vector/sinks/aws_kinesis/
request_builder.rs1use std::{io, marker::PhantomData};
2
3use bytes::Bytes;
4use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
5use vector_lib::ByteSizeOf;
6
7use super::{
8 record::Record,
9 sink::{KinesisKey, KinesisProcessedEvent},
10};
11use crate::{
12 codecs::{Encoder, Transformer},
13 event::{Event, EventFinalizers, Finalizable},
14 sinks::util::{
15 metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
16 RequestBuilder,
17 },
18};
19
20#[derive(Clone)]
21pub struct KinesisRequestBuilder<R> {
22 pub compression: Compression,
23 pub encoder: (Transformer, Encoder<()>),
24 pub _phantom: PhantomData<R>,
25}
26
27pub struct KinesisMetadata {
28 pub finalizers: EventFinalizers,
29 pub partition_key: String,
30}
31
32#[derive(Clone)]
33pub struct KinesisRequest<R>
34where
35 R: Record,
36{
37 pub key: KinesisKey,
38 pub record: R,
39 pub finalizers: EventFinalizers,
40 metadata: RequestMetadata,
41}
42
43impl<R> Finalizable for KinesisRequest<R>
44where
45 R: Record,
46{
47 fn take_finalizers(&mut self) -> EventFinalizers {
48 std::mem::take(&mut self.finalizers)
49 }
50}
51
52impl<R> MetaDescriptive for KinesisRequest<R>
53where
54 R: Record,
55{
56 fn get_metadata(&self) -> &RequestMetadata {
57 &self.metadata
58 }
59
60 fn metadata_mut(&mut self) -> &mut RequestMetadata {
61 &mut self.metadata
62 }
63}
64
65impl<R> ByteSizeOf for KinesisRequest<R>
66where
67 R: Record,
68{
69 fn size_of(&self) -> usize {
70 self.record.encoded_length()
74 }
75
76 fn allocated_bytes(&self) -> usize {
77 0
78 }
79}
80
81impl<R> RequestBuilder<KinesisProcessedEvent> for KinesisRequestBuilder<R>
82where
83 R: Record,
84{
85 type Metadata = KinesisMetadata;
86 type Events = Event;
87 type Encoder = (Transformer, Encoder<()>);
88 type Payload = Bytes;
89 type Request = KinesisRequest<R>;
90 type Error = io::Error;
91
92 fn compression(&self) -> Compression {
93 self.compression
94 }
95
96 fn encoder(&self) -> &Self::Encoder {
97 &self.encoder
98 }
99
100 fn split_input(
101 &self,
102 mut processed_event: KinesisProcessedEvent,
103 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
104 let kinesis_metadata = KinesisMetadata {
105 finalizers: processed_event.event.take_finalizers(),
106 partition_key: processed_event.metadata.partition_key,
107 };
108 let event = Event::from(processed_event.event);
109 let builder = RequestMetadataBuilder::from_event(&event);
110
111 (kinesis_metadata, builder, event)
112 }
113
114 fn build_request(
115 &self,
116 kinesis_metadata: Self::Metadata,
117 metadata: RequestMetadata,
118 payload: EncodeResult<Self::Payload>,
119 ) -> Self::Request {
120 let payload_bytes = payload.into_payload();
121
122 let record = R::new(&payload_bytes, &kinesis_metadata.partition_key);
123
124 KinesisRequest {
125 key: KinesisKey {
126 partition_key: kinesis_metadata.partition_key.clone(),
127 },
128 record,
129 finalizers: kinesis_metadata.finalizers,
130 metadata,
131 }
132 }
133}