vector/sinks/clickhouse/
config.rs

1//! Configuration for the `Clickhouse` sink.
2
3use super::{
4    request_builder::ClickhouseRequestBuilder,
5    service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
6    sink::{ClickhouseSink, PartitionKey},
7};
8use crate::{
9    http::{Auth, HttpClient, MaybeAuth},
10    sinks::{
11        prelude::*,
12        util::{http::HttpService, RealtimeSizeBasedDefaultBatchSettings, UriSerde},
13    },
14};
15use http::{Request, StatusCode, Uri};
16use hyper::Body;
17use std::fmt;
18use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
19
20/// Data format.
21///
22/// The format used to parse input/output data.
23///
24/// [formats]: https://clickhouse.com/docs/en/interfaces/formats
25#[configurable_component]
26#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Hash)]
27#[serde(rename_all = "snake_case")]
28#[derivative(Default)]
29#[allow(clippy::enum_variant_names)]
30pub enum Format {
31    #[derivative(Default)]
32    /// JSONEachRow.
33    JsonEachRow,
34
35    /// JSONAsObject.
36    JsonAsObject,
37
38    /// JSONAsString.
39    JsonAsString,
40}
41
42impl fmt::Display for Format {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        match self {
45            Format::JsonEachRow => write!(f, "JSONEachRow"),
46            Format::JsonAsObject => write!(f, "JSONAsObject"),
47            Format::JsonAsString => write!(f, "JSONAsString"),
48        }
49    }
50}
51
52/// Configuration for the `clickhouse` sink.
53#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
54#[derive(Clone, Debug, Default)]
55#[serde(deny_unknown_fields)]
56pub struct ClickhouseConfig {
57    /// The endpoint of the ClickHouse server.
58    #[serde(alias = "host")]
59    #[configurable(metadata(docs::examples = "http://localhost:8123"))]
60    pub endpoint: UriSerde,
61
62    /// The table that data is inserted into.
63    #[configurable(metadata(docs::examples = "mytable"))]
64    pub table: Template,
65
66    /// The database that contains the table that data is inserted into.
67    #[configurable(metadata(docs::examples = "mydatabase"))]
68    pub database: Option<Template>,
69
70    /// The format to parse input data.
71    #[serde(default)]
72    pub format: Format,
73
74    /// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
75    ///
76    /// If left unspecified, use the default provided by the `ClickHouse` server.
77    #[serde(default)]
78    pub skip_unknown_fields: Option<bool>,
79
80    /// Sets `date_time_input_format` to `best_effort`, allowing ClickHouse to properly parse RFC3339/ISO 8601.
81    #[serde(default)]
82    pub date_time_best_effort: bool,
83
84    /// Sets `insert_distributed_one_random_shard`, allowing ClickHouse to insert data into a random shard when using Distributed Table Engine.
85    #[serde(default)]
86    pub insert_random_shard: bool,
87
88    #[configurable(derived)]
89    #[serde(default = "Compression::gzip_default")]
90    pub compression: Compression,
91
92    #[configurable(derived)]
93    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
94    pub encoding: Transformer,
95
96    #[configurable(derived)]
97    #[serde(default)]
98    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
99
100    #[configurable(derived)]
101    pub auth: Option<Auth>,
102
103    #[configurable(derived)]
104    #[serde(default)]
105    pub request: TowerRequestConfig,
106
107    #[configurable(derived)]
108    pub tls: Option<TlsConfig>,
109
110    #[configurable(derived)]
111    #[serde(
112        default,
113        deserialize_with = "crate::serde::bool_or_struct",
114        skip_serializing_if = "crate::serde::is_default"
115    )]
116    pub acknowledgements: AcknowledgementsConfig,
117
118    #[configurable(derived)]
119    #[serde(default)]
120    pub query_settings: QuerySettingsConfig,
121}
122
123/// Query settings for the `clickhouse` sink.
124#[configurable_component]
125#[derive(Clone, Copy, Debug, Default)]
126#[serde(deny_unknown_fields)]
127pub struct QuerySettingsConfig {
128    /// Async insert-related settings.
129    #[serde(default)]
130    pub async_insert_settings: AsyncInsertSettingsConfig,
131}
132
133/// Async insert related settings for the `clickhouse` sink.
134#[configurable_component]
135#[derive(Clone, Copy, Debug, Default)]
136#[serde(deny_unknown_fields)]
137pub struct AsyncInsertSettingsConfig {
138    /// Sets `async_insert`, allowing ClickHouse to queue the inserted data and later flush to table in the background.
139    ///
140    /// If left unspecified, use the default provided by the `ClickHouse` server.
141    #[serde(default)]
142    pub enabled: Option<bool>,
143
144    /// Sets `wait_for`, allowing ClickHouse to wait for processing of asynchronous insertion.
145    ///
146    /// If left unspecified, use the default provided by the `ClickHouse` server.
147    #[serde(default)]
148    pub wait_for_processing: Option<bool>,
149
150    /// Sets 'wait_for_processing_timeout`, to control the timeout for waiting for processing asynchronous insertion.
151    ///
152    /// If left unspecified, use the default provided by the `ClickHouse` server.
153    #[serde(default)]
154    pub wait_for_processing_timeout: Option<u64>,
155
156    /// Sets `async_insert_deduplicate`, allowing ClickHouse to perform deduplication when inserting blocks in the replicated table.
157    ///
158    /// If left unspecified, use the default provided by the `ClickHouse` server.
159    #[serde(default)]
160    pub deduplicate: Option<bool>,
161
162    /// Sets `async_insert_max_data_size`, the maximum size in bytes of unparsed data collected per query before being inserted.
163    ///
164    /// If left unspecified, use the default provided by the `ClickHouse` server.
165    #[serde(default)]
166    pub max_data_size: Option<u64>,
167
168    /// Sets `async_insert_max_query_number`, the maximum number of insert queries before being inserted
169    ///
170    /// If left unspecified, use the default provided by the `ClickHouse` server.
171    #[serde(default)]
172    pub max_query_number: Option<u64>,
173}
174
175impl_generate_config_from_default!(ClickhouseConfig);
176
177#[async_trait::async_trait]
178#[typetag::serde(name = "clickhouse")]
179impl SinkConfig for ClickhouseConfig {
180    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
181        let endpoint = self.endpoint.with_default_parts().uri;
182
183        let auth = self.auth.choose_one(&self.endpoint.auth)?;
184
185        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
186
187        let client = HttpClient::new(tls_settings, &cx.proxy)?;
188
189        let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
190            auth: auth.clone(),
191            endpoint: endpoint.clone(),
192            skip_unknown_fields: self.skip_unknown_fields,
193            date_time_best_effort: self.date_time_best_effort,
194            insert_random_shard: self.insert_random_shard,
195            compression: self.compression,
196            query_settings: self.query_settings,
197        };
198
199        let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
200            HttpService::new(client.clone(), clickhouse_service_request_builder);
201
202        let request_limits = self.request.into_settings();
203
204        let service = ServiceBuilder::new()
205            .settings(request_limits, ClickhouseRetryLogic::default())
206            .service(service);
207
208        let batch_settings = self.batch.into_batcher_settings()?;
209
210        let database = self.database.clone().unwrap_or_else(|| {
211            "default"
212                .try_into()
213                .expect("'default' should be a valid template")
214        });
215
216        let request_builder = ClickhouseRequestBuilder {
217            compression: self.compression,
218            encoding: (
219                self.encoding.clone(),
220                Encoder::<Framer>::new(
221                    NewlineDelimitedEncoderConfig.build().into(),
222                    JsonSerializerConfig::default().build().into(),
223                ),
224            ),
225        };
226
227        let sink = ClickhouseSink::new(
228            batch_settings,
229            service,
230            database,
231            self.table.clone(),
232            self.format,
233            request_builder,
234        );
235
236        let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
237
238        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
239    }
240
241    fn input(&self) -> Input {
242        Input::log()
243    }
244
245    fn acknowledgements(&self) -> &AcknowledgementsConfig {
246        &self.acknowledgements
247    }
248}
249
250fn get_healthcheck_uri(endpoint: &Uri) -> String {
251    let mut uri = endpoint.to_string();
252    if !uri.ends_with('/') {
253        uri.push('/');
254    }
255    uri.push_str("?query=SELECT%201");
256    uri
257}
258
259async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
260    let uri = get_healthcheck_uri(&endpoint);
261    let mut request = Request::get(uri).body(Body::empty()).unwrap();
262
263    if let Some(auth) = auth {
264        auth.apply(&mut request);
265    }
266
267    let response = client.send(request).await?;
268
269    match response.status() {
270        StatusCode::OK => Ok(()),
271        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278
279    #[test]
280    fn generate_config() {
281        crate::test_util::test_generate_config::<ClickhouseConfig>();
282    }
283
284    #[test]
285    fn test_get_healthcheck_uri() {
286        assert_eq!(
287            get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
288            "http://localhost:8123/?query=SELECT%201"
289        );
290        assert_eq!(
291            get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
292            "http://localhost:8123/?query=SELECT%201"
293        );
294        assert_eq!(
295            get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
296            "http://localhost:8123/path/?query=SELECT%201"
297        );
298    }
299}