vector/sinks/greptimedb/metrics/
service.rs

1use std::{sync::Arc, task::Poll};
2
3use greptimedb_ingester::{
4    Client, ClientBuilder, Compression, Database, Error as GreptimeError,
5    api::v1::{auth_header::AuthScheme, *},
6    channel_manager::*,
7};
8use vector_lib::sensitive_string::SensitiveString;
9
10use crate::sinks::{
11    greptimedb::metrics::{
12        config::GreptimeDBMetricsConfig,
13        request::{GreptimeDBGrpcBatchOutput, GreptimeDBGrpcRequest},
14    },
15    prelude::*,
16};
17
18#[derive(Debug, Clone)]
19pub struct GreptimeDBGrpcService {
20    /// the client that connects to greptimedb
21    client: Arc<Database>,
22}
23
24fn new_client_from_config(config: &GreptimeDBGrpcServiceConfig) -> crate::Result<Client> {
25    let mut builder = ClientBuilder::default().peers(vec![&config.endpoint]);
26
27    if let Some(compression) = config.compression.as_ref() {
28        let compression = match compression.as_str() {
29            "gzip" => Compression::Gzip,
30            "zstd" => Compression::Zstd,
31            _ => {
32                warn!(message = "Unknown gRPC compression type: {compression}, disabled.");
33                Compression::None
34            }
35        };
36        builder = builder.compression(compression);
37    }
38
39    if let Some(tls_config) = &config.tls {
40        let channel_config = ChannelConfig {
41            client_tls: Some(try_from_tls_config(tls_config)?),
42            ..Default::default()
43        };
44
45        builder = builder
46            .channel_manager(ChannelManager::with_tls_config(channel_config).map_err(Box::new)?);
47    }
48
49    Ok(builder.build())
50}
51
52fn try_from_tls_config(tls_config: &TlsConfig) -> crate::Result<ClientTlsOption> {
53    if tls_config.key_pass.is_some()
54        || tls_config.alpn_protocols.is_some()
55        || tls_config.verify_certificate.is_some()
56        || tls_config.verify_hostname.is_some()
57    {
58        warn!(
59            message = "TlsConfig: key_pass, alpn_protocols, verify_certificate and verify_hostname are not supported by greptimedb client at the moment."
60        );
61    }
62
63    Ok(ClientTlsOption {
64        server_ca_cert_path: tls_config.ca_file.clone(),
65        client_cert_path: tls_config.crt_file.clone(),
66        client_key_path: tls_config.key_file.clone(),
67    })
68}
69
70impl GreptimeDBGrpcService {
71    pub fn try_new(config: impl Into<GreptimeDBGrpcServiceConfig>) -> crate::Result<Self> {
72        let config = config.into();
73
74        let grpc_client = new_client_from_config(&config)?;
75
76        let mut client = Database::new_with_dbname(&config.dbname, grpc_client);
77
78        if let (Some(username), Some(password)) = (&config.username, &config.password) {
79            client.set_auth(AuthScheme::Basic(Basic {
80                username: username.to_owned(),
81                password: password.clone().into(),
82            }))
83        };
84
85        Ok(GreptimeDBGrpcService {
86            client: Arc::new(client),
87        })
88    }
89}
90
91impl Service<GreptimeDBGrpcRequest> for GreptimeDBGrpcService {
92    type Response = GreptimeDBGrpcBatchOutput;
93    type Error = GreptimeError;
94    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
95
96    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
97        Poll::Ready(Ok(()))
98    }
99
100    // Convert vector metrics into GreptimeDB format and send them in batch
101    fn call(&mut self, req: GreptimeDBGrpcRequest) -> Self::Future {
102        let client = Arc::clone(&self.client);
103
104        Box::pin(async move {
105            let metadata = req.metadata;
106            let result = client.row_insert(req.items).await?;
107
108            Ok(GreptimeDBGrpcBatchOutput {
109                _item_count: result,
110                metadata,
111            })
112        })
113    }
114}
115
116/// Configuration for the GreptimeDB gRPC service
117pub(super) struct GreptimeDBGrpcServiceConfig {
118    endpoint: String,
119    dbname: String,
120    username: Option<String>,
121    password: Option<SensitiveString>,
122    compression: Option<String>,
123    tls: Option<TlsConfig>,
124}
125
126impl From<&GreptimeDBMetricsConfig> for GreptimeDBGrpcServiceConfig {
127    fn from(val: &GreptimeDBMetricsConfig) -> Self {
128        GreptimeDBGrpcServiceConfig {
129            endpoint: val.endpoint.clone(),
130            dbname: val.dbname.clone(),
131            username: val.username.clone(),
132            password: val.password.clone(),
133            compression: val.grpc_compression.clone(),
134            tls: val.tls.clone(),
135        }
136    }
137}
138
139pub(super) fn healthcheck(
140    config: impl Into<GreptimeDBGrpcServiceConfig>,
141) -> crate::Result<Healthcheck> {
142    let config = config.into();
143    let client = new_client_from_config(&config)?;
144
145    Ok(async move { client.health_check().await.map_err(|error| error.into()) }.boxed())
146}