vector/sinks/aws_s_s/
sink.rs1use super::{client::Client, request_builder::SSRequestBuilder, service::SSService};
2use crate::sinks::aws_s_s::retry::SSRetryLogic;
3use crate::sinks::prelude::*;
4
5#[derive(Clone)]
6pub(super) struct SSSink<C, E>
7where
8 C: Client<E> + Clone + Send + Sync + 'static,
9 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
10{
11 request_builder: SSRequestBuilder,
12 service: SSService<C, E>,
13 request: TowerRequestConfig,
14}
15
16impl<C, E> SSSink<C, E>
17where
18 C: Client<E> + Clone + Send + Sync + 'static,
19 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
20{
21 pub(super) fn new(
22 request_builder: SSRequestBuilder,
23 request: TowerRequestConfig,
24 publisher: C,
25 ) -> crate::Result<Self> {
26 Ok(SSSink {
27 request_builder,
28 service: SSService::new(publisher),
29 request,
30 })
31 }
32
33 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
34 let request = self.request.into_settings();
35 let retry_logic: SSRetryLogic<E> = super::retry::SSRetryLogic::new();
36 let service = tower::ServiceBuilder::new()
37 .settings(request, retry_logic)
38 .service(self.service);
39
40 input
41 .request_builder(
42 default_request_builder_concurrency_limit(),
43 self.request_builder,
44 )
45 .filter_map(|req| async move {
46 req.map_err(|error| {
47 emit!(SinkRequestBuildError { error });
48 })
49 .ok()
50 })
51 .into_driver(service)
52 .run()
53 .await
54 }
55}
56
57#[async_trait::async_trait]
58impl<C, E> StreamSink<Event> for SSSink<C, E>
59where
60 C: Client<E> + Clone + Send + Sync + 'static,
61 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
62{
63 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
64 self.run_inner(input).await
65 }
66}