1use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::encoding::format::SchemaProvider;
8use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig};
9
10use super::{
11 request_builder::ClickhouseRequestBuilder,
12 service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
13 sink::{ClickhouseSink, PartitionKey},
14};
15use crate::{
16 http::{Auth, HttpClient, MaybeAuth},
17 sinks::{
18 prelude::*,
19 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
20 },
21};
22
23#[configurable_component]
29#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Hash)]
30#[serde(rename_all = "snake_case")]
31#[derivative(Default)]
32#[allow(clippy::enum_variant_names)]
33pub enum Format {
34 #[derivative(Default)]
35 JsonEachRow,
37
38 JsonAsObject,
40
41 JsonAsString,
43
44 #[configurable(metadata(status = "beta"))]
46 ArrowStream,
47}
48
49impl fmt::Display for Format {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 match self {
52 Format::JsonEachRow => write!(f, "JSONEachRow"),
53 Format::JsonAsObject => write!(f, "JSONAsObject"),
54 Format::JsonAsString => write!(f, "JSONAsString"),
55 Format::ArrowStream => write!(f, "ArrowStream"),
56 }
57 }
58}
59
60#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
62#[derive(Clone, Debug, Default)]
63#[serde(deny_unknown_fields)]
64pub struct ClickhouseConfig {
65 #[serde(alias = "host")]
67 #[configurable(metadata(docs::examples = "http://localhost:8123"))]
68 pub endpoint: UriSerde,
69
70 #[configurable(metadata(docs::examples = "mytable"))]
72 pub table: Template,
73
74 #[configurable(metadata(docs::examples = "mydatabase"))]
76 pub database: Option<Template>,
77
78 #[serde(default)]
80 pub format: Format,
81
82 #[serde(default)]
86 pub skip_unknown_fields: Option<bool>,
87
88 #[serde(default)]
90 pub date_time_best_effort: bool,
91
92 #[serde(default)]
94 pub insert_random_shard: bool,
95
96 #[configurable(derived)]
97 #[serde(default = "Compression::gzip_default")]
98 pub compression: Compression,
99
100 #[configurable(derived)]
101 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
102 pub encoding: Transformer,
103
104 #[configurable(derived)]
109 #[serde(default)]
110 pub batch_encoding: Option<BatchSerializerConfig>,
111
112 #[configurable(derived)]
113 #[serde(default)]
114 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
115
116 #[configurable(derived)]
117 pub auth: Option<Auth>,
118
119 #[configurable(derived)]
120 #[serde(default)]
121 pub request: TowerRequestConfig,
122
123 #[configurable(derived)]
124 pub tls: Option<TlsConfig>,
125
126 #[configurable(derived)]
127 #[serde(
128 default,
129 deserialize_with = "crate::serde::bool_or_struct",
130 skip_serializing_if = "crate::serde::is_default"
131 )]
132 pub acknowledgements: AcknowledgementsConfig,
133
134 #[configurable(derived)]
135 #[serde(default)]
136 pub query_settings: QuerySettingsConfig,
137}
138
139#[configurable_component]
141#[derive(Clone, Copy, Debug, Default)]
142#[serde(deny_unknown_fields)]
143pub struct QuerySettingsConfig {
144 #[serde(default)]
146 pub async_insert_settings: AsyncInsertSettingsConfig,
147}
148
149#[configurable_component]
151#[derive(Clone, Copy, Debug, Default)]
152#[serde(deny_unknown_fields)]
153pub struct AsyncInsertSettingsConfig {
154 #[serde(default)]
158 pub enabled: Option<bool>,
159
160 #[serde(default)]
164 pub wait_for_processing: Option<bool>,
165
166 #[serde(default)]
170 pub wait_for_processing_timeout: Option<u64>,
171
172 #[serde(default)]
176 pub deduplicate: Option<bool>,
177
178 #[serde(default)]
182 pub max_data_size: Option<u64>,
183
184 #[serde(default)]
188 pub max_query_number: Option<u64>,
189}
190
191impl_generate_config_from_default!(ClickhouseConfig);
192
193#[async_trait::async_trait]
194#[typetag::serde(name = "clickhouse")]
195impl SinkConfig for ClickhouseConfig {
196 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
197 let endpoint = self.endpoint.with_default_parts().uri;
198
199 let auth = self.auth.choose_one(&self.endpoint.auth)?;
200
201 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
202
203 let client = HttpClient::new(tls_settings, &cx.proxy)?;
204
205 let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
206 auth: auth.clone(),
207 endpoint: endpoint.clone(),
208 skip_unknown_fields: self.skip_unknown_fields,
209 date_time_best_effort: self.date_time_best_effort,
210 insert_random_shard: self.insert_random_shard,
211 compression: self.compression,
212 query_settings: self.query_settings,
213 };
214
215 let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
216 HttpService::new(client.clone(), clickhouse_service_request_builder);
217
218 let request_limits = self.request.into_settings();
219
220 let service = ServiceBuilder::new()
221 .settings(request_limits, ClickhouseRetryLogic::default())
222 .service(service);
223
224 let batch_settings = self.batch.into_batcher_settings()?;
225
226 let database = self.database.clone().unwrap_or_else(|| {
227 "default"
228 .try_into()
229 .expect("'default' should be a valid template")
230 });
231
232 let (format, encoder_kind) = self
234 .resolve_strategy(&client, &endpoint, &database, auth.as_ref())
235 .await?;
236
237 let request_builder = ClickhouseRequestBuilder {
238 compression: self.compression,
239 encoder: (self.encoding.clone(), encoder_kind),
240 };
241
242 let sink = ClickhouseSink::new(
243 batch_settings,
244 service,
245 database,
246 self.table.clone(),
247 format,
248 request_builder,
249 );
250
251 let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
252
253 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
254 }
255
256 fn input(&self) -> Input {
257 Input::log()
258 }
259
260 fn acknowledgements(&self) -> &AcknowledgementsConfig {
261 &self.acknowledgements
262 }
263}
264
265impl ClickhouseConfig {
266 async fn resolve_strategy(
271 &self,
272 client: &HttpClient,
273 endpoint: &Uri,
274 database: &Template,
275 auth: Option<&Auth>,
276 ) -> crate::Result<(Format, vector_lib::codecs::EncoderKind)> {
277 use vector_lib::codecs::EncoderKind;
278 use vector_lib::codecs::{
279 JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer,
280 };
281
282 if let Some(batch_encoding) = &self.batch_encoding {
283 use vector_lib::codecs::{BatchEncoder, BatchSerializer};
284
285 if self.format != Format::ArrowStream {
287 return Err(format!(
288 "'batch_encoding' is only compatible with 'format: arrow_stream'. Found 'format: {}'.",
289 self.format
290 )
291 .into());
292 }
293
294 let mut arrow_config = match batch_encoding {
295 BatchSerializerConfig::ArrowStream(config) => config.clone(),
296 };
297
298 self.resolve_arrow_schema(
299 client,
300 endpoint.to_string(),
301 database,
302 auth,
303 &mut arrow_config,
304 )
305 .await?;
306
307 let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config);
308 let arrow_serializer = resolved_batch_config.build()?;
309 let batch_serializer = BatchSerializer::Arrow(arrow_serializer);
310 let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer));
311
312 return Ok((Format::ArrowStream, encoder));
313 }
314
315 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(
316 NewlineDelimitedEncoderConfig.build().into(),
317 JsonSerializerConfig::default().build().into(),
318 )));
319
320 Ok((self.format, encoder))
321 }
322
323 async fn resolve_arrow_schema(
324 &self,
325 client: &HttpClient,
326 endpoint: String,
327 database: &Template,
328 auth: Option<&Auth>,
329 config: &mut ArrowStreamSerializerConfig,
330 ) -> crate::Result<()> {
331 use super::arrow;
332
333 if self.table.is_dynamic() || database.is_dynamic() {
334 return Err(
335 "Arrow codec requires a static table and database. Dynamic schema inference is not supported."
336 .into(),
337 );
338 }
339
340 let table_str = self.table.get_ref();
341 let database_str = database.get_ref();
342
343 debug!(
344 "Fetching schema for table {}.{} at startup.",
345 database_str, table_str
346 );
347
348 let provider = arrow::ClickHouseSchemaProvider::new(
349 client.clone(),
350 endpoint,
351 database_str.to_string(),
352 table_str.to_string(),
353 auth.cloned(),
354 );
355
356 let schema = provider.get_schema().await.map_err(|e| {
357 format!(
358 "Failed to fetch schema for {}.{}: {}.",
359 database_str, table_str, e
360 )
361 })?;
362
363 config.schema = Some(schema);
364
365 debug!(
366 "Successfully fetched Arrow schema with {} fields.",
367 config
368 .schema
369 .as_ref()
370 .map(|s| s.fields().len())
371 .unwrap_or(0)
372 );
373
374 Ok(())
375 }
376}
377
378fn get_healthcheck_uri(endpoint: &Uri) -> String {
379 let mut uri = endpoint.to_string();
380 if !uri.ends_with('/') {
381 uri.push('/');
382 }
383 uri.push_str("?query=SELECT%201");
384 uri
385}
386
387async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
388 let uri = get_healthcheck_uri(&endpoint);
389 let mut request = Request::get(uri).body(Body::empty()).unwrap();
390
391 if let Some(auth) = auth {
392 auth.apply(&mut request);
393 }
394
395 let response = client.send(request).await?;
396
397 match response.status() {
398 StatusCode::OK => Ok(()),
399 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use vector_lib::codecs::encoding::ArrowStreamSerializerConfig;
407
408 #[test]
409 fn generate_config() {
410 crate::test_util::test_generate_config::<ClickhouseConfig>();
411 }
412
413 #[test]
414 fn test_get_healthcheck_uri() {
415 assert_eq!(
416 get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
417 "http://localhost:8123/?query=SELECT%201"
418 );
419 assert_eq!(
420 get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
421 "http://localhost:8123/?query=SELECT%201"
422 );
423 assert_eq!(
424 get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
425 "http://localhost:8123/path/?query=SELECT%201"
426 );
427 }
428
429 fn create_test_config(
431 format: Format,
432 batch_encoding: Option<BatchSerializerConfig>,
433 ) -> ClickhouseConfig {
434 ClickhouseConfig {
435 endpoint: "http://localhost:8123".parse::<http::Uri>().unwrap().into(),
436 table: "test_table".try_into().unwrap(),
437 database: Some("test_db".try_into().unwrap()),
438 format,
439 batch_encoding,
440 ..Default::default()
441 }
442 }
443
444 #[tokio::test]
445 async fn test_format_selection_with_batch_encoding() {
446 use crate::http::HttpClient;
447 use crate::tls::TlsSettings;
448
449 let tls = TlsSettings::default();
451 let client = HttpClient::new(tls, &Default::default()).unwrap();
452 let endpoint: http::Uri = "http://localhost:8123".parse().unwrap();
453 let database: Template = "test_db".try_into().unwrap();
454
455 let incompatible_formats = vec![
457 (Format::JsonEachRow, "json_each_row"),
458 (Format::JsonAsObject, "json_as_object"),
459 (Format::JsonAsString, "json_as_string"),
460 ];
461
462 for (format, format_name) in incompatible_formats {
463 let config = create_test_config(
464 format,
465 Some(BatchSerializerConfig::ArrowStream(
466 ArrowStreamSerializerConfig::default(),
467 )),
468 );
469
470 let result = config
471 .resolve_strategy(&client, &endpoint, &database, None)
472 .await;
473
474 assert!(
475 result.is_err(),
476 "Expected error for format {} with batch_encoding, but got success",
477 format_name
478 );
479 }
480 }
481
482 #[test]
483 fn test_format_selection_without_batch_encoding() {
484 let configs = vec![
486 Format::JsonEachRow,
487 Format::JsonAsObject,
488 Format::JsonAsString,
489 Format::ArrowStream,
490 ];
491
492 for format in configs {
493 let config = create_test_config(format, None);
494
495 assert!(
496 config.batch_encoding.is_none(),
497 "batch_encoding should be None for format {:?}",
498 format
499 );
500 assert_eq!(
501 config.format, format,
502 "format should match configured value"
503 );
504 }
505 }
506}