1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3#[cfg(feature = "codecs-parquet")]
4use vector_lib::codecs::BatchEncoder;
5#[cfg(feature = "codecs-parquet")]
6use vector_lib::codecs::encoding::BatchSerializerConfig;
7use vector_lib::{
8 TimeZone,
9 codecs::{
10 EncoderKind, TextSerializerConfig,
11 encoding::{Framer, FramingConfig},
12 },
13 configurable::configurable_component,
14 sink::VectorSink,
15};
16
17use super::sink::S3RequestOptions;
18use crate::{
19 aws::{AwsAuthentication, RegionOrEndpoint},
20 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
21 config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
22 sinks::{
23 Healthcheck,
24 s3_common::{
25 self,
26 config::{RetryStrategy, S3Options},
27 partitioner::S3KeyPartitioner,
28 service::S3Service,
29 sink::S3Sink,
30 },
31 util::{
32 BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
33 TowerRequestConfig, timezone_to_offset,
34 },
35 },
36 template::Template,
37 tls::TlsConfig,
38};
39
40#[configurable_component(sink(
42 "aws_s3",
43 "Store observability events in the AWS S3 object storage system."
44))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct S3SinkConfig {
48 #[configurable(metadata(docs::examples = "my-bucket"))]
52 pub bucket: String,
53
54 #[serde(default = "default_key_prefix")]
60 #[configurable(metadata(docs::templateable))]
61 #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
62 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
63 #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
64 pub key_prefix: String,
65
66 #[serde(default = "default_filename_time_format")]
83 pub filename_time_format: String,
84
85 #[serde(default = "crate::serde::default_true")]
94 #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
95 pub filename_append_uuid: bool,
96
97 #[configurable(metadata(docs::examples = "json"))]
101 pub filename_extension: Option<String>,
102
103 #[serde(flatten)]
104 pub options: S3Options,
105
106 #[serde(flatten)]
107 pub region: RegionOrEndpoint,
108
109 #[serde(flatten)]
110 pub encoding: EncodingConfigWithFraming,
111
112 #[cfg(feature = "codecs-parquet")]
118 #[configurable(derived)]
119 #[serde(default)]
120 pub batch_encoding: Option<BatchSerializerConfig>,
121
122 #[configurable(derived)]
129 #[serde(default = "Compression::gzip_default")]
130 pub compression: Compression,
131
132 #[configurable(derived)]
133 #[serde(default)]
134 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
135
136 #[configurable(derived)]
137 #[serde(default)]
138 pub request: TowerRequestConfig,
139
140 #[configurable(derived)]
141 pub tls: Option<TlsConfig>,
142
143 #[configurable(derived)]
144 #[serde(default)]
145 pub auth: AwsAuthentication,
146
147 #[configurable(derived)]
148 #[serde(
149 default,
150 deserialize_with = "crate::serde::bool_or_struct",
151 skip_serializing_if = "crate::serde::is_default"
152 )]
153 pub acknowledgements: AcknowledgementsConfig,
154
155 #[configurable(derived)]
156 #[serde(default)]
157 pub timezone: Option<TimeZone>,
158
159 #[serde(default = "crate::serde::default_true")]
163 pub force_path_style: bool,
164
165 #[configurable(derived)]
170 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
171 pub retry_strategy: RetryStrategy,
172}
173
174pub(super) fn default_key_prefix() -> String {
175 "date=%F".to_string()
176}
177
178pub(super) fn default_filename_time_format() -> String {
179 "%s".to_string()
180}
181
182impl GenerateConfig for S3SinkConfig {
183 fn generate_config() -> toml::Value {
184 toml::Value::try_from(Self {
185 bucket: "".to_owned(),
186 key_prefix: default_key_prefix(),
187 filename_time_format: default_filename_time_format(),
188 filename_append_uuid: true,
189 filename_extension: None,
190 options: S3Options::default(),
191 region: RegionOrEndpoint::default(),
192 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
193 #[cfg(feature = "codecs-parquet")]
194 batch_encoding: None,
195 compression: Compression::gzip_default(),
196 batch: BatchConfig::default(),
197 request: TowerRequestConfig::default(),
198 tls: Some(TlsConfig::default()),
199 auth: AwsAuthentication::default(),
200 acknowledgements: Default::default(),
201 timezone: Default::default(),
202 force_path_style: Default::default(),
203 retry_strategy: Default::default(),
204 })
205 .unwrap()
206 }
207}
208
209#[async_trait::async_trait]
210#[typetag::serde(name = "aws_s3")]
211impl SinkConfig for S3SinkConfig {
212 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
213 let service = self.create_service(&cx.proxy).await?;
214 let healthcheck = self.build_healthcheck(service.client())?;
215 let sink = self.build_processor(service, cx)?;
216 Ok((sink, healthcheck))
217 }
218
219 fn input(&self) -> Input {
220 #[cfg(feature = "codecs-parquet")]
221 if let Some(batch_config) = &self.batch_encoding {
222 return Input::new(batch_config.input_type());
223 }
224 Input::new(self.encoding.config().1.input_type())
225 }
226
227 fn acknowledgements(&self) -> &AcknowledgementsConfig {
228 &self.acknowledgements
229 }
230}
231
232impl S3SinkConfig {
233 pub fn build_processor(
234 &self,
235 service: S3Service,
236 cx: SinkContext,
237 ) -> crate::Result<VectorSink> {
238 let request_limits = self.request.into_settings();
243 let retry_strategy = self.retry_strategy.clone();
244 let service = ServiceBuilder::new()
245 .settings(request_limits, retry_strategy)
246 .service(service);
247
248 let offset = self
249 .timezone
250 .or(cx.globals.timezone)
251 .and_then(timezone_to_offset);
252
253 let batch_settings = self.batch.into_batcher_settings()?;
255
256 let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
257
258 let ssekms_key_id = self
259 .options
260 .ssekms_key_id
261 .as_ref()
262 .cloned()
263 .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
264 .transpose()?;
265
266 let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
267
268 let transformer = self.encoding.transformer();
269
270 #[cfg(feature = "codecs-parquet")]
273 if let Some(batch_config) = &self.batch_encoding {
274 if !matches!(batch_config, BatchSerializerConfig::Parquet(_)) {
275 return Err(
276 "batch_encoding only supports encoding with parquet format for amazon s3 sink"
277 .into(),
278 );
279 }
280
281 let batch_serializer = batch_config.build_batch_serializer()?;
282 let batch_encoder = BatchEncoder::new(batch_serializer);
283
284 let mut api_options = self.options.clone();
287 if api_options.content_type.is_none() {
288 api_options.content_type = Some(batch_encoder.content_type().to_string());
289 }
290
291 let encoder = EncoderKind::Batch(batch_encoder);
292
293 let filename_extension =
295 self.filename_extension
296 .clone()
297 .or_else(|| match batch_config {
298 BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()),
299 #[allow(unreachable_patterns)]
300 _ => None,
301 });
302
303 if self.compression != Compression::None {
304 warn!("Top level compression setting ignored when batch_encoding set to parquet.")
305 }
306
307 let request_options = S3RequestOptions {
308 bucket: self.bucket.clone(),
309 api_options,
310 filename_extension,
311 filename_time_format: self.filename_time_format.clone(),
312 filename_append_uuid: self.filename_append_uuid,
313 encoder: (transformer, encoder),
314 compression: Compression::None,
316 filename_tz_offset: offset,
317 };
318
319 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
320 return Ok(VectorSink::from_event_streamsink(sink));
321 }
322
323 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
324 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
325
326 let request_options = S3RequestOptions {
327 bucket: self.bucket.clone(),
328 api_options: self.options.clone(),
329 filename_extension: self.filename_extension.clone(),
330 filename_time_format: self.filename_time_format.clone(),
331 filename_append_uuid: self.filename_append_uuid,
332 encoder: (transformer, encoder),
333 compression: self.compression,
334 filename_tz_offset: offset,
335 };
336
337 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
338
339 Ok(VectorSink::from_event_streamsink(sink))
340 }
341
342 pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
343 s3_common::config::build_healthcheck(self.bucket.clone(), client)
344 }
345
346 pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
347 s3_common::config::create_service(
348 &self.region,
349 &self.auth,
350 proxy,
351 self.tls.as_ref(),
352 self.force_path_style,
353 )
354 .await
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::S3SinkConfig;
361
362 #[test]
363 fn generate_config() {
364 crate::test_util::test_generate_config::<S3SinkConfig>();
365 }
366
367 #[cfg(feature = "codecs-parquet")]
369 #[test]
370 fn parquet_batch_encoding_correct_toml_shape() {
371 let config: S3SinkConfig = toml::from_str(
372 r#"
373 bucket = "test-bucket"
374 compression = "none"
375
376 [encoding]
377 codec = "text"
378
379 [batch_encoding]
380 schema_mode = "auto_infer"
381 codec = "parquet"
382
383 [batch_encoding.compression]
384 algorithm = "snappy"
385
386 "#,
387 )
388 .expect("correct batch_encoding shape should parse");
389
390 let batch_enc = config
391 .batch_encoding
392 .expect("batch_encoding should be Some");
393 match batch_enc {
394 vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(ref p) => {
395 use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
396 assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
397 assert_eq!(p.compression, ParquetCompression::Snappy);
398 }
399 #[allow(unreachable_patterns)]
400 _ => panic!("expected Parquet variant"),
401 }
402 }
403
404 #[cfg(feature = "codecs-parquet")]
407 #[test]
408 fn parquet_content_type_auto_detected() {
409 use vector_lib::codecs::encoding::format::{
410 ParquetCompression, ParquetSchemaMode, ParquetSerializerConfig,
411 };
412
413 use crate::sinks::s3_common::config::S3Options;
414 use crate::sinks::util::{BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression};
415 use vector_lib::codecs::TextSerializerConfig;
416 use vector_lib::codecs::encoding::{BatchSerializerConfig, FramingConfig};
417
418 let parquet_config = ParquetSerializerConfig {
419 schema_mode: ParquetSchemaMode::AutoInfer,
420 compression: ParquetCompression::Snappy,
421 ..Default::default()
422 };
423
424 let config = S3SinkConfig {
425 bucket: "test".to_string(),
426 key_prefix: super::default_key_prefix(),
427 filename_time_format: super::default_filename_time_format(),
428 filename_append_uuid: true,
429 filename_extension: None,
430 options: S3Options::default(),
431 region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
432 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
433 batch_encoding: Some(BatchSerializerConfig::Parquet(parquet_config)),
434 compression: Compression::None,
435 batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
436 request: Default::default(),
437 tls: Default::default(),
438 auth: Default::default(),
439 acknowledgements: Default::default(),
440 timezone: Default::default(),
441 force_path_style: true,
442 retry_strategy: Default::default(),
443 };
444
445 let batch_config = config.batch_encoding.as_ref().unwrap();
446 let batch_serializer = batch_config.build_batch_serializer().unwrap();
447 let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
448
449 let mut api_options = config.options.clone();
450 if api_options.content_type.is_none() {
451 api_options.content_type = Some(batch_encoder.content_type().to_string());
452 }
453
454 assert_eq!(
455 api_options.content_type.as_deref(),
456 Some("application/vnd.apache.parquet"),
457 "Content-Type must be auto-detected for Parquet"
458 );
459 }
460
461 #[cfg(feature = "codecs-parquet")]
463 #[test]
464 fn parquet_content_type_user_override_preserved() {
465 let config: S3SinkConfig = toml::from_str(
466 r#"
467 bucket = "test-bucket"
468 compression = "none"
469 content_type = "application/octet-stream"
470
471 [encoding]
472 codec = "text"
473
474 [batch_encoding]
475 codec = "parquet"
476 schema_mode = "auto_infer"
477
478 [batch_encoding.compression]
479 algorithm = "gzip"
480 level = 9
481 "#,
482 )
483 .unwrap();
484
485 let batch_config = config.batch_encoding.as_ref().unwrap();
486 let batch_serializer = batch_config.build_batch_serializer().unwrap();
487 let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
488
489 let mut api_options = config.options.clone();
490 if api_options.content_type.is_none() {
491 api_options.content_type = Some(batch_encoder.content_type().to_string());
492 }
493
494 assert_eq!(
495 api_options.content_type.as_deref(),
496 Some("application/octet-stream"),
497 "User-specified Content-Type must not be overridden"
498 );
499 }
500
501 #[cfg(feature = "codecs-parquet")]
503 #[test]
504 fn parquet_filename_extension_defaults_to_parquet() {
505 let config: S3SinkConfig = toml::from_str(
506 r#"
507 bucket = "test-bucket"
508 compression = "none"
509
510 [encoding]
511 codec = "text"
512
513 [batch_encoding]
514 codec = "parquet"
515 schema_mode = "auto_infer"
516 "#,
517 )
518 .unwrap();
519
520 assert!(
521 config.filename_extension.is_none(),
522 "fixture must not set filename_extension"
523 );
524
525 let batch_config = config.batch_encoding.as_ref().unwrap();
526 let extension = config
527 .filename_extension
528 .clone()
529 .or_else(|| match batch_config {
530 vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(_) => {
531 Some("parquet".to_string())
532 }
533 #[allow(unreachable_patterns)]
534 _ => None,
535 });
536
537 assert_eq!(extension.as_deref(), Some("parquet"));
538 }
539
540 #[cfg(feature = "codecs-parquet")]
542 #[test]
543 fn parquet_filename_extension_user_override() {
544 let config: S3SinkConfig = toml::from_str(
545 r#"
546 bucket = "test-bucket"
547 compression = "none"
548 filename_extension = "pq"
549
550 [encoding]
551 codec = "text"
552
553 [batch_encoding]
554 codec = "parquet"
555 schema_mode = "auto_infer"
556 "#,
557 )
558 .unwrap();
559
560 assert_eq!(config.filename_extension.as_deref(), Some("pq"));
561 }
562
563 #[cfg(feature = "codecs-parquet")]
565 #[test]
566 fn parquet_schema_mode_defaults_to_relaxed() {
567 use vector_lib::codecs::encoding::format::ParquetSchemaMode;
568
569 let config: S3SinkConfig = toml::from_str(
570 r#"
571 bucket = "test-bucket"
572 compression = "none"
573
574 [encoding]
575 codec = "text"
576
577 [batch_encoding]
578 codec = "parquet"
579 "#,
580 )
581 .unwrap();
582
583 match config.batch_encoding.unwrap() {
584 vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
585 assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
586 }
587 #[allow(unreachable_patterns)]
588 _ => panic!("expected Parquet variant"),
589 }
590 }
591
592 #[cfg(feature = "codecs-parquet")]
594 #[test]
595 fn parquet_schema_mode_strict_parsed() {
596 use vector_lib::codecs::encoding::format::ParquetSchemaMode;
597
598 let config: S3SinkConfig = toml::from_str(
599 r#"
600 bucket = "test-bucket"
601 compression = "none"
602
603 [encoding]
604 codec = "text"
605
606 [batch_encoding]
607 codec = "parquet"
608 schema_mode = "strict"
609 schema_file = "tmp/something.schema"
610 "#,
611 )
612 .unwrap();
613
614 match config.batch_encoding.unwrap() {
615 vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
616 assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
617 }
618 #[allow(unreachable_patterns)]
619 _ => panic!("expected Parquet variant"),
620 }
621 }
622}