vector/sinks/aws_kinesis/
request_builder.rs

1use std::{io, marker::PhantomData};
2
3use bytes::Bytes;
4use vector_lib::{
5    ByteSizeOf,
6    request_metadata::{MetaDescriptive, RequestMetadata},
7};
8
9use super::{
10    record::Record,
11    sink::{KinesisKey, KinesisProcessedEvent},
12};
13use crate::{
14    codecs::{Encoder, Transformer},
15    event::{Event, EventFinalizers, Finalizable},
16    sinks::util::{
17        Compression, RequestBuilder, metadata::RequestMetadataBuilder,
18        request_builder::EncodeResult,
19    },
20};
21
22#[derive(Clone)]
23pub struct KinesisRequestBuilder<R> {
24    pub compression: Compression,
25    pub encoder: (Transformer, Encoder<()>),
26    pub _phantom: PhantomData<R>,
27}
28
29pub struct KinesisMetadata {
30    pub finalizers: EventFinalizers,
31    pub partition_key: String,
32}
33
34#[derive(Clone)]
35pub struct KinesisRequest<R>
36where
37    R: Record,
38{
39    pub key: KinesisKey,
40    pub record: R,
41    pub finalizers: EventFinalizers,
42    metadata: RequestMetadata,
43}
44
45impl<R> Finalizable for KinesisRequest<R>
46where
47    R: Record,
48{
49    fn take_finalizers(&mut self) -> EventFinalizers {
50        std::mem::take(&mut self.finalizers)
51    }
52}
53
54impl<R> MetaDescriptive for KinesisRequest<R>
55where
56    R: Record,
57{
58    fn get_metadata(&self) -> &RequestMetadata {
59        &self.metadata
60    }
61
62    fn metadata_mut(&mut self) -> &mut RequestMetadata {
63        &mut self.metadata
64    }
65}
66
67impl<R> ByteSizeOf for KinesisRequest<R>
68where
69    R: Record,
70{
71    fn size_of(&self) -> usize {
72        // `ByteSizeOf` is being somewhat abused here. This is
73        // used by the batcher. `encoded_length` is needed so that final
74        // batched size doesn't exceed the Kinesis limits (5Mb)
75        self.record.encoded_length()
76    }
77
78    fn allocated_bytes(&self) -> usize {
79        0
80    }
81}
82
83impl<R> RequestBuilder<KinesisProcessedEvent> for KinesisRequestBuilder<R>
84where
85    R: Record,
86{
87    type Metadata = KinesisMetadata;
88    type Events = Event;
89    type Encoder = (Transformer, Encoder<()>);
90    type Payload = Bytes;
91    type Request = KinesisRequest<R>;
92    type Error = io::Error;
93
94    fn compression(&self) -> Compression {
95        self.compression
96    }
97
98    fn encoder(&self) -> &Self::Encoder {
99        &self.encoder
100    }
101
102    fn split_input(
103        &self,
104        mut processed_event: KinesisProcessedEvent,
105    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
106        let kinesis_metadata = KinesisMetadata {
107            finalizers: processed_event.event.take_finalizers(),
108            partition_key: processed_event.metadata.partition_key,
109        };
110        let event = Event::from(processed_event.event);
111        let builder = RequestMetadataBuilder::from_event(&event);
112
113        (kinesis_metadata, builder, event)
114    }
115
116    fn build_request(
117        &self,
118        kinesis_metadata: Self::Metadata,
119        metadata: RequestMetadata,
120        payload: EncodeResult<Self::Payload>,
121    ) -> Self::Request {
122        let payload_bytes = payload.into_payload();
123
124        let record = R::new(&payload_bytes, &kinesis_metadata.partition_key);
125
126        KinesisRequest {
127            key: KinesisKey {
128                partition_key: kinesis_metadata.partition_key.clone(),
129            },
130            record,
131            finalizers: kinesis_metadata.finalizers,
132            metadata,
133        }
134    }
135}