vector/sinks/greptimedb/metrics/
service.rs1use std::{sync::Arc, task::Poll};
2
3use greptimedb_ingester::{
4 Error as GreptimeError, GrpcCompression as IngesterGrpcCompression,
5 api::v1::Basic,
6 api::v1::auth_header::AuthScheme,
7 channel_manager::{ChannelConfig, ChannelManager, ClientTlsOption},
8 client::Client,
9 database::Database,
10};
11use vector_lib::sensitive_string::SensitiveString;
12
13use crate::sinks::{
14 greptimedb::{
15 GrpcCompression,
16 metrics::{
17 config::GreptimeDBMetricsConfig,
18 request::{GreptimeDBGrpcBatchOutput, GreptimeDBGrpcRequest},
19 },
20 },
21 prelude::*,
22};
23
24#[derive(Debug, Clone)]
25pub struct GreptimeDBGrpcService {
26 client: Arc<Database>,
28}
29
30fn new_client_from_config(config: &GreptimeDBGrpcServiceConfig) -> crate::Result<Client> {
31 let send_compression_encoding = match config.compression {
32 GrpcCompression::None => None,
33 GrpcCompression::Gzip => Some(IngesterGrpcCompression::Gzip),
34 GrpcCompression::Zstd => Some(IngesterGrpcCompression::Zstd),
35 };
36
37 let mut channel_config = ChannelConfig {
38 send_compression_encoding,
39 ..Default::default()
40 };
41
42 let channel_manager = if let Some(tls_config) = &config.tls {
43 let TlsConfig {
44 verify_certificate,
45 verify_hostname,
46 alpn_protocols,
47 ca_file,
48 crt_file,
49 key_file,
50 key_pass,
51 server_name,
52 } = tls_config;
53
54 if verify_certificate.is_some()
55 || verify_hostname.is_some()
56 || alpn_protocols.is_some()
57 || key_pass.is_some()
58 || server_name.is_some()
59 {
60 warn!(
61 message = "TlsConfig: verify_certificate, verify_hostname, alpn_protocols, key_pass and server_name are not supported by greptimedb client at the moment."
62 );
63 }
64
65 match (ca_file, crt_file, key_file) {
68 (Some(ca), Some(crt), Some(key)) => {
69 channel_config.client_tls = Some(ClientTlsOption {
70 server_ca_cert_path: ca.to_string_lossy().into_owned(),
71 client_cert_path: crt.to_string_lossy().into_owned(),
72 client_key_path: key.to_string_lossy().into_owned(),
73 });
74 ChannelManager::with_tls_config(channel_config).map_err(Box::new)?
75 }
76 _ => {
77 return Err(
78 "GreptimeDB TLS requires ca_file, crt_file, and key_file to all be set.".into(),
79 );
80 }
81 }
82 } else {
83 ChannelManager::with_config(channel_config)
84 };
85 let client = Client::with_manager_and_urls(channel_manager, vec![&config.endpoint]);
86
87 Ok(client)
88}
89
90impl GreptimeDBGrpcService {
91 pub fn try_new(config: impl Into<GreptimeDBGrpcServiceConfig>) -> crate::Result<Self> {
92 let config = config.into();
93
94 let grpc_client = new_client_from_config(&config)?;
95
96 let mut client = Database::new_with_dbname(&config.dbname, grpc_client);
97
98 if let (Some(username), Some(password)) = (&config.username, &config.password) {
99 client.set_auth(AuthScheme::Basic(Basic {
100 username: username.to_owned(),
101 password: password.clone().into(),
102 }))
103 };
104
105 Ok(GreptimeDBGrpcService {
106 client: Arc::new(client),
107 })
108 }
109}
110
111impl Service<GreptimeDBGrpcRequest> for GreptimeDBGrpcService {
112 type Response = GreptimeDBGrpcBatchOutput;
113 type Error = GreptimeError;
114 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
115
116 fn poll_ready(&mut self, _cx: &mut std::task::Context) -> Poll<Result<(), Self::Error>> {
117 Poll::Ready(Ok(()))
118 }
119
120 fn call(&mut self, req: GreptimeDBGrpcRequest) -> Self::Future {
122 let client = Arc::clone(&self.client);
123
124 Box::pin(async move {
125 let metadata = req.metadata;
126 let result = client.insert(req.items).await?;
127
128 Ok(GreptimeDBGrpcBatchOutput {
129 _item_count: result,
130 metadata,
131 })
132 })
133 }
134}
135
136pub(super) struct GreptimeDBGrpcServiceConfig {
138 endpoint: String,
139 dbname: String,
140 username: Option<String>,
141 password: Option<SensitiveString>,
142 compression: GrpcCompression,
143 tls: Option<TlsConfig>,
144}
145
146impl From<&GreptimeDBMetricsConfig> for GreptimeDBGrpcServiceConfig {
147 fn from(val: &GreptimeDBMetricsConfig) -> Self {
148 GreptimeDBGrpcServiceConfig {
149 endpoint: val.endpoint.clone(),
150 dbname: val.dbname.clone(),
151 username: val.username.clone(),
152 password: val.password.clone(),
153 compression: val.grpc_compression,
154 tls: val.tls.clone(),
155 }
156 }
157}
158
159pub(super) fn healthcheck(
160 config: impl Into<GreptimeDBGrpcServiceConfig>,
161) -> crate::Result<Healthcheck> {
162 let config = config.into();
163 let client = new_client_from_config(&config)?;
164
165 Ok(async move { client.health_check().await.map_err(|error| error.into()) }.boxed())
166}