vector/sinks/aws_s_s/
service.rs1use std::{
2 marker::PhantomData,
3 task::{Context, Poll},
4};
5
6use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
7use futures::future::BoxFuture;
8use tower::Service;
9use vector_lib::{
10 ByteSizeOf, event::EventStatus, request_metadata::GroupedCountByteSize, stream::DriverResponse,
11};
12
13use super::{client::Client, request_builder::SendMessageEntry};
14
15pub(super) struct SSService<C, E>
16where
17 C: Client<E> + Clone + Send + Sync + 'static,
18 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
19{
20 client: C,
21 _phantom: PhantomData<fn() -> E>,
22}
23
24impl<C, E> SSService<C, E>
25where
26 C: Client<E> + Clone + Send + Sync + 'static,
27 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
28{
29 pub(super) const fn new(client: C) -> Self {
30 Self {
31 client,
32 _phantom: PhantomData,
33 }
34 }
35}
36
37impl<C, E> Clone for SSService<C, E>
38where
39 C: Client<E> + Clone + Send + Sync + 'static,
40 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
41{
42 fn clone(&self) -> SSService<C, E> {
43 SSService {
44 client: self.client.clone(),
45 _phantom: PhantomData,
46 }
47 }
48}
49
50impl<C, E> Service<SendMessageEntry> for SSService<C, E>
51where
52 C: Client<E> + Clone + Send + Sync + 'static,
53 E: std::fmt::Debug + std::fmt::Display + std::error::Error + Sync + Send + 'static,
54{
55 type Response = SendMessageResponse;
56 type Error = SdkError<E, HttpResponse>;
57 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
58
59 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
61 Poll::Ready(Ok(()))
62 }
63
64 fn call(&mut self, entry: SendMessageEntry) -> Self::Future {
66 let byte_size = entry.size_of();
67 let client = self.client.clone();
68
69 Box::pin(async move { client.send_message(entry, byte_size).await })
70 }
71}
72
73pub(super) struct SendMessageResponse {
74 pub(crate) byte_size: usize,
75 pub(crate) json_byte_size: GroupedCountByteSize,
76}
77
78impl DriverResponse for SendMessageResponse {
79 fn event_status(&self) -> EventStatus {
80 EventStatus::Delivered
81 }
82
83 fn events_sent(&self) -> &GroupedCountByteSize {
84 &self.json_byte_size
85 }
86
87 fn bytes_sent(&self) -> Option<usize> {
88 Some(self.byte_size)
89 }
90}