vector/sinks/clickhouse/
config.rs

1//! Configuration for the `Clickhouse` sink.
2
3use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::encoding::format::SchemaProvider;
8use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig};
9
10use super::{
11    request_builder::ClickhouseRequestBuilder,
12    service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
13    sink::{ClickhouseSink, PartitionKey},
14};
15use crate::{
16    http::{Auth, HttpClient, MaybeAuth},
17    sinks::{
18        prelude::*,
19        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
20    },
21};
22
23/// Data format.
24///
25/// The format used to parse input/output data.
26///
27/// [formats]: https://clickhouse.com/docs/en/interfaces/formats
28#[configurable_component]
29#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Hash)]
30#[serde(rename_all = "snake_case")]
31#[derivative(Default)]
32#[allow(clippy::enum_variant_names)]
33pub enum Format {
34    #[derivative(Default)]
35    /// JSONEachRow.
36    JsonEachRow,
37
38    /// JSONAsObject.
39    JsonAsObject,
40
41    /// JSONAsString.
42    JsonAsString,
43
44    /// ArrowStream (beta).
45    #[configurable(metadata(status = "beta"))]
46    ArrowStream,
47}
48
49impl fmt::Display for Format {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        match self {
52            Format::JsonEachRow => write!(f, "JSONEachRow"),
53            Format::JsonAsObject => write!(f, "JSONAsObject"),
54            Format::JsonAsString => write!(f, "JSONAsString"),
55            Format::ArrowStream => write!(f, "ArrowStream"),
56        }
57    }
58}
59
60/// Configuration for the `clickhouse` sink.
61#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
62#[derive(Clone, Debug, Default)]
63#[serde(deny_unknown_fields)]
64pub struct ClickhouseConfig {
65    /// The endpoint of the ClickHouse server.
66    #[serde(alias = "host")]
67    #[configurable(metadata(docs::examples = "http://localhost:8123"))]
68    pub endpoint: UriSerde,
69
70    /// The table that data is inserted into.
71    #[configurable(metadata(docs::examples = "mytable"))]
72    pub table: Template,
73
74    /// The database that contains the table that data is inserted into.
75    #[configurable(metadata(docs::examples = "mydatabase"))]
76    pub database: Option<Template>,
77
78    /// The format to parse input data.
79    #[serde(default)]
80    pub format: Format,
81
82    /// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
83    ///
84    /// If left unspecified, use the default provided by the `ClickHouse` server.
85    #[serde(default)]
86    pub skip_unknown_fields: Option<bool>,
87
88    /// Sets `date_time_input_format` to `best_effort`, allowing ClickHouse to properly parse RFC3339/ISO 8601.
89    #[serde(default)]
90    pub date_time_best_effort: bool,
91
92    /// Sets `insert_distributed_one_random_shard`, allowing ClickHouse to insert data into a random shard when using Distributed Table Engine.
93    #[serde(default)]
94    pub insert_random_shard: bool,
95
96    #[configurable(derived)]
97    #[serde(default = "Compression::gzip_default")]
98    pub compression: Compression,
99
100    #[configurable(derived)]
101    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
102    pub encoding: Transformer,
103
104    /// The batch encoding configuration for encoding events in batches.
105    ///
106    /// When specified, events are encoded together as a single batch.
107    /// This is mutually exclusive with per-event encoding based on the `format` field.
108    #[configurable(derived)]
109    #[serde(default)]
110    pub batch_encoding: Option<BatchSerializerConfig>,
111
112    #[configurable(derived)]
113    #[serde(default)]
114    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
115
116    #[configurable(derived)]
117    pub auth: Option<Auth>,
118
119    #[configurable(derived)]
120    #[serde(default)]
121    pub request: TowerRequestConfig,
122
123    #[configurable(derived)]
124    pub tls: Option<TlsConfig>,
125
126    #[configurable(derived)]
127    #[serde(
128        default,
129        deserialize_with = "crate::serde::bool_or_struct",
130        skip_serializing_if = "crate::serde::is_default"
131    )]
132    pub acknowledgements: AcknowledgementsConfig,
133
134    #[configurable(derived)]
135    #[serde(default)]
136    pub query_settings: QuerySettingsConfig,
137}
138
139/// Query settings for the `clickhouse` sink.
140#[configurable_component]
141#[derive(Clone, Copy, Debug, Default)]
142#[serde(deny_unknown_fields)]
143pub struct QuerySettingsConfig {
144    /// Async insert-related settings.
145    #[serde(default)]
146    pub async_insert_settings: AsyncInsertSettingsConfig,
147}
148
149/// Async insert related settings for the `clickhouse` sink.
150#[configurable_component]
151#[derive(Clone, Copy, Debug, Default)]
152#[serde(deny_unknown_fields)]
153pub struct AsyncInsertSettingsConfig {
154    /// Sets `async_insert`, allowing ClickHouse to queue the inserted data and later flush to table in the background.
155    ///
156    /// If left unspecified, use the default provided by the `ClickHouse` server.
157    #[serde(default)]
158    pub enabled: Option<bool>,
159
160    /// Sets `wait_for`, allowing ClickHouse to wait for processing of asynchronous insertion.
161    ///
162    /// If left unspecified, use the default provided by the `ClickHouse` server.
163    #[serde(default)]
164    pub wait_for_processing: Option<bool>,
165
166    /// Sets 'wait_for_processing_timeout`, to control the timeout for waiting for processing asynchronous insertion.
167    ///
168    /// If left unspecified, use the default provided by the `ClickHouse` server.
169    #[serde(default)]
170    pub wait_for_processing_timeout: Option<u64>,
171
172    /// Sets `async_insert_deduplicate`, allowing ClickHouse to perform deduplication when inserting blocks in the replicated table.
173    ///
174    /// If left unspecified, use the default provided by the `ClickHouse` server.
175    #[serde(default)]
176    pub deduplicate: Option<bool>,
177
178    /// Sets `async_insert_max_data_size`, the maximum size in bytes of unparsed data collected per query before being inserted.
179    ///
180    /// If left unspecified, use the default provided by the `ClickHouse` server.
181    #[serde(default)]
182    pub max_data_size: Option<u64>,
183
184    /// Sets `async_insert_max_query_number`, the maximum number of insert queries before being inserted
185    ///
186    /// If left unspecified, use the default provided by the `ClickHouse` server.
187    #[serde(default)]
188    pub max_query_number: Option<u64>,
189}
190
191impl_generate_config_from_default!(ClickhouseConfig);
192
193#[async_trait::async_trait]
194#[typetag::serde(name = "clickhouse")]
195impl SinkConfig for ClickhouseConfig {
196    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
197        let endpoint = self.endpoint.with_default_parts().uri;
198
199        let auth = self.auth.choose_one(&self.endpoint.auth)?;
200
201        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
202
203        let client = HttpClient::new(tls_settings, &cx.proxy)?;
204
205        let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
206            auth: auth.clone(),
207            endpoint: endpoint.clone(),
208            skip_unknown_fields: self.skip_unknown_fields,
209            date_time_best_effort: self.date_time_best_effort,
210            insert_random_shard: self.insert_random_shard,
211            compression: self.compression,
212            query_settings: self.query_settings,
213        };
214
215        let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
216            HttpService::new(client.clone(), clickhouse_service_request_builder);
217
218        let request_limits = self.request.into_settings();
219
220        let service = ServiceBuilder::new()
221            .settings(request_limits, ClickhouseRetryLogic::default())
222            .service(service);
223
224        let batch_settings = self.batch.into_batcher_settings()?;
225
226        let database = self.database.clone().unwrap_or_else(|| {
227            "default"
228                .try_into()
229                .expect("'default' should be a valid template")
230        });
231
232        // Resolve the encoding strategy (format + encoder) based on configuration
233        let (format, encoder_kind) = self
234            .resolve_strategy(&client, &endpoint, &database, auth.as_ref())
235            .await?;
236
237        let request_builder = ClickhouseRequestBuilder {
238            compression: self.compression,
239            encoder: (self.encoding.clone(), encoder_kind),
240        };
241
242        let sink = ClickhouseSink::new(
243            batch_settings,
244            service,
245            database,
246            self.table.clone(),
247            format,
248            request_builder,
249        );
250
251        let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
252
253        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
254    }
255
256    fn input(&self) -> Input {
257        Input::log()
258    }
259
260    fn acknowledgements(&self) -> &AcknowledgementsConfig {
261        &self.acknowledgements
262    }
263}
264
265impl ClickhouseConfig {
266    /// Resolves the encoding strategy (format + encoder) based on configuration.
267    ///
268    /// This method determines the appropriate ClickHouse format and Vector encoder
269    /// based on the user's configuration, ensuring they are consistent.
270    async fn resolve_strategy(
271        &self,
272        client: &HttpClient,
273        endpoint: &Uri,
274        database: &Template,
275        auth: Option<&Auth>,
276    ) -> crate::Result<(Format, vector_lib::codecs::EncoderKind)> {
277        use vector_lib::codecs::EncoderKind;
278        use vector_lib::codecs::{
279            JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer,
280        };
281
282        if let Some(batch_encoding) = &self.batch_encoding {
283            use vector_lib::codecs::{BatchEncoder, BatchSerializer};
284
285            // Validate that batch_encoding is only compatible with ArrowStream format
286            if self.format != Format::ArrowStream {
287                return Err(format!(
288                    "'batch_encoding' is only compatible with 'format: arrow_stream'. Found 'format: {}'.",
289                    self.format
290                )
291                .into());
292            }
293
294            let mut arrow_config = match batch_encoding {
295                BatchSerializerConfig::ArrowStream(config) => config.clone(),
296            };
297
298            self.resolve_arrow_schema(
299                client,
300                endpoint.to_string(),
301                database,
302                auth,
303                &mut arrow_config,
304            )
305            .await?;
306
307            let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config);
308            let arrow_serializer = resolved_batch_config.build()?;
309            let batch_serializer = BatchSerializer::Arrow(arrow_serializer);
310            let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer));
311
312            return Ok((Format::ArrowStream, encoder));
313        }
314
315        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(
316            NewlineDelimitedEncoderConfig.build().into(),
317            JsonSerializerConfig::default().build().into(),
318        )));
319
320        Ok((self.format, encoder))
321    }
322
323    async fn resolve_arrow_schema(
324        &self,
325        client: &HttpClient,
326        endpoint: String,
327        database: &Template,
328        auth: Option<&Auth>,
329        config: &mut ArrowStreamSerializerConfig,
330    ) -> crate::Result<()> {
331        use super::arrow;
332
333        if self.table.is_dynamic() || database.is_dynamic() {
334            return Err(
335                "Arrow codec requires a static table and database. Dynamic schema inference is not supported."
336                    .into(),
337            );
338        }
339
340        let table_str = self.table.get_ref();
341        let database_str = database.get_ref();
342
343        debug!(
344            "Fetching schema for table {}.{} at startup.",
345            database_str, table_str
346        );
347
348        let provider = arrow::ClickHouseSchemaProvider::new(
349            client.clone(),
350            endpoint,
351            database_str.to_string(),
352            table_str.to_string(),
353            auth.cloned(),
354        );
355
356        let schema = provider.get_schema().await.map_err(|e| {
357            format!(
358                "Failed to fetch schema for {}.{}: {}.",
359                database_str, table_str, e
360            )
361        })?;
362
363        config.schema = Some(schema);
364
365        debug!(
366            "Successfully fetched Arrow schema with {} fields.",
367            config
368                .schema
369                .as_ref()
370                .map(|s| s.fields().len())
371                .unwrap_or(0)
372        );
373
374        Ok(())
375    }
376}
377
378fn get_healthcheck_uri(endpoint: &Uri) -> String {
379    let mut uri = endpoint.to_string();
380    if !uri.ends_with('/') {
381        uri.push('/');
382    }
383    uri.push_str("?query=SELECT%201");
384    uri
385}
386
387async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
388    let uri = get_healthcheck_uri(&endpoint);
389    let mut request = Request::get(uri).body(Body::empty()).unwrap();
390
391    if let Some(auth) = auth {
392        auth.apply(&mut request);
393    }
394
395    let response = client.send(request).await?;
396
397    match response.status() {
398        StatusCode::OK => Ok(()),
399        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
407
408    #[test]
409    fn generate_config() {
410        crate::test_util::test_generate_config::<ClickhouseConfig>();
411    }
412
413    #[test]
414    fn test_get_healthcheck_uri() {
415        assert_eq!(
416            get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
417            "http://localhost:8123/?query=SELECT%201"
418        );
419        assert_eq!(
420            get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
421            "http://localhost:8123/?query=SELECT%201"
422        );
423        assert_eq!(
424            get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
425            "http://localhost:8123/path/?query=SELECT%201"
426        );
427    }
428
429    /// Helper to create a minimal ClickhouseConfig for testing
430    fn create_test_config(
431        format: Format,
432        batch_encoding: Option<BatchSerializerConfig>,
433    ) -> ClickhouseConfig {
434        ClickhouseConfig {
435            endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
436            table: "test_table".try_into().unwrap(),
437            database: Some("test_db".try_into().unwrap()),
438            format,
439            batch_encoding,
440            ..Default::default()
441        }
442    }
443
444    #[tokio::test]
445    async fn test_format_selection_with_batch_encoding() {
446        use crate::http::HttpClient;
447        use crate::tls::TlsSettings;
448
449        // Create minimal dependencies for resolve_strategy
450        let tls = TlsSettings::default();
451        let client = HttpClient::new(tls, &Default::default()).unwrap();
452        let endpoint: http::Uri = "http://localhost:8123".parse().unwrap();
453        let database: Template = "test_db".try_into().unwrap();
454
455        // Test incompatible formats - should all return errors
456        let incompatible_formats = vec![
457            (Format::JsonEachRow, "json_each_row"),
458            (Format::JsonAsObject, "json_as_object"),
459            (Format::JsonAsString, "json_as_string"),
460        ];
461
462        for (format, format_name) in incompatible_formats {
463            let config = create_test_config(
464                format,
465                Some(BatchSerializerConfig::ArrowStream(
466                    ArrowStreamSerializerConfig::default(),
467                )),
468            );
469
470            let result = config
471                .resolve_strategy(&client, &endpoint, &database, None)
472                .await;
473
474            assert!(
475                result.is_err(),
476                "Expected error for format {} with batch_encoding, but got success",
477                format_name
478            );
479        }
480    }
481
482    #[test]
483    fn test_format_selection_without_batch_encoding() {
484        // When batch_encoding is None, the configured format should be used
485        let configs = vec![
486            Format::JsonEachRow,
487            Format::JsonAsObject,
488            Format::JsonAsString,
489            Format::ArrowStream,
490        ];
491
492        for format in configs {
493            let config = create_test_config(format, None);
494
495            assert!(
496                config.batch_encoding.is_none(),
497                "batch_encoding should be None for format {:?}",
498                format
499            );
500            assert_eq!(
501                config.format, format,
502                "format should match configured value"
503            );
504        }
505    }
506}