vector/sinks/aws_s_s/
request_builder.rs1use bytes::Bytes;
2use vector_lib::{
3 ByteSizeOf,
4 request_metadata::{MetaDescriptive, RequestMetadata},
5};
6
7use crate::{
8 codecs::{Encoder, EncodingConfig, Transformer},
9 event::{Event, EventFinalizers, Finalizable},
10 internal_events::TemplateRenderingError,
11 sinks::util::{
12 Compression, EncodedLength, RequestBuilder, metadata::RequestMetadataBuilder,
13 request_builder::EncodeResult,
14 },
15 template::Template,
16};
17
18#[derive(Clone)]
19pub(super) struct SSMetadata {
20 pub(super) finalizers: EventFinalizers,
21 pub(super) message_group_id: Option<String>,
22 pub(super) message_deduplication_id: Option<String>,
23}
24
25#[derive(Clone)]
26pub(super) struct SSRequestBuilder {
27 encoder: (Transformer, Encoder<()>),
28 message_group_id: Option<Template>,
29 message_deduplication_id: Option<Template>,
30}
31
32impl SSRequestBuilder {
33 pub(super) fn new(
34 message_group_id: Option<Template>,
35 message_deduplication_id: Option<Template>,
36 encoding_config: EncodingConfig,
37 ) -> crate::Result<Self> {
38 let transformer = encoding_config.transformer();
39 let serializer = encoding_config.build()?;
40 let encoder = Encoder::<()>::new(serializer);
41
42 Ok(Self {
43 encoder: (transformer, encoder),
44 message_group_id,
45 message_deduplication_id,
46 })
47 }
48}
49
50impl RequestBuilder<Event> for SSRequestBuilder {
51 type Metadata = SSMetadata;
52 type Events = Event;
53 type Encoder = (Transformer, Encoder<()>);
54 type Payload = Bytes;
55 type Request = SendMessageEntry;
56 type Error = std::io::Error;
57
58 fn compression(&self) -> Compression {
59 Compression::None
60 }
61
62 fn encoder(&self) -> &Self::Encoder {
63 &self.encoder
64 }
65
66 fn split_input(
67 &self,
68 mut event: Event,
69 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
70 let message_group_id = match self.message_group_id {
71 Some(ref tpl) => match tpl.render_string(&event) {
72 Ok(value) => Some(value),
73 Err(error) => {
74 emit!(TemplateRenderingError {
75 error,
76 field: Some("message_group_id"),
77 drop_event: true,
78 });
79 None
80 }
81 },
82 None => None,
83 };
84 let message_deduplication_id = match self.message_deduplication_id {
85 Some(ref tpl) => match tpl.render_string(&event) {
86 Ok(value) => Some(value),
87 Err(error) => {
88 emit!(TemplateRenderingError {
89 error,
90 field: Some("message_deduplication_id"),
91 drop_event: true,
92 });
93 None
94 }
95 },
96 None => None,
97 };
98
99 let builder = RequestMetadataBuilder::from_event(&event);
100
101 let metadata = SSMetadata {
102 finalizers: event.take_finalizers(),
103 message_group_id,
104 message_deduplication_id,
105 };
106 (metadata, builder, event)
107 }
108
109 fn build_request(
110 &self,
111 client_metadata: Self::Metadata,
112 metadata: RequestMetadata,
113 payload: EncodeResult<Self::Payload>,
114 ) -> Self::Request {
115 let payload_bytes = payload.into_payload();
116 let message_body = String::from(std::str::from_utf8(&payload_bytes).unwrap());
117
118 SendMessageEntry {
119 message_body,
120 message_group_id: client_metadata.message_group_id,
121 message_deduplication_id: client_metadata.message_deduplication_id,
122 finalizers: client_metadata.finalizers,
123 metadata,
124 }
125 }
126}
127
128#[derive(Debug, Clone)]
129pub(super) struct SendMessageEntry {
130 pub(super) message_body: String,
131 pub(super) message_group_id: Option<String>,
132 pub(super) message_deduplication_id: Option<String>,
133 pub(super) finalizers: EventFinalizers,
134 pub(super) metadata: RequestMetadata,
135}
136
137impl ByteSizeOf for SendMessageEntry {
138 fn allocated_bytes(&self) -> usize {
139 self.message_body.size_of()
140 + self.message_group_id.size_of()
141 + self.message_deduplication_id.size_of()
142 }
143}
144
145impl EncodedLength for SendMessageEntry {
146 fn encoded_length(&self) -> usize {
147 self.message_body.len()
148 }
149}
150
151impl Finalizable for SendMessageEntry {
152 fn take_finalizers(&mut self) -> EventFinalizers {
153 self.finalizers.take_finalizers()
154 }
155}
156
157impl MetaDescriptive for SendMessageEntry {
158 fn get_metadata(&self) -> &RequestMetadata {
159 &self.metadata
160 }
161
162 fn metadata_mut(&mut self) -> &mut RequestMetadata {
163 &mut self.metadata
164 }
165}