1use std::fmt;
4
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use vector_lib::codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer};
8
9use super::{
10 request_builder::ClickhouseRequestBuilder,
11 service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
12 sink::{ClickhouseSink, PartitionKey},
13};
14use crate::{
15 http::{Auth, HttpClient, MaybeAuth},
16 sinks::{
17 prelude::*,
18 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpService},
19 },
20};
21
22#[configurable_component]
28#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Hash)]
29#[serde(rename_all = "snake_case")]
30#[derivative(Default)]
31#[allow(clippy::enum_variant_names)]
32pub enum Format {
33 #[derivative(Default)]
34 JsonEachRow,
36
37 JsonAsObject,
39
40 JsonAsString,
42}
43
44impl fmt::Display for Format {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 match self {
47 Format::JsonEachRow => write!(f, "JSONEachRow"),
48 Format::JsonAsObject => write!(f, "JSONAsObject"),
49 Format::JsonAsString => write!(f, "JSONAsString"),
50 }
51 }
52}
53
54#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
56#[derive(Clone, Debug, Default)]
57#[serde(deny_unknown_fields)]
58pub struct ClickhouseConfig {
59 #[serde(alias = "host")]
61 #[configurable(metadata(docs::examples = "http://localhost:8123"))]
62 pub endpoint: UriSerde,
63
64 #[configurable(metadata(docs::examples = "mytable"))]
66 pub table: Template,
67
68 #[configurable(metadata(docs::examples = "mydatabase"))]
70 pub database: Option<Template>,
71
72 #[serde(default)]
74 pub format: Format,
75
76 #[serde(default)]
80 pub skip_unknown_fields: Option<bool>,
81
82 #[serde(default)]
84 pub date_time_best_effort: bool,
85
86 #[serde(default)]
88 pub insert_random_shard: bool,
89
90 #[configurable(derived)]
91 #[serde(default = "Compression::gzip_default")]
92 pub compression: Compression,
93
94 #[configurable(derived)]
95 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
96 pub encoding: Transformer,
97
98 #[configurable(derived)]
99 #[serde(default)]
100 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
101
102 #[configurable(derived)]
103 pub auth: Option<Auth>,
104
105 #[configurable(derived)]
106 #[serde(default)]
107 pub request: TowerRequestConfig,
108
109 #[configurable(derived)]
110 pub tls: Option<TlsConfig>,
111
112 #[configurable(derived)]
113 #[serde(
114 default,
115 deserialize_with = "crate::serde::bool_or_struct",
116 skip_serializing_if = "crate::serde::is_default"
117 )]
118 pub acknowledgements: AcknowledgementsConfig,
119
120 #[configurable(derived)]
121 #[serde(default)]
122 pub query_settings: QuerySettingsConfig,
123}
124
125#[configurable_component]
127#[derive(Clone, Copy, Debug, Default)]
128#[serde(deny_unknown_fields)]
129pub struct QuerySettingsConfig {
130 #[serde(default)]
132 pub async_insert_settings: AsyncInsertSettingsConfig,
133}
134
135#[configurable_component]
137#[derive(Clone, Copy, Debug, Default)]
138#[serde(deny_unknown_fields)]
139pub struct AsyncInsertSettingsConfig {
140 #[serde(default)]
144 pub enabled: Option<bool>,
145
146 #[serde(default)]
150 pub wait_for_processing: Option<bool>,
151
152 #[serde(default)]
156 pub wait_for_processing_timeout: Option<u64>,
157
158 #[serde(default)]
162 pub deduplicate: Option<bool>,
163
164 #[serde(default)]
168 pub max_data_size: Option<u64>,
169
170 #[serde(default)]
174 pub max_query_number: Option<u64>,
175}
176
177impl_generate_config_from_default!(ClickhouseConfig);
178
179#[async_trait::async_trait]
180#[typetag::serde(name = "clickhouse")]
181impl SinkConfig for ClickhouseConfig {
182 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
183 let endpoint = self.endpoint.with_default_parts().uri;
184
185 let auth = self.auth.choose_one(&self.endpoint.auth)?;
186
187 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
188
189 let client = HttpClient::new(tls_settings, &cx.proxy)?;
190
191 let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
192 auth: auth.clone(),
193 endpoint: endpoint.clone(),
194 skip_unknown_fields: self.skip_unknown_fields,
195 date_time_best_effort: self.date_time_best_effort,
196 insert_random_shard: self.insert_random_shard,
197 compression: self.compression,
198 query_settings: self.query_settings,
199 };
200
201 let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
202 HttpService::new(client.clone(), clickhouse_service_request_builder);
203
204 let request_limits = self.request.into_settings();
205
206 let service = ServiceBuilder::new()
207 .settings(request_limits, ClickhouseRetryLogic::default())
208 .service(service);
209
210 let batch_settings = self.batch.into_batcher_settings()?;
211
212 let database = self.database.clone().unwrap_or_else(|| {
213 "default"
214 .try_into()
215 .expect("'default' should be a valid template")
216 });
217
218 let request_builder = ClickhouseRequestBuilder {
219 compression: self.compression,
220 encoding: (
221 self.encoding.clone(),
222 Encoder::<Framer>::new(
223 NewlineDelimitedEncoderConfig.build().into(),
224 JsonSerializerConfig::default().build().into(),
225 ),
226 ),
227 };
228
229 let sink = ClickhouseSink::new(
230 batch_settings,
231 service,
232 database,
233 self.table.clone(),
234 self.format,
235 request_builder,
236 );
237
238 let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
239
240 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
241 }
242
243 fn input(&self) -> Input {
244 Input::log()
245 }
246
247 fn acknowledgements(&self) -> &AcknowledgementsConfig {
248 &self.acknowledgements
249 }
250}
251
252fn get_healthcheck_uri(endpoint: &Uri) -> String {
253 let mut uri = endpoint.to_string();
254 if !uri.ends_with('/') {
255 uri.push('/');
256 }
257 uri.push_str("?query=SELECT%201");
258 uri
259}
260
261async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
262 let uri = get_healthcheck_uri(&endpoint);
263 let mut request = Request::get(uri).body(Body::empty()).unwrap();
264
265 if let Some(auth) = auth {
266 auth.apply(&mut request);
267 }
268
269 let response = client.send(request).await?;
270
271 match response.status() {
272 StatusCode::OK => Ok(()),
273 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn generate_config() {
283 crate::test_util::test_generate_config::<ClickhouseConfig>();
284 }
285
286 #[test]
287 fn test_get_healthcheck_uri() {
288 assert_eq!(
289 get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
290 "http://localhost:8123/?query=SELECT%201"
291 );
292 assert_eq!(
293 get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
294 "http://localhost:8123/?query=SELECT%201"
295 );
296 assert_eq!(
297 get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
298 "http://localhost:8123/path/?query=SELECT%201"
299 );
300 }
301}