vector/sinks/aws_s3/
config.rs

1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3use vector_lib::{
4    TimeZone,
5    codecs::{
6        TextSerializerConfig,
7        encoding::{Framer, FramingConfig},
8    },
9    configurable::configurable_component,
10    sink::VectorSink,
11};
12
13use super::sink::S3RequestOptions;
14use crate::{
15    aws::{AwsAuthentication, RegionOrEndpoint},
16    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
17    config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
18    sinks::{
19        Healthcheck,
20        s3_common::{
21            self,
22            config::{RetryStrategy, S3Options},
23            partitioner::S3KeyPartitioner,
24            service::S3Service,
25            sink::S3Sink,
26        },
27        util::{
28            BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
29            TowerRequestConfig, timezone_to_offset,
30        },
31    },
32    template::Template,
33    tls::TlsConfig,
34};
35
36/// Configuration for the `aws_s3` sink.
37#[configurable_component(sink(
38    "aws_s3",
39    "Store observability events in the AWS S3 object storage system."
40))]
41#[derive(Clone, Debug)]
42#[serde(deny_unknown_fields)]
43pub struct S3SinkConfig {
44    /// The S3 bucket name.
45    ///
46    /// This must not include a leading `s3://` or a trailing `/`.
47    #[configurable(metadata(docs::examples = "my-bucket"))]
48    pub bucket: String,
49
50    /// A prefix to apply to all object keys.
51    ///
52    /// Prefixes are useful for partitioning objects, such as by creating an object key that
53    /// stores objects under a particular directory. If using a prefix for this purpose, it must end
54    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
55    #[serde(default = "default_key_prefix")]
56    #[configurable(metadata(docs::templateable))]
57    #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
58    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
59    #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
60    pub key_prefix: String,
61
62    /// The timestamp format for the time component of the object key.
63    ///
64    /// By default, object keys are appended with a timestamp that reflects when the objects are
65    /// sent to S3, such that the resulting object key is functionally equivalent to joining the key
66    /// prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
67    ///
68    /// This would represent a `key_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
69    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
70    /// timestamps in seconds since the Unix epoch.
71    ///
72    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
73    /// languages.
74    ///
75    /// When set to an empty string, no timestamp is appended to the key prefix.
76    ///
77    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
78    #[serde(default = "default_filename_time_format")]
79    pub filename_time_format: String,
80
81    /// Whether or not to append a UUID v4 token to the end of the object key.
82    ///
83    /// The UUID is appended to the timestamp portion of the object key, such that if the object key
84    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
85    /// in an object key that looks like `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
86    ///
87    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
88    /// object keys must be unique.
89    #[serde(default = "crate::serde::default_true")]
90    #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
91    pub filename_append_uuid: bool,
92
93    /// The filename extension to use in the object key.
94    ///
95    /// This overrides setting the extension based on the configured `compression`.
96    #[configurable(metadata(docs::examples = "json"))]
97    pub filename_extension: Option<String>,
98
99    #[serde(flatten)]
100    pub options: S3Options,
101
102    #[serde(flatten)]
103    pub region: RegionOrEndpoint,
104
105    #[serde(flatten)]
106    pub encoding: EncodingConfigWithFraming,
107
108    /// Compression configuration.
109    ///
110    /// All compression algorithms use the default compression level unless otherwise specified.
111    ///
112    /// Some cloud storage API clients and browsers handle decompression transparently, so
113    /// depending on how they are accessed, files may not always appear to be compressed.
114    #[configurable(derived)]
115    #[serde(default = "Compression::gzip_default")]
116    pub compression: Compression,
117
118    #[configurable(derived)]
119    #[serde(default)]
120    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
121
122    #[configurable(derived)]
123    #[serde(default)]
124    pub request: TowerRequestConfig,
125
126    #[configurable(derived)]
127    pub tls: Option<TlsConfig>,
128
129    #[configurable(derived)]
130    #[serde(default)]
131    pub auth: AwsAuthentication,
132
133    #[configurable(derived)]
134    #[serde(
135        default,
136        deserialize_with = "crate::serde::bool_or_struct",
137        skip_serializing_if = "crate::serde::is_default"
138    )]
139    pub acknowledgements: AcknowledgementsConfig,
140
141    #[configurable(derived)]
142    #[serde(default)]
143    pub timezone: Option<TimeZone>,
144
145    /// Specifies which addressing style to use.
146    ///
147    /// This controls if the bucket name is in the hostname or part of the URL.
148    #[serde(default = "crate::serde::default_true")]
149    pub force_path_style: bool,
150
151    /// Specifies retry strategy for failed requests.
152    ///
153    /// By default, the sink only retries attempts it deems possible to retry.
154    /// These settings extend the default behavior.
155    #[configurable(derived)]
156    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
157    pub retry_strategy: RetryStrategy,
158}
159
160pub(super) fn default_key_prefix() -> String {
161    "date=%F".to_string()
162}
163
164pub(super) fn default_filename_time_format() -> String {
165    "%s".to_string()
166}
167
168impl GenerateConfig for S3SinkConfig {
169    fn generate_config() -> toml::Value {
170        toml::Value::try_from(Self {
171            bucket: "".to_owned(),
172            key_prefix: default_key_prefix(),
173            filename_time_format: default_filename_time_format(),
174            filename_append_uuid: true,
175            filename_extension: None,
176            options: S3Options::default(),
177            region: RegionOrEndpoint::default(),
178            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
179            compression: Compression::gzip_default(),
180            batch: BatchConfig::default(),
181            request: TowerRequestConfig::default(),
182            tls: Some(TlsConfig::default()),
183            auth: AwsAuthentication::default(),
184            acknowledgements: Default::default(),
185            timezone: Default::default(),
186            force_path_style: Default::default(),
187            retry_strategy: Default::default(),
188        })
189        .unwrap()
190    }
191}
192
193#[async_trait::async_trait]
194#[typetag::serde(name = "aws_s3")]
195impl SinkConfig for S3SinkConfig {
196    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
197        let service = self.create_service(&cx.proxy).await?;
198        let healthcheck = self.build_healthcheck(service.client())?;
199        let sink = self.build_processor(service, cx)?;
200        Ok((sink, healthcheck))
201    }
202
203    fn input(&self) -> Input {
204        Input::new(self.encoding.config().1.input_type())
205    }
206
207    fn acknowledgements(&self) -> &AcknowledgementsConfig {
208        &self.acknowledgements
209    }
210}
211
212impl S3SinkConfig {
213    pub fn build_processor(
214        &self,
215        service: S3Service,
216        cx: SinkContext,
217    ) -> crate::Result<VectorSink> {
218        // Build our S3 client/service, which is what we'll ultimately feed
219        // requests into in order to ship files to S3.  We build this here in
220        // order to configure the client/service with retries, concurrency
221        // limits, rate limits, and whatever else the client should have.
222        let request_limits = self.request.into_settings();
223        let retry_strategy = self.retry_strategy.clone();
224        let service = ServiceBuilder::new()
225            .settings(request_limits, retry_strategy)
226            .service(service);
227
228        let offset = self
229            .timezone
230            .or(cx.globals.timezone)
231            .and_then(timezone_to_offset);
232
233        // Configure our partitioning/batching.
234        let batch_settings = self.batch.into_batcher_settings()?;
235
236        let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
237
238        let ssekms_key_id = self
239            .options
240            .ssekms_key_id
241            .as_ref()
242            .cloned()
243            .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
244            .transpose()?;
245
246        let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
247
248        let transformer = self.encoding.transformer();
249        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
250        let encoder = Encoder::<Framer>::new(framer, serializer);
251
252        let request_options = S3RequestOptions {
253            bucket: self.bucket.clone(),
254            api_options: self.options.clone(),
255            filename_extension: self.filename_extension.clone(),
256            filename_time_format: self.filename_time_format.clone(),
257            filename_append_uuid: self.filename_append_uuid,
258            encoder: (transformer, encoder),
259            compression: self.compression,
260            filename_tz_offset: offset,
261        };
262
263        let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
264
265        Ok(VectorSink::from_event_streamsink(sink))
266    }
267
268    pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
269        s3_common::config::build_healthcheck(self.bucket.clone(), client)
270    }
271
272    pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
273        s3_common::config::create_service(
274            &self.region,
275            &self.auth,
276            proxy,
277            self.tls.as_ref(),
278            self.force_path_style,
279        )
280        .await
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::S3SinkConfig;
287
288    #[test]
289    fn generate_config() {
290        crate::test_util::test_generate_config::<S3SinkConfig>();
291    }
292}