vector/sinks/databend/
config.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use databend_client::APIClient as DatabendAPIClient;
5use futures::future::FutureExt;
6use tower::ServiceBuilder;
7use vector_lib::codecs::encoding::{Framer, FramingConfig};
8use vector_lib::configurable::{component::GenerateConfig, configurable_component};
9
10use crate::{
11    codecs::{Encoder, EncodingConfig},
12    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
13    http::{Auth, MaybeAuth},
14    sinks::{
15        util::{
16            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
17            TowerRequestConfig, UriSerde,
18        },
19        Healthcheck, VectorSink,
20    },
21    tls::TlsConfig,
22    vector_version,
23};
24
25use super::{
26    compression::DatabendCompression,
27    encoding::{DatabendEncodingConfig, DatabendMissingFieldAS, DatabendSerializerConfig},
28    request_builder::DatabendRequestBuilder,
29    service::{DatabendRetryLogic, DatabendService},
30    sink::DatabendSink,
31};
32
33/// Configuration for the `databend` sink.
34#[configurable_component(sink("databend", "Deliver log data to a Databend database."))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct DatabendConfig {
38    /// The DSN of the Databend server.
39    #[configurable(metadata(
40        docs::examples = "databend://localhost:8000/default?sslmode=disable"
41    ))]
42    pub endpoint: UriSerde,
43
44    /// The TLS configuration to use when connecting to the Databend server.
45    #[configurable(
46        deprecated = "This option has been deprecated, use arguments in the DSN instead."
47    )]
48    pub tls: Option<TlsConfig>,
49
50    /// The database that contains the table that data is inserted into. Overrides the database in DSN.
51    #[configurable(metadata(docs::examples = "mydatabase"))]
52    pub database: Option<String>,
53
54    /// The username and password to authenticate with. Overrides the username and password in DSN.
55    #[configurable(derived)]
56    pub auth: Option<Auth>,
57
58    /// The table that data is inserted into.
59    #[configurable(metadata(docs::examples = "mytable"))]
60    pub table: String,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    pub missing_field_as: DatabendMissingFieldAS,
65
66    #[configurable(derived)]
67    #[serde(default)]
68    pub encoding: DatabendEncodingConfig,
69
70    #[configurable(derived)]
71    #[serde(default)]
72    pub compression: DatabendCompression,
73
74    #[configurable(derived)]
75    #[serde(default)]
76    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub request: TowerRequestConfig,
81
82    #[configurable(derived)]
83    #[serde(
84        default,
85        deserialize_with = "crate::serde::bool_or_struct",
86        skip_serializing_if = "crate::serde::is_default"
87    )]
88    pub acknowledgements: AcknowledgementsConfig,
89}
90
91impl GenerateConfig for DatabendConfig {
92    fn generate_config() -> toml::Value {
93        toml::from_str(
94            r#"endpoint = "databend://localhost:8000/default?sslmode=disable"
95            table = "default"
96        "#,
97        )
98        .unwrap()
99    }
100}
101
102#[async_trait::async_trait]
103#[typetag::serde(name = "databend")]
104impl SinkConfig for DatabendConfig {
105    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
106        let ua = format!("vector/{}", vector_version());
107        let auth = self.auth.choose_one(&self.endpoint.auth)?;
108        let authority = self
109            .endpoint
110            .uri
111            .authority()
112            .ok_or("Endpoint missing authority")?;
113        let endpoint = match self.endpoint.uri.scheme().map(|s| s.as_str()) {
114            Some("databend") => self.endpoint.to_string(),
115            // for backward compatibility, build DSN from endpoint
116            Some("http") => format!("databend://{authority}/?sslmode=disable"),
117            Some("https") => format!("databend://{authority}"),
118            None => {
119                return Err("Missing scheme for Databend endpoint. Expected `databend`.".into());
120            }
121            Some(s) => {
122                return Err(format!("Unsupported scheme for Databend endpoint: {s}").into());
123            }
124        };
125        let mut endpoint = url::Url::parse(&endpoint)?;
126        match auth {
127            Some(Auth::Basic { user, password }) => {
128                let _ = endpoint.set_username(&user);
129                let _ = endpoint.set_password(Some(password.inner()));
130            }
131            Some(Auth::Bearer { .. }) => {
132                return Err("Bearer authentication is not supported currently".into());
133            }
134            None => {}
135            #[cfg(feature = "aws-core")]
136            _ => {}
137        }
138        if let Some(database) = &self.database {
139            endpoint.set_path(&format!("/{database}"));
140        }
141        let endpoint = endpoint.to_string();
142        let health_client = DatabendAPIClient::new(&endpoint, Some(ua.clone())).await?;
143        let healthcheck = select_one(health_client).boxed();
144
145        let request_settings = self.request.into_settings();
146        let batch_settings = self.batch.into_batcher_settings()?;
147
148        let mut file_format_options = BTreeMap::new();
149        let compression = match self.compression {
150            DatabendCompression::Gzip => {
151                file_format_options.insert("compression", "GZIP");
152                Compression::gzip_default()
153            }
154            DatabendCompression::None => {
155                file_format_options.insert("compression", "NONE");
156                Compression::None
157            }
158        };
159        let encoding: EncodingConfig = self.encoding.clone().into();
160        let serializer = match self.encoding.config() {
161            DatabendSerializerConfig::Json(_) => {
162                file_format_options.insert("type", "NDJSON");
163                file_format_options.insert("missing_field_as", self.missing_field_as.as_str());
164                encoding.build()?
165            }
166            DatabendSerializerConfig::Csv(_) => {
167                file_format_options.insert("type", "CSV");
168                file_format_options.insert("field_delimiter", ",");
169                file_format_options.insert("record_delimiter", "\n");
170                file_format_options.insert("skip_header", "0");
171                encoding.build()?
172            }
173        };
174        let framer = FramingConfig::NewlineDelimited.build();
175        let transformer = encoding.transformer();
176
177        let mut copy_options = BTreeMap::new();
178        copy_options.insert("purge", "true");
179
180        let client = DatabendAPIClient::new(&endpoint, Some(ua)).await?;
181        let service = DatabendService::new(
182            client,
183            self.table.clone(),
184            file_format_options,
185            copy_options,
186        )?;
187        let service = ServiceBuilder::new()
188            .settings(request_settings, DatabendRetryLogic)
189            .service(service);
190
191        let encoder = Encoder::<Framer>::new(framer, serializer);
192        let request_builder = DatabendRequestBuilder::new(compression, (transformer, encoder));
193
194        let sink = DatabendSink::new(batch_settings, request_builder, service);
195
196        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
197    }
198
199    fn input(&self) -> Input {
200        Input::log()
201    }
202
203    fn acknowledgements(&self) -> &AcknowledgementsConfig {
204        &self.acknowledgements
205    }
206}
207
208async fn select_one(client: Arc<DatabendAPIClient>) -> crate::Result<()> {
209    client.query_all("SELECT 1").await?;
210    Ok(())
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn generate_config() {
219        crate::test_util::test_generate_config::<DatabendConfig>();
220    }
221
222    #[test]
223    fn parse_config() {
224        let cfg = toml::from_str::<DatabendConfig>(
225            r#"
226            endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
227            table = "mytable"
228        "#,
229        )
230        .unwrap();
231        assert_eq!(
232            cfg.endpoint.uri,
233            "databend://localhost:8000/mydatabase?sslmode=disable"
234        );
235        assert_eq!(cfg.table, "mytable");
236        assert!(matches!(
237            cfg.encoding.config(),
238            DatabendSerializerConfig::Json(_)
239        ));
240        assert!(matches!(cfg.compression, DatabendCompression::None));
241    }
242
243    #[test]
244    fn parse_config_with_encoding_compression() {
245        let cfg = toml::from_str::<DatabendConfig>(
246            r#"
247            endpoint = "databend://localhost:8000/mydatabase?sslmode=disable"
248            table = "mytable"
249            encoding.codec = "csv"
250            encoding.csv.fields = ["host", "timestamp", "message"]
251            compression = "gzip"
252        "#,
253        )
254        .unwrap();
255        assert_eq!(
256            cfg.endpoint.uri,
257            "databend://localhost:8000/mydatabase?sslmode=disable"
258        );
259        assert_eq!(cfg.table, "mytable");
260        assert!(matches!(
261            cfg.encoding.config(),
262            DatabendSerializerConfig::Csv(_)
263        ));
264        assert!(matches!(cfg.compression, DatabendCompression::Gzip));
265    }
266}