1use super::{
4 request_builder::ClickhouseRequestBuilder,
5 service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder},
6 sink::{ClickhouseSink, PartitionKey},
7};
8use crate::{
9 http::{Auth, HttpClient, MaybeAuth},
10 sinks::{
11 prelude::*,
12 util::{http::HttpService, RealtimeSizeBasedDefaultBatchSettings, UriSerde},
13 },
14};
15use http::{Request, StatusCode, Uri};
16use hyper::Body;
17use std::fmt;
18use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
19
20#[configurable_component]
26#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq, Hash)]
27#[serde(rename_all = "snake_case")]
28#[derivative(Default)]
29#[allow(clippy::enum_variant_names)]
30pub enum Format {
31 #[derivative(Default)]
32 JsonEachRow,
34
35 JsonAsObject,
37
38 JsonAsString,
40}
41
42impl fmt::Display for Format {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 match self {
45 Format::JsonEachRow => write!(f, "JSONEachRow"),
46 Format::JsonAsObject => write!(f, "JSONAsObject"),
47 Format::JsonAsString => write!(f, "JSONAsString"),
48 }
49 }
50}
51
52#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
54#[derive(Clone, Debug, Default)]
55#[serde(deny_unknown_fields)]
56pub struct ClickhouseConfig {
57 #[serde(alias = "host")]
59 #[configurable(metadata(docs::examples = "http://localhost:8123"))]
60 pub endpoint: UriSerde,
61
62 #[configurable(metadata(docs::examples = "mytable"))]
64 pub table: Template,
65
66 #[configurable(metadata(docs::examples = "mydatabase"))]
68 pub database: Option<Template>,
69
70 #[serde(default)]
72 pub format: Format,
73
74 #[serde(default)]
78 pub skip_unknown_fields: Option<bool>,
79
80 #[serde(default)]
82 pub date_time_best_effort: bool,
83
84 #[serde(default)]
86 pub insert_random_shard: bool,
87
88 #[configurable(derived)]
89 #[serde(default = "Compression::gzip_default")]
90 pub compression: Compression,
91
92 #[configurable(derived)]
93 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
94 pub encoding: Transformer,
95
96 #[configurable(derived)]
97 #[serde(default)]
98 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
99
100 #[configurable(derived)]
101 pub auth: Option<Auth>,
102
103 #[configurable(derived)]
104 #[serde(default)]
105 pub request: TowerRequestConfig,
106
107 #[configurable(derived)]
108 pub tls: Option<TlsConfig>,
109
110 #[configurable(derived)]
111 #[serde(
112 default,
113 deserialize_with = "crate::serde::bool_or_struct",
114 skip_serializing_if = "crate::serde::is_default"
115 )]
116 pub acknowledgements: AcknowledgementsConfig,
117
118 #[configurable(derived)]
119 #[serde(default)]
120 pub query_settings: QuerySettingsConfig,
121}
122
123#[configurable_component]
125#[derive(Clone, Copy, Debug, Default)]
126#[serde(deny_unknown_fields)]
127pub struct QuerySettingsConfig {
128 #[serde(default)]
130 pub async_insert_settings: AsyncInsertSettingsConfig,
131}
132
133#[configurable_component]
135#[derive(Clone, Copy, Debug, Default)]
136#[serde(deny_unknown_fields)]
137pub struct AsyncInsertSettingsConfig {
138 #[serde(default)]
142 pub enabled: Option<bool>,
143
144 #[serde(default)]
148 pub wait_for_processing: Option<bool>,
149
150 #[serde(default)]
154 pub wait_for_processing_timeout: Option<u64>,
155
156 #[serde(default)]
160 pub deduplicate: Option<bool>,
161
162 #[serde(default)]
166 pub max_data_size: Option<u64>,
167
168 #[serde(default)]
172 pub max_query_number: Option<u64>,
173}
174
175impl_generate_config_from_default!(ClickhouseConfig);
176
177#[async_trait::async_trait]
178#[typetag::serde(name = "clickhouse")]
179impl SinkConfig for ClickhouseConfig {
180 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
181 let endpoint = self.endpoint.with_default_parts().uri;
182
183 let auth = self.auth.choose_one(&self.endpoint.auth)?;
184
185 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
186
187 let client = HttpClient::new(tls_settings, &cx.proxy)?;
188
189 let clickhouse_service_request_builder = ClickhouseServiceRequestBuilder {
190 auth: auth.clone(),
191 endpoint: endpoint.clone(),
192 skip_unknown_fields: self.skip_unknown_fields,
193 date_time_best_effort: self.date_time_best_effort,
194 insert_random_shard: self.insert_random_shard,
195 compression: self.compression,
196 query_settings: self.query_settings,
197 };
198
199 let service: HttpService<ClickhouseServiceRequestBuilder, PartitionKey> =
200 HttpService::new(client.clone(), clickhouse_service_request_builder);
201
202 let request_limits = self.request.into_settings();
203
204 let service = ServiceBuilder::new()
205 .settings(request_limits, ClickhouseRetryLogic::default())
206 .service(service);
207
208 let batch_settings = self.batch.into_batcher_settings()?;
209
210 let database = self.database.clone().unwrap_or_else(|| {
211 "default"
212 .try_into()
213 .expect("'default' should be a valid template")
214 });
215
216 let request_builder = ClickhouseRequestBuilder {
217 compression: self.compression,
218 encoding: (
219 self.encoding.clone(),
220 Encoder::<Framer>::new(
221 NewlineDelimitedEncoderConfig.build().into(),
222 JsonSerializerConfig::default().build().into(),
223 ),
224 ),
225 };
226
227 let sink = ClickhouseSink::new(
228 batch_settings,
229 service,
230 database,
231 self.table.clone(),
232 self.format,
233 request_builder,
234 );
235
236 let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
237
238 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
239 }
240
241 fn input(&self) -> Input {
242 Input::log()
243 }
244
245 fn acknowledgements(&self) -> &AcknowledgementsConfig {
246 &self.acknowledgements
247 }
248}
249
250fn get_healthcheck_uri(endpoint: &Uri) -> String {
251 let mut uri = endpoint.to_string();
252 if !uri.ends_with('/') {
253 uri.push('/');
254 }
255 uri.push_str("?query=SELECT%201");
256 uri
257}
258
259async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
260 let uri = get_healthcheck_uri(&endpoint);
261 let mut request = Request::get(uri).body(Body::empty()).unwrap();
262
263 if let Some(auth) = auth {
264 auth.apply(&mut request);
265 }
266
267 let response = client.send(request).await?;
268
269 match response.status() {
270 StatusCode::OK => Ok(()),
271 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278
279 #[test]
280 fn generate_config() {
281 crate::test_util::test_generate_config::<ClickhouseConfig>();
282 }
283
284 #[test]
285 fn test_get_healthcheck_uri() {
286 assert_eq!(
287 get_healthcheck_uri(&"http://localhost:8123".parse().unwrap()),
288 "http://localhost:8123/?query=SELECT%201"
289 );
290 assert_eq!(
291 get_healthcheck_uri(&"http://localhost:8123/".parse().unwrap()),
292 "http://localhost:8123/?query=SELECT%201"
293 );
294 assert_eq!(
295 get_healthcheck_uri(&"http://localhost:8123/path/".parse().unwrap()),
296 "http://localhost:8123/path/?query=SELECT%201"
297 );
298 }
299}