vector/sinks/aws_s3/
sink.rs1use std::io;
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use uuid::Uuid;
6use vector_lib::codecs::encoding::Framer;
7use vector_lib::event::Finalizable;
8use vector_lib::request_metadata::RequestMetadata;
9
10use crate::{
11 codecs::{Encoder, Transformer},
12 event::Event,
13 sinks::{
14 s3_common::{
15 config::S3Options,
16 partitioner::S3PartitionKey,
17 service::{S3Metadata, S3Request},
18 },
19 util::{
20 metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
21 RequestBuilder,
22 },
23 },
24};
25
26#[derive(Clone)]
27pub struct S3RequestOptions {
28 pub bucket: String,
29 pub filename_time_format: String,
30 pub filename_append_uuid: bool,
31 pub filename_extension: Option<String>,
32 pub api_options: S3Options,
33 pub encoder: (Transformer, Encoder<Framer>),
34 pub compression: Compression,
35 pub filename_tz_offset: Option<FixedOffset>,
36}
37
38impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {
39 type Metadata = S3Metadata;
40 type Events = Vec<Event>;
41 type Encoder = (Transformer, Encoder<Framer>);
42 type Payload = Bytes;
43 type Request = S3Request;
44 type Error = io::Error; fn compression(&self) -> Compression {
47 self.compression
48 }
49
50 fn encoder(&self) -> &Self::Encoder {
51 &self.encoder
52 }
53
54 fn split_input(
55 &self,
56 input: (S3PartitionKey, Vec<Event>),
57 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
58 let (partition_key, mut events) = input;
59 let builder = RequestMetadataBuilder::from_events(&events);
60
61 let finalizers = events.take_finalizers();
62 let s3_key_prefix = partition_key.key_prefix.clone();
63
64 let metadata = S3Metadata {
65 partition_key,
66 s3_key: s3_key_prefix,
67 finalizers,
68 };
69
70 (metadata, builder, events)
71 }
72
73 fn build_request(
74 &self,
75 mut s3metadata: Self::Metadata,
76 request_metadata: RequestMetadata,
77 payload: EncodeResult<Self::Payload>,
78 ) -> Self::Request {
79 let filename = {
80 let formatted_ts = match self.filename_tz_offset {
81 Some(offset) => Utc::now()
82 .with_timezone(&offset)
83 .format(self.filename_time_format.as_str()),
84 None => Utc::now()
85 .with_timezone(&Utc)
86 .format(self.filename_time_format.as_str()),
87 };
88
89 if self.filename_append_uuid {
90 format!("{formatted_ts}-{}", Uuid::new_v4().hyphenated())
91 } else {
92 formatted_ts.to_string()
93 }
94 };
95
96 let ssekms_key_id = s3metadata.partition_key.ssekms_key_id.clone();
97 let mut s3_options = self.api_options.clone();
98 s3_options.ssekms_key_id = ssekms_key_id;
99
100 let extension = self
101 .filename_extension
102 .as_ref()
103 .cloned()
104 .unwrap_or_else(|| self.compression.extension().into());
105
106 s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension);
107
108 S3Request {
109 body: payload.into_payload(),
110 bucket: self.bucket.clone(),
111 metadata: s3metadata,
112 request_metadata,
113 content_encoding: self.compression.content_encoding(),
114 options: s3_options,
115 }
116 }
117}
118
119fn format_s3_key(s3_key: &str, filename: &str, extension: &str) -> String {
120 if extension.is_empty() {
121 format!("{s3_key}{filename}")
122 } else {
123 format!("{s3_key}{filename}.{extension}")
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn test_format_s3_key() {
133 assert_eq!(
134 "s3_key_filename.txt",
135 format_s3_key("s3_key_", "filename", "txt")
136 );
137 assert_eq!("s3_key_filename", format_s3_key("s3_key_", "filename", ""));
138 }
139}