vector/sinks/aws_s_s/
sink.rs

1use super::{client::Client, request_builder::SSRequestBuilder, service::SSService};
2use crate::sinks::{aws_s_s::retry::SSRetryLogic, prelude::*};
3
4#[derive(Clone)]
5pub(super) struct SSSink<C, E>
6where
7    C: Client<E> + Clone + Send + Sync + 'static,
8    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
9{
10    request_builder: SSRequestBuilder,
11    service: SSService<C, E>,
12    request: TowerRequestConfig,
13}
14
15impl<C, E> SSSink<C, E>
16where
17    C: Client<E> + Clone + Send + Sync + 'static,
18    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
19{
20    pub(super) fn new(
21        request_builder: SSRequestBuilder,
22        request: TowerRequestConfig,
23        publisher: C,
24    ) -> crate::Result<Self> {
25        Ok(SSSink {
26            request_builder,
27            service: SSService::new(publisher),
28            request,
29        })
30    }
31
32    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
33        let request = self.request.into_settings();
34        let retry_logic: SSRetryLogic<E> = super::retry::SSRetryLogic::new();
35        let service = tower::ServiceBuilder::new()
36            .settings(request, retry_logic)
37            .service(self.service);
38
39        input
40            .request_builder(
41                default_request_builder_concurrency_limit(),
42                self.request_builder,
43            )
44            .filter_map(|req| async move {
45                req.map_err(|error| {
46                    emit!(SinkRequestBuildError { error });
47                })
48                .ok()
49            })
50            .into_driver(service)
51            .run()
52            .await
53    }
54}
55
56#[async_trait::async_trait]
57impl<C, E> StreamSink<Event> for SSSink<C, E>
58where
59    C: Client<E> + Clone + Send + Sync + 'static,
60    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
61{
62    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
63        self.run_inner(input).await
64    }
65}