vector/sinks/aws_kinesis/
service.rs1use std::{
2 marker::PhantomData,
3 task::{Context, Poll},
4};
5
6use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
7use aws_types::region::Region;
8
9use super::{
10 record::{Record, SendRecord},
11 sink::BatchKinesisRequest,
12};
13use crate::{event::EventStatus, sinks::prelude::*};
14
15pub struct KinesisService<C, T, E> {
16 pub client: C,
17 pub stream_name: String,
18 pub region: Option<Region>,
19 pub _phantom_t: PhantomData<T>,
20 pub _phantom_e: PhantomData<E>,
21}
22
23impl<C, T, E> Clone for KinesisService<C, T, E>
24where
25 C: Clone,
26{
27 fn clone(&self) -> Self {
28 Self {
29 client: self.client.clone(),
30 stream_name: self.stream_name.clone(),
31 region: self.region.clone(),
32 _phantom_e: self._phantom_e,
33 _phantom_t: self._phantom_t,
34 }
35 }
36}
37
38pub struct KinesisResponse {
39 pub(crate) failure_count: usize,
40 pub(crate) events_byte_size: GroupedCountByteSize,
41}
42
43impl DriverResponse for KinesisResponse {
44 fn event_status(&self) -> EventStatus {
45 EventStatus::Delivered
46 }
47
48 fn events_sent(&self) -> &GroupedCountByteSize {
49 &self.events_byte_size
50 }
51}
52
53impl<R, C, T, E> Service<BatchKinesisRequest<R>> for KinesisService<C, T, E>
54where
55 R: Record<T = T> + Clone,
56 C: SendRecord + Clone + Sync + Send + 'static,
57 Vec<<C as SendRecord>::T>: FromIterator<T>,
58 <C as SendRecord>::T: Send,
59{
60 type Response = KinesisResponse;
61 type Error = SdkError<<C as SendRecord>::E, HttpResponse>;
62 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
63
64 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
66 Poll::Ready(Ok(()))
67 }
68
69 fn call(&mut self, mut requests: BatchKinesisRequest<R>) -> Self::Future {
71 let metadata = std::mem::take(requests.metadata_mut());
72 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
73
74 let records = requests
75 .events
76 .into_iter()
77 .map(|req| req.record.get())
78 .collect();
79
80 let client = self.client.clone();
81 let stream_name = self.stream_name.clone();
82
83 Box::pin(async move {
84 client.send(records, stream_name).await.map(|mut r| {
85 r.events_byte_size = events_byte_size;
87 r
88 })
89 })
90 }
91}