vector/sinks/aws_kinesis/
request_builder.rs

1use std::{io, marker::PhantomData};
2
3use bytes::Bytes;
4use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
5use vector_lib::ByteSizeOf;
6
7use super::{
8    record::Record,
9    sink::{KinesisKey, KinesisProcessedEvent},
10};
11use crate::{
12    codecs::{Encoder, Transformer},
13    event::{Event, EventFinalizers, Finalizable},
14    sinks::util::{
15        metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
16        RequestBuilder,
17    },
18};
19
20#[derive(Clone)]
21pub struct KinesisRequestBuilder<R> {
22    pub compression: Compression,
23    pub encoder: (Transformer, Encoder<()>),
24    pub _phantom: PhantomData<R>,
25}
26
27pub struct KinesisMetadata {
28    pub finalizers: EventFinalizers,
29    pub partition_key: String,
30}
31
32#[derive(Clone)]
33pub struct KinesisRequest<R>
34where
35    R: Record,
36{
37    pub key: KinesisKey,
38    pub record: R,
39    pub finalizers: EventFinalizers,
40    metadata: RequestMetadata,
41}
42
43impl<R> Finalizable for KinesisRequest<R>
44where
45    R: Record,
46{
47    fn take_finalizers(&mut self) -> EventFinalizers {
48        std::mem::take(&mut self.finalizers)
49    }
50}
51
52impl<R> MetaDescriptive for KinesisRequest<R>
53where
54    R: Record,
55{
56    fn get_metadata(&self) -> &RequestMetadata {
57        &self.metadata
58    }
59
60    fn metadata_mut(&mut self) -> &mut RequestMetadata {
61        &mut self.metadata
62    }
63}
64
65impl<R> ByteSizeOf for KinesisRequest<R>
66where
67    R: Record,
68{
69    fn size_of(&self) -> usize {
70        // `ByteSizeOf` is being somewhat abused here. This is
71        // used by the batcher. `encoded_length` is needed so that final
72        // batched size doesn't exceed the Kinesis limits (5Mb)
73        self.record.encoded_length()
74    }
75
76    fn allocated_bytes(&self) -> usize {
77        0
78    }
79}
80
81impl<R> RequestBuilder<KinesisProcessedEvent> for KinesisRequestBuilder<R>
82where
83    R: Record,
84{
85    type Metadata = KinesisMetadata;
86    type Events = Event;
87    type Encoder = (Transformer, Encoder<()>);
88    type Payload = Bytes;
89    type Request = KinesisRequest<R>;
90    type Error = io::Error;
91
92    fn compression(&self) -> Compression {
93        self.compression
94    }
95
96    fn encoder(&self) -> &Self::Encoder {
97        &self.encoder
98    }
99
100    fn split_input(
101        &self,
102        mut processed_event: KinesisProcessedEvent,
103    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
104        let kinesis_metadata = KinesisMetadata {
105            finalizers: processed_event.event.take_finalizers(),
106            partition_key: processed_event.metadata.partition_key,
107        };
108        let event = Event::from(processed_event.event);
109        let builder = RequestMetadataBuilder::from_event(&event);
110
111        (kinesis_metadata, builder, event)
112    }
113
114    fn build_request(
115        &self,
116        kinesis_metadata: Self::Metadata,
117        metadata: RequestMetadata,
118        payload: EncodeResult<Self::Payload>,
119    ) -> Self::Request {
120        let payload_bytes = payload.into_payload();
121
122        let record = R::new(&payload_bytes, &kinesis_metadata.partition_key);
123
124        KinesisRequest {
125            key: KinesisKey {
126                partition_key: kinesis_metadata.partition_key.clone(),
127            },
128            record,
129            finalizers: kinesis_metadata.finalizers,
130            metadata,
131        }
132    }
133}