vector/sinks/aws_s3/
config.rs

1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3#[cfg(feature = "codecs-parquet")]
4use vector_lib::codecs::BatchEncoder;
5#[cfg(feature = "codecs-parquet")]
6use vector_lib::codecs::encoding::BatchSerializerConfig;
7use vector_lib::{
8    TimeZone,
9    codecs::{
10        EncoderKind, TextSerializerConfig,
11        encoding::{Framer, FramingConfig},
12    },
13    configurable::configurable_component,
14    sink::VectorSink,
15};
16
17use super::sink::S3RequestOptions;
18use crate::{
19    aws::{AwsAuthentication, RegionOrEndpoint},
20    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
21    config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
22    sinks::{
23        Healthcheck,
24        s3_common::{
25            self,
26            config::{RetryStrategy, S3Options},
27            partitioner::S3KeyPartitioner,
28            service::S3Service,
29            sink::S3Sink,
30        },
31        util::{
32            BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
33            TowerRequestConfig, timezone_to_offset,
34        },
35    },
36    template::Template,
37    tls::TlsConfig,
38};
39
40/// Configuration for the `aws_s3` sink.
41#[configurable_component(sink(
42    "aws_s3",
43    "Store observability events in the AWS S3 object storage system."
44))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct S3SinkConfig {
48    /// The S3 bucket name.
49    ///
50    /// This must not include a leading `s3://` or a trailing `/`.
51    #[configurable(metadata(docs::examples = "my-bucket"))]
52    pub bucket: String,
53
54    /// A prefix to apply to all object keys.
55    ///
56    /// Prefixes are useful for partitioning objects, such as by creating an object key that
57    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
58    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
59    #[serde(default = "default_key_prefix")]
60    #[configurable(metadata(docs::templateable))]
61    #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
62    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
63    #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
64    pub key_prefix: String,
65
66    /// The timestamp format for the time component of the object key.
67    ///
68    /// By default, object keys are appended with a timestamp that reflects when the objects are
69    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
70    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
71    ///
72    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
73    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
74    /// timestamps in seconds since the Unix epoch.
75    ///
76    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
77    /// languages.
78    ///
79    /// When set to an empty string, no timestamp is appended to the key prefix.
80    ///
81    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
82    #[serde(default = "default_filename_time_format")]
83    pub filename_time_format: String,
84
85    /// Whether or not to append a UUID v4 token to the end of the object key.
86    ///
87    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
88    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
89    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
90    ///
91    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
92    /// object keys must be unique.
93    #[serde(default = "crate::serde::default_true")]
94    #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
95    pub filename_append_uuid: bool,
96
97    /// The filename extension to use in the object key.
98    ///
99    /// This overrides setting the extension based on the configured `compression`.
100    #[configurable(metadata(docs::examples = "json"))]
101    pub filename_extension: Option<String>,
102
103    #[serde(flatten)]
104    pub options: S3Options,
105
106    #[serde(flatten)]
107    pub region: RegionOrEndpoint,
108
109    #[serde(flatten)]
110    pub encoding: EncodingConfigWithFraming,
111
112    /// Batch encoding configuration for columnar formats.
113    ///
114    /// When set, events are encoded together as a batch in a columnar format (e.g., Parquet)
115    /// instead of the standard per-event framing-based encoding. The columnar format handles
116    /// its own internal compression, so the top-level `compression` setting is bypassed.
117    #[cfg(feature = "codecs-parquet")]
118    #[configurable(derived)]
119    #[serde(default)]
120    pub batch_encoding: Option<BatchSerializerConfig>,
121
122    /// Compression configuration.
123    ///
124    /// All compression algorithms use the default compression level unless otherwise specified.
125    ///
126    /// Some cloud storage API clients and browsers handle decompression transparently, so
127    /// depending on how they are accessed, files may not always appear to be compressed.
128    #[configurable(derived)]
129    #[serde(default = "Compression::gzip_default")]
130    pub compression: Compression,
131
132    #[configurable(derived)]
133    #[serde(default)]
134    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
135
136    #[configurable(derived)]
137    #[serde(default)]
138    pub request: TowerRequestConfig,
139
140    #[configurable(derived)]
141    pub tls: Option<TlsConfig>,
142
143    #[configurable(derived)]
144    #[serde(default)]
145    pub auth: AwsAuthentication,
146
147    #[configurable(derived)]
148    #[serde(
149        default,
150        deserialize_with = "crate::serde::bool_or_struct",
151        skip_serializing_if = "crate::serde::is_default"
152    )]
153    pub acknowledgements: AcknowledgementsConfig,
154
155    #[configurable(derived)]
156    #[serde(default)]
157    pub timezone: Option<TimeZone>,
158
159    /// Specifies which addressing style to use.
160    ///
161    /// This controls if the bucket name is in the hostname or part of the URL.
162    #[serde(default = "crate::serde::default_true")]
163    pub force_path_style: bool,
164
165    /// Specifies retry strategy for failed requests.
166    ///
167    /// By default, the sink only retries attempts it deems possible to retry.
168    /// These settings extend the default behavior.
169    #[configurable(derived)]
170    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
171    pub retry_strategy: RetryStrategy,
172}
173
174pub(super) fn default_key_prefix() -> String {
175    "date=%F".to_string()
176}
177
178pub(super) fn default_filename_time_format() -> String {
179    "%s".to_string()
180}
181
182impl GenerateConfig for S3SinkConfig {
183    fn generate_config() -> toml::Value {
184        toml::Value::try_from(Self {
185            bucket: "".to_owned(),
186            key_prefix: default_key_prefix(),
187            filename_time_format: default_filename_time_format(),
188            filename_append_uuid: true,
189            filename_extension: None,
190            options: S3Options::default(),
191            region: RegionOrEndpoint::default(),
192            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
193            #[cfg(feature = "codecs-parquet")]
194            batch_encoding: None,
195            compression: Compression::gzip_default(),
196            batch: BatchConfig::default(),
197            request: TowerRequestConfig::default(),
198            tls: Some(TlsConfig::default()),
199            auth: AwsAuthentication::default(),
200            acknowledgements: Default::default(),
201            timezone: Default::default(),
202            force_path_style: Default::default(),
203            retry_strategy: Default::default(),
204        })
205        .unwrap()
206    }
207}
208
209#[async_trait::async_trait]
210#[typetag::serde(name = "aws_s3")]
211impl SinkConfig for S3SinkConfig {
212    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
213        let service = self.create_service(&cx.proxy).await?;
214        let healthcheck = self.build_healthcheck(service.client())?;
215        let sink = self.build_processor(service, cx)?;
216        Ok((sink, healthcheck))
217    }
218
219    fn input(&self) -> Input {
220        #[cfg(feature = "codecs-parquet")]
221        if let Some(batch_config) = &self.batch_encoding {
222            return Input::new(batch_config.input_type());
223        }
224        Input::new(self.encoding.config().1.input_type())
225    }
226
227    fn acknowledgements(&self) -> &AcknowledgementsConfig {
228        &self.acknowledgements
229    }
230}
231
232impl S3SinkConfig {
233    pub fn build_processor(
234        &self,
235        service: S3Service,
236        cx: SinkContext,
237    ) -> crate::Result<VectorSink> {
238        // Build our S3 client/service, which is what we'll ultimately feed
239        // requests into in order to ship files to S3.  We build this here in
240        // order to configure the client/service with retries, concurrency
241        // limits, rate limits, and whatever else the client should have.
242        let request_limits = self.request.into_settings();
243        let retry_strategy = self.retry_strategy.clone();
244        let service = ServiceBuilder::new()
245            .settings(request_limits, retry_strategy)
246            .service(service);
247
248        let offset = self
249            .timezone
250            .or(cx.globals.timezone)
251            .and_then(timezone_to_offset);
252
253        // Configure our partitioning/batching.
254        let batch_settings = self.batch.into_batcher_settings()?;
255
256        let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
257
258        let ssekms_key_id = self
259            .options
260            .ssekms_key_id
261            .as_ref()
262            .cloned()
263            .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
264            .transpose()?;
265
266        let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
267
268        let transformer = self.encoding.transformer();
269
270        // When batch_encoding is configured (e.g., Parquet), use batch mode
271        // with internal compression and appropriate file extension.
272        #[cfg(feature = "codecs-parquet")]
273        if let Some(batch_config) = &self.batch_encoding {
274            if !matches!(batch_config, BatchSerializerConfig::Parquet(_)) {
275                return Err(
276                    "batch_encoding only supports encoding with parquet format for amazon s3 sink"
277                        .into(),
278                );
279            }
280
281            let batch_serializer = batch_config.build_batch_serializer()?;
282            let batch_encoder = BatchEncoder::new(batch_serializer);
283
284            // Auto-detect Content-Type from batch format. Users can still
285            // override via `options.content_type`; we only set it when unset.
286            let mut api_options = self.options.clone();
287            if api_options.content_type.is_none() {
288                api_options.content_type = Some(batch_encoder.content_type().to_string());
289            }
290
291            let encoder = EncoderKind::Batch(batch_encoder);
292
293            // Auto-detect file extension from batch format
294            let filename_extension =
295                self.filename_extension
296                    .clone()
297                    .or_else(|| match batch_config {
298                        BatchSerializerConfig::Parquet(_) => Some("parquet".to_string()),
299                        #[allow(unreachable_patterns)]
300                        _ => None,
301                    });
302
303            if self.compression != Compression::None {
304                warn!("Top level compression setting ignored when batch_encoding set to parquet.")
305            }
306
307            let request_options = S3RequestOptions {
308                bucket: self.bucket.clone(),
309                api_options,
310                filename_extension,
311                filename_time_format: self.filename_time_format.clone(),
312                filename_append_uuid: self.filename_append_uuid,
313                encoder: (transformer, encoder),
314                // Batch formats handle their own compression internally
315                compression: Compression::None,
316                filename_tz_offset: offset,
317            };
318
319            let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
320            return Ok(VectorSink::from_event_streamsink(sink));
321        }
322
323        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
324        let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
325
326        let request_options = S3RequestOptions {
327            bucket: self.bucket.clone(),
328            api_options: self.options.clone(),
329            filename_extension: self.filename_extension.clone(),
330            filename_time_format: self.filename_time_format.clone(),
331            filename_append_uuid: self.filename_append_uuid,
332            encoder: (transformer, encoder),
333            compression: self.compression,
334            filename_tz_offset: offset,
335        };
336
337        let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
338
339        Ok(VectorSink::from_event_streamsink(sink))
340    }
341
342    pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
343        s3_common::config::build_healthcheck(self.bucket.clone(), client)
344    }
345
346    pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
347        s3_common::config::create_service(
348            &self.region,
349            &self.auth,
350            proxy,
351            self.tls.as_ref(),
352            self.force_path_style,
353        )
354        .await
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::S3SinkConfig;
361
362    #[test]
363    fn generate_config() {
364        crate::test_util::test_generate_config::<S3SinkConfig>();
365    }
366
367    /// Correct TOML shape: `batch_encoding.codec = "parquet"` with `schema_mode = "auto_infer"`.
368    #[cfg(feature = "codecs-parquet")]
369    #[test]
370    fn parquet_batch_encoding_correct_toml_shape() {
371        let config: S3SinkConfig = toml::from_str(
372            r#"
373            bucket = "test-bucket"
374            compression = "none"
375
376            [encoding]
377            codec = "text"
378
379            [batch_encoding]
380            schema_mode = "auto_infer"
381            codec = "parquet"
382
383            [batch_encoding.compression]
384            algorithm = "snappy"
385
386            "#,
387        )
388        .expect("correct batch_encoding shape should parse");
389
390        let batch_enc = config
391            .batch_encoding
392            .expect("batch_encoding should be Some");
393        match batch_enc {
394            vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(ref p) => {
395                use vector_lib::codecs::encoding::format::{ParquetCompression, ParquetSchemaMode};
396                assert_eq!(p.schema_mode, ParquetSchemaMode::AutoInfer);
397                assert_eq!(p.compression, ParquetCompression::Snappy);
398            }
399            #[allow(unreachable_patterns)]
400            _ => panic!("expected Parquet variant"),
401        }
402    }
403
404    /// Content-Type must be auto-detected as `application/vnd.apache.parquet`
405    /// when `batch_encoding` is set and `content_type` is not explicitly provided.
406    #[cfg(feature = "codecs-parquet")]
407    #[test]
408    fn parquet_content_type_auto_detected() {
409        use vector_lib::codecs::encoding::format::{
410            ParquetCompression, ParquetSchemaMode, ParquetSerializerConfig,
411        };
412
413        use crate::sinks::s3_common::config::S3Options;
414        use crate::sinks::util::{BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression};
415        use vector_lib::codecs::TextSerializerConfig;
416        use vector_lib::codecs::encoding::{BatchSerializerConfig, FramingConfig};
417
418        let parquet_config = ParquetSerializerConfig {
419            schema_mode: ParquetSchemaMode::AutoInfer,
420            compression: ParquetCompression::Snappy,
421            ..Default::default()
422        };
423
424        let config = S3SinkConfig {
425            bucket: "test".to_string(),
426            key_prefix: super::default_key_prefix(),
427            filename_time_format: super::default_filename_time_format(),
428            filename_append_uuid: true,
429            filename_extension: None,
430            options: S3Options::default(),
431            region: crate::aws::RegionOrEndpoint::with_both("us-east-1", "http://localhost:4566"),
432            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
433            batch_encoding: Some(BatchSerializerConfig::Parquet(parquet_config)),
434            compression: Compression::None,
435            batch: BatchConfig::<BulkSizeBasedDefaultBatchSettings>::default(),
436            request: Default::default(),
437            tls: Default::default(),
438            auth: Default::default(),
439            acknowledgements: Default::default(),
440            timezone: Default::default(),
441            force_path_style: true,
442            retry_strategy: Default::default(),
443        };
444
445        let batch_config = config.batch_encoding.as_ref().unwrap();
446        let batch_serializer = batch_config.build_batch_serializer().unwrap();
447        let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
448
449        let mut api_options = config.options.clone();
450        if api_options.content_type.is_none() {
451            api_options.content_type = Some(batch_encoder.content_type().to_string());
452        }
453
454        assert_eq!(
455            api_options.content_type.as_deref(),
456            Some("application/vnd.apache.parquet"),
457            "Content-Type must be auto-detected for Parquet"
458        );
459    }
460
461    /// When user explicitly sets `content_type`, the auto-detection must not override it.
462    #[cfg(feature = "codecs-parquet")]
463    #[test]
464    fn parquet_content_type_user_override_preserved() {
465        let config: S3SinkConfig = toml::from_str(
466            r#"
467            bucket = "test-bucket"
468            compression = "none"
469            content_type = "application/octet-stream"
470
471            [encoding]
472            codec = "text"
473
474            [batch_encoding]
475            codec = "parquet"
476            schema_mode = "auto_infer"
477
478            [batch_encoding.compression]
479            algorithm = "gzip"
480            level = 9
481            "#,
482        )
483        .unwrap();
484
485        let batch_config = config.batch_encoding.as_ref().unwrap();
486        let batch_serializer = batch_config.build_batch_serializer().unwrap();
487        let batch_encoder = vector_lib::codecs::BatchEncoder::new(batch_serializer);
488
489        let mut api_options = config.options.clone();
490        if api_options.content_type.is_none() {
491            api_options.content_type = Some(batch_encoder.content_type().to_string());
492        }
493
494        assert_eq!(
495            api_options.content_type.as_deref(),
496            Some("application/octet-stream"),
497            "User-specified Content-Type must not be overridden"
498        );
499    }
500
501    /// Parquet filename extension defaults to `.parquet` when not explicitly set.
502    #[cfg(feature = "codecs-parquet")]
503    #[test]
504    fn parquet_filename_extension_defaults_to_parquet() {
505        let config: S3SinkConfig = toml::from_str(
506            r#"
507            bucket = "test-bucket"
508            compression = "none"
509
510            [encoding]
511            codec = "text"
512
513            [batch_encoding]
514            codec = "parquet"
515            schema_mode = "auto_infer"
516            "#,
517        )
518        .unwrap();
519
520        assert!(
521            config.filename_extension.is_none(),
522            "fixture must not set filename_extension"
523        );
524
525        let batch_config = config.batch_encoding.as_ref().unwrap();
526        let extension = config
527            .filename_extension
528            .clone()
529            .or_else(|| match batch_config {
530                vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(_) => {
531                    Some("parquet".to_string())
532                }
533                #[allow(unreachable_patterns)]
534                _ => None,
535            });
536
537        assert_eq!(extension.as_deref(), Some("parquet"));
538    }
539
540    /// Explicit filename_extension overrides the `.parquet` default.
541    #[cfg(feature = "codecs-parquet")]
542    #[test]
543    fn parquet_filename_extension_user_override() {
544        let config: S3SinkConfig = toml::from_str(
545            r#"
546            bucket = "test-bucket"
547            compression = "none"
548            filename_extension = "pq"
549
550            [encoding]
551            codec = "text"
552
553            [batch_encoding]
554            codec = "parquet"
555            schema_mode = "auto_infer"
556            "#,
557        )
558        .unwrap();
559
560        assert_eq!(config.filename_extension.as_deref(), Some("pq"));
561    }
562
563    /// `schema_mode` defaults to `relaxed` when not specified.
564    #[cfg(feature = "codecs-parquet")]
565    #[test]
566    fn parquet_schema_mode_defaults_to_relaxed() {
567        use vector_lib::codecs::encoding::format::ParquetSchemaMode;
568
569        let config: S3SinkConfig = toml::from_str(
570            r#"
571            bucket = "test-bucket"
572            compression = "none"
573
574            [encoding]
575            codec = "text"
576
577            [batch_encoding]
578            codec = "parquet"
579            "#,
580        )
581        .unwrap();
582
583        match config.batch_encoding.unwrap() {
584            vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
585                assert_eq!(p.schema_mode, ParquetSchemaMode::Relaxed);
586            }
587            #[allow(unreachable_patterns)]
588            _ => panic!("expected Parquet variant"),
589        }
590    }
591
592    /// Explicit `schema_mode = "strict"` is correctly parsed.
593    #[cfg(feature = "codecs-parquet")]
594    #[test]
595    fn parquet_schema_mode_strict_parsed() {
596        use vector_lib::codecs::encoding::format::ParquetSchemaMode;
597
598        let config: S3SinkConfig = toml::from_str(
599            r#"
600            bucket = "test-bucket"
601            compression = "none"
602
603            [encoding]
604            codec = "text"
605
606            [batch_encoding]
607            codec = "parquet"
608            schema_mode = "strict"
609            schema_file = "tmp/something.schema"
610            "#,
611        )
612        .unwrap();
613
614        match config.batch_encoding.unwrap() {
615            vector_lib::codecs::encoding::BatchSerializerConfig::Parquet(p) => {
616                assert_eq!(p.schema_mode, ParquetSchemaMode::Strict);
617            }
618            #[allow(unreachable_patterns)]
619            _ => panic!("expected Parquet variant"),
620        }
621    }
622}