1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use crate::sinks::{
    greptimedb::metrics::config::GreptimeDBMetricsConfig,
    greptimedb::metrics::request::{GreptimeDBGrpcBatchOutput, GreptimeDBGrpcRequest},
    prelude::*,
};
use greptimedb_ingester::{
    api::v1::auth_header::AuthScheme, api::v1::*, channel_manager::*, Client, ClientBuilder,
    Compression, Database, Error as GreptimeError,
};
use std::{sync::Arc, task::Poll};
use vector_lib::sensitive_string::SensitiveString;

#[derive(Debug, Clone)]
pub struct GreptimeDBGrpcService {
    /// the client that connects to greptimedb
    client: Arc<Database>,
}

fn new_client_from_config(config: &GreptimeDBGrpcServiceConfig) -> crate::Result<Client> {
    let mut builder = ClientBuilder::default().peers(vec![&config.endpoint]);

    if let Some(compression) = config.compression.as_ref() {
        let compression = match compression.as_str() {
            "gzip" => Compression::Gzip,
            "zstd" => Compression::Zstd,
            _ => {
                warn!(message = "Unknown gRPC compression type: {compression}, disabled.");
                Compression::None
            }
        };
        builder = builder.compression(compression);
    }

    if let Some(tls_config) = &config.tls {
        let channel_config = ChannelConfig {
            client_tls: Some(try_from_tls_config(tls_config)?),
            ..Default::default()
        };

        builder = builder
            .channel_manager(ChannelManager::with_tls_config(channel_config).map_err(Box::new)?);
    }

    Ok(builder.build())
}

fn try_from_tls_config(tls_config: &TlsConfig) -> crate::Result<ClientTlsOption> {
    if tls_config.key_pass.is_some()
        || tls_config.alpn_protocols.is_some()
        || tls_config.verify_certificate.is_some()
        || tls_config.verify_hostname.is_some()
    {
        warn!(message = "TlsConfig: key_pass, alpn_protocols, verify_certificate and verify_hostname are not supported by greptimedb client at the moment.");
    }

    Ok(ClientTlsOption {
        server_ca_cert_path: tls_config.ca_file.clone(),
        client_cert_path: tls_config.crt_file.clone(),
        client_key_path: tls_config.key_file.clone(),
    })
}

impl GreptimeDBGrpcService {
    pub fn try_new(config: impl Into<GreptimeDBGrpcServiceConfig>) -> crate::Result<Self> {
        let config = config.into();

        let grpc_client = new_client_from_config(&config)?;

        let mut client = Database::new_with_dbname(&config.dbname, grpc_client);

        if let (Some(username), Some(password)) = (&config.username, &config.password) {
            client.set_auth(AuthScheme::Basic(Basic {
                username: username.to_owned(),
                password: password.clone().into(),
            }))
        };

        Ok(GreptimeDBGrpcService {
            client: Arc::new(client),
        })
    }
}

impl Service<GreptimeDBGrpcRequest> for GreptimeDBGrpcService {
    type Response = GreptimeDBGrpcBatchOutput;
    type Error = GreptimeError;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    // Convert vector metrics into GreptimeDB format and send them in batch
    fn call(&mut self, req: GreptimeDBGrpcRequest) -> Self::Future {
        let client = Arc::clone(&self.client);

        Box::pin(async move {
            let metadata = req.metadata;
            let result = client.row_insert(req.items).await?;

            Ok(GreptimeDBGrpcBatchOutput {
                _item_count: result,
                metadata,
            })
        })
    }
}

/// Configuration for the GreptimeDB gRPC service
pub(super) struct GreptimeDBGrpcServiceConfig {
    endpoint: String,
    dbname: String,
    username: Option<String>,
    password: Option<SensitiveString>,
    compression: Option<String>,
    tls: Option<TlsConfig>,
}

impl From<&GreptimeDBMetricsConfig> for GreptimeDBGrpcServiceConfig {
    fn from(val: &GreptimeDBMetricsConfig) -> Self {
        GreptimeDBGrpcServiceConfig {
            endpoint: val.endpoint.clone(),
            dbname: val.dbname.clone(),
            username: val.username.clone(),
            password: val.password.clone(),
            compression: val.grpc_compression.clone(),
            tls: val.tls.clone(),
        }
    }
}

pub(super) fn healthcheck(
    config: impl Into<GreptimeDBGrpcServiceConfig>,
) -> crate::Result<Healthcheck> {
    let config = config.into();
    let client = new_client_from_config(&config)?;

    Ok(async move { client.health_check().await.map_err(|error| error.into()) }.boxed())
}