vector/sinks/gcp_chronicle/
sink.rs

1use std::fmt;
2
3use crate::sinks::{
4    gcp_chronicle::partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
5    prelude::*,
6};
7
8pub struct ChronicleSink<Svc, RB> {
9    service: Svc,
10    request_builder: RB,
11    partitioner: ChroniclePartitioner,
12    batcher_settings: BatcherSettings,
13    protocol: &'static str,
14}
15
16impl<Svc, RB> ChronicleSink<Svc, RB> {
17    pub const fn new(
18        service: Svc,
19        request_builder: RB,
20        partitioner: ChroniclePartitioner,
21        batcher_settings: BatcherSettings,
22        protocol: &'static str,
23    ) -> Self {
24        Self {
25            service,
26            request_builder,
27            partitioner,
28            batcher_settings,
29            protocol,
30        }
31    }
32}
33
34impl<Svc, RB> ChronicleSink<Svc, RB>
35where
36    Svc: Service<RB::Request> + Send + 'static,
37    Svc::Future: Send + 'static,
38    Svc::Response: DriverResponse + Send + 'static,
39    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
40    RB: RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> + Send + Sync + 'static,
41    RB::Error: fmt::Display + Send,
42    RB::Request: Finalizable + MetaDescriptive + Send,
43{
44    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
45        let partitioner = self.partitioner;
46        let settings = self.batcher_settings;
47
48        let request_builder = self.request_builder;
49
50        input
51            .batched_partitioned(partitioner, settings.timeout, |_| {
52                settings.as_byte_size_config()
53            })
54            .filter_map(|(key, batch)| async move {
55                // A `TemplateRenderingError` will have been emitted by `KeyPartitioner` if the key here is `None`,
56                // thus no further `EventsDropped` event needs emitting at this stage.
57                key.map(move |k| (k, batch))
58            })
59            .request_builder(default_request_builder_concurrency_limit(), request_builder)
60            .filter_map(|request| async move {
61                match request {
62                    Err(error) => {
63                        emit!(SinkRequestBuildError { error });
64                        None
65                    }
66                    Ok(req) => Some(req),
67                }
68            })
69            .into_driver(self.service)
70            .protocol(self.protocol)
71            .run()
72            .await
73    }
74}
75
76#[async_trait]
77impl<Svc, RB> StreamSink<Event> for ChronicleSink<Svc, RB>
78where
79    Svc: Service<RB::Request> + Send + 'static,
80    Svc::Future: Send + 'static,
81    Svc::Response: DriverResponse + Send + 'static,
82    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
83    RB: RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> + Send + Sync + 'static,
84    RB::Error: fmt::Display + Send,
85    RB::Request: Finalizable + MetaDescriptive + Send,
86{
87    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
88        self.run_inner(input).await
89    }
90}