1use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::encoding::format::SchemaProvider;
8use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig};
9
10use super::{
11 request_builder::ClickhouseRequestBuilder,
12 service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
13 sink::{ClickhouseSink, PartitionKey},
14};
15use crate::{
16 http::{Auth, HttpClient, MaybeAuth},
17 sinks::{
18 prelude::*,
19 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
20 },
21};
22
23#[configurable_component]
29#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash)]
30#[serde(rename_all = "snake_case")]
31#[allow(clippy::enum_variant_names)]
32pub enum Format {
33 #[default]
34 JsonEachRow,
36
37 JsonAsObject,
39
40 JsonAsString,
42
43 #[configurable(metadata(status = "beta"))]
45 ArrowStream,
46}
47
48impl fmt::Display for Format {
49 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50 match self {
51 Format::JsonEachRow => write!(f, "JSONEachRow"),
52 Format::JsonAsObject => write!(f, "JSONAsObject"),
53 Format::JsonAsString => write!(f, "JSONAsString"),
54 Format::ArrowStream => write!(f, "ArrowStream"),
55 }
56 }
57}
58
59#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
61#[derive(Clone, Debug, Default)]
62#[serde(deny_unknown_fields)]
63pub struct ClickhouseConfig {
64 #[serde(alias = "host")]
66 #[configurable(metadata(docs::examples = "http://localhost:8123"))]
67 pub endpoint: UriSerde,
68
69 #[configurable(metadata(docs::examples = "mytable"))]
71 pub table: Template,
72
73 #[configurable(metadata(docs::examples = "mydatabase"))]
75 pub database: Option<Template>,
76
77 #[serde(default)]
79 pub format: Format,
80
81 #[serde(default)]
85 pub skip_unknown_fields: Option<bool>,
86
87 #[serde(default)]
89 pub date_time_best_effort: bool,
90
91 #[serde(default)]
93 pub insert_random_shard: bool,
94
95 #[configurable(derived)]
96 #[serde(default = "Compression::gzip_default")]
97 pub compression: Compression,
98
99 #[configurable(derived)]
100 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
101 pub encoding: Transformer,
102
103 #[configurable(derived)]
108 #[serde(default)]
109 pub batch_encoding: Option<BatchSerializerConfig>,
110
111 #[configurable(derived)]
112 #[serde(default)]
113 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
114
115 #[configurable(derived)]
116 pub auth: Option<Auth>,
117
118 #[configurable(derived)]
119 #[serde(default)]
120 pub request: TowerRequestConfig,
121
122 #[configurable(derived)]
123 pub tls: Option<TlsConfig>,
124
125 #[configurable(derived)]
126 #[serde(
127 default,
128 deserialize_with = "crate::serde::bool_or_struct",
129 skip_serializing_if = "crate::serde::is_default"
130 )]
131 pub acknowledgements: AcknowledgementsConfig,
132
133 #[configurable(derived)]
134 #[serde(default)]
135 pub query_settings: QuerySettingsConfig,
136}
137
138#[configurable_component]
140#[derive(Clone, Copy, Debug, Default)]
141#[serde(deny_unknown_fields)]
142pub struct QuerySettingsConfig {
143 #[serde(default)]
145 pub async_insert_settings: AsyncInsertSettingsConfig,
146}
147
148#[configurable_component]
150#[derive(Clone, Copy, Debug, Default)]
151#[serde(deny_unknown_fields)]
152pub struct AsyncInsertSettingsConfig {
153 #[serde(default)]
157 pub enabled: Option<bool>,
158
159 #[serde(default)]
163 pub wait_for_processing: Option<bool>,
164
165 #[serde(default)]
169 pub wait_for_processing_timeout: Option<u64>,
170
171 #[serde(default)]
175 pub deduplicate: Option<bool>,
176
177 #[serde(default)]
181 pub max_data_size: Option<u64>,
182
183 #[serde(default)]
187 pub max_query_number: Option<u64>,
188}
189
190impl_generate_config_from_default!(ClickhouseConfig);
191
192#[async_trait::async_trait]
193#[typetag::serde(name = "clickhouse")]
194impl SinkConfig for ClickhouseConfig {
195 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
196 let endpoint = self.endpoint.with_default_parts().uri;
197
198 let auth = self.auth.choose_one(&self.endpoint.auth)?;
199
200 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
201
202 let client = HttpClient::new(tls_settings, &cx.proxy)?;
203
204 let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
205 auth: auth.clone(),
206 endpoint: endpoint.clone(),
207 skip_unknown_fields: self.skip_unknown_fields,
208 date_time_best_effort: self.date_time_best_effort,
209 insert_random_shard: self.insert_random_shard,
210 compression: self.compression,
211 query_settings: self.query_settings,
212 };
213
214 let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
215 HttpService::new(client.clone(), clickhouse_service_request_builder);
216
217 let request_limits = self.request.into_settings();
218
219 let service = ServiceBuilder::new()
220 .settings(request_limits, ClickhouseRetryLogic::default())
221 .service(service);
222
223 let batch_settings = self.batch.into_batcher_settings()?;
224
225 let database = self.database.clone().unwrap_or_else(|| {
226 "default"
227 .try_into()
228 .expect("'default' should be a valid template")
229 });
230
231 let (format, encoder_kind) = self
233 .resolve_strategy(&client, &endpoint, &database, auth.as_ref())
234 .await?;
235
236 let request_builder = ClickhouseRequestBuilder {
237 compression: self.compression,
238 encoder: (self.encoding.clone(), encoder_kind),
239 };
240
241 let sink = ClickhouseSink::new(
242 batch_settings,
243 service,
244 database,
245 self.table.clone(),
246 format,
247 request_builder,
248 );
249
250 let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
251
252 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
253 }
254
255 fn input(&self) -> Input {
256 Input::log()
257 }
258
259 fn acknowledgements(&self) -> &AcknowledgementsConfig {
260 &self.acknowledgements
261 }
262}
263
264impl ClickhouseConfig {
265 async fn resolve_strategy(
270 &self,
271 client: &HttpClient,
272 endpoint: &Uri,
273 database: &Template,
274 auth: Option<&Auth>,
275 ) -> crate::Result<(Format, vector_lib::codecs::EncoderKind)> {
276 use vector_lib::codecs::EncoderKind;
277 use vector_lib::codecs::{
278 JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer,
279 };
280
281 if let Some(batch_encoding) = &self.batch_encoding {
282 use vector_lib::codecs::BatchEncoder;
283
284 if self.format != Format::ArrowStream {
286 return Err(format!(
287 "'batch_encoding' is only compatible with 'format: arrow_stream'. Found 'format: {}'.",
288 self.format
289 )
290 .into());
291 }
292
293 let mut arrow_config = match batch_encoding {
294 BatchSerializerConfig::ArrowStream(config) => config.clone(),
295 #[cfg(feature = "codecs-parquet")]
296 BatchSerializerConfig::Parquet(_) => {
297 return Err(
298 "ClickHouse sink does not support Parquet batch encoding. Use 'arrow_stream' instead."
299 .into(),
300 );
301 }
302 };
303
304 self.resolve_arrow_schema(
305 client,
306 endpoint.to_string(),
307 database,
308 auth,
309 &mut arrow_config,
310 )
311 .await?;
312
313 let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config);
314 let batch_serializer = resolved_batch_config.build_batch_serializer()?;
315 let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer));
316
317 return Ok((Format::ArrowStream, encoder));
318 }
319
320 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(
321 NewlineDelimitedEncoderConfig.build().into(),
322 JsonSerializerConfig::default().build().into(),
323 )));
324
325 Ok((self.format, encoder))
326 }
327
328 async fn resolve_arrow_schema(
329 &self,
330 client: &HttpClient,
331 endpoint: String,
332 database: &Template,
333 auth: Option<&Auth>,
334 config: &mut ArrowStreamSerializerConfig,
335 ) -> crate::Result<()> {
336 use super::arrow;
337
338 if self.table.is_dynamic() || database.is_dynamic() {
339 return Err(
340 "Arrow codec requires a static table and database. Dynamic schema inference is not supported."
341 .into(),
342 );
343 }
344
345 let table_str = self.table.get_ref();
346 let database_str = database.get_ref();
347
348 debug!(
349 "Fetching schema for table {}.{} at startup.",
350 database_str, table_str
351 );
352
353 let provider = arrow::ClickHouseSchemaProvider::new(
354 client.clone(),
355 endpoint,
356 database_str.to_string(),
357 table_str.to_string(),
358 auth.cloned(),
359 );
360
361 let schema = provider.get_schema().await.map_err(|e| {
362 format!(
363 "Failed to fetch schema for {}.{}: {}.",
364 database_str, table_str, e
365 )
366 })?;
367
368 config.schema = Some(schema);
369
370 debug!(
371 "Successfully fetched Arrow schema with {} fields.",
372 config
373 .schema
374 .as_ref()
375 .map(|s| s.fields().len())
376 .unwrap_or(0)
377 );
378
379 Ok(())
380 }
381}
382
383fn get_healthcheck_uri(endpoint: &Uri) -> String {
384 let mut uri = endpoint.to_string();
385 if !uri.ends_with('/') {
386 uri.push('/');
387 }
388 uri.push_str("?query=SELECT%201");
389 uri
390}
391
392async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
393 let uri = get_healthcheck_uri(&endpoint);
394 let mut request = Request::get(uri).body(Body::empty()).unwrap();
395
396 if let Some(auth) = auth {
397 auth.apply(&mut request);
398 }
399
400 let response = client.send(request).await?;
401
402 match response.status() {
403 StatusCode::OK => Ok(()),
404 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
412
413 #[test]
414 fn generate_config() {
415 crate::test_util::test_generate_config::<ClickhouseConfig>();
416 }
417
418 #[test]
419 fn test_get_healthcheck_uri() {
420 assert_eq!(
421 get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
422 "http://localhost:8123/?query=SELECT%201"
423 );
424 assert_eq!(
425 get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
426 "http://localhost:8123/?query=SELECT%201"
427 );
428 assert_eq!(
429 get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
430 "http://localhost:8123/path/?query=SELECT%201"
431 );
432 }
433
434 fn create_test_config(
436 format: Format,
437 batch_encoding: Option<BatchSerializerConfig>,
438 ) -> ClickhouseConfig {
439 ClickhouseConfig {
440 endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
441 table: "test_table".try_into().unwrap(),
442 database: Some("test_db".try_into().unwrap()),
443 format,
444 batch_encoding,
445 ..Default::default()
446 }
447 }
448
449 #[tokio::test]
450 async fn test_format_selection_with_batch_encoding() {
451 use crate::http::HttpClient;
452 use crate::tls::TlsSettings;
453
454 let tls = TlsSettings::default();
456 let client = HttpClient::new(tls, &Default::default()).unwrap();
457 let endpoint: http::Uri = "http://localhost:8123".parse().unwrap();
458 let database: Template = "test_db".try_into().unwrap();
459
460 let incompatible_formats = vec![
462 (Format::JsonEachRow, "json_each_row"),
463 (Format::JsonAsObject, "json_as_object"),
464 (Format::JsonAsString, "json_as_string"),
465 ];
466
467 for (format, format_name) in incompatible_formats {
468 let config = create_test_config(
469 format,
470 Some(BatchSerializerConfig::ArrowStream(
471 ArrowStreamSerializerConfig::default(),
472 )),
473 );
474
475 let result = config
476 .resolve_strategy(&client, &endpoint, &database, None)
477 .await;
478
479 assert!(
480 result.is_err(),
481 "Expected error for format {} with batch_encoding, but got success",
482 format_name
483 );
484 }
485 }
486
487 #[test]
488 fn test_format_selection_without_batch_encoding() {
489 let configs = vec![
491 Format::JsonEachRow,
492 Format::JsonAsObject,
493 Format::JsonAsString,
494 Format::ArrowStream,
495 ];
496
497 for format in configs {
498 let config = create_test_config(format, None);
499
500 assert!(
501 config.batch_encoding.is_none(),
502 "batch_encoding should be None for format {:?}",
503 format
504 );
505 assert_eq!(
506 config.format, format,
507 "format should match configured value"
508 );
509 }
510 }
511}