vector/sinks/aws_kinesis/
request_builder.rs1use std::{io, marker::PhantomData};
2
3use bytes::Bytes;
4use vector_lib::{
5 ByteSizeOf,
6 request_metadata::{MetaDescriptive, RequestMetadata},
7};
8
9use super::{
10 record::Record,
11 sink::{KinesisKey, KinesisProcessedEvent},
12};
13use crate::{
14 codecs::{Encoder, Transformer},
15 event::{Event, EventFinalizers, Finalizable},
16 sinks::util::{
17 Compression, RequestBuilder, metadata::RequestMetadataBuilder,
18 request_builder::EncodeResult,
19 },
20};
21
22#[derive(Clone)]
23pub struct KinesisRequestBuilder<R> {
24 pub compression: Compression,
25 pub encoder: (Transformer, Encoder<()>),
26 pub _phantom: PhantomData<R>,
27}
28
29pub struct KinesisMetadata {
30 pub finalizers: EventFinalizers,
31 pub partition_key: String,
32}
33
34#[derive(Clone)]
35pub struct KinesisRequest<R>
36where
37 R: Record,
38{
39 pub key: KinesisKey,
40 pub record: R,
41 pub finalizers: EventFinalizers,
42 metadata: RequestMetadata,
43}
44
45impl<R> Finalizable for KinesisRequest<R>
46where
47 R: Record,
48{
49 fn take_finalizers(&mut self) -> EventFinalizers {
50 std::mem::take(&mut self.finalizers)
51 }
52}
53
54impl<R> MetaDescriptive for KinesisRequest<R>
55where
56 R: Record,
57{
58 fn get_metadata(&self) -> &RequestMetadata {
59 &self.metadata
60 }
61
62 fn metadata_mut(&mut self) -> &mut RequestMetadata {
63 &mut self.metadata
64 }
65}
66
67impl<R> ByteSizeOf for KinesisRequest<R>
68where
69 R: Record,
70{
71 fn size_of(&self) -> usize {
72 self.record.encoded_length()
76 }
77
78 fn allocated_bytes(&self) -> usize {
79 0
80 }
81}
82
83impl<R> RequestBuilder<KinesisProcessedEvent> for KinesisRequestBuilder<R>
84where
85 R: Record,
86{
87 type Metadata = KinesisMetadata;
88 type Events = Event;
89 type Encoder = (Transformer, Encoder<()>);
90 type Payload = Bytes;
91 type Request = KinesisRequest<R>;
92 type Error = io::Error;
93
94 fn compression(&self) -> Compression {
95 self.compression
96 }
97
98 fn encoder(&self) -> &Self::Encoder {
99 &self.encoder
100 }
101
102 fn split_input(
103 &self,
104 mut processed_event: KinesisProcessedEvent,
105 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
106 let kinesis_metadata = KinesisMetadata {
107 finalizers: processed_event.event.take_finalizers(),
108 partition_key: processed_event.metadata.partition_key,
109 };
110 let event = Event::from(processed_event.event);
111 let builder = RequestMetadataBuilder::from_event(&event);
112
113 (kinesis_metadata, builder, event)
114 }
115
116 fn build_request(
117 &self,
118 kinesis_metadata: Self::Metadata,
119 metadata: RequestMetadata,
120 payload: EncodeResult<Self::Payload>,
121 ) -> Self::Request {
122 let payload_bytes = payload.into_payload();
123
124 let record = R::new(&payload_bytes, &kinesis_metadata.partition_key);
125
126 KinesisRequest {
127 key: KinesisKey {
128 partition_key: kinesis_metadata.partition_key.clone(),
129 },
130 record,
131 finalizers: kinesis_metadata.finalizers,
132 metadata,
133 }
134 }
135}