vector/sinks/aws_s_s/
service.rs1use std::marker::PhantomData;
2use std::task::{Context, Poll};
3
4use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
5use aws_smithy_runtime_api::client::result::SdkError;
6use futures::future::BoxFuture;
7use tower::Service;
8use vector_lib::request_metadata::GroupedCountByteSize;
9use vector_lib::stream::DriverResponse;
10use vector_lib::{event::EventStatus, ByteSizeOf};
11
12use super::{client::Client, request_builder::SendMessageEntry};
13
14pub(super) struct SSService<C, E>
15where
16 C: Client<E> + Clone + Send + Sync + 'static,
17 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
18{
19 client: C,
20 _phantom: PhantomData<fn() -> E>,
21}
22
23impl<C, E> SSService<C, E>
24where
25 C: Client<E> + Clone + Send + Sync + 'static,
26 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
27{
28 pub(super) const fn new(client: C) -> Self {
29 Self {
30 client,
31 _phantom: PhantomData,
32 }
33 }
34}
35
36impl<C, E> Clone for SSService<C, E>
37where
38 C: Client<E> + Clone + Send + Sync + 'static,
39 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
40{
41 fn clone(&self) -> SSService<C, E> {
42 SSService {
43 client: self.client.clone(),
44 _phantom: PhantomData,
45 }
46 }
47}
48
49impl<C, E> Service<SendMessageEntry> for SSService<C, E>
50where
51 C: Client<E> + Clone + Send + Sync + 'static,
52 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
53{
54 type Response = SendMessageResponse;
55 type Error = SdkError<E, HttpResponse>;
56 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
57
58 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
60 Poll::Ready(Ok(()))
61 }
62
63 fn call(&mut self, entry: SendMessageEntry) -> Self::Future {
65 let byte_size = entry.size_of();
66 let client = self.client.clone();
67
68 Box::pin(async move { client.send_message(entry, byte_size).await })
69 }
70}
71
72pub(super) struct SendMessageResponse {
73 pub(crate) byte_size: usize,
74 pub(crate) json_byte_size: GroupedCountByteSize,
75}
76
77impl DriverResponse for SendMessageResponse {
78 fn event_status(&self) -> EventStatus {
79 EventStatus::Delivered
80 }
81
82 fn events_sent(&self) -> &GroupedCountByteSize {
83 &self.json_byte_size
84 }
85
86 fn bytes_sent(&self) -> Option<usize> {
87 Some(self.byte_size)
88 }
89}