vector/sinks/aws_s_s/
service.rs

1use std::{
2    marker::PhantomData,
3    task::{Context, Poll},
4};
5
6use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
7use futures::future::BoxFuture;
8use tower::Service;
9use vector_lib::{
10    ByteSizeOf, event::EventStatus, request_metadata::GroupedCountByteSize, stream::DriverResponse,
11};
12
13use super::{client::Client, request_builder::SendMessageEntry};
14
15pub(super) struct SSService<C, E>
16where
17    C: Client<E> + Clone + Send + Sync + 'static,
18    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
19{
20    client: C,
21    _phantom: PhantomData<fn() -> E>,
22}
23
24impl<C, E> SSService<C, E>
25where
26    C: Client<E> + Clone + Send + Sync + 'static,
27    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
28{
29    pub(super) const fn new(client: C) -> Self {
30        Self {
31            client,
32            _phantom: PhantomData,
33        }
34    }
35}
36
37impl<C, E> Clone for SSService<C, E>
38where
39    C: Client<E> + Clone + Send + Sync + 'static,
40    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
41{
42    fn clone(&self) -> SSService<C, E> {
43        SSService {
44            client: self.client.clone(),
45            _phantom: PhantomData,
46        }
47    }
48}
49
50impl<C, E> Service<SendMessageEntry> for SSService<C, E>
51where
52    C: Client<E> + Clone + Send + Sync + 'static,
53    E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
54{
55    type Response = SendMessageResponse;
56    type Error = SdkError<E, HttpResponse>;
57    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
58
59    // Emission of an internal event in case of errors is handled upstream by the caller.
60    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
61        Poll::Ready(Ok(()))
62    }
63
64    // Emission of internal events for errors and dropped events is handled upstream by the caller.
65    fn call(&mut self, entry: SendMessageEntry) -> Self::Future {
66        let byte_size = entry.size_of();
67        let client = self.client.clone();
68
69        Box::pin(async move { client.send_message(entry, byte_size).await })
70    }
71}
72
73pub(super) struct SendMessageResponse {
74    pub(crate) byte_size: usize,
75    pub(crate) json_byte_size: GroupedCountByteSize,
76}
77
78impl DriverResponse for SendMessageResponse {
79    fn event_status(&self) -> EventStatus {
80        EventStatus::Delivered
81    }
82
83    fn events_sent(&self) -> &GroupedCountByteSize {
84        &self.json_byte_size
85    }
86
87    fn bytes_sent(&self) -> Option<usize> {
88        Some(self.byte_size)
89    }
90}