vector/sinks/greptimedb/metrics/
service.rs

1use std::{sync::Arc, task::Poll};
2
3use greptimedb_ingester::{
4    Error as GreptimeError, GrpcCompression as IngesterGrpcCompression,
5    api::v1::Basic,
6    api::v1::auth_header::AuthScheme,
7    channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption},
8    client::Client,
9    database::Database,
10};
11use vector_lib::sensitive_string::SensitiveString;
12
13use crate::sinks::{
14    greptimedb::{
15        GrpcCompression,
16        metrics::{
17            config::GreptimeDBMetricsConfig,
18            request::{GreptimeDBGrpcBatchOutput, GreptimeDBGrpcRequest},
19        },
20    },
21    prelude::*,
22};
23
24#[derive(Debug, Clone)]
25pub struct GreptimeDBGrpcService {
26    /// the client that connects to greptimedb
27    client: Arc<Database>,
28}
29
30fn new_client_from_config(config: &GreptimeDBGrpcServiceConfig) -> crate::Result<Client> {
31    let send_compression_encoding = match config.compression {
32        GrpcCompression::None => None,
33        GrpcCompression::Gzip => Some(IngesterGrpcCompression::Gzip),
34        GrpcCompression::Zstd => Some(IngesterGrpcCompression::Zstd),
35    };
36
37    let mut channel_config = ChannelConfig {
38        send_compression_encoding,
39        ..Default::default()
40    };
41
42    let channel_manager = if let Some(tls_config) = &config.tls {
43        let TlsConfig {
44            verify_certificate,
45            verify_hostname,
46            alpn_protocols,
47            ca_file,
48            crt_file,
49            key_file,
50            key_pass,
51            server_name,
52        } = tls_config;
53
54        if verify_certificate.is_some()
55            || verify_hostname.is_some()
56            || alpn_protocols.is_some()
57            || key_pass.is_some()
58            || server_name.is_some()
59        {
60            warn!(
61                message = "TlsConfig: verify_certificate, verify_hostname, alpn_protocols, key_pass and server_name are not supported by greptimedb client at the moment."
62            );
63        }
64
65        // The greptimedb ingester requires all three TLS paths (ca_file, crt_file,
66        // key_file) to be set. Refuse a partial config rather than downgrading to plaintext.
67        match (ca_file, crt_file, key_file) {
68            (Some(ca), Some(crt), Some(key)) => {
69                channel_config.client_tls = Some(ClientTlsOption {
70                    server_ca_cert_path: ca.to_string_lossy().into_owned(),
71                    client_cert_path: crt.to_string_lossy().into_owned(),
72                    client_key_path: key.to_string_lossy().into_owned(),
73                });
74                ChannelManager::with_tls_config(channel_config).map_err(Box::new)?
75            }
76            _ => {
77                return Err(
78                    "GreptimeDB TLS requires ca_file, crt_file, and key_file to all be set.".into(),
79                );
80            }
81        }
82    } else {
83        ChannelManager::with_config(channel_config)
84    };
85    let client = Client::with_manager_and_urls(channel_manager, vec![&config.endpoint]);
86
87    Ok(client)
88}
89
90impl GreptimeDBGrpcService {
91    pub fn try_new(config: impl Into<GreptimeDBGrpcServiceConfig>) -> crate::Result<Self> {
92        let config = config.into();
93
94        let grpc_client = new_client_from_config(&config)?;
95
96        let mut client = Database::new_with_dbname(&config.dbname, grpc_client);
97
98        if let (Some(username), Some(password)) = (&config.username, &config.password) {
99            client.set_auth(AuthScheme::Basic(Basic {
100                username: username.to_owned(),
101                password: password.clone().into(),
102            }))
103        };
104
105        Ok(GreptimeDBGrpcService {
106            client: Arc::new(client),
107        })
108    }
109}
110
111impl Service<GreptimeDBGrpcRequest> for GreptimeDBGrpcService {
112    type Response = GreptimeDBGrpcBatchOutput;
113    type Error = GreptimeError;
114    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
115
116    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
117        Poll::Ready(Ok(()))
118    }
119
120    // Convert vector metrics into GreptimeDB format and send them in batch
121    fn call(&mut self, req: GreptimeDBGrpcRequest) -> Self::Future {
122        let client = Arc::clone(&self.client);
123
124        Box::pin(async move {
125            let metadata = req.metadata;
126            let result = client.insert(req.items).await?;
127
128            Ok(GreptimeDBGrpcBatchOutput {
129                _item_count: result,
130                metadata,
131            })
132        })
133    }
134}
135
136/// Configuration for the GreptimeDB gRPC service
137pub(super) struct GreptimeDBGrpcServiceConfig {
138    endpoint: String,
139    dbname: String,
140    username: Option<String>,
141    password: Option<SensitiveString>,
142    compression: GrpcCompression,
143    tls: Option<TlsConfig>,
144}
145
146impl From<&GreptimeDBMetricsConfig> for GreptimeDBGrpcServiceConfig {
147    fn from(val: &GreptimeDBMetricsConfig) -> Self {
148        GreptimeDBGrpcServiceConfig {
149            endpoint: val.endpoint.clone(),
150            dbname: val.dbname.clone(),
151            username: val.username.clone(),
152            password: val.password.clone(),
153            compression: val.grpc_compression,
154            tls: val.tls.clone(),
155        }
156    }
157}
158
159pub(super) fn healthcheck(
160    config: impl Into<GreptimeDBGrpcServiceConfig>,
161) -> crate::Result<Healthcheck> {
162    let config = config.into();
163    let client = new_client_from_config(&config)?;
164
165    Ok(async move { client.health_check().await.map_err(|error| error.into()) }.boxed())
166}