vector/sinks/aws_s3/
sink.rs1use std::io;
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use uuid::Uuid;
6use vector_lib::{codecs::encoding::Framer, event::Finalizable, request_metadata::RequestMetadata};
7
8use crate::{
9 codecs::{Encoder, Transformer},
10 event::Event,
11 sinks::{
12 s3_common::{
13 config::S3Options,
14 partitioner::S3PartitionKey,
15 service::{S3Metadata, S3Request},
16 },
17 util::{
18 Compression, RequestBuilder, metadata::RequestMetadataBuilder,
19 request_builder::EncodeResult,
20 },
21 },
22};
23
24#[derive(Clone)]
25pub struct S3RequestOptions {
26 pub bucket: String,
27 pub filename_time_format: String,
28 pub filename_append_uuid: bool,
29 pub filename_extension: Option<String>,
30 pub api_options: S3Options,
31 pub encoder: (Transformer, Encoder<Framer>),
32 pub compression: Compression,
33 pub filename_tz_offset: Option<FixedOffset>,
34}
35
36impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
37 type Metadata = S3Metadata;
38 type Events = Vec<Event>;
39 type Encoder = (Transformer, Encoder<Framer>);
40 type Payload = Bytes;
41 type Request = S3Request;
42 type Error = io::Error; fn compression(&self) -> Compression {
45 self.compression
46 }
47
48 fn encoder(&self) -> &Self::Encoder {
49 &self.encoder
50 }
51
52 fn split_input(
53 &self,
54 input: (S3PartitionKey, Vec<Event>),
55 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
56 let (partition_key, mut events) = input;
57 let builder = RequestMetadataBuilder::from_events(&events);
58
59 let finalizers = events.take_finalizers();
60 let s3_key_prefix = partition_key.key_prefix.clone();
61
62 let metadata = S3Metadata {
63 partition_key,
64 s3_key: s3_key_prefix,
65 finalizers,
66 };
67
68 (metadata, builder, events)
69 }
70
71 fn build_request(
72 &self,
73 mut s3metadata: Self::Metadata,
74 request_metadata: RequestMetadata,
75 payload: EncodeResult<Self::Payload>,
76 ) -> Self::Request {
77 let filename = {
78 let formatted_ts = match self.filename_tz_offset {
79 Some(offset) => Utc::now()
80 .with_timezone(&offset)
81 .format(self.filename_time_format.as_str()),
82 None => Utc::now()
83 .with_timezone(&Utc)
84 .format(self.filename_time_format.as_str()),
85 };
86
87 if self.filename_append_uuid {
88 format!("{formatted_ts}-{}", Uuid::new_v4().hyphenated())
89 } else {
90 formatted_ts.to_string()
91 }
92 };
93
94 let ssekms_key_id = s3metadata.partition_key.ssekms_key_id.clone();
95 let mut s3_options = self.api_options.clone();
96 s3_options.ssekms_key_id = ssekms_key_id;
97
98 let extension = self
99 .filename_extension
100 .as_ref()
101 .cloned()
102 .unwrap_or_else(|| self.compression.extension().into());
103
104 s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension);
105
106 S3Request {
107 body: payload.into_payload(),
108 bucket: self.bucket.clone(),
109 metadata: s3metadata,
110 request_metadata,
111 content_encoding: self.compression.content_encoding(),
112 options: s3_options,
113 }
114 }
115}
116
117fn format_s3_key(s3_key: &str, filename: &str, extension: &str) -> String {
118 if extension.is_empty() {
119 format!("{s3_key}{filename}")
120 } else {
121 format!("{s3_key}{filename}.{extension}")
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128
129 #[test]
130 fn test_format_s3_key() {
131 assert_eq!(
132 "s3_key_filename.txt",
133 format_s3_key("s3_key_", "filename", "txt")
134 );
135 assert_eq!("s3_key_filename", format_s3_key("s3_key_", "filename", ""));
136 }
137}