vector/sinks/gcp_chronicle/
sink.rs

1use std::fmt;
2
3use crate::sinks::{
4    gcp_chronicle::partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
5    prelude::*,
6};
7
8pub struct ChronicleSink<Svc, RB> {
9    service: Svc,
10    request_builder: RB,
11    partitioner: ChroniclePartitioner,
12    batcher_settings: BatcherSettings,
13    protocol: &'static str,
14}
15
16impl<Svc, RB> ChronicleSink<Svc, RB> {
17    pub const fn new(
18        service: Svc,
19        request_builder: RB,
20        partitioner: ChroniclePartitioner,
21        batcher_settings: BatcherSettings,
22        protocol: &'static str,
23    ) -> Self {
24        Self {
25            service,
26            request_builder,
27            partitioner,
28            batcher_settings,
29            protocol,
30        }
31    }
32}
33
34impl<Svc, RB> ChronicleSink<Svc, RB>
35where
36    Svc: Service<RB::Request> + Send + 'static,
37    Svc::Future: Send + 'static,
38    Svc::Response: DriverResponse + Send + 'static,
39    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
40    RB: RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> + Send + Sync + 'static,
41    RB::Error: fmt::Display + Send,
42    RB::Request: Finalizable + MetaDescriptive + Send,
43{
44    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
45        let partitioner = self.partitioner;
46        let settings = self.batcher_settings;
47
48        let request_builder = self.request_builder;
49
50        input
51            .batched_partitioned(partitioner, || settings.as_byte_size_config())
52            .filter_map(|(key, batch)| async move {
53                // A `TemplateRenderingError` will have been emitted by `KeyPartitioner` if the key here is `None`,
54                // thus no further `EventsDropped` event needs emitting at this stage.
55                key.map(move |k| (k, batch))
56            })
57            .request_builder(default_request_builder_concurrency_limit(), request_builder)
58            .filter_map(|request| async move {
59                match request {
60                    Err(error) => {
61                        emit!(SinkRequestBuildError { error });
62                        None
63                    }
64                    Ok(req) => Some(req),
65                }
66            })
67            .into_driver(self.service)
68            .protocol(self.protocol)
69            .run()
70            .await
71    }
72}
73
74#[async_trait]
75impl<Svc, RB> StreamSink<Event> for ChronicleSink<Svc, RB>
76where
77    Svc: Service<RB::Request> + Send + 'static,
78    Svc::Future: Send + 'static,
79    Svc::Response: DriverResponse + Send + 'static,
80    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
81    RB: RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> + Send + Sync + 'static,
82    RB::Error: fmt::Display + Send,
83    RB::Request: Finalizable + MetaDescriptive + Send,
84{
85    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
86        self.run_inner(input).await
87    }
88}