vector/sinks/greptimedb/logs/
sink.rs1use crate::sinks::{
2 greptimedb::logs::http_request_builder::{
3 GreptimeDBLogsHttpRequestBuilder, KeyPartitioner, PartitionKey,
4 },
5 prelude::*,
6 util::http::HttpRequest,
7};
8
9pub struct LogsSinkSetting {
10 pub dbname: Template,
11 pub table: Template,
12 pub pipeline_name: Template,
13 pub pipeline_version: Option<Template>,
14}
15
16pub struct GreptimeDBLogsHttpSink<S> {
18 batcher_settings: BatcherSettings,
19 service: S,
20 request_builder: GreptimeDBLogsHttpRequestBuilder,
21 logs_sink_setting: LogsSinkSetting,
22}
23
24impl<S> GreptimeDBLogsHttpSink<S>
25where
26 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
27 S::Future: Send + 'static,
28 S::Response: DriverResponse + Send + 'static,
29 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
30{
31 pub const fn new(
32 batcher_settings: BatcherSettings,
33 service: S,
34 request_builder: GreptimeDBLogsHttpRequestBuilder,
35 logs_sink_setting: LogsSinkSetting,
36 ) -> Self {
37 Self {
38 batcher_settings,
39 service,
40 request_builder,
41 logs_sink_setting,
42 }
43 }
44
45 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
46 let batcher_settings = self.batcher_settings;
47 input
48 .batched_partitioned(
49 KeyPartitioner::new(
50 self.logs_sink_setting.dbname,
51 self.logs_sink_setting.table,
52 self.logs_sink_setting.pipeline_name,
53 self.logs_sink_setting.pipeline_version,
54 ),
55 || batcher_settings.as_byte_size_config(),
56 )
57 .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
58 .request_builder(
59 default_request_builder_concurrency_limit(),
60 self.request_builder,
61 )
62 .filter_map(|request| async {
63 match request {
64 Err(error) => {
65 emit!(SinkRequestBuildError { error });
66 None
67 }
68 Ok(req) => Some(req),
69 }
70 })
71 .into_driver(self.service)
72 .run()
73 .await
74 }
75}
76
77#[async_trait::async_trait]
78impl<S> StreamSink<Event> for GreptimeDBLogsHttpSink<S>
79where
80 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
81 S::Future: Send + 'static,
82 S::Response: DriverResponse + Send + 'static,
83 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
84{
85 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
86 self.run_inner(input).await
87 }
88}