1use aws_sdk_s3::Client as S3Client;
2use tower::ServiceBuilder;
3use vector_lib::{
4 TimeZone,
5 codecs::{
6 TextSerializerConfig,
7 encoding::{Framer, FramingConfig},
8 },
9 configurable::configurable_component,
10 sink::VectorSink,
11};
12
13use super::sink::S3RequestOptions;
14use crate::{
15 aws::{AwsAuthentication, RegionOrEndpoint},
16 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
17 config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext},
18 sinks::{
19 Healthcheck,
20 s3_common::{
21 self,
22 config::{RetryStrategy, S3Options},
23 partitioner::S3KeyPartitioner,
24 service::S3Service,
25 sink::S3Sink,
26 },
27 util::{
28 BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
29 TowerRequestConfig, timezone_to_offset,
30 },
31 },
32 template::Template,
33 tls::TlsConfig,
34};
35
36#[configurable_component(sink(
38 "aws_s3",
39 "Store observability events in the AWS S3 object storage system."
40))]
41#[derive(Clone, Debug)]
42#[serde(deny_unknown_fields)]
43pub struct S3SinkConfig {
44 #[configurable(metadata(docs::examples = "my-bucket"))]
48 pub bucket: String,
49
50 #[serde(default = "default_key_prefix")]
56 #[configurable(metadata(docs::templateable))]
57 #[configurable(metadata(docs::examples = "date=%F/hour=%H"))]
58 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d"))]
59 #[configurable(metadata(docs::examples = "application_id={{ application_id }}/date=%F"))]
60 pub key_prefix: String,
61
62 #[serde(default = "default_filename_time_format")]
79 pub filename_time_format: String,
80
81 #[serde(default = "crate::serde::default_true")]
90 #[configurable(metadata(docs::human_name = "Append UUID to Filename"))]
91 pub filename_append_uuid: bool,
92
93 #[configurable(metadata(docs::examples = "json"))]
97 pub filename_extension: Option<String>,
98
99 #[serde(flatten)]
100 pub options: S3Options,
101
102 #[serde(flatten)]
103 pub region: RegionOrEndpoint,
104
105 #[serde(flatten)]
106 pub encoding: EncodingConfigWithFraming,
107
108 #[configurable(derived)]
115 #[serde(default = "Compression::gzip_default")]
116 pub compression: Compression,
117
118 #[configurable(derived)]
119 #[serde(default)]
120 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
121
122 #[configurable(derived)]
123 #[serde(default)]
124 pub request: TowerRequestConfig,
125
126 #[configurable(derived)]
127 pub tls: Option<TlsConfig>,
128
129 #[configurable(derived)]
130 #[serde(default)]
131 pub auth: AwsAuthentication,
132
133 #[configurable(derived)]
134 #[serde(
135 default,
136 deserialize_with = "crate::serde::bool_or_struct",
137 skip_serializing_if = "crate::serde::is_default"
138 )]
139 pub acknowledgements: AcknowledgementsConfig,
140
141 #[configurable(derived)]
142 #[serde(default)]
143 pub timezone: Option<TimeZone>,
144
145 #[serde(default = "crate::serde::default_true")]
149 pub force_path_style: bool,
150
151 #[configurable(derived)]
156 #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
157 pub retry_strategy: RetryStrategy,
158}
159
160pub(super) fn default_key_prefix() -> String {
161 "date=%F".to_string()
162}
163
164pub(super) fn default_filename_time_format() -> String {
165 "%s".to_string()
166}
167
168impl GenerateConfig for S3SinkConfig {
169 fn generate_config() -> toml::Value {
170 toml::Value::try_from(Self {
171 bucket: "".to_owned(),
172 key_prefix: default_key_prefix(),
173 filename_time_format: default_filename_time_format(),
174 filename_append_uuid: true,
175 filename_extension: None,
176 options: S3Options::default(),
177 region: RegionOrEndpoint::default(),
178 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
179 compression: Compression::gzip_default(),
180 batch: BatchConfig::default(),
181 request: TowerRequestConfig::default(),
182 tls: Some(TlsConfig::default()),
183 auth: AwsAuthentication::default(),
184 acknowledgements: Default::default(),
185 timezone: Default::default(),
186 force_path_style: Default::default(),
187 retry_strategy: Default::default(),
188 })
189 .unwrap()
190 }
191}
192
193#[async_trait::async_trait]
194#[typetag::serde(name = "aws_s3")]
195impl SinkConfig for S3SinkConfig {
196 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
197 let service = self.create_service(&cx.proxy).await?;
198 let healthcheck = self.build_healthcheck(service.client())?;
199 let sink = self.build_processor(service, cx)?;
200 Ok((sink, healthcheck))
201 }
202
203 fn input(&self) -> Input {
204 Input::new(self.encoding.config().1.input_type())
205 }
206
207 fn acknowledgements(&self) -> &AcknowledgementsConfig {
208 &self.acknowledgements
209 }
210}
211
212impl S3SinkConfig {
213 pub fn build_processor(
214 &self,
215 service: S3Service,
216 cx: SinkContext,
217 ) -> crate::Result<VectorSink> {
218 let request_limits = self.request.into_settings();
223 let retry_strategy = self.retry_strategy.clone();
224 let service = ServiceBuilder::new()
225 .settings(request_limits, retry_strategy)
226 .service(service);
227
228 let offset = self
229 .timezone
230 .or(cx.globals.timezone)
231 .and_then(timezone_to_offset);
232
233 let batch_settings = self.batch.into_batcher_settings()?;
235
236 let key_prefix = Template::try_from(self.key_prefix.clone())?.with_tz_offset(offset);
237
238 let ssekms_key_id = self
239 .options
240 .ssekms_key_id
241 .as_ref()
242 .cloned()
243 .map(|ssekms_key_id| Template::try_from(ssekms_key_id.as_str()))
244 .transpose()?;
245
246 let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None);
247
248 let transformer = self.encoding.transformer();
249 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
250 let encoder = Encoder::<Framer>::new(framer, serializer);
251
252 let request_options = S3RequestOptions {
253 bucket: self.bucket.clone(),
254 api_options: self.options.clone(),
255 filename_extension: self.filename_extension.clone(),
256 filename_time_format: self.filename_time_format.clone(),
257 filename_append_uuid: self.filename_append_uuid,
258 encoder: (transformer, encoder),
259 compression: self.compression,
260 filename_tz_offset: offset,
261 };
262
263 let sink = S3Sink::new(service, request_options, partitioner, batch_settings);
264
265 Ok(VectorSink::from_event_streamsink(sink))
266 }
267
268 pub fn build_healthcheck(&self, client: S3Client) -> crate::Result<Healthcheck> {
269 s3_common::config::build_healthcheck(self.bucket.clone(), client)
270 }
271
272 pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
273 s3_common::config::create_service(
274 &self.region,
275 &self.auth,
276 proxy,
277 self.tls.as_ref(),
278 self.force_path_style,
279 )
280 .await
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::S3SinkConfig;
287
288 #[test]
289 fn generate_config() {
290 crate::test_util::test_generate_config::<S3SinkConfig>();
291 }
292}