vector/sinks/aws_s_s/
request_builder.rs1use bytes::Bytes;
2use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
3use vector_lib::ByteSizeOf;
4
5use crate::codecs::EncodingConfig;
6use crate::{
7 codecs::{Encoder, Transformer},
8 event::{Event, EventFinalizers, Finalizable},
9 internal_events::TemplateRenderingError,
10 sinks::util::{
11 metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
12 EncodedLength, RequestBuilder,
13 },
14 template::Template,
15};
16
17#[derive(Clone)]
18pub(super) struct SSMetadata {
19 pub(super) finalizers: EventFinalizers,
20 pub(super) message_group_id: Option<String>,
21 pub(super) message_deduplication_id: Option<String>,
22}
23
24#[derive(Clone)]
25pub(super) struct SSRequestBuilder {
26 encoder: (Transformer, Encoder<()>),
27 message_group_id: Option<Template>,
28 message_deduplication_id: Option<Template>,
29}
30
31impl SSRequestBuilder {
32 pub(super) fn new(
33 message_group_id: Option<Template>,
34 message_deduplication_id: Option<Template>,
35 encoding_config: EncodingConfig,
36 ) -> crate::Result<Self> {
37 let transformer = encoding_config.transformer();
38 let serializer = encoding_config.build()?;
39 let encoder = Encoder::<()>::new(serializer);
40
41 Ok(Self {
42 encoder: (transformer, encoder),
43 message_group_id,
44 message_deduplication_id,
45 })
46 }
47}
48
49impl RequestBuilder<Event> for SSRequestBuilder {
50 type Metadata = SSMetadata;
51 type Events = Event;
52 type Encoder = (Transformer, Encoder<()>);
53 type Payload = Bytes;
54 type Request = SendMessageEntry;
55 type Error = std::io::Error;
56
57 fn compression(&self) -> Compression {
58 Compression::None
59 }
60
61 fn encoder(&self) -> &Self::Encoder {
62 &self.encoder
63 }
64
65 fn split_input(
66 &self,
67 mut event: Event,
68 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
69 let message_group_id = match self.message_group_id {
70 Some(ref tpl) => match tpl.render_string(&event) {
71 Ok(value) => Some(value),
72 Err(error) => {
73 emit!(TemplateRenderingError {
74 error,
75 field: Some("message_group_id"),
76 drop_event: true,
77 });
78 None
79 }
80 },
81 None => None,
82 };
83 let message_deduplication_id = match self.message_deduplication_id {
84 Some(ref tpl) => match tpl.render_string(&event) {
85 Ok(value) => Some(value),
86 Err(error) => {
87 emit!(TemplateRenderingError {
88 error,
89 field: Some("message_deduplication_id"),
90 drop_event: true,
91 });
92 None
93 }
94 },
95 None => None,
96 };
97
98 let builder = RequestMetadataBuilder::from_event(&event);
99
100 let metadata = SSMetadata {
101 finalizers: event.take_finalizers(),
102 message_group_id,
103 message_deduplication_id,
104 };
105 (metadata, builder, event)
106 }
107
108 fn build_request(
109 &self,
110 client_metadata: Self::Metadata,
111 metadata: RequestMetadata,
112 payload: EncodeResult<Self::Payload>,
113 ) -> Self::Request {
114 let payload_bytes = payload.into_payload();
115 let message_body = String::from(std::str::from_utf8(&payload_bytes).unwrap());
116
117 SendMessageEntry {
118 message_body,
119 message_group_id: client_metadata.message_group_id,
120 message_deduplication_id: client_metadata.message_deduplication_id,
121 finalizers: client_metadata.finalizers,
122 metadata,
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
128pub(super) struct SendMessageEntry {
129 pub(super) message_body: String,
130 pub(super) message_group_id: Option<String>,
131 pub(super) message_deduplication_id: Option<String>,
132 pub(super) finalizers: EventFinalizers,
133 pub(super) metadata: RequestMetadata,
134}
135
136impl ByteSizeOf for SendMessageEntry {
137 fn allocated_bytes(&self) -> usize {
138 self.message_body.size_of()
139 + self.message_group_id.size_of()
140 + self.message_deduplication_id.size_of()
141 }
142}
143
144impl EncodedLength for SendMessageEntry {
145 fn encoded_length(&self) -> usize {
146 self.message_body.len()
147 }
148}
149
150impl Finalizable for SendMessageEntry {
151 fn take_finalizers(&mut self) -> EventFinalizers {
152 self.finalizers.take_finalizers()
153 }
154}
155
156impl MetaDescriptive for SendMessageEntry {
157 fn get_metadata(&self) -> &RequestMetadata {
158 &self.metadata
159 }
160
161 fn metadata_mut(&mut self) -> &mut RequestMetadata {
162 &mut self.metadata
163 }
164}