1use std::{collections::BTreeMap, sync::Arc};
2
3use databend_client::APIClient as DatabendAPIClient;
4use futures::future::FutureExt;
5use tower::ServiceBuilder;
6use vector_lib::{
7 codecs::encoding::{Framer, FramingConfig},
8 configurable::{component::GenerateConfig, configurable_component},
9};
10
11use super::{
12 compression::DatabendCompression,
13 encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
14 request_builder::DatabendRequestBuilder,
15 service::{DatabendRetryLogic, DatabendService},
16 sink::DatabendSink,
17};
18use crate::{
19 codecs::{Encoder, EncodingConfig},
20 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
21 http::{Auth, MaybeAuth},
22 sinks::{
23 Healthcheck, VectorSink,
24 util::{
25 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
26 TowerRequestConfig, UriSerde,
27 },
28 },
29 tls::TlsConfig,
30 vector_version,
31};
32
33#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct DatabendConfig {
38 #[configurable(metadata(
40 docs::examples = "databend://localhost:8000/default?sslmode=disable"
41 ))]
42 pub endpoint: UriSerde,
43
44 #[configurable(
46 deprecated = "This option has been deprecated, use arguments in the DSN instead."
47 )]
48 pub tls: Option<TlsConfig>,
49
50 #[configurable(metadata(docs::examples = "mydatabase"))]
52 pub database: Option<String>,
53
54 #[configurable(derived)]
56 pub auth: Option<Auth>,
57
58 #[configurable(metadata(docs::examples = "mytable"))]
60 pub table: String,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub missing_field_as: DatabendMissingFieldAS,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub encoding: DatabendEncodingConfig,
69
70 #[configurable(derived)]
71 #[serde(default)]
72 pub compression: DatabendCompression,
73
74 #[configurable(derived)]
75 #[serde(default)]
76 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub request: TowerRequestConfig,
81
82 #[configurable(derived)]
83 #[serde(
84 default,
85 deserialize_with = "crate::serde::bool_or_struct",
86 skip_serializing_if = "crate::serde::is_default"
87 )]
88 pub acknowledgements: AcknowledgementsConfig,
89}
90
91impl GenerateConfig for DatabendConfig {
92 fn generate_config() -> toml::Value {
93 toml::from_str(
94 r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
95 table = "default"
96 "#,
97 )
98 .unwrap()
99 }
100}
101
102#[async_trait::async_trait]
103#[typetag::serde(name = "databend")]
104impl SinkConfig for DatabendConfig {
105 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
106 let ua = format!("vector/{}", vector_version());
107 let auth = self.auth.choose_one(&self.endpoint.auth)?;
108 let authority = self
109 .endpoint
110 .uri
111 .authority()
112 .ok_or("Endpoint missing authority")?;
113 let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
114 Some("databend") => self.endpoint.to_string(),
115 Some("http") => format!("databend://{authority}/?sslmode=disable"),
117 Some("https") => format!("databend://{authority}"),
118 None => {
119 return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
120 }
121 Some(s) => {
122 return Err(format!("Unsupported scheme for Databend endpoint: {s}").into());
123 }
124 };
125 let mut endpoint = url::Url::parse(&endpoint)?;
126 match auth {
127 Some(Auth::Basic { user, password }) => {
128 let _ = endpoint.set_username(&user);
129 let _ = endpoint.set_password(Some(password.inner()));
130 }
131 Some(Auth::Bearer { .. }) => {
132 return Err("Bearer authentication is not supported currently".into());
133 }
134 Some(Auth::Custom { .. }) => {
135 return Err("Custom authentication is not supported currently".into());
136 }
137 None => {}
138 #[cfg(feature = "aws-core")]
139 _ => {}
140 }
141 if let Some(database) = &self.database {
142 endpoint.set_path(&format!("/{database}"));
143 }
144 let endpoint = endpoint.to_string();
145 let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
146 let healthcheck = select_one(health_client).boxed();
147
148 let request_settings = self.request.into_settings();
149 let batch_settings = self.batch.into_batcher_settings()?;
150
151 let mut file_format_options = BTreeMap::new();
152 let compression = match self.compression {
153 DatabendCompression::Gzip => {
154 file_format_options.insert("compression", "GZIP");
155 Compression::gzip_default()
156 }
157 DatabendCompression::None => {
158 file_format_options.insert("compression", "NONE");
159 Compression::None
160 }
161 };
162 let encoding: EncodingConfig = self.encoding.clone().into();
163 let serializer = match self.encoding.config() {
164 DatabendSerializerConfig::Json(_) => {
165 file_format_options.insert("type", "NDJSON");
166 file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
167 encoding.build()?
168 }
169 DatabendSerializerConfig::Csv(_) => {
170 file_format_options.insert("type", "CSV");
171 file_format_options.insert("field_delimiter", ",");
172 file_format_options.insert("record_delimiter", "\n");
173 file_format_options.insert("skip_header", "0");
174 encoding.build()?
175 }
176 };
177 let framer = FramingConfig::NewlineDelimited.build();
178 let transformer = encoding.transformer();
179
180 let mut copy_options = BTreeMap::new();
181 copy_options.insert("purge", "true");
182
183 let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
184 let service = DatabendService::new(
185 client,
186 self.table.clone(),
187 file_format_options,
188 copy_options,
189 )?;
190 let service = ServiceBuilder::new()
191 .settings(request_settings, DatabendRetryLogic)
192 .service(service);
193
194 let encoder = Encoder::<Framer>::new(framer, serializer);
195 let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
196
197 let sink = DatabendSink::new(batch_settings, request_builder, service);
198
199 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
200 }
201
202 fn input(&self) -> Input {
203 Input::log()
204 }
205
206 fn acknowledgements(&self) -> &AcknowledgementsConfig {
207 &self.acknowledgements
208 }
209}
210
211async fn select_one(client: Arc<DatabendAPIClient>) -> crate::Result<()> {
212 client.query_all("SELECT 1").await?;
213 Ok(())
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn generate_config() {
222 crate::test_util::test_generate_config::<DatabendConfig>();
223 }
224
225 #[test]
226 fn parse_config() {
227 let cfg = toml::from_str::<DatabendConfig>(
228 r#"
229 endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
230 table = "mytable"
231 "#,
232 )
233 .unwrap();
234 assert_eq!(
235 cfg.endpoint.uri,
236 "databend://localhost:8000/mydatabase?sslmode=disable"
237 );
238 assert_eq!(cfg.table, "mytable");
239 assert!(matches!(
240 cfg.encoding.config(),
241 DatabendSerializerConfig::Json(_)
242 ));
243 assert!(matches!(cfg.compression, DatabendCompression::None));
244 }
245
246 #[test]
247 fn parse_config_with_encoding_compression() {
248 let cfg = toml::from_str::<DatabendConfig>(
249 r#"
250 endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
251 table = "mytable"
252 encoding.codec = "csv"
253 encoding.csv.fields = ["host", "timestamp", "message"]
254 compression = "gzip"
255 "#,
256 )
257 .unwrap();
258 assert_eq!(
259 cfg.endpoint.uri,
260 "databend://localhost:8000/mydatabase?sslmode=disable"
261 );
262 assert_eq!(cfg.table, "mytable");
263 assert!(matches!(
264 cfg.encoding.config(),
265 DatabendSerializerConfig::Csv(_)
266 ));
267 assert!(matches!(cfg.compression, DatabendCompression::Gzip));
268 }
269}