vector/sinks/aws_s3/
sink.rs

1use std::io;
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use uuid::Uuid;
6use vector_lib::codecs::encoding::Framer;
7use vector_lib::event::Finalizable;
8use vector_lib::request_metadata::RequestMetadata;
9
10use crate::{
11    codecs::{Encoder, Transformer},
12    event::Event,
13    sinks::{
14        s3_common::{
15            config::S3Options,
16            partitioner::S3PartitionKey,
17            service::{S3Metadata, S3Request},
18        },
19        util::{
20            metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
21            RequestBuilder,
22        },
23    },
24};
25
26#[derive(Clone)]
27pub struct S3RequestOptions {
28    pub bucket: String,
29    pub filename_time_format: String,
30    pub filename_append_uuid: bool,
31    pub filename_extension: Option<String>,
32    pub api_options: S3Options,
33    pub encoder: (Transformer, Encoder<Framer>),
34    pub compression: Compression,
35    pub filename_tz_offset: Option<FixedOffset>,
36}
37
38impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
39    type Metadata = S3Metadata;
40    type Events = Vec<Event>;
41    type Encoder = (Transformer, Encoder<Framer>);
42    type Payload = Bytes;
43    type Request = S3Request;
44    type Error = io::Error; // TODO: this is ugly.
45
46    fn compression(&self) -> Compression {
47        self.compression
48    }
49
50    fn encoder(&self) -> &Self::Encoder {
51        &self.encoder
52    }
53
54    fn split_input(
55        &self,
56        input: (S3PartitionKey, Vec<Event>),
57    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
58        let (partition_key, mut events) = input;
59        let builder = RequestMetadataBuilder::from_events(&events);
60
61        let finalizers = events.take_finalizers();
62        let s3_key_prefix = partition_key.key_prefix.clone();
63
64        let metadata = S3Metadata {
65            partition_key,
66            s3_key: s3_key_prefix,
67            finalizers,
68        };
69
70        (metadata, builder, events)
71    }
72
73    fn build_request(
74        &self,
75        mut s3metadata: Self::Metadata,
76        request_metadata: RequestMetadata,
77        payload: EncodeResult<Self::Payload>,
78    ) -> Self::Request {
79        let filename = {
80            let formatted_ts = match self.filename_tz_offset {
81                Some(offset) => Utc::now()
82                    .with_timezone(&offset)
83                    .format(self.filename_time_format.as_str()),
84                None => Utc::now()
85                    .with_timezone(&Utc)
86                    .format(self.filename_time_format.as_str()),
87            };
88
89            if self.filename_append_uuid {
90                format!("{formatted_ts}-{}", Uuid::new_v4().hyphenated())
91            } else {
92                formatted_ts.to_string()
93            }
94        };
95
96        let ssekms_key_id = s3metadata.partition_key.ssekms_key_id.clone();
97        let mut s3_options = self.api_options.clone();
98        s3_options.ssekms_key_id = ssekms_key_id;
99
100        let extension = self
101            .filename_extension
102            .as_ref()
103            .cloned()
104            .unwrap_or_else(|| self.compression.extension().into());
105
106        s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension);
107
108        S3Request {
109            body: payload.into_payload(),
110            bucket: self.bucket.clone(),
111            metadata: s3metadata,
112            request_metadata,
113            content_encoding: self.compression.content_encoding(),
114            options: s3_options,
115        }
116    }
117}
118
119fn format_s3_key(s3_key: &str, filename: &str, extension: &str) -> String {
120    if extension.is_empty() {
121        format!("{s3_key}{filename}")
122    } else {
123        format!("{s3_key}{filename}.{extension}")
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn test_format_s3_key() {
133        assert_eq!(
134            "s3_key_filename.txt",
135            format_s3_key("s3_key_", "filename", "txt")
136        );
137        assert_eq!("s3_key_filename", format_s3_key("s3_key_", "filename", ""));
138    }
139}