vector/sinks/aws_s_s/
sink.rs1use super::{client::Client, request_builder::SSRequestBuilder, service::SSService};
2use crate::sinks::{aws_s_s::retry::SSRetryLogic, prelude::*};
3
4#[derive(Clone)]
5pub(super) struct SSSink<C, E>
6where
7 C: Client<E> + Clone + Send + Sync + 'static,
8 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
9{
10 request_builder: SSRequestBuilder,
11 service: SSService<C, E>,
12 request: TowerRequestConfig,
13}
14
15impl<C, E> SSSink<C, E>
16where
17 C: Client<E> + Clone + Send + Sync + 'static,
18 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
19{
20 pub(super) fn new(
21 request_builder: SSRequestBuilder,
22 request: TowerRequestConfig,
23 publisher: C,
24 ) -> crate::Result<Self> {
25 Ok(SSSink {
26 request_builder,
27 service: SSService::new(publisher),
28 request,
29 })
30 }
31
32 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
33 let request = self.request.into_settings();
34 let retry_logic: SSRetryLogic<E> = super::retry::SSRetryLogic::new();
35 let service = tower::ServiceBuilder::new()
36 .settings(request, retry_logic)
37 .service(self.service);
38
39 input
40 .request_builder(
41 default_request_builder_concurrency_limit(),
42 self.request_builder,
43 )
44 .filter_map(|req| async move {
45 req.map_err(|error| {
46 emit!(SinkRequestBuildError { error });
47 })
48 .ok()
49 })
50 .into_driver(service)
51 .run()
52 .await
53 }
54}
55
56#[async_trait::async_trait]
57impl<C, E> StreamSink<Event> for SSSink<C, E>
58where
59 C: Client<E> + Clone + Send + Sync + 'static,
60 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
61{
62 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
63 self.run_inner(input).await
64 }
65}