vector/sinks/aws_kinesis/
service.rs

1use std::{
2    marker::PhantomData,
3    task::{Context, Poll},
4};
5
6use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
7use aws_types::region::Region;
8
9use super::{
10    record::{Record, SendRecord},
11    sink::BatchKinesisRequest,
12};
13use crate::{event::EventStatus, sinks::prelude::*};
14
15pub struct KinesisService<C, T, E> {
16    pub client: C,
17    pub stream_name: String,
18    pub region: Option<Region>,
19    pub _phantom_t: PhantomData<T>,
20    pub _phantom_e: PhantomData<E>,
21}
22
23impl<C, T, E> Clone for KinesisService<C, T, E>
24where
25    C: Clone,
26{
27    fn clone(&self) -> Self {
28        Self {
29            client: self.client.clone(),
30            stream_name: self.stream_name.clone(),
31            region: self.region.clone(),
32            _phantom_e: self._phantom_e,
33            _phantom_t: self._phantom_t,
34        }
35    }
36}
37
38pub struct KinesisResponse {
39    pub(crate) failure_count: usize,
40    pub(crate) events_byte_size: GroupedCountByteSize,
41    #[cfg(feature = "sinks-aws_kinesis_streams")]
42    /// Track individual failed records for retry logic (Streams only)
43    pub(crate) failed_records: Vec<RecordResult>,
44}
45
46#[derive(Clone)]
47pub struct RecordResult {
48    pub index: usize, // Original position in batch
49    pub success: bool,
50    pub error_code: Option<String>,
51    pub error_message: Option<String>,
52}
53
54impl DriverResponse for KinesisResponse {
55    fn event_status(&self) -> EventStatus {
56        EventStatus::Delivered
57    }
58
59    fn events_sent(&self) -> &GroupedCountByteSize {
60        &self.events_byte_size
61    }
62}
63
64impl<R, C, T, E> Service<BatchKinesisRequest<R>> for KinesisService<C, T, E>
65where
66    R: Record<T = T> + Clone,
67    C: SendRecord + Clone + Sync + Send + 'static,
68    Vec<<C as SendRecord>::T>: FromIterator<T>,
69    <C as SendRecord>::T: Send,
70{
71    type Response = KinesisResponse;
72    type Error = SdkError<<C as SendRecord>::E, HttpResponse>;
73    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
74
75    // Emission of an internal event in case of errors is handled upstream by the caller.
76    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
77        Poll::Ready(Ok(()))
78    }
79
80    // Emission of internal events for errors and dropped events is handled upstream by the caller.
81    fn call(&mut self, mut requests: BatchKinesisRequest<R>) -> Self::Future {
82        let metadata = std::mem::take(requests.metadata_mut());
83        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
84
85        let records = requests
86            .events
87            .into_iter()
88            .map(|req| req.record.get())
89            .collect();
90
91        let client = self.client.clone();
92        let stream_name = self.stream_name.clone();
93
94        Box::pin(async move {
95            client.send(records, stream_name).await.map(|mut r| {
96                // augment the response
97                r.events_byte_size = events_byte_size;
98                r
99            })
100        })
101    }
102}