1use std::collections::{BTreeMap, HashMap};
2
3use aws_sdk_s3::{
4 operation::put_object::PutObjectError,
5 types::{ObjectCannedAcl, ServerSideEncryption, StorageClass},
6 Client as S3Client,
7};
8use aws_smithy_runtime_api::{
9 client::{orchestrator::HttpResponse, result::SdkError},
10 http::StatusCode,
11};
12use futures::FutureExt;
13use snafu::Snafu;
14use vector_lib::configurable::configurable_component;
15
16use super::service::{S3Request, S3Response, S3Service};
17use crate::{
18 aws::{create_client, is_retriable_error, AwsAuthentication, RegionOrEndpoint},
19 common::s3::S3ClientBuilder,
20 config::ProxyConfig,
21 http::status,
22 sinks::{util::retries::RetryLogic, Healthcheck},
23 tls::TlsConfig,
24};
25
26#[configurable_component]
28#[derive(Clone, Debug, Default)]
29pub struct S3Options {
30 pub acl: Option<S3CannedAcl>,
36
37 #[configurable(metadata(
44 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
45 ))]
46 #[configurable(metadata(docs::examples = "person@email.com"))]
47 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
48 pub grant_full_control: Option<String>,
49
50 #[configurable(metadata(
56 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
57 ))]
58 #[configurable(metadata(docs::examples = "person@email.com"))]
59 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
60 pub grant_read: Option<String>,
61
62 #[configurable(metadata(
68 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
69 ))]
70 #[configurable(metadata(docs::examples = "person@email.com"))]
71 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
72 pub grant_read_acp: Option<String>,
73
74 #[configurable(metadata(
80 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
81 ))]
82 #[configurable(metadata(docs::examples = "person@email.com"))]
83 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
84 pub grant_write_acp: Option<String>,
85
86 pub server_side_encryption: Option<S3ServerSideEncryption>,
88
89 #[configurable(metadata(docs::examples = "abcd1234"))]
96 #[configurable(metadata(docs::templateable))]
97 pub ssekms_key_id: Option<String>,
98
99 #[serde(default)]
105 pub storage_class: S3StorageClass,
106
107 #[configurable(metadata(docs::additional_props_description = "A single tag."))]
109 #[configurable(metadata(docs::examples = "example_tags()"))]
110 pub tags: Option<BTreeMap<String, String>>,
111
112 #[configurable(metadata(docs::examples = "gzip"))]
118 pub content_encoding: Option<String>,
119
120 #[configurable(metadata(docs::examples = "application/gzip"))]
127 pub content_type: Option<String>,
128}
129
130fn example_tags() -> HashMap<String, String> {
131 HashMap::<_, _>::from_iter([
132 ("Project".to_string(), "Blue".to_string()),
133 ("Classification".to_string(), "confidential".to_string()),
134 ("PHI".to_string(), "True".to_string()),
135 ])
136}
137
138#[configurable_component]
144#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
145#[derivative(Default)]
146#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
147pub enum S3StorageClass {
148 #[derivative(Default)]
150 Standard,
151
152 ReducedRedundancy,
154
155 IntelligentTiering,
157
158 StandardIa,
160
161 ExpressOnezone,
163
164 OnezoneIa,
166
167 Glacier,
169
170 GlacierIr,
172
173 DeepArchive,
175}
176
177impl From<S3StorageClass> for StorageClass {
178 fn from(x: S3StorageClass) -> Self {
179 match x {
180 S3StorageClass::Standard => Self::Standard,
181 S3StorageClass::ReducedRedundancy => Self::ReducedRedundancy,
182 S3StorageClass::IntelligentTiering => Self::IntelligentTiering,
183 S3StorageClass::StandardIa => Self::StandardIa,
184 S3StorageClass::ExpressOnezone => Self::ExpressOnezone,
185 S3StorageClass::OnezoneIa => Self::OnezoneIa,
186 S3StorageClass::Glacier => Self::Glacier,
187 S3StorageClass::GlacierIr => Self::GlacierIr,
188 S3StorageClass::DeepArchive => Self::DeepArchive,
189 }
190 }
191}
192
193#[configurable_component]
199#[derive(Clone, Copy, Debug)]
200pub enum S3ServerSideEncryption {
201 #[serde(rename = "AES256")]
205 Aes256,
206
207 #[serde(rename = "aws:kms")]
213 AwsKms,
214}
215
216impl From<S3ServerSideEncryption> for ServerSideEncryption {
217 fn from(x: S3ServerSideEncryption) -> Self {
218 match x {
219 S3ServerSideEncryption::Aes256 => Self::Aes256,
220 S3ServerSideEncryption::AwsKms => Self::AwsKms,
221 }
222 }
223}
224
225#[configurable_component]
231#[derive(Clone, Copy, Debug, Derivative)]
232#[derivative(Default)]
233#[serde(rename_all = "kebab-case")]
234pub enum S3CannedAcl {
235 #[derivative(Default)]
242 Private,
243
244 PublicRead,
249
250 PublicReadWrite,
257
258 AwsExecRead,
264
265 AuthenticatedRead,
270
271 BucketOwnerRead,
278
279 BucketOwnerFullControl,
286
287 LogDeliveryWrite,
298}
299
300impl From<S3CannedAcl> for ObjectCannedAcl {
301 fn from(x: S3CannedAcl) -> Self {
302 match x {
303 S3CannedAcl::Private => ObjectCannedAcl::Private,
304 S3CannedAcl::PublicRead => ObjectCannedAcl::PublicRead,
305 S3CannedAcl::PublicReadWrite => ObjectCannedAcl::PublicReadWrite,
306 S3CannedAcl::AwsExecRead => ObjectCannedAcl::AwsExecRead,
307 S3CannedAcl::AuthenticatedRead => ObjectCannedAcl::AuthenticatedRead,
308 S3CannedAcl::BucketOwnerRead => ObjectCannedAcl::BucketOwnerRead,
309 S3CannedAcl::BucketOwnerFullControl => ObjectCannedAcl::BucketOwnerFullControl,
310 S3CannedAcl::LogDeliveryWrite => ObjectCannedAcl::from("log-delivery-write"),
311 }
312 }
313}
314
315fn is_retriable_response(res: &HttpResponse, errors_to_retry: Option<Vec<u16>>) -> bool {
316 let status_code = res.status();
317
318 match errors_to_retry {
319 Some(error_codes) => error_codes.contains(&status_code.as_u16()),
320 None => false,
321 }
322}
323
324fn should_retry_error(
325 errors_to_retry: Option<Vec<u16>>,
326 error: &SdkError<PutObjectError, HttpResponse>,
327) -> bool {
328 match error {
329 SdkError::ResponseError(err) => is_retriable_response(err.raw(), errors_to_retry),
330 SdkError::ServiceError(err) => is_retriable_response(err.raw(), errors_to_retry),
331 _ => false,
332 }
333}
334
335#[configurable_component]
343#[derive(Debug, Clone, Default, PartialEq)]
344#[serde(tag = "type", rename_all = "snake_case")]
345#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))]
346pub enum RetryStrategy {
347 #[default]
349 None,
350
351 All,
353
354 Custom {
356 status_codes: Vec<u16>,
358 },
359}
360
361impl RetryLogic for RetryStrategy {
362 type Error = SdkError<PutObjectError, HttpResponse>;
363 type Request = S3Request;
364 type Response = S3Response;
365
366 fn is_retriable_error(&self, error: &Self::Error) -> bool {
367 match self {
368 RetryStrategy::None => false,
369 RetryStrategy::All => true,
370 RetryStrategy::Custom { status_codes } => {
371 is_retriable_error(error) || should_retry_error(Some(status_codes.clone()), error)
372 }
373 }
374 }
375}
376
377#[derive(Debug, Snafu)]
378pub enum HealthcheckError {
379 #[snafu(display("Invalid credentials"))]
380 InvalidCredentials,
381 #[snafu(display("Unknown bucket: {:?}", bucket))]
382 UnknownBucket { bucket: String },
383 #[snafu(display("Unknown status code: {}", status))]
384 UnknownStatus { status: StatusCode },
385}
386
387pub fn build_healthcheck(bucket: String, client: S3Client) -> crate::Result<Healthcheck> {
388 let healthcheck = async move {
389 let req = client
390 .head_bucket()
391 .bucket(bucket.clone())
392 .set_expected_bucket_owner(None)
393 .send()
394 .await;
395
396 match req {
397 Ok(_) => Ok(()),
398 Err(error) => Err(match error {
399 SdkError::ServiceError(inner) => {
400 let status = inner.into_raw().status();
401 match status.as_u16() {
402 status::FORBIDDEN => HealthcheckError::InvalidCredentials.into(),
403 status::NOT_FOUND => HealthcheckError::UnknownBucket { bucket }.into(),
404 _ => HealthcheckError::UnknownStatus { status }.into(),
405 }
406 }
407 error => error.into(),
408 }),
409 }
410 };
411
412 Ok(healthcheck.boxed())
413}
414
415pub async fn create_service(
416 region: &RegionOrEndpoint,
417 auth: &AwsAuthentication,
418 proxy: &ProxyConfig,
419 tls_options: Option<&TlsConfig>,
420 force_path_style: impl Into<bool>,
421) -> crate::Result<S3Service> {
422 let endpoint = region.endpoint();
423 let region = region.region();
424 let force_path_style_value: bool = force_path_style.into();
425 let client = create_client::<S3ClientBuilder>(
426 &S3ClientBuilder {
427 force_path_style: Some(force_path_style_value),
428 },
429 auth,
430 region.clone(),
431 endpoint,
432 proxy,
433 tls_options,
434 None,
435 )
436 .await?;
437 Ok(S3Service::new(client))
438}
439
440#[cfg(test)]
441mod tests {
442 use super::S3StorageClass;
443 use crate::serde::json::to_string;
444
445 #[test]
446 fn storage_class_names() {
447 for &(name, storage_class) in &[
448 ("DEEP_ARCHIVE", S3StorageClass::DeepArchive),
449 ("GLACIER", S3StorageClass::Glacier),
450 ("GLACIER_IR", S3StorageClass::GlacierIr),
451 ("INTELLIGENT_TIERING", S3StorageClass::IntelligentTiering),
452 ("EXPRESS_ONEZONE", S3StorageClass::ExpressOnezone),
453 ("ONEZONE_IA", S3StorageClass::OnezoneIa),
454 ("REDUCED_REDUNDANCY", S3StorageClass::ReducedRedundancy),
455 ("STANDARD", S3StorageClass::Standard),
456 ("STANDARD_IA", S3StorageClass::StandardIa),
457 ] {
458 assert_eq!(name, to_string(storage_class));
459 let result: S3StorageClass = serde_json::from_str(&format!("{name:?}"))
460 .unwrap_or_else(|error| panic!("Unparsable storage class name {name:?}: {error}"));
461 assert_eq!(result, storage_class);
462 }
463 }
464}