vector/sinks/aws_s_s/
request_builder.rs

1use bytes::Bytes;
2use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
3use vector_lib::ByteSizeOf;
4
5use crate::codecs::EncodingConfig;
6use crate::{
7    codecs::{Encoder, Transformer},
8    event::{Event, EventFinalizers, Finalizable},
9    internal_events::TemplateRenderingError,
10    sinks::util::{
11        metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
12        EncodedLength, RequestBuilder,
13    },
14    template::Template,
15};
16
17#[derive(Clone)]
18pub(super) struct SSMetadata {
19    pub(super) finalizers: EventFinalizers,
20    pub(super) message_group_id: Option<String>,
21    pub(super) message_deduplication_id: Option<String>,
22}
23
24#[derive(Clone)]
25pub(super) struct SSRequestBuilder {
26    encoder: (Transformer, Encoder<()>),
27    message_group_id: Option<Template>,
28    message_deduplication_id: Option<Template>,
29}
30
31impl SSRequestBuilder {
32    pub(super) fn new(
33        message_group_id: Option<Template>,
34        message_deduplication_id: Option<Template>,
35        encoding_config: EncodingConfig,
36    ) -> crate::Result<Self> {
37        let transformer = encoding_config.transformer();
38        let serializer = encoding_config.build()?;
39        let encoder = Encoder::<()>::new(serializer);
40
41        Ok(Self {
42            encoder: (transformer, encoder),
43            message_group_id,
44            message_deduplication_id,
45        })
46    }
47}
48
49impl RequestBuilder<Event> for SSRequestBuilder {
50    type Metadata = SSMetadata;
51    type Events = Event;
52    type Encoder = (Transformer, Encoder<()>);
53    type Payload = Bytes;
54    type Request = SendMessageEntry;
55    type Error = std::io::Error;
56
57    fn compression(&self) -> Compression {
58        Compression::None
59    }
60
61    fn encoder(&self) -> &Self::Encoder {
62        &self.encoder
63    }
64
65    fn split_input(
66        &self,
67        mut event: Event,
68    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
69        let message_group_id = match self.message_group_id {
70            Some(ref tpl) => match tpl.render_string(&event) {
71                Ok(value) => Some(value),
72                Err(error) => {
73                    emit!(TemplateRenderingError {
74                        error,
75                        field: Some("message_group_id"),
76                        drop_event: true,
77                    });
78                    None
79                }
80            },
81            None => None,
82        };
83        let message_deduplication_id = match self.message_deduplication_id {
84            Some(ref tpl) => match tpl.render_string(&event) {
85                Ok(value) => Some(value),
86                Err(error) => {
87                    emit!(TemplateRenderingError {
88                        error,
89                        field: Some("message_deduplication_id"),
90                        drop_event: true,
91                    });
92                    None
93                }
94            },
95            None => None,
96        };
97
98        let builder = RequestMetadataBuilder::from_event(&event);
99
100        let metadata = SSMetadata {
101            finalizers: event.take_finalizers(),
102            message_group_id,
103            message_deduplication_id,
104        };
105        (metadata, builder, event)
106    }
107
108    fn build_request(
109        &self,
110        client_metadata: Self::Metadata,
111        metadata: RequestMetadata,
112        payload: EncodeResult<Self::Payload>,
113    ) -> Self::Request {
114        let payload_bytes = payload.into_payload();
115        let message_body = String::from(std::str::from_utf8(&payload_bytes).unwrap());
116
117        SendMessageEntry {
118            message_body,
119            message_group_id: client_metadata.message_group_id,
120            message_deduplication_id: client_metadata.message_deduplication_id,
121            finalizers: client_metadata.finalizers,
122            metadata,
123        }
124    }
125}
126
127#[derive(Debug, Clone)]
128pub(super) struct SendMessageEntry {
129    pub(super) message_body: String,
130    pub(super) message_group_id: Option<String>,
131    pub(super) message_deduplication_id: Option<String>,
132    pub(super) finalizers: EventFinalizers,
133    pub(super) metadata: RequestMetadata,
134}
135
136impl ByteSizeOf for SendMessageEntry {
137    fn allocated_bytes(&self) -> usize {
138        self.message_body.size_of()
139            + self.message_group_id.size_of()
140            + self.message_deduplication_id.size_of()
141    }
142}
143
144impl EncodedLength for SendMessageEntry {
145    fn encoded_length(&self) -> usize {
146        self.message_body.len()
147    }
148}
149
150impl Finalizable for SendMessageEntry {
151    fn take_finalizers(&mut self) -> EventFinalizers {
152        self.finalizers.take_finalizers()
153    }
154}
155
156impl MetaDescriptive for SendMessageEntry {
157    fn get_metadata(&self) -> &RequestMetadata {
158        &self.metadata
159    }
160
161    fn metadata_mut(&mut self) -> &mut RequestMetadata {
162        &mut self.metadata
163    }
164}