1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use databend_client::APIClient as DatabendAPIClient;
5use futures::future::FutureExt;
6use tower::ServiceBuilder;
7use vector_lib::codecs::encoding::{Framer, FramingConfig};
8use vector_lib::configurable::{component::GenerateConfig, configurable_component};
9
10use crate::{
11 codecs::{Encoder, EncodingConfig},
12 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
13 http::{Auth, MaybeAuth},
14 sinks::{
15 util::{
16 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
17 TowerRequestConfig, UriSerde,
18 },
19 Healthcheck, VectorSink,
20 },
21 tls::TlsConfig,
22 vector_version,
23};
24
25use super::{
26 compression::DatabendCompression,
27 encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
28 request_builder::DatabendRequestBuilder,
29 service::{DatabendRetryLogic, DatabendService},
30 sink::DatabendSink,
31};
32
33#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct DatabendConfig {
38 #[configurable(metadata(
40 docs::examples = "databend://localhost:8000/default?sslmode=disable"
41 ))]
42 pub endpoint: UriSerde,
43
44 #[configurable(
46 deprecated = "This option has been deprecated, use arguments in the DSN instead."
47 )]
48 pub tls: Option<TlsConfig>,
49
50 #[configurable(metadata(docs::examples = "mydatabase"))]
52 pub database: Option<String>,
53
54 #[configurable(derived)]
56 pub auth: Option<Auth>,
57
58 #[configurable(metadata(docs::examples = "mytable"))]
60 pub table: String,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub missing_field_as: DatabendMissingFieldAS,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub encoding: DatabendEncodingConfig,
69
70 #[configurable(derived)]
71 #[serde(default)]
72 pub compression: DatabendCompression,
73
74 #[configurable(derived)]
75 #[serde(default)]
76 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub request: TowerRequestConfig,
81
82 #[configurable(derived)]
83 #[serde(
84 default,
85 deserialize_with = "crate::serde::bool_or_struct",
86 skip_serializing_if = "crate::serde::is_default"
87 )]
88 pub acknowledgements: AcknowledgementsConfig,
89}
90
91impl GenerateConfig for DatabendConfig {
92 fn generate_config() -> toml::Value {
93 toml::from_str(
94 r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
95 table = "default"
96 "#,
97 )
98 .unwrap()
99 }
100}
101
102#[async_trait::async_trait]
103#[typetag::serde(name = "databend")]
104impl SinkConfig for DatabendConfig {
105 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
106 let ua = format!("vector/{}", vector_version());
107 let auth = self.auth.choose_one(&self.endpoint.auth)?;
108 let authority = self
109 .endpoint
110 .uri
111 .authority()
112 .ok_or("Endpoint missing authority")?;
113 let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
114 Some("databend") => self.endpoint.to_string(),
115 Some("http") => format!("databend://{authority}/?sslmode=disable"),
117 Some("https") => format!("databend://{authority}"),
118 None => {
119 return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
120 }
121 Some(s) => {
122 return Err(format!("Unsupported scheme for Databend endpoint: {s}").into());
123 }
124 };
125 let mut endpoint = url::Url::parse(&endpoint)?;
126 match auth {
127 Some(Auth::Basic { user, password }) => {
128 let _ = endpoint.set_username(&user);
129 let _ = endpoint.set_password(Some(password.inner()));
130 }
131 Some(Auth::Bearer { .. }) => {
132 return Err("Bearer authentication is not supported currently".into());
133 }
134 None => {}
135 #[cfg(feature = "aws-core")]
136 _ => {}
137 }
138 if let Some(database) = &self.database {
139 endpoint.set_path(&format!("/{database}"));
140 }
141 let endpoint = endpoint.to_string();
142 let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
143 let healthcheck = select_one(health_client).boxed();
144
145 let request_settings = self.request.into_settings();
146 let batch_settings = self.batch.into_batcher_settings()?;
147
148 let mut file_format_options = BTreeMap::new();
149 let compression = match self.compression {
150 DatabendCompression::Gzip => {
151 file_format_options.insert("compression", "GZIP");
152 Compression::gzip_default()
153 }
154 DatabendCompression::None => {
155 file_format_options.insert("compression", "NONE");
156 Compression::None
157 }
158 };
159 let encoding: EncodingConfig = self.encoding.clone().into();
160 let serializer = match self.encoding.config() {
161 DatabendSerializerConfig::Json(_) => {
162 file_format_options.insert("type", "NDJSON");
163 file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
164 encoding.build()?
165 }
166 DatabendSerializerConfig::Csv(_) => {
167 file_format_options.insert("type", "CSV");
168 file_format_options.insert("field_delimiter", ",");
169 file_format_options.insert("record_delimiter", "\n");
170 file_format_options.insert("skip_header", "0");
171 encoding.build()?
172 }
173 };
174 let framer = FramingConfig::NewlineDelimited.build();
175 let transformer = encoding.transformer();
176
177 let mut copy_options = BTreeMap::new();
178 copy_options.insert("purge", "true");
179
180 let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
181 let service = DatabendService::new(
182 client,
183 self.table.clone(),
184 file_format_options,
185 copy_options,
186 )?;
187 let service = ServiceBuilder::new()
188 .settings(request_settings, DatabendRetryLogic)
189 .service(service);
190
191 let encoder = Encoder::<Framer>::new(framer, serializer);
192 let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
193
194 let sink = DatabendSink::new(batch_settings, request_builder, service);
195
196 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
197 }
198
199 fn input(&self) -> Input {
200 Input::log()
201 }
202
203 fn acknowledgements(&self) -> &AcknowledgementsConfig {
204 &self.acknowledgements
205 }
206}
207
208async fn select_one(client: Arc<DatabendAPIClient>) -> crate::Result<()> {
209 client.query_all("SELECT 1").await?;
210 Ok(())
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn generate_config() {
219 crate::test_util::test_generate_config::<DatabendConfig>();
220 }
221
222 #[test]
223 fn parse_config() {
224 let cfg = toml::from_str::<DatabendConfig>(
225 r#"
226 endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
227 table = "mytable"
228 "#,
229 )
230 .unwrap();
231 assert_eq!(
232 cfg.endpoint.uri,
233 "databend://localhost:8000/mydatabase?sslmode=disable"
234 );
235 assert_eq!(cfg.table, "mytable");
236 assert!(matches!(
237 cfg.encoding.config(),
238 DatabendSerializerConfig::Json(_)
239 ));
240 assert!(matches!(cfg.compression, DatabendCompression::None));
241 }
242
243 #[test]
244 fn parse_config_with_encoding_compression() {
245 let cfg = toml::from_str::<DatabendConfig>(
246 r#"
247 endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
248 table = "mytable"
249 encoding.codec = "csv"
250 encoding.csv.fields = ["host", "timestamp", "message"]
251 compression = "gzip"
252 "#,
253 )
254 .unwrap();
255 assert_eq!(
256 cfg.endpoint.uri,
257 "databend://localhost:8000/mydatabase?sslmode=disable"
258 );
259 assert_eq!(cfg.table, "mytable");
260 assert!(matches!(
261 cfg.encoding.config(),
262 DatabendSerializerConfig::Csv(_)
263 ));
264 assert!(matches!(cfg.compression, DatabendCompression::Gzip));
265 }
266}