vector/sinks/greptimedb/logs/
sink.rs

1use crate::sinks::{
2    greptimedb::logs::http_request_builder::{
3        GreptimeDBLogsHttpRequestBuilder, KeyPartitioner, PartitionKey,
4    },
5    prelude::*,
6    util::http::HttpRequest,
7};
8
9pub struct LogsSinkSetting {
10    pub dbname: Template,
11    pub table: Template,
12    pub pipeline_name: Template,
13    pub pipeline_version: Option<Template>,
14}
15
16/// A sink that ingests logs into GreptimeDB.
17pub struct GreptimeDBLogsHttpSink<S> {
18    batcher_settings: BatcherSettings,
19    service: S,
20    request_builder: GreptimeDBLogsHttpRequestBuilder,
21    logs_sink_setting: LogsSinkSetting,
22}
23
24impl<S> GreptimeDBLogsHttpSink<S>
25where
26    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
27    S::Future: Send + 'static,
28    S::Response: DriverResponse + Send + 'static,
29    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
30{
31    pub const fn new(
32        batcher_settings: BatcherSettings,
33        service: S,
34        request_builder: GreptimeDBLogsHttpRequestBuilder,
35        logs_sink_setting: LogsSinkSetting,
36    ) -> Self {
37        Self {
38            batcher_settings,
39            service,
40            request_builder,
41            logs_sink_setting,
42        }
43    }
44
45    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
46        let batcher_settings = self.batcher_settings;
47        input
48            .batched_partitioned(
49                KeyPartitioner::new(
50                    self.logs_sink_setting.dbname,
51                    self.logs_sink_setting.table,
52                    self.logs_sink_setting.pipeline_name,
53                    self.logs_sink_setting.pipeline_version,
54                ),
55                || batcher_settings.as_byte_size_config(),
56            )
57            .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
58            .request_builder(
59                default_request_builder_concurrency_limit(),
60                self.request_builder,
61            )
62            .filter_map(|request| async {
63                match request {
64                    Err(error) => {
65                        emit!(SinkRequestBuildError { error });
66                        None
67                    }
68                    Ok(req) => Some(req),
69                }
70            })
71            .into_driver(self.service)
72            .run()
73            .await
74    }
75}
76
77#[async_trait::async_trait]
78impl<S> StreamSink<Event> for GreptimeDBLogsHttpSink<S>
79where
80    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
81    S::Future: Send + 'static,
82    S::Response: DriverResponse + Send + 'static,
83    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
84{
85    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
86        self.run_inner(input).await
87    }
88}