vector/sinks/clickhouse/
sink.rs

1//! Implementation of the `clickhouse` sink.
2
3use super::{config::Format, request_builder::ClickhouseRequestBuilder};
4use crate::sinks::{prelude::*, util::http::HttpRequest};
5
6pub struct ClickhouseSink<S> {
7    batch_settings: BatcherSettings,
8    service: S,
9    database: Template,
10    table: Template,
11    format: Format,
12    request_builder: ClickhouseRequestBuilder,
13}
14
15impl<S> ClickhouseSink<S>
16where
17    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
18    S::Future: Send + 'static,
19    S::Response: DriverResponse + Send + 'static,
20    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
21{
22    pub const fn new(
23        batch_settings: BatcherSettings,
24        service: S,
25        database: Template,
26        table: Template,
27        format: Format,
28        request_builder: ClickhouseRequestBuilder,
29    ) -> Self {
30        Self {
31            batch_settings,
32            service,
33            database,
34            table,
35            format,
36            request_builder,
37        }
38    }
39
40    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41        let batch_settings = self.batch_settings;
42
43        input
44            .batched_partitioned(
45                KeyPartitioner::new(self.database, self.table, self.format),
46                || batch_settings.as_byte_size_config(),
47            )
48            .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
49            .request_builder(
50                default_request_builder_concurrency_limit(),
51                self.request_builder,
52            )
53            .filter_map(|request| async {
54                match request {
55                    Err(error) => {
56                        emit!(SinkRequestBuildError { error });
57                        None
58                    }
59                    Ok(req) => Some(req),
60                }
61            })
62            .into_driver(self.service)
63            .run()
64            .await
65    }
66}
67
68#[async_trait::async_trait]
69impl<S> StreamSink<Event> for ClickhouseSink<S>
70where
71    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
72    S::Future: Send + 'static,
73    S::Response: DriverResponse + Send + 'static,
74    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
75{
76    async fn run(
77        self: Box<Self>,
78        input: futures_util::stream::BoxStream<'_, Event>,
79    ) -> Result<(), ()> {
80        self.run_inner(input).await
81    }
82}
83
84/// PartitionKey used to partition events by (database, table) pair.
85#[derive(Hash, Eq, PartialEq, Clone, Debug)]
86pub struct PartitionKey {
87    pub database: String,
88    pub table: String,
89    pub format: Format,
90}
91
92/// KeyPartitioner that partitions events by (database, table) pair.
93struct KeyPartitioner {
94    database: Template,
95    table: Template,
96    format: Format,
97}
98
99impl KeyPartitioner {
100    const fn new(database: Template, table: Template, format: Format) -> Self {
101        Self {
102            database,
103            table,
104            format,
105        }
106    }
107
108    fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
109        template
110            .render_string(item)
111            .map_err(|error| {
112                emit!(TemplateRenderingError {
113                    error,
114                    field: Some(field),
115                    drop_event: true,
116                });
117            })
118            .ok()
119    }
120}
121
122impl Partitioner for KeyPartitioner {
123    type Item = Event;
124    type Key = Option<PartitionKey>;
125
126    fn partition(&self, item: &Self::Item) -> Self::Key {
127        let database = Self::render(&self.database, item, "database_key")?;
128        let table = Self::render(&self.table, item, "table_key")?;
129        Some(PartitionKey {
130            database,
131            table,
132            format: self.format,
133        })
134    }
135}