vector/sinks/clickhouse/
config.rs

1//! Configuration for the `Clickhouse` sink.
2
3use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::encoding::format::SchemaProvider;
8use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig};
9
10use super::{
11    request_builder::ClickhouseRequestBuilder,
12    service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
13    sink::{ClickhouseSink, PartitionKey},
14};
15use crate::{
16    http::{Auth, HttpClient, MaybeAuth},
17    sinks::{
18        prelude::*,
19        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
20    },
21};
22
23/// Data format.
24///
25/// The format used to parse input/output data.
26///
27/// [formats]: https://clickhouse.com/docs/en/interfaces/formats
28#[configurable_component]
29#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash)]
30#[serde(rename_all = "snake_case")]
31#[allow(clippy::enum_variant_names)]
32pub enum Format {
33    #[default]
34    /// JSONEachRow.
35    JsonEachRow,
36
37    /// JSONAsObject.
38    JsonAsObject,
39
40    /// JSONAsString.
41    JsonAsString,
42
43    /// ArrowStream (beta).
44    #[configurable(metadata(status = "beta"))]
45    ArrowStream,
46}
47
48impl fmt::Display for Format {
49    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50        match self {
51            Format::JsonEachRow => write!(f, "JSONEachRow"),
52            Format::JsonAsObject => write!(f, "JSONAsObject"),
53            Format::JsonAsString => write!(f, "JSONAsString"),
54            Format::ArrowStream => write!(f, "ArrowStream"),
55        }
56    }
57}
58
59/// Configuration for the `clickhouse` sink.
60#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
61#[derive(Clone, Debug, Default)]
62#[serde(deny_unknown_fields)]
63pub struct ClickhouseConfig {
64    /// The endpoint of the ClickHouse server.
65    #[serde(alias = "host")]
66    #[configurable(metadata(docs::examples = "http://localhost:8123"))]
67    pub endpoint: UriSerde,
68
69    /// The table that data is inserted into.
70    #[configurable(metadata(docs::examples = "mytable"))]
71    pub table: Template,
72
73    /// The database that contains the table that data is inserted into.
74    #[configurable(metadata(docs::examples = "mydatabase"))]
75    pub database: Option<Template>,
76
77    /// The format to parse input data.
78    #[serde(default)]
79    pub format: Format,
80
81    /// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
82    ///
83    /// If left unspecified, use the default provided by the `ClickHouse` server.
84    #[serde(default)]
85    pub skip_unknown_fields: Option<bool>,
86
87    /// Sets `date_time_input_format` to `best_effort`, allowing ClickHouse to properly parse RFC3339/ISO 8601.
88    #[serde(default)]
89    pub date_time_best_effort: bool,
90
91    /// Sets `insert_distributed_one_random_shard`, allowing ClickHouse to insert data into a random shard when using Distributed Table Engine.
92    #[serde(default)]
93    pub insert_random_shard: bool,
94
95    #[configurable(derived)]
96    #[serde(default = "Compression::gzip_default")]
97    pub compression: Compression,
98
99    #[configurable(derived)]
100    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
101    pub encoding: Transformer,
102
103    /// The batch encoding configuration for encoding events in batches.
104    ///
105    /// When specified, events are encoded together as a single batch.
106    /// This is mutually exclusive with per-event encoding based on the `format` field.
107    #[configurable(derived)]
108    #[serde(default)]
109    pub batch_encoding: Option<BatchSerializerConfig>,
110
111    #[configurable(derived)]
112    #[serde(default)]
113    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
114
115    #[configurable(derived)]
116    pub auth: Option<Auth>,
117
118    #[configurable(derived)]
119    #[serde(default)]
120    pub request: TowerRequestConfig,
121
122    #[configurable(derived)]
123    pub tls: Option<TlsConfig>,
124
125    #[configurable(derived)]
126    #[serde(
127        default,
128        deserialize_with = "crate::serde::bool_or_struct",
129        skip_serializing_if = "crate::serde::is_default"
130    )]
131    pub acknowledgements: AcknowledgementsConfig,
132
133    #[configurable(derived)]
134    #[serde(default)]
135    pub query_settings: QuerySettingsConfig,
136}
137
138/// Query settings for the `clickhouse` sink.
139#[configurable_component]
140#[derive(Clone, Copy, Debug, Default)]
141#[serde(deny_unknown_fields)]
142pub struct QuerySettingsConfig {
143    /// Async insert-related settings.
144    #[serde(default)]
145    pub async_insert_settings: AsyncInsertSettingsConfig,
146}
147
148/// Async insert related settings for the `clickhouse` sink.
149#[configurable_component]
150#[derive(Clone, Copy, Debug, Default)]
151#[serde(deny_unknown_fields)]
152pub struct AsyncInsertSettingsConfig {
153    /// Sets `async_insert`, allowing ClickHouse to queue the inserted data and later flush to table in the background.
154    ///
155    /// If left unspecified, use the default provided by the `ClickHouse` server.
156    #[serde(default)]
157    pub enabled: Option<bool>,
158
159    /// Sets `wait_for`, allowing ClickHouse to wait for processing of asynchronous insertion.
160    ///
161    /// If left unspecified, use the default provided by the `ClickHouse` server.
162    #[serde(default)]
163    pub wait_for_processing: Option<bool>,
164
165    /// Sets 'wait_for_processing_timeout`, to control the timeout for waiting for processing asynchronous insertion.
166    ///
167    /// If left unspecified, use the default provided by the `ClickHouse` server.
168    #[serde(default)]
169    pub wait_for_processing_timeout: Option<u64>,
170
171    /// Sets `async_insert_deduplicate`, allowing ClickHouse to perform deduplication when inserting blocks in the replicated table.
172    ///
173    /// If left unspecified, use the default provided by the `ClickHouse` server.
174    #[serde(default)]
175    pub deduplicate: Option<bool>,
176
177    /// Sets `async_insert_max_data_size`, the maximum size in bytes of unparsed data collected per query before being inserted.
178    ///
179    /// If left unspecified, use the default provided by the `ClickHouse` server.
180    #[serde(default)]
181    pub max_data_size: Option<u64>,
182
183    /// Sets `async_insert_max_query_number`, the maximum number of insert queries before being inserted
184    ///
185    /// If left unspecified, use the default provided by the `ClickHouse` server.
186    #[serde(default)]
187    pub max_query_number: Option<u64>,
188}
189
190impl_generate_config_from_default!(ClickhouseConfig);
191
192#[async_trait::async_trait]
193#[typetag::serde(name = "clickhouse")]
194impl SinkConfig for ClickhouseConfig {
195    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
196        let endpoint = self.endpoint.with_default_parts().uri;
197
198        let auth = self.auth.choose_one(&self.endpoint.auth)?;
199
200        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
201
202        let client = HttpClient::new(tls_settings, &cx.proxy)?;
203
204        let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
205            auth: auth.clone(),
206            endpoint: endpoint.clone(),
207            skip_unknown_fields: self.skip_unknown_fields,
208            date_time_best_effort: self.date_time_best_effort,
209            insert_random_shard: self.insert_random_shard,
210            compression: self.compression,
211            query_settings: self.query_settings,
212        };
213
214        let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
215            HttpService::new(client.clone(), clickhouse_service_request_builder);
216
217        let request_limits = self.request.into_settings();
218
219        let service = ServiceBuilder::new()
220            .settings(request_limits, ClickhouseRetryLogic::default())
221            .service(service);
222
223        let batch_settings = self.batch.into_batcher_settings()?;
224
225        let database = self.database.clone().unwrap_or_else(|| {
226            "default"
227                .try_into()
228                .expect("'default' should be a valid template")
229        });
230
231        // Resolve the encoding strategy (format + encoder) based on configuration
232        let (format, encoder_kind) = self
233            .resolve_strategy(&client, &endpoint, &database, auth.as_ref())
234            .await?;
235
236        let request_builder = ClickhouseRequestBuilder {
237            compression: self.compression,
238            encoder: (self.encoding.clone(), encoder_kind),
239        };
240
241        let sink = ClickhouseSink::new(
242            batch_settings,
243            service,
244            database,
245            self.table.clone(),
246            format,
247            request_builder,
248        );
249
250        let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
251
252        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
253    }
254
255    fn input(&self) -> Input {
256        Input::log()
257    }
258
259    fn acknowledgements(&self) -> &AcknowledgementsConfig {
260        &self.acknowledgements
261    }
262}
263
264impl ClickhouseConfig {
265    /// Resolves the encoding strategy (format + encoder) based on configuration.
266    ///
267    /// This method determines the appropriate ClickHouse format and Vector encoder
268    /// based on the user's configuration, ensuring they are consistent.
269    async fn resolve_strategy(
270        &self,
271        client: &HttpClient,
272        endpoint: &Uri,
273        database: &Template,
274        auth: Option<&Auth>,
275    ) -> crate::Result<(Format, vector_lib::codecs::EncoderKind)> {
276        use vector_lib::codecs::EncoderKind;
277        use vector_lib::codecs::{
278            JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer,
279        };
280
281        if let Some(batch_encoding) = &self.batch_encoding {
282            use vector_lib::codecs::BatchEncoder;
283
284            // Validate that batch_encoding is only compatible with ArrowStream format
285            if self.format != Format::ArrowStream {
286                return Err(format!(
287                    "'batch_encoding' is only compatible with 'format: arrow_stream'. Found 'format: {}'.",
288                    self.format
289                )
290                .into());
291            }
292
293            let mut arrow_config = match batch_encoding {
294                BatchSerializerConfig::ArrowStream(config) => config.clone(),
295                #[cfg(feature = "codecs-parquet")]
296                BatchSerializerConfig::Parquet(_) => {
297                    return Err(
298                        "ClickHouse sink does not support Parquet batch encoding. Use 'arrow_stream' instead."
299                            .into(),
300                    );
301                }
302            };
303
304            self.resolve_arrow_schema(
305                client,
306                endpoint.to_string(),
307                database,
308                auth,
309                &mut arrow_config,
310            )
311            .await?;
312
313            let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config);
314            let batch_serializer = resolved_batch_config.build_batch_serializer()?;
315            let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer));
316
317            return Ok((Format::ArrowStream, encoder));
318        }
319
320        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(
321            NewlineDelimitedEncoderConfig.build().into(),
322            JsonSerializerConfig::default().build().into(),
323        )));
324
325        Ok((self.format, encoder))
326    }
327
328    async fn resolve_arrow_schema(
329        &self,
330        client: &HttpClient,
331        endpoint: String,
332        database: &Template,
333        auth: Option<&Auth>,
334        config: &mut ArrowStreamSerializerConfig,
335    ) -> crate::Result<()> {
336        use super::arrow;
337
338        if self.table.is_dynamic() || database.is_dynamic() {
339            return Err(
340                "Arrow codec requires a static table and database. Dynamic schema inference is not supported."
341                    .into(),
342            );
343        }
344
345        let table_str = self.table.get_ref();
346        let database_str = database.get_ref();
347
348        debug!(
349            "Fetching schema for table {}.{} at startup.",
350            database_str, table_str
351        );
352
353        let provider = arrow::ClickHouseSchemaProvider::new(
354            client.clone(),
355            endpoint,
356            database_str.to_string(),
357            table_str.to_string(),
358            auth.cloned(),
359        );
360
361        let schema = provider.get_schema().await.map_err(|e| {
362            format!(
363                "Failed to fetch schema for {}.{}: {}.",
364                database_str, table_str, e
365            )
366        })?;
367
368        config.schema = Some(schema);
369
370        debug!(
371            "Successfully fetched Arrow schema with {} fields.",
372            config
373                .schema
374                .as_ref()
375                .map(|s| s.fields().len())
376                .unwrap_or(0)
377        );
378
379        Ok(())
380    }
381}
382
383fn get_healthcheck_uri(endpoint: &Uri) -> String {
384    let mut uri = endpoint.to_string();
385    if !uri.ends_with('/') {
386        uri.push('/');
387    }
388    uri.push_str("?query=SELECT%201");
389    uri
390}
391
392async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
393    let uri = get_healthcheck_uri(&endpoint);
394    let mut request = Request::get(uri).body(Body::empty()).unwrap();
395
396    if let Some(auth) = auth {
397        auth.apply(&mut request);
398    }
399
400    let response = client.send(request).await?;
401
402    match response.status() {
403        StatusCode::OK => Ok(()),
404        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
412
413    #[test]
414    fn generate_config() {
415        crate::test_util::test_generate_config::<ClickhouseConfig>();
416    }
417
418    #[test]
419    fn test_get_healthcheck_uri() {
420        assert_eq!(
421            get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
422            "http://localhost:8123/?query=SELECT%201"
423        );
424        assert_eq!(
425            get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
426            "http://localhost:8123/?query=SELECT%201"
427        );
428        assert_eq!(
429            get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
430            "http://localhost:8123/path/?query=SELECT%201"
431        );
432    }
433
434    /// Helper to create a minimal ClickhouseConfig for testing
435    fn create_test_config(
436        format: Format,
437        batch_encoding: Option<BatchSerializerConfig>,
438    ) -> ClickhouseConfig {
439        ClickhouseConfig {
440            endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
441            table: "test_table".try_into().unwrap(),
442            database: Some("test_db".try_into().unwrap()),
443            format,
444            batch_encoding,
445            ..Default::default()
446        }
447    }
448
449    #[tokio::test]
450    async fn test_format_selection_with_batch_encoding() {
451        use crate::http::HttpClient;
452        use crate::tls::TlsSettings;
453
454        // Create minimal dependencies for resolve_strategy
455        let tls = TlsSettings::default();
456        let client = HttpClient::new(tls, &Default::default()).unwrap();
457        let endpoint: http::Uri = "http://localhost:8123".parse().unwrap();
458        let database: Template = "test_db".try_into().unwrap();
459
460        // Test incompatible formats - should all return errors
461        let incompatible_formats = vec![
462            (Format::JsonEachRow, "json_each_row"),
463            (Format::JsonAsObject, "json_as_object"),
464            (Format::JsonAsString, "json_as_string"),
465        ];
466
467        for (format, format_name) in incompatible_formats {
468            let config = create_test_config(
469                format,
470                Some(BatchSerializerConfig::ArrowStream(
471                    ArrowStreamSerializerConfig::default(),
472                )),
473            );
474
475            let result = config
476                .resolve_strategy(&client, &endpoint, &database, None)
477                .await;
478
479            assert!(
480                result.is_err(),
481                "Expected error for format {} with batch_encoding, but got success",
482                format_name
483            );
484        }
485    }
486
487    #[test]
488    fn test_format_selection_without_batch_encoding() {
489        // When batch_encoding is None, the configured format should be used
490        let configs = vec![
491            Format::JsonEachRow,
492            Format::JsonAsObject,
493            Format::JsonAsString,
494            Format::ArrowStream,
495        ];
496
497        for format in configs {
498            let config = create_test_config(format, None);
499
500            assert!(
501                config.batch_encoding.is_none(),
502                "batch_encoding should be None for format {:?}",
503                format
504            );
505            assert_eq!(
506                config.format, format,
507                "format should match configured value"
508            );
509        }
510    }
511}