1use std::{collections::BTreeMap, sync::Arc};
2
3use databend_client::APIClient as DatabendAPIClient;
4use futures::future::FutureExt;
5use tower::ServiceBuilder;
6use vector_lib::{
7 codecs::encoding::{Framer, FramingConfig},
8 configurable::{component::GenerateConfig, configurable_component},
9};
10
11use super::{
12 compression::DatabendCompression,
13 encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
14 request_builder::DatabendRequestBuilder,
15 service::{DatabendRetryLogic, DatabendService},
16 sink::DatabendSink,
17};
18use crate::{
19 codecs::{Encoder, EncodingConfig},
20 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
21 http::{Auth, MaybeAuth},
22 sinks::{
23 Healthcheck, VectorSink,
24 util::{
25 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
26 TowerRequestConfig, UriSerde,
27 },
28 },
29 tls::TlsConfig,
30 vector_version,
31};
32
33#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct DatabendConfig {
38 #[configurable(metadata(
40 docs::examples = "databend://localhost:8000/default?sslmode=disable"
41 ))]
42 pub endpoint: UriSerde,
43
44 #[configurable(
46 deprecated = "This option has been deprecated, use arguments in the DSN instead."
47 )]
48 pub tls: Option<TlsConfig>,
49
50 #[configurable(metadata(docs::examples = "mydatabase"))]
52 pub database: Option<String>,
53
54 #[configurable(derived)]
56 pub auth: Option<Auth>,
57
58 #[configurable(metadata(docs::examples = "mytable"))]
60 pub table: String,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub missing_field_as: DatabendMissingFieldAS,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub encoding: DatabendEncodingConfig,
69
70 #[configurable(derived)]
71 #[serde(default)]
72 pub compression: DatabendCompression,
73
74 #[configurable(derived)]
75 #[serde(default)]
76 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub request: TowerRequestConfig,
81
82 #[configurable(derived)]
83 #[serde(
84 default,
85 deserialize_with = "crate::serde::bool_or_struct",
86 skip_serializing_if = "crate::serde::is_default"
87 )]
88 pub acknowledgements: AcknowledgementsConfig,
89}
90
91impl GenerateConfig for DatabendConfig {
92 fn generate_config() -> toml::Value {
93 toml::from_str(
94 r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
95 table = "default"
96 "#,
97 )
98 .unwrap()
99 }
100}
101
102#[async_trait::async_trait]
103#[typetag::serde(name = "databend")]
104impl SinkConfig for DatabendConfig {
105 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
106 let ua = format!("vector/{}", vector_version());
107 let auth = self.auth.choose_one(&self.endpoint.auth)?;
108 let authority = self
109 .endpoint
110 .uri
111 .authority()
112 .ok_or("Endpoint missing authority")?;
113 let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
114 Some("databend") => self.endpoint.to_string(),
115 Some("http") => format!("databend://{authority}/?sslmode=disable"),
117 Some("https") => format!("databend://{authority}"),
118 None => {
119 return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
120 }
121 Some(s) => {
122 return Err(format!("Unsupported scheme for Databend endpoint: {s}").into());
123 }
124 };
125 let mut endpoint = url::Url::parse(&endpoint)?;
126 match auth {
127 Some(Auth::Basic { user, password }) => {
128 let _ = endpoint.set_username(&user);
129 let _ = endpoint.set_password(Some(password.inner()));
130 }
131 Some(Auth::Bearer { .. }) => {
132 return Err("Bearer authentication is not supported currently".into());
133 }
134 None => {}
135 #[cfg(feature = "aws-core")]
136 _ => {}
137 }
138 if let Some(database) = &self.database {
139 endpoint.set_path(&format!("/{database}"));
140 }
141 let endpoint = endpoint.to_string();
142 let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
143 let healthcheck = select_one(health_client).boxed();
144
145 let request_settings = self.request.into_settings();
146 let batch_settings = self.batch.into_batcher_settings()?;
147
148 let mut file_format_options = BTreeMap::new();
149 let compression = match self.compression {
150 DatabendCompression::Gzip => {
151 file_format_options.insert("compression", "GZIP");
152 Compression::gzip_default()
153 }
154 DatabendCompression::None => {
155 file_format_options.insert("compression", "NONE");
156 Compression::None
157 }
158 };
159 let encoding: EncodingConfig = self.encoding.clone().into();
160 let serializer = match self.encoding.config() {
161 DatabendSerializerConfig::Json(_) => {
162 file_format_options.insert("type", "NDJSON");
163 file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
164 encoding.build()?
165 }
166 DatabendSerializerConfig::Csv(_) => {
167 file_format_options.insert("type", "CSV");
168 file_format_options.insert("field_delimiter", ",");
169 file_format_options.insert("record_delimiter", "\n");
170 file_format_options.insert("skip_header", "0");
171 encoding.build()?
172 }
173 };
174 let framer = FramingConfig::NewlineDelimited.build();
175 let transformer = encoding.transformer();
176
177 let mut copy_options = BTreeMap::new();
178 copy_options.insert("purge", "true");
179
180 let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
181 let service = DatabendService::new(
182 client,
183 self.table.clone(),
184 file_format_options,
185 copy_options,
186 )?;
187 let service = ServiceBuilder::new()
188 .settings(request_settings, DatabendRetryLogic)
189 .service(service);
190
191 let encoder = Encoder::<Framer>::new(framer, serializer);
192 let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
193
194 let sink = DatabendSink::new(batch_settings, request_builder, service);
195
196 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
197 }
198
199 fn input(&self) -> Input {
200 Input::log()
201 }
202
203 fn acknowledgements(&self) -> &AcknowledgementsConfig {
204 &self.acknowledgements
205 }
206}
207
208async fn select_one(client: Arc<DatabendAPIClient>) -> crate::Result<()> {
209 client.query_all("SELECT 1").await?;
210 Ok(())
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn generate_config() {
219 crate::test_util::test_generate_config::<DatabendConfig>();
220 }
221
222 #[test]
223 fn parse_config() {
224 let cfg = toml::from_str::<DatabendConfig>(
225 r#"
226 endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
227 table = "mytable"
228 "#,
229 )
230 .unwrap();
231 assert_eq!(
232 cfg.endpoint.uri,
233 "databend://localhost:8000/mydatabase?sslmode=disable"
234 );
235 assert_eq!(cfg.table, "mytable");
236 assert!(matches!(
237 cfg.encoding.config(),
238 DatabendSerializerConfig::Json(_)
239 ));
240 assert!(matches!(cfg.compression, DatabendCompression::None));
241 }
242
243 #[test]
244 fn parse_config_with_encoding_compression() {
245 let cfg = toml::from_str::<DatabendConfig>(
246 r#"
247 endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
248 table = "mytable"
249 encoding.codec = "csv"
250 encoding.csv.fields = ["host", "timestamp", "message"]
251 compression = "gzip"
252 "#,
253 )
254 .unwrap();
255 assert_eq!(
256 cfg.endpoint.uri,
257 "databend://localhost:8000/mydatabase?sslmode=disable"
258 );
259 assert_eq!(cfg.table, "mytable");
260 assert!(matches!(
261 cfg.encoding.config(),
262 DatabendSerializerConfig::Csv(_)
263 ));
264 assert!(matches!(cfg.compression, DatabendCompression::Gzip));
265 }
266}