1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3use vector_lib::codecs::{
4 encoding::{Framer, FramingConfig},
5 TextSerializerConfig,
6};
7use vector_lib::{configurable::configurable_component, sink::VectorSink, TimeZone};
8
9use super::sink::S3RequestOptions;
10use crate::{
11 aws::{AwsAuthentication, RegionOrEndpoint},
12 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
13 config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
14 sinks::{
15 s3_common::{
16 self,
17 config::{RetryStrategy, S3Options},
18 partitioner::S3KeyPartitioner,
19 service::S3Service,
20 sink::S3Sink,
21 },
22 util::{
23 timezone_to_offset, BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression,
24 ServiceBuilderExt, TowerRequestConfig,
25 },
26 Healthcheck,
27 },
28 template::Template,
29 tls::TlsConfig,
30};
31
32#[configurable_component(sink(
34 "aws_s3",
35 "Store observability events in the AWS S3 object storage system."
36))]
37#[derive(Clone, Debug)]
38#[serde(deny_unknown_fields)]
39pub struct S3SinkConfig {
40 #[configurable(metadata(docs::examples = "my-bucket"))]
44 pub bucket: String,
45
46 #[serde(default = "default_key_prefix")]
52 #[configurable(metadata(docs::templateable))]
53 #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
54 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
55 #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
56 pub key_prefix: String,
57
58 #[serde(default = "default_filename_time_format")]
75 pub filename_time_format: String,
76
77 #[serde(default = "crate::serde::default_true")]
86 #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
87 pub filename_append_uuid: bool,
88
89 #[configurable(metadata(docs::examples = "json"))]
93 pub filename_extension: Option<String>,
94
95 #[serde(flatten)]
96 pub options: S3Options,
97
98 #[serde(flatten)]
99 pub region: RegionOrEndpoint,
100
101 #[serde(flatten)]
102 pub encoding: EncodingConfigWithFraming,
103
104 #[configurable(derived)]
111 #[serde(default = "Compression::gzip_default")]
112 pub compression: Compression,
113
114 #[configurable(derived)]
115 #[serde(default)]
116 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
117
118 #[configurable(derived)]
119 #[serde(default)]
120 pub request: TowerRequestConfig,
121
122 #[configurable(derived)]
123 pub tls: Option<TlsConfig>,
124
125 #[configurable(derived)]
126 #[serde(default)]
127 pub auth: AwsAuthentication,
128
129 #[configurable(derived)]
130 #[serde(
131 default,
132 deserialize_with = "crate::serde::bool_or_struct",
133 skip_serializing_if = "crate::serde::is_default"
134 )]
135 pub acknowledgements: AcknowledgementsConfig,
136
137 #[configurable(derived)]
138 #[serde(default)]
139 pub timezone: Option<TimeZone>,
140
141 #[serde(default = "crate::serde::default_true")]
145 pub force_path_style: bool,
146
147 #[configurable(derived)]
152 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
153 pub retry_strategy: RetryStrategy,
154}
155
156pub(super) fn default_key_prefix() -> String {
157 "date=%F".to_string()
158}
159
160pub(super) fn default_filename_time_format() -> String {
161 "%s".to_string()
162}
163
164impl GenerateConfig for S3SinkConfig {
165 fn generate_config() -> toml::Value {
166 toml::Value::try_from(Self {
167 bucket: "".to_owned(),
168 key_prefix: default_key_prefix(),
169 filename_time_format: default_filename_time_format(),
170 filename_append_uuid: true,
171 filename_extension: None,
172 options: S3Options::default(),
173 region: RegionOrEndpoint::default(),
174 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
175 compression: Compression::gzip_default(),
176 batch: BatchConfig::default(),
177 request: TowerRequestConfig::default(),
178 tls: Some(TlsConfig::default()),
179 auth: AwsAuthentication::default(),
180 acknowledgements: Default::default(),
181 timezone: Default::default(),
182 force_path_style: Default::default(),
183 retry_strategy: Default::default(),
184 })
185 .unwrap()
186 }
187}
188
189#[async_trait::async_trait]
190#[typetag::serde(name = "aws_s3")]
191impl SinkConfig for S3SinkConfig {
192 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
193 let service = self.create_service(&cx.proxy).await?;
194 let healthcheck = self.build_healthcheck(service.client())?;
195 let sink = self.build_processor(service, cx)?;
196 Ok((sink, healthcheck))
197 }
198
199 fn input(&self) -> Input {
200 Input::new(self.encoding.config().1.input_type())
201 }
202
203 fn acknowledgements(&self) -> &AcknowledgementsConfig {
204 &self.acknowledgements
205 }
206}
207
208impl S3SinkConfig {
209 pub fn build_processor(
210 &self,
211 service: S3Service,
212 cx: SinkContext,
213 ) -> crate::Result<VectorSink> {
214 let request_limits = self.request.into_settings();
219 let retry_strategy = self.retry_strategy.clone();
220 let service = ServiceBuilder::new()
221 .settings(request_limits, retry_strategy)
222 .service(service);
223
224 let offset = self
225 .timezone
226 .or(cx.globals.timezone)
227 .and_then(timezone_to_offset);
228
229 let batch_settings = self.batch.into_batcher_settings()?;
231
232 let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
233
234 let ssekms_key_id = self
235 .options
236 .ssekms_key_id
237 .as_ref()
238 .cloned()
239 .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
240 .transpose()?;
241
242 let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
243
244 let transformer = self.encoding.transformer();
245 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
246 let encoder = Encoder::<Framer>::new(framer, serializer);
247
248 let request_options = S3RequestOptions {
249 bucket: self.bucket.clone(),
250 api_options: self.options.clone(),
251 filename_extension: self.filename_extension.clone(),
252 filename_time_format: self.filename_time_format.clone(),
253 filename_append_uuid: self.filename_append_uuid,
254 encoder: (transformer, encoder),
255 compression: self.compression,
256 filename_tz_offset: offset,
257 };
258
259 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
260
261 Ok(VectorSink::from_event_streamsink(sink))
262 }
263
264 pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
265 s3_common::config::build_healthcheck(self.bucket.clone(), client)
266 }
267
268 pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
269 s3_common::config::create_service(
270 &self.region,
271 &self.auth,
272 proxy,
273 self.tls.as_ref(),
274 self.force_path_style,
275 )
276 .await
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::S3SinkConfig;
283
284 #[test]
285 fn generate_config() {
286 crate::test_util::test_generate_config::<S3SinkConfig>();
287 }
288}