vector/sinks/aws_s3/
config.rs

1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3use vector_lib::codecs::{
4    encoding::{Framer, FramingConfig},
5    TextSerializerConfig,
6};
7use vector_lib::{configurable::configurable_component, sink::VectorSink, TimeZone};
8
9use super::sink::S3RequestOptions;
10use crate::{
11    aws::{AwsAuthentication, RegionOrEndpoint},
12    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
13    config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
14    sinks::{
15        s3_common::{
16            self,
17            config::{RetryStrategy, S3Options},
18            partitioner::S3KeyPartitioner,
19            service::S3Service,
20            sink::S3Sink,
21        },
22        util::{
23            timezone_to_offset, BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression,
24            ServiceBuilderExt, TowerRequestConfig,
25        },
26        Healthcheck,
27    },
28    template::Template,
29    tls::TlsConfig,
30};
31
32/// Configuration for the `aws_s3` sink.
33#[configurable_component(sink(
34    "aws_s3",
35    "Store observability events in the AWS S3 object storage system."
36))]
37#[derive(Clone, Debug)]
38#[serde(deny_unknown_fields)]
39pub struct S3SinkConfig {
40    /// The S3 bucket name.
41    ///
42    /// This must not include a leading `s3://` or a trailing `/`.
43    #[configurable(metadata(docs::examples = "my-bucket"))]
44    pub bucket: String,
45
46    /// A prefix to apply to all object keys.
47    ///
48    /// Prefixes are useful for partitioning objects, such as by creating an object key that
49    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
50    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
51    #[serde(default = "default_key_prefix")]
52    #[configurable(metadata(docs::templateable))]
53    #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
54    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
55    #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
56    pub key_prefix: String,
57
58    /// The timestamp format for the time component of the object key.
59    ///
60    /// By default, object keys are appended with a timestamp that reflects when the objects are
61    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
62    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
63    ///
64    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
65    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
66    /// timestamps in seconds since the Unix epoch.
67    ///
68    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
69    /// languages.
70    ///
71    /// When set to an empty string, no timestamp is appended to the key prefix.
72    ///
73    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
74    #[serde(default = "default_filename_time_format")]
75    pub filename_time_format: String,
76
77    /// Whether or not to append a UUID v4 token to the end of the object key.
78    ///
79    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
80    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
81    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
82    ///
83    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
84    /// object keys must be unique.
85    #[serde(default = "crate::serde::default_true")]
86    #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
87    pub filename_append_uuid: bool,
88
89    /// The filename extension to use in the object key.
90    ///
91    /// This overrides setting the extension based on the configured `compression`.
92    #[configurable(metadata(docs::examples = "json"))]
93    pub filename_extension: Option<String>,
94
95    #[serde(flatten)]
96    pub options: S3Options,
97
98    #[serde(flatten)]
99    pub region: RegionOrEndpoint,
100
101    #[serde(flatten)]
102    pub encoding: EncodingConfigWithFraming,
103
104    /// Compression configuration.
105    ///
106    /// All compression algorithms use the default compression level unless otherwise specified.
107    ///
108    /// Some cloud storage API clients and browsers handle decompression transparently, so
109    /// depending on how they are accessed, files may not always appear to be compressed.
110    #[configurable(derived)]
111    #[serde(default = "Compression::gzip_default")]
112    pub compression: Compression,
113
114    #[configurable(derived)]
115    #[serde(default)]
116    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
117
118    #[configurable(derived)]
119    #[serde(default)]
120    pub request: TowerRequestConfig,
121
122    #[configurable(derived)]
123    pub tls: Option<TlsConfig>,
124
125    #[configurable(derived)]
126    #[serde(default)]
127    pub auth: AwsAuthentication,
128
129    #[configurable(derived)]
130    #[serde(
131        default,
132        deserialize_with = "crate::serde::bool_or_struct",
133        skip_serializing_if = "crate::serde::is_default"
134    )]
135    pub acknowledgements: AcknowledgementsConfig,
136
137    #[configurable(derived)]
138    #[serde(default)]
139    pub timezone: Option<TimeZone>,
140
141    /// Specifies which addressing style to use.
142    ///
143    /// This controls if the bucket name is in the hostname or part of the URL.
144    #[serde(default = "crate::serde::default_true")]
145    pub force_path_style: bool,
146
147    /// Specifies errors to retry
148    ///
149    /// By default, the sink only retries attempts it deems possible to retry.
150    /// These settings extend the default behavior.
151    #[configurable(derived)]
152    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
153    pub retry_strategy: RetryStrategy,
154}
155
156pub(super) fn default_key_prefix() -> String {
157    "date=%F".to_string()
158}
159
160pub(super) fn default_filename_time_format() -> String {
161    "%s".to_string()
162}
163
164impl GenerateConfig for S3SinkConfig {
165    fn generate_config() -> toml::Value {
166        toml::Value::try_from(Self {
167            bucket: "".to_owned(),
168            key_prefix: default_key_prefix(),
169            filename_time_format: default_filename_time_format(),
170            filename_append_uuid: true,
171            filename_extension: None,
172            options: S3Options::default(),
173            region: RegionOrEndpoint::default(),
174            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
175            compression: Compression::gzip_default(),
176            batch: BatchConfig::default(),
177            request: TowerRequestConfig::default(),
178            tls: Some(TlsConfig::default()),
179            auth: AwsAuthentication::default(),
180            acknowledgements: Default::default(),
181            timezone: Default::default(),
182            force_path_style: Default::default(),
183            retry_strategy: Default::default(),
184        })
185        .unwrap()
186    }
187}
188
189#[async_trait::async_trait]
190#[typetag::serde(name = "aws_s3")]
191impl SinkConfig for S3SinkConfig {
192    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
193        let service = self.create_service(&cx.proxy).await?;
194        let healthcheck = self.build_healthcheck(service.client())?;
195        let sink = self.build_processor(service, cx)?;
196        Ok((sink, healthcheck))
197    }
198
199    fn input(&self) -> Input {
200        Input::new(self.encoding.config().1.input_type())
201    }
202
203    fn acknowledgements(&self) -> &AcknowledgementsConfig {
204        &self.acknowledgements
205    }
206}
207
208impl S3SinkConfig {
209    pub fn build_processor(
210        &self,
211        service: S3Service,
212        cx: SinkContext,
213    ) -> crate::Result<VectorSink> {
214        // Build our S3 client/service, which is what we'll ultimately feed
215        // requests into in order to ship files to S3.  We build this here in
216        // order to configure the client/service with retries, concurrency
217        // limits, rate limits, and whatever else the client should have.
218        let request_limits = self.request.into_settings();
219        let retry_strategy = self.retry_strategy.clone();
220        let service = ServiceBuilder::new()
221            .settings(request_limits, retry_strategy)
222            .service(service);
223
224        let offset = self
225            .timezone
226            .or(cx.globals.timezone)
227            .and_then(timezone_to_offset);
228
229        // Configure our partitioning/batching.
230        let batch_settings = self.batch.into_batcher_settings()?;
231
232        let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
233
234        let ssekms_key_id = self
235            .options
236            .ssekms_key_id
237            .as_ref()
238            .cloned()
239            .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
240            .transpose()?;
241
242        let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
243
244        let transformer = self.encoding.transformer();
245        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
246        let encoder = Encoder::<Framer>::new(framer, serializer);
247
248        let request_options = S3RequestOptions {
249            bucket: self.bucket.clone(),
250            api_options: self.options.clone(),
251            filename_extension: self.filename_extension.clone(),
252            filename_time_format: self.filename_time_format.clone(),
253            filename_append_uuid: self.filename_append_uuid,
254            encoder: (transformer, encoder),
255            compression: self.compression,
256            filename_tz_offset: offset,
257        };
258
259        let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
260
261        Ok(VectorSink::from_event_streamsink(sink))
262    }
263
264    pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
265        s3_common::config::build_healthcheck(self.bucket.clone(), client)
266    }
267
268    pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
269        s3_common::config::create_service(
270            &self.region,
271            &self.auth,
272            proxy,
273            self.tls.as_ref(),
274            self.force_path_style,
275        )
276        .await
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::S3SinkConfig;
283
284    #[test]
285    fn generate_config() {
286        crate::test_util::test_generate_config::<S3SinkConfig>();
287    }
288}