vector/sinks/greptimedb/metrics/
service.rs1use std::{sync::Arc, task::Poll};
2
3use greptimedb_ingester::{
4 Client, ClientBuilder, Compression, Database, Error as GreptimeError,
5 api::v1::{auth_header::AuthScheme, *},
6 channel_manager::*,
7};
8use vector_lib::sensitive_string::SensitiveString;
9
10use crate::sinks::{
11 greptimedb::metrics::{
12 config::GreptimeDBMetricsConfig,
13 request::{GreptimeDBGrpcBatchOutput, GreptimeDBGrpcRequest},
14 },
15 prelude::*,
16};
17
18#[derive(Debug, Clone)]
19pub struct GreptimeDBGrpcService {
20 client: Arc<Database>,
22}
23
24fn new_client_from_config(config: &GreptimeDBGrpcServiceConfig) -> crate::Result<Client> {
25 let mut builder = ClientBuilder::default().peers(vec![&config.endpoint]);
26
27 if let Some(compression) = config.compression.as_ref() {
28 let compression = match compression.as_str() {
29 "gzip" => Compression::Gzip,
30 "zstd" => Compression::Zstd,
31 _ => {
32 warn!(message = "Unknown gRPC compression type: {compression}, disabled.");
33 Compression::None
34 }
35 };
36 builder = builder.compression(compression);
37 }
38
39 if let Some(tls_config) = &config.tls {
40 let channel_config = ChannelConfig {
41 client_tls: Some(try_from_tls_config(tls_config)?),
42 ..Default::default()
43 };
44
45 builder = builder
46 .channel_manager(ChannelManager::with_tls_config(channel_config).map_err(Box::new)?);
47 }
48
49 Ok(builder.build())
50}
51
52fn try_from_tls_config(tls_config: &TlsConfig) -> crate::Result<ClientTlsOption> {
53 if tls_config.key_pass.is_some()
54 || tls_config.alpn_protocols.is_some()
55 || tls_config.verify_certificate.is_some()
56 || tls_config.verify_hostname.is_some()
57 {
58 warn!(
59 message = "TlsConfig: key_pass, alpn_protocols, verify_certificate and verify_hostname are not supported by greptimedb client at the moment."
60 );
61 }
62
63 Ok(ClientTlsOption {
64 server_ca_cert_path: tls_config.ca_file.clone(),
65 client_cert_path: tls_config.crt_file.clone(),
66 client_key_path: tls_config.key_file.clone(),
67 })
68}
69
70impl GreptimeDBGrpcService {
71 pub fn try_new(config: impl Into<GreptimeDBGrpcServiceConfig>) -> crate::Result<Self> {
72 let config = config.into();
73
74 let grpc_client = new_client_from_config(&config)?;
75
76 let mut client = Database::new_with_dbname(&config.dbname, grpc_client);
77
78 if let (Some(username), Some(password)) = (&config.username, &config.password) {
79 client.set_auth(AuthScheme::Basic(Basic {
80 username: username.to_owned(),
81 password: password.clone().into(),
82 }))
83 };
84
85 Ok(GreptimeDBGrpcService {
86 client: Arc::new(client),
87 })
88 }
89}
90
91impl Service<GreptimeDBGrpcRequest> for GreptimeDBGrpcService {
92 type Response = GreptimeDBGrpcBatchOutput;
93 type Error = GreptimeError;
94 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
95
96 fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
97 Poll::Ready(Ok(()))
98 }
99
100 fn call(&mut self, req: GreptimeDBGrpcRequest) -> Self::Future {
102 let client = Arc::clone(&self.client);
103
104 Box::pin(async move {
105 let metadata = req.metadata;
106 let result = client.row_insert(req.items).await?;
107
108 Ok(GreptimeDBGrpcBatchOutput {
109 _item_count: result,
110 metadata,
111 })
112 })
113 }
114}
115
116pub(super) struct GreptimeDBGrpcServiceConfig {
118 endpoint: String,
119 dbname: String,
120 username: Option<String>,
121 password: Option<SensitiveString>,
122 compression: Option<String>,
123 tls: Option<TlsConfig>,
124}
125
126impl From<&GreptimeDBMetricsConfig> for GreptimeDBGrpcServiceConfig {
127 fn from(val: &GreptimeDBMetricsConfig) -> Self {
128 GreptimeDBGrpcServiceConfig {
129 endpoint: val.endpoint.clone(),
130 dbname: val.dbname.clone(),
131 username: val.username.clone(),
132 password: val.password.clone(),
133 compression: val.grpc_compression.clone(),
134 tls: val.tls.clone(),
135 }
136 }
137}
138
139pub(super) fn healthcheck(
140 config: impl Into<GreptimeDBGrpcServiceConfig>,
141) -> crate::Result<Healthcheck> {
142 let config = config.into();
143 let client = new_client_from_config(&config)?;
144
145 Ok(async move { client.health_check().await.map_err(|error| error.into()) }.boxed())
146}