vector/sinks/clickhouse/
sink.rs

1//! Implementation of the `clickhouse` sink.
2
3use super::{config::Format, request_builder::ClickhouseRequestBuilder};
4use crate::sinks::{prelude::*, util::http::HttpRequest};
5
6pub struct ClickhouseSink<S> {
7    batch_settings: BatcherSettings,
8    service: S,
9    database: Template,
10    table: Template,
11    format: Format,
12    request_builder: ClickhouseRequestBuilder,
13}
14
15impl<S> ClickhouseSink<S>
16where
17    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
18    S::Future: Send + 'static,
19    S::Response: DriverResponse + Send + 'static,
20    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
21{
22    pub const fn new(
23        batch_settings: BatcherSettings,
24        service: S,
25        database: Template,
26        table: Template,
27        format: Format,
28        request_builder: ClickhouseRequestBuilder,
29    ) -> Self {
30        Self {
31            batch_settings,
32            service,
33            database,
34            table,
35            format,
36            request_builder,
37        }
38    }
39
40    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41        let batch_settings = self.batch_settings;
42
43        input
44            .batched_partitioned(
45                KeyPartitioner::new(self.database, self.table, self.format),
46                batch_settings.timeout,
47                |_| batch_settings.as_byte_size_config(),
48            )
49            .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
50            .request_builder(
51                default_request_builder_concurrency_limit(),
52                self.request_builder,
53            )
54            .filter_map(|request| async {
55                match request {
56                    Err(error) => {
57                        emit!(SinkRequestBuildError { error });
58                        None
59                    }
60                    Ok(req) => Some(req),
61                }
62            })
63            .into_driver(self.service)
64            .run()
65            .await
66    }
67}
68
69#[async_trait::async_trait]
70impl<S> StreamSink<Event> for ClickhouseSink<S>
71where
72    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
73    S::Future: Send + 'static,
74    S::Response: DriverResponse + Send + 'static,
75    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
76{
77    async fn run(
78        self: Box<Self>,
79        input: futures_util::stream::BoxStream<'_, Event>,
80    ) -> Result<(), ()> {
81        self.run_inner(input).await
82    }
83}
84
85/// PartitionKey used to partition events by (database, table) pair.
86#[derive(Hash, Eq, PartialEq, Clone, Debug)]
87pub struct PartitionKey {
88    pub database: String,
89    pub table: String,
90    pub format: Format,
91}
92
93/// KeyPartitioner that partitions events by (database, table) pair.
94struct KeyPartitioner {
95    database: Template,
96    table: Template,
97    format: Format,
98}
99
100impl KeyPartitioner {
101    const fn new(database: Template, table: Template, format: Format) -> Self {
102        Self {
103            database,
104            table,
105            format,
106        }
107    }
108
109    fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
110        template
111            .render_string(item)
112            .map_err(|error| {
113                emit!(TemplateRenderingError {
114                    error,
115                    field: Some(field),
116                    drop_event: true,
117                });
118            })
119            .ok()
120    }
121}
122
123impl Partitioner for KeyPartitioner {
124    type Item = Event;
125    type Key = Option<PartitionKey>;
126
127    fn partition(&self, item: &Self::Item) -> Self::Key {
128        let database = Self::render(&self.database, item, "database_key")?;
129        let table = Self::render(&self.table, item, "table_key")?;
130        Some(PartitionKey {
131            database,
132            table,
133            format: self.format,
134        })
135    }
136}