vector/sinks/aws_s_s/
service.rs

1use std::marker::PhantomData;
2use std::task::{Context, Poll};
3
4use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
5use aws_smithy_runtime_api::client::result::SdkError;
6use futures::future::BoxFuture;
7use tower::Service;
8use vector_lib::request_metadata::GroupedCountByteSize;
9use vector_lib::stream::DriverResponse;
10use vector_lib::{event::EventStatus, ByteSizeOf};
11
12use super::{client::Client, request_builder::SendMessageEntry};
13
14pub(super) struct SSService<C, E>
15where
16    C: Client<E> + Clone + Send + Sync + 'static,
17    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
18{
19    client: C,
20    _phantom: PhantomData<fn() -> E>,
21}
22
23impl<C, E> SSService<C, E>
24where
25    C: Client<E> + Clone + Send + Sync + 'static,
26    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
27{
28    pub(super) const fn new(client: C) -> Self {
29        Self {
30            client,
31            _phantom: PhantomData,
32        }
33    }
34}
35
36impl<C, E> Clone for SSService<C, E>
37where
38    C: Client<E> + Clone + Send + Sync + 'static,
39    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
40{
41    fn clone(&self) -> SSService<C, E> {
42        SSService {
43            client: self.client.clone(),
44            _phantom: PhantomData,
45        }
46    }
47}
48
49impl<C, E> Service<SendMessageEntry> for SSService<C, E>
50where
51    C: Client<E> + Clone + Send + Sync + 'static,
52    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
53{
54    type Response = SendMessageResponse;
55    type Error = SdkError<E, HttpResponse>;
56    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
57
58    // Emission of an internal event in case of errors is handled upstream by the caller.
59    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
60        Poll::Ready(Ok(()))
61    }
62
63    // Emission of internal events for errors and dropped events is handled upstream by the caller.
64    fn call(&mut self, entry: SendMessageEntry) -> Self::Future {
65        let byte_size = entry.size_of();
66        let client = self.client.clone();
67
68        Box::pin(async move { client.send_message(entry, byte_size).await })
69    }
70}
71
72pub(super) struct SendMessageResponse {
73    pub(crate) byte_size: usize,
74    pub(crate) json_byte_size: GroupedCountByteSize,
75}
76
77impl DriverResponse for SendMessageResponse {
78    fn event_status(&self) -> EventStatus {
79        EventStatus::Delivered
80    }
81
82    fn events_sent(&self) -> &GroupedCountByteSize {
83        &self.json_byte_size
84    }
85
86    fn bytes_sent(&self) -> Option<usize> {
87        Some(self.byte_size)
88    }
89}