vector/sinks/clickhouse/
sink.rs1use super::{config::Format, request_builder::ClickhouseRequestBuilder};
4use crate::sinks::{prelude::*, util::http::HttpRequest};
5
6pub struct ClickhouseSink<S> {
7 batch_settings: BatcherSettings,
8 service: S,
9 database: Template,
10 table: Template,
11 format: Format,
12 request_builder: ClickhouseRequestBuilder,
13}
14
15impl<S> ClickhouseSink<S>
16where
17 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
18 S::Future: Send + 'static,
19 S::Response: DriverResponse + Send + 'static,
20 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
21{
22 pub const fn new(
23 batch_settings: BatcherSettings,
24 service: S,
25 database: Template,
26 table: Template,
27 format: Format,
28 request_builder: ClickhouseRequestBuilder,
29 ) -> Self {
30 Self {
31 batch_settings,
32 service,
33 database,
34 table,
35 format,
36 request_builder,
37 }
38 }
39
40 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41 let batch_settings = self.batch_settings;
42
43 input
44 .batched_partitioned(
45 KeyPartitioner::new(self.database, self.table, self.format),
46 || batch_settings.as_byte_size_config(),
47 )
48 .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
49 .request_builder(
50 default_request_builder_concurrency_limit(),
51 self.request_builder,
52 )
53 .filter_map(|request| async {
54 match request {
55 Err(error) => {
56 emit!(SinkRequestBuildError { error });
57 None
58 }
59 Ok(req) => Some(req),
60 }
61 })
62 .into_driver(self.service)
63 .run()
64 .await
65 }
66}
67
68#[async_trait::async_trait]
69impl<S> StreamSink<Event> for ClickhouseSink<S>
70where
71 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
72 S::Future: Send + 'static,
73 S::Response: DriverResponse + Send + 'static,
74 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
75{
76 async fn run(
77 self: Box<Self>,
78 input: futures_util::stream::BoxStream<'_, Event>,
79 ) -> Result<(), ()> {
80 self.run_inner(input).await
81 }
82}
83
84#[derive(Hash, Eq, PartialEq, Clone, Debug)]
86pub struct PartitionKey {
87 pub database: String,
88 pub table: String,
89 pub format: Format,
90}
91
92struct KeyPartitioner {
94 database: Template,
95 table: Template,
96 format: Format,
97}
98
99impl KeyPartitioner {
100 const fn new(database: Template, table: Template, format: Format) -> Self {
101 Self {
102 database,
103 table,
104 format,
105 }
106 }
107
108 fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
109 template
110 .render_string(item)
111 .map_err(|error| {
112 emit!(TemplateRenderingError {
113 error,
114 field: Some(field),
115 drop_event: true,
116 });
117 })
118 .ok()
119 }
120}
121
122impl Partitioner for KeyPartitioner {
123 type Item = Event;
124 type Key = Option<PartitionKey>;
125
126 fn partition(&self, item: &Self::Item) -> Self::Key {
127 let database = Self::render(&self.database, item, "database_key")?;
128 let table = Self::render(&self.table, item, "table_key")?;
129 Some(PartitionKey {
130 database,
131 table,
132 format: self.format,
133 })
134 }
135}