vector/sinks/aws_s_s/
sink.rs

1use super::{client::Client, request_builder::SSRequestBuilder, service::SSService};
2use crate::sinks::aws_s_s::retry::SSRetryLogic;
3use crate::sinks::prelude::*;
4
5#[derive(Clone)]
6pub(super) struct SSSink<C, E>
7where
8    C: Client<E> + Clone + Send + Sync + 'static,
9    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
10{
11    request_builder: SSRequestBuilder,
12    service: SSService<C, E>,
13    request: TowerRequestConfig,
14}
15
16impl<C, E> SSSink<C, E>
17where
18    C: Client<E> + Clone + Send + Sync + 'static,
19    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
20{
21    pub(super) fn new(
22        request_builder: SSRequestBuilder,
23        request: TowerRequestConfig,
24        publisher: C,
25    ) -> crate::Result<Self> {
26        Ok(SSSink {
27            request_builder,
28            service: SSService::new(publisher),
29            request,
30        })
31    }
32
33    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
34        let request = self.request.into_settings();
35        let retry_logic: SSRetryLogic<E> = super::retry::SSRetryLogic::new();
36        let service = tower::ServiceBuilder::new()
37            .settings(request, retry_logic)
38            .service(self.service);
39
40        input
41            .request_builder(
42                default_request_builder_concurrency_limit(),
43                self.request_builder,
44            )
45            .filter_map(|req| async move {
46                req.map_err(|error| {
47                    emit!(SinkRequestBuildError { error });
48                })
49                .ok()
50            })
51            .into_driver(service)
52            .run()
53            .await
54    }
55}
56
57#[async_trait::async_trait]
58impl<C, E> StreamSink<Event> for SSSink<C, E>
59where
60    C: Client<E> + Clone + Send + Sync + 'static,
61    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
62{
63    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
64        self.run_inner(input).await
65    }
66}