vector/sinks/clickhouse/
sink.rs1use super::{config::Format, request_builder::ClickhouseRequestBuilder};
4use crate::sinks::{prelude::*, util::http::HttpRequest};
5
6pub struct ClickhouseSink<S> {
7 batch_settings: BatcherSettings,
8 service: S,
9 database: Template,
10 table: Template,
11 format: Format,
12 request_builder: ClickhouseRequestBuilder,
13}
14
15impl<S> ClickhouseSink<S>
16where
17 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
18 S::Future: Send + 'static,
19 S::Response: DriverResponse + Send + 'static,
20 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
21{
22 pub const fn new(
23 batch_settings: BatcherSettings,
24 service: S,
25 database: Template,
26 table: Template,
27 format: Format,
28 request_builder: ClickhouseRequestBuilder,
29 ) -> Self {
30 Self {
31 batch_settings,
32 service,
33 database,
34 table,
35 format,
36 request_builder,
37 }
38 }
39
40 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41 let batch_settings = self.batch_settings;
42
43 input
44 .batched_partitioned(
45 KeyPartitioner::new(self.database, self.table, self.format),
46 batch_settings.timeout,
47 |_| batch_settings.as_byte_size_config(),
48 )
49 .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
50 .request_builder(
51 default_request_builder_concurrency_limit(),
52 self.request_builder,
53 )
54 .filter_map(|request| async {
55 match request {
56 Err(error) => {
57 emit!(SinkRequestBuildError { error });
58 None
59 }
60 Ok(req) => Some(req),
61 }
62 })
63 .into_driver(self.service)
64 .run()
65 .await
66 }
67}
68
69#[async_trait::async_trait]
70impl<S> StreamSink<Event> for ClickhouseSink<S>
71where
72 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
73 S::Future: Send + 'static,
74 S::Response: DriverResponse + Send + 'static,
75 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
76{
77 async fn run(
78 self: Box<Self>,
79 input: futures_util::stream::BoxStream<'_, Event>,
80 ) -> Result<(), ()> {
81 self.run_inner(input).await
82 }
83}
84
85#[derive(Hash, Eq, PartialEq, Clone, Debug)]
87pub struct PartitionKey {
88 pub database: String,
89 pub table: String,
90 pub format: Format,
91}
92
93struct KeyPartitioner {
95 database: Template,
96 table: Template,
97 format: Format,
98}
99
100impl KeyPartitioner {
101 const fn new(database: Template, table: Template, format: Format) -> Self {
102 Self {
103 database,
104 table,
105 format,
106 }
107 }
108
109 fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
110 template
111 .render_string(item)
112 .map_err(|error| {
113 emit!(TemplateRenderingError {
114 error,
115 field: Some(field),
116 drop_event: true,
117 });
118 })
119 .ok()
120 }
121}
122
123impl Partitioner for KeyPartitioner {
124 type Item = Event;
125 type Key = Option<PartitionKey>;
126
127 fn partition(&self, item: &Self::Item) -> Self::Key {
128 let database = Self::render(&self.database, item, "database_key")?;
129 let table = Self::render(&self.table, item, "table_key")?;
130 Some(PartitionKey {
131 database,
132 table,
133 format: self.format,
134 })
135 }
136}