vector/sinks/databend/
config.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use databend_client::APIClient as DatabendAPIClient;
4use futures::future::FutureExt;
5use tower::ServiceBuilder;
6use vector_lib::{
7    codecs::encoding::{Framer, FramingConfig},
8    configurable::{component::GenerateConfig, configurable_component},
9};
10
11use super::{
12    compression::DatabendCompression,
13    encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
14    request_builder::DatabendRequestBuilder,
15    service::{DatabendRetryLogic, DatabendService},
16    sink::DatabendSink,
17};
18use crate::{
19    codecs::{Encoder, EncodingConfig},
20    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
21    http::{Auth, MaybeAuth},
22    sinks::{
23        Healthcheck, VectorSink,
24        util::{
25            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
26            TowerRequestConfig, UriSerde,
27        },
28    },
29    tls::TlsConfig,
30    vector_version,
31};
32
33/// Configuration for the `databend` sink.
34#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct DatabendConfig {
38    /// The DSN of the Databend server.
39    #[configurable(metadata(
40        docs::examples = "databend://localhost:8000/default?sslmode=disable"
41    ))]
42    pub endpoint: UriSerde,
43
44    /// The TLS configuration to use when connecting to the Databend server.
45    #[configurable(
46        deprecated = "This option has been deprecated, use arguments in the DSN instead."
47    )]
48    pub tls: Option<TlsConfig>,
49
50    /// The database that contains the table that data is inserted into. Overrides the database in DSN.
51    #[configurable(metadata(docs::examples = "mydatabase"))]
52    pub database: Option<String>,
53
54    /// The username and password to authenticate with. Overrides the username and password in DSN.
55    #[configurable(derived)]
56    pub auth: Option<Auth>,
57
58    /// The table that data is inserted into.
59    #[configurable(metadata(docs::examples = "mytable"))]
60    pub table: String,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    pub missing_field_as: DatabendMissingFieldAS,
65
66    #[configurable(derived)]
67    #[serde(default)]
68    pub encoding: DatabendEncodingConfig,
69
70    #[configurable(derived)]
71    #[serde(default)]
72    pub compression: DatabendCompression,
73
74    #[configurable(derived)]
75    #[serde(default)]
76    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub request: TowerRequestConfig,
81
82    #[configurable(derived)]
83    #[serde(
84        default,
85        deserialize_with = "crate::serde::bool_or_struct",
86        skip_serializing_if = "crate::serde::is_default"
87    )]
88    pub acknowledgements: AcknowledgementsConfig,
89}
90
91impl GenerateConfig for DatabendConfig {
92    fn generate_config() -> toml::Value {
93        toml::from_str(
94            r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
95            table = "default"
96        "#,
97        )
98        .unwrap()
99    }
100}
101
102#[async_trait::async_trait]
103#[typetag::serde(name = "databend")]
104impl SinkConfig for DatabendConfig {
105    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
106        let ua = format!("vector/{}", vector_version());
107        let auth = self.auth.choose_one(&self.endpoint.auth)?;
108        let authority = self
109            .endpoint
110            .uri
111            .authority()
112            .ok_or("Endpoint missing authority")?;
113        let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
114            Some("databend") => self.endpoint.to_string(),
115            // for backward compatibility, build DSN from endpoint
116            Some("http") => format!("databend://{authority}/?sslmode=disable"),
117            Some("https") => format!("databend://{authority}"),
118            None => {
119                return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
120            }
121            Some(s) => {
122                return Err(format!("Unsupported scheme for Databend endpoint: {s}").into());
123            }
124        };
125        let mut endpoint = url::Url::parse(&endpoint)?;
126        match auth {
127            Some(Auth::Basic { user, password }) => {
128                let _ = endpoint.set_username(&user);
129                let _ = endpoint.set_password(Some(password.inner()));
130            }
131            Some(Auth::Bearer { .. }) => {
132                return Err("Bearer authentication is not supported currently".into());
133            }
134            Some(Auth::Custom { .. }) => {
135                return Err("Custom authentication is not supported currently".into());
136            }
137            None => {}
138            #[cfg(feature = "aws-core")]
139            _ => {}
140        }
141        if let Some(database) = &self.database {
142            endpoint.set_path(&format!("/{database}"));
143        }
144        let endpoint = endpoint.to_string();
145        let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
146        let healthcheck = select_one(health_client).boxed();
147
148        let request_settings = self.request.into_settings();
149        let batch_settings = self.batch.into_batcher_settings()?;
150
151        let mut file_format_options = BTreeMap::new();
152        let compression = match self.compression {
153            DatabendCompression::Gzip => {
154                file_format_options.insert("compression", "GZIP");
155                Compression::gzip_default()
156            }
157            DatabendCompression::None => {
158                file_format_options.insert("compression", "NONE");
159                Compression::None
160            }
161        };
162        let encoding: EncodingConfig = self.encoding.clone().into();
163        let serializer = match self.encoding.config() {
164            DatabendSerializerConfig::Json(_) => {
165                file_format_options.insert("type", "NDJSON");
166                file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
167                encoding.build()?
168            }
169            DatabendSerializerConfig::Csv(_) => {
170                file_format_options.insert("type", "CSV");
171                file_format_options.insert("field_delimiter", ",");
172                file_format_options.insert("record_delimiter", "\n");
173                file_format_options.insert("skip_header", "0");
174                encoding.build()?
175            }
176        };
177        let framer = FramingConfig::NewlineDelimited.build();
178        let transformer = encoding.transformer();
179
180        let mut copy_options = BTreeMap::new();
181        copy_options.insert("purge", "true");
182
183        let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
184        let service = DatabendService::new(
185            client,
186            self.table.clone(),
187            file_format_options,
188            copy_options,
189        )?;
190        let service = ServiceBuilder::new()
191            .settings(request_settings, DatabendRetryLogic)
192            .service(service);
193
194        let encoder = Encoder::<Framer>::new(framer, serializer);
195        let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
196
197        let sink = DatabendSink::new(batch_settings, request_builder, service);
198
199        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
200    }
201
202    fn input(&self) -> Input {
203        Input::log()
204    }
205
206    fn acknowledgements(&self) -> &AcknowledgementsConfig {
207        &self.acknowledgements
208    }
209}
210
211async fn select_one(client: Arc<DatabendAPIClient>) -> crate::Result<()> {
212    client.query_all("SELECT 1").await?;
213    Ok(())
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn generate_config() {
222        crate::test_util::test_generate_config::<DatabendConfig>();
223    }
224
225    #[test]
226    fn parse_config() {
227        let cfg = toml::from_str::<DatabendConfig>(
228            r#"
229            endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
230            table = "mytable"
231        "#,
232        )
233        .unwrap();
234        assert_eq!(
235            cfg.endpoint.uri,
236            "databend://localhost:8000/mydatabase?sslmode=disable"
237        );
238        assert_eq!(cfg.table, "mytable");
239        assert!(matches!(
240            cfg.encoding.config(),
241            DatabendSerializerConfig::Json(_)
242        ));
243        assert!(matches!(cfg.compression, DatabendCompression::None));
244    }
245
246    #[test]
247    fn parse_config_with_encoding_compression() {
248        let cfg = toml::from_str::<DatabendConfig>(
249            r#"
250            endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
251            table = "mytable"
252            encoding.codec = "csv"
253            encoding.csv.fields = ["host", "timestamp", "message"]
254            compression = "gzip"
255        "#,
256        )
257        .unwrap();
258        assert_eq!(
259            cfg.endpoint.uri,
260            "databend://localhost:8000/mydatabase?sslmode=disable"
261        );
262        assert_eq!(cfg.table, "mytable");
263        assert!(matches!(
264            cfg.encoding.config(),
265            DatabendSerializerConfig::Csv(_)
266        ));
267        assert!(matches!(cfg.compression, DatabendCompression::Gzip));
268    }
269}