vector/sinks/clickhouse/
config.rs

1//! Configuration for the `Clickhouse` sink.
2
3use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer};
8
9use super::{
10    request_builder::ClickhouseRequestBuilder,
11    service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
12    sink::{ClickhouseSink, PartitionKey},
13};
14use crate::{
15    http::{Auth, HttpClient, MaybeAuth},
16    sinks::{
17        prelude::*,
18        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
19    },
20};
21
22/// Data format.
23///
24/// The format used to parse input/output data.
25///
26/// [formats]: https://clickhouse.com/docs/en/interfaces/formats
27#[configurable_component]
28#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Hash)]
29#[serde(rename_all = "snake_case")]
30#[derivative(Default)]
31#[allow(clippy::enum_variant_names)]
32pub enum Format {
33    #[derivative(Default)]
34    /// JSONEachRow.
35    JsonEachRow,
36
37    /// JSONAsObject.
38    JsonAsObject,
39
40    /// JSONAsString.
41    JsonAsString,
42}
43
44impl fmt::Display for Format {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        match self {
47            Format::JsonEachRow => write!(f, "JSONEachRow"),
48            Format::JsonAsObject => write!(f, "JSONAsObject"),
49            Format::JsonAsString => write!(f, "JSONAsString"),
50        }
51    }
52}
53
54/// Configuration for the `clickhouse` sink.
55#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
56#[derive(Clone, Debug, Default)]
57#[serde(deny_unknown_fields)]
58pub struct ClickhouseConfig {
59    /// The endpoint of the ClickHouse server.
60    #[serde(alias = "host")]
61    #[configurable(metadata(docs::examples = "http://localhost:8123"))]
62    pub endpoint: UriSerde,
63
64    /// The table that data is inserted into.
65    #[configurable(metadata(docs::examples = "mytable"))]
66    pub table: Template,
67
68    /// The database that contains the table that data is inserted into.
69    #[configurable(metadata(docs::examples = "mydatabase"))]
70    pub database: Option<Template>,
71
72    /// The format to parse input data.
73    #[serde(default)]
74    pub format: Format,
75
76    /// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
77    ///
78    /// If left unspecified, use the default provided by the `ClickHouse` server.
79    #[serde(default)]
80    pub skip_unknown_fields: Option<bool>,
81
82    /// Sets `date_time_input_format` to `best_effort`, allowing ClickHouse to properly parse RFC3339/ISO 8601.
83    #[serde(default)]
84    pub date_time_best_effort: bool,
85
86    /// Sets `insert_distributed_one_random_shard`, allowing ClickHouse to insert data into a random shard when using Distributed Table Engine.
87    #[serde(default)]
88    pub insert_random_shard: bool,
89
90    #[configurable(derived)]
91    #[serde(default = "Compression::gzip_default")]
92    pub compression: Compression,
93
94    #[configurable(derived)]
95    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
96    pub encoding: Transformer,
97
98    #[configurable(derived)]
99    #[serde(default)]
100    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
101
102    #[configurable(derived)]
103    pub auth: Option<Auth>,
104
105    #[configurable(derived)]
106    #[serde(default)]
107    pub request: TowerRequestConfig,
108
109    #[configurable(derived)]
110    pub tls: Option<TlsConfig>,
111
112    #[configurable(derived)]
113    #[serde(
114        default,
115        deserialize_with = "crate::serde::bool_or_struct",
116        skip_serializing_if = "crate::serde::is_default"
117    )]
118    pub acknowledgements: AcknowledgementsConfig,
119
120    #[configurable(derived)]
121    #[serde(default)]
122    pub query_settings: QuerySettingsConfig,
123}
124
125/// Query settings for the `clickhouse` sink.
126#[configurable_component]
127#[derive(Clone, Copy, Debug, Default)]
128#[serde(deny_unknown_fields)]
129pub struct QuerySettingsConfig {
130    /// Async insert-related settings.
131    #[serde(default)]
132    pub async_insert_settings: AsyncInsertSettingsConfig,
133}
134
135/// Async insert related settings for the `clickhouse` sink.
136#[configurable_component]
137#[derive(Clone, Copy, Debug, Default)]
138#[serde(deny_unknown_fields)]
139pub struct AsyncInsertSettingsConfig {
140    /// Sets `async_insert`, allowing ClickHouse to queue the inserted data and later flush to table in the background.
141    ///
142    /// If left unspecified, use the default provided by the `ClickHouse` server.
143    #[serde(default)]
144    pub enabled: Option<bool>,
145
146    /// Sets `wait_for`, allowing ClickHouse to wait for processing of asynchronous insertion.
147    ///
148    /// If left unspecified, use the default provided by the `ClickHouse` server.
149    #[serde(default)]
150    pub wait_for_processing: Option<bool>,
151
152    /// Sets 'wait_for_processing_timeout`, to control the timeout for waiting for processing asynchronous insertion.
153    ///
154    /// If left unspecified, use the default provided by the `ClickHouse` server.
155    #[serde(default)]
156    pub wait_for_processing_timeout: Option<u64>,
157
158    /// Sets `async_insert_deduplicate`, allowing ClickHouse to perform deduplication when inserting blocks in the replicated table.
159    ///
160    /// If left unspecified, use the default provided by the `ClickHouse` server.
161    #[serde(default)]
162    pub deduplicate: Option<bool>,
163
164    /// Sets `async_insert_max_data_size`, the maximum size in bytes of unparsed data collected per query before being inserted.
165    ///
166    /// If left unspecified, use the default provided by the `ClickHouse` server.
167    #[serde(default)]
168    pub max_data_size: Option<u64>,
169
170    /// Sets `async_insert_max_query_number`, the maximum number of insert queries before being inserted
171    ///
172    /// If left unspecified, use the default provided by the `ClickHouse` server.
173    #[serde(default)]
174    pub max_query_number: Option<u64>,
175}
176
177impl_generate_config_from_default!(ClickhouseConfig);
178
179#[async_trait::async_trait]
180#[typetag::serde(name = "clickhouse")]
181impl SinkConfig for ClickhouseConfig {
182    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
183        let endpoint = self.endpoint.with_default_parts().uri;
184
185        let auth = self.auth.choose_one(&self.endpoint.auth)?;
186
187        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
188
189        let client = HttpClient::new(tls_settings, &cx.proxy)?;
190
191        let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
192            auth: auth.clone(),
193            endpoint: endpoint.clone(),
194            skip_unknown_fields: self.skip_unknown_fields,
195            date_time_best_effort: self.date_time_best_effort,
196            insert_random_shard: self.insert_random_shard,
197            compression: self.compression,
198            query_settings: self.query_settings,
199        };
200
201        let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
202            HttpService::new(client.clone(), clickhouse_service_request_builder);
203
204        let request_limits = self.request.into_settings();
205
206        let service = ServiceBuilder::new()
207            .settings(request_limits, ClickhouseRetryLogic::default())
208            .service(service);
209
210        let batch_settings = self.batch.into_batcher_settings()?;
211
212        let database = self.database.clone().unwrap_or_else(|| {
213            "default"
214                .try_into()
215                .expect("'default' should be a valid template")
216        });
217
218        let request_builder = ClickhouseRequestBuilder {
219            compression: self.compression,
220            encoding: (
221                self.encoding.clone(),
222                Encoder::<Framer>::new(
223                    NewlineDelimitedEncoderConfig.build().into(),
224                    JsonSerializerConfig::default().build().into(),
225                ),
226            ),
227        };
228
229        let sink = ClickhouseSink::new(
230            batch_settings,
231            service,
232            database,
233            self.table.clone(),
234            self.format,
235            request_builder,
236        );
237
238        let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
239
240        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
241    }
242
243    fn input(&self) -> Input {
244        Input::log()
245    }
246
247    fn acknowledgements(&self) -> &AcknowledgementsConfig {
248        &self.acknowledgements
249    }
250}
251
252fn get_healthcheck_uri(endpoint: &Uri) -> String {
253    let mut uri = endpoint.to_string();
254    if !uri.ends_with('/') {
255        uri.push('/');
256    }
257    uri.push_str("?query=SELECT%201");
258    uri
259}
260
261async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
262    let uri = get_healthcheck_uri(&endpoint);
263    let mut request = Request::get(uri).body(Body::empty()).unwrap();
264
265    if let Some(auth) = auth {
266        auth.apply(&mut request);
267    }
268
269    let response = client.send(request).await?;
270
271    match response.status() {
272        StatusCode::OK => Ok(()),
273        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    #[test]
282    fn generate_config() {
283        crate::test_util::test_generate_config::<ClickhouseConfig>();
284    }
285
286    #[test]
287    fn test_get_healthcheck_uri() {
288        assert_eq!(
289            get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
290            "http://localhost:8123/?query=SELECT%201"
291        );
292        assert_eq!(
293            get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
294            "http://localhost:8123/?query=SELECT%201"
295        );
296        assert_eq!(
297            get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
298            "http://localhost:8123/path/?query=SELECT%201"
299        );
300    }
301}