vector/sinks/aws_s3/
sink.rs

1use std::io;
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use uuid::Uuid;
6use vector_lib::{codecs::encoding::Framer, event::Finalizable, request_metadata::RequestMetadata};
7
8use crate::{
9    codecs::{Encoder, Transformer},
10    event::Event,
11    sinks::{
12        s3_common::{
13            config::S3Options,
14            partitioner::S3PartitionKey,
15            service::{S3Metadata, S3Request},
16        },
17        util::{
18            Compression, RequestBuilder, metadata::RequestMetadataBuilder,
19            request_builder::EncodeResult,
20        },
21    },
22};
23
24#[derive(Clone)]
25pub struct S3RequestOptions {
26    pub bucket: String,
27    pub filename_time_format: String,
28    pub filename_append_uuid: bool,
29    pub filename_extension: Option<String>,
30    pub api_options: S3Options,
31    pub encoder: (Transformer, Encoder<Framer>),
32    pub compression: Compression,
33    pub filename_tz_offset: Option<FixedOffset>,
34}
35
36impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
37    type Metadata = S3Metadata;
38    type Events = Vec<Event>;
39    type Encoder = (Transformer, Encoder<Framer>);
40    type Payload = Bytes;
41    type Request = S3Request;
42    type Error = io::Error; // TODO: this is ugly.
43
44    fn compression(&self) -> Compression {
45        self.compression
46    }
47
48    fn encoder(&self) -> &Self::Encoder {
49        &self.encoder
50    }
51
52    fn split_input(
53        &self,
54        input: (S3PartitionKey, Vec<Event>),
55    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
56        let (partition_key, mut events) = input;
57        let builder = RequestMetadataBuilder::from_events(&events);
58
59        let finalizers = events.take_finalizers();
60        let s3_key_prefix = partition_key.key_prefix.clone();
61
62        let metadata = S3Metadata {
63            partition_key,
64            s3_key: s3_key_prefix,
65            finalizers,
66        };
67
68        (metadata, builder, events)
69    }
70
71    fn build_request(
72        &self,
73        mut s3metadata: Self::Metadata,
74        request_metadata: RequestMetadata,
75        payload: EncodeResult<Self::Payload>,
76    ) -> Self::Request {
77        let filename = {
78            let formatted_ts = match self.filename_tz_offset {
79                Some(offset) => Utc::now()
80                    .with_timezone(&offset)
81                    .format(self.filename_time_format.as_str()),
82                None => Utc::now()
83                    .with_timezone(&Utc)
84                    .format(self.filename_time_format.as_str()),
85            };
86
87            if self.filename_append_uuid {
88                format!("{formatted_ts}-{}", Uuid::new_v4().hyphenated())
89            } else {
90                formatted_ts.to_string()
91            }
92        };
93
94        let ssekms_key_id = s3metadata.partition_key.ssekms_key_id.clone();
95        let mut s3_options = self.api_options.clone();
96        s3_options.ssekms_key_id = ssekms_key_id;
97
98        let extension = self
99            .filename_extension
100            .as_ref()
101            .cloned()
102            .unwrap_or_else(|| self.compression.extension().into());
103
104        s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension);
105
106        S3Request {
107            body: payload.into_payload(),
108            bucket: self.bucket.clone(),
109            metadata: s3metadata,
110            request_metadata,
111            content_encoding: self.compression.content_encoding(),
112            options: s3_options,
113        }
114    }
115}
116
117fn format_s3_key(s3_key: &str, filename: &str, extension: &str) -> String {
118    if extension.is_empty() {
119        format!("{s3_key}{filename}")
120    } else {
121        format!("{s3_key}{filename}.{extension}")
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128
129    #[test]
130    fn test_format_s3_key() {
131        assert_eq!(
132            "s3_key_filename.txt",
133            format_s3_key("s3_key_", "filename", "txt")
134        );
135        assert_eq!("s3_key_filename", format_s3_key("s3_key_", "filename", ""));
136    }
137}