vector/sinks/aws_s_s/
request_builder.rs1use bytes::Bytes;
2use vector_lib::{
3    ByteSizeOf,
4    request_metadata::{MetaDescriptive, RequestMetadata},
5};
6
7use crate::{
8    codecs::{Encoder, EncodingConfig, Transformer},
9    event::{Event, EventFinalizers, Finalizable},
10    internal_events::TemplateRenderingError,
11    sinks::util::{
12        Compression, EncodedLength, RequestBuilder, metadata::RequestMetadataBuilder,
13        request_builder::EncodeResult,
14    },
15    template::Template,
16};
17
18#[derive(Clone)]
19pub(super) struct SSMetadata {
20    pub(super) finalizers: EventFinalizers,
21    pub(super) message_group_id: Option<String>,
22    pub(super) message_deduplication_id: Option<String>,
23}
24
25#[derive(Clone)]
26pub(super) struct SSRequestBuilder {
27    encoder: (Transformer, Encoder<()>),
28    message_group_id: Option<Template>,
29    message_deduplication_id: Option<Template>,
30}
31
32impl SSRequestBuilder {
33    pub(super) fn new(
34        message_group_id: Option<Template>,
35        message_deduplication_id: Option<Template>,
36        encoding_config: EncodingConfig,
37    ) -> crate::Result<Self> {
38        let transformer = encoding_config.transformer();
39        let serializer = encoding_config.build()?;
40        let encoder = Encoder::<()>::new(serializer);
41
42        Ok(Self {
43            encoder: (transformer, encoder),
44            message_group_id,
45            message_deduplication_id,
46        })
47    }
48}
49
50impl RequestBuilder<Event> for SSRequestBuilder {
51    type Metadata = SSMetadata;
52    type Events = Event;
53    type Encoder = (Transformer, Encoder<()>);
54    type Payload = Bytes;
55    type Request = SendMessageEntry;
56    type Error = std::io::Error;
57
58    fn compression(&self) -> Compression {
59        Compression::None
60    }
61
62    fn encoder(&self) -> &Self::Encoder {
63        &self.encoder
64    }
65
66    fn split_input(
67        &self,
68        mut event: Event,
69    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
70        let message_group_id = match self.message_group_id {
71            Some(ref tpl) => match tpl.render_string(&event) {
72                Ok(value) => Some(value),
73                Err(error) => {
74                    emit!(TemplateRenderingError {
75                        error,
76                        field: Some("message_group_id"),
77                        drop_event: true,
78                    });
79                    None
80                }
81            },
82            None => None,
83        };
84        let message_deduplication_id = match self.message_deduplication_id {
85            Some(ref tpl) => match tpl.render_string(&event) {
86                Ok(value) => Some(value),
87                Err(error) => {
88                    emit!(TemplateRenderingError {
89                        error,
90                        field: Some("message_deduplication_id"),
91                        drop_event: true,
92                    });
93                    None
94                }
95            },
96            None => None,
97        };
98
99        let builder = RequestMetadataBuilder::from_event(&event);
100
101        let metadata = SSMetadata {
102            finalizers: event.take_finalizers(),
103            message_group_id,
104            message_deduplication_id,
105        };
106        (metadata, builder, event)
107    }
108
109    fn build_request(
110        &self,
111        client_metadata: Self::Metadata,
112        metadata: RequestMetadata,
113        payload: EncodeResult<Self::Payload>,
114    ) -> Self::Request {
115        let payload_bytes = payload.into_payload();
116        let message_body = String::from(std::str::from_utf8(&payload_bytes).unwrap());
117
118        SendMessageEntry {
119            message_body,
120            message_group_id: client_metadata.message_group_id,
121            message_deduplication_id: client_metadata.message_deduplication_id,
122            finalizers: client_metadata.finalizers,
123            metadata,
124        }
125    }
126}
127
128#[derive(Debug, Clone)]
129pub(super) struct SendMessageEntry {
130    pub(super) message_body: String,
131    pub(super) message_group_id: Option<String>,
132    pub(super) message_deduplication_id: Option<String>,
133    pub(super) finalizers: EventFinalizers,
134    pub(super) metadata: RequestMetadata,
135}
136
137impl ByteSizeOf for SendMessageEntry {
138    fn allocated_bytes(&self) -> usize {
139        self.message_body.size_of()
140            + self.message_group_id.size_of()
141            + self.message_deduplication_id.size_of()
142    }
143}
144
145impl EncodedLength for SendMessageEntry {
146    fn encoded_length(&self) -> usize {
147        self.message_body.len()
148    }
149}
150
151impl Finalizable for SendMessageEntry {
152    fn take_finalizers(&mut self) -> EventFinalizers {
153        self.finalizers.take_finalizers()
154    }
155}
156
157impl MetaDescriptive for SendMessageEntry {
158    fn get_metadata(&self) -> &RequestMetadata {
159        &self.metadata
160    }
161
162    fn metadata_mut(&mut self) -> &mut RequestMetadata {
163        &mut self.metadata
164    }
165}