1use std::collections::{BTreeMap, HashMap};
2
3use aws_sdk_s3::{
4 Client as S3Client,
5 operation::put_object::PutObjectError,
6 types::{ObjectCannedAcl, ServerSideEncryption, StorageClass},
7};
8use aws_smithy_runtime_api::{
9 client::{orchestrator::HttpResponse, result::SdkError},
10 http::StatusCode,
11};
12use futures::FutureExt;
13use snafu::Snafu;
14use vector_lib::configurable::configurable_component;
15
16use super::service::{S3Request, S3Response, S3Service};
17use crate::{
18 aws::{AwsAuthentication, RegionOrEndpoint, create_client, is_retriable_error},
19 common::s3::S3ClientBuilder,
20 config::ProxyConfig,
21 http::status,
22 sinks::{Healthcheck, util::retries::RetryLogic},
23 tls::TlsConfig,
24};
25
26#[configurable_component]
28#[derive(Clone, Debug, Default)]
29pub struct S3Options {
30 pub acl: Option<S3CannedAcl>,
36
37 #[configurable(metadata(
44 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
45 ))]
46 #[configurable(metadata(docs::examples = "person@email.com"))]
47 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
48 pub grant_full_control: Option<String>,
49
50 #[configurable(metadata(
56 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
57 ))]
58 #[configurable(metadata(docs::examples = "person@email.com"))]
59 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
60 pub grant_read: Option<String>,
61
62 #[configurable(metadata(
68 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
69 ))]
70 #[configurable(metadata(docs::examples = "person@email.com"))]
71 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
72 pub grant_read_acp: Option<String>,
73
74 #[configurable(metadata(
80 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
81 ))]
82 #[configurable(metadata(docs::examples = "person@email.com"))]
83 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
84 pub grant_write_acp: Option<String>,
85
86 pub server_side_encryption: Option<S3ServerSideEncryption>,
88
89 #[configurable(metadata(docs::examples = "abcd1234"))]
96 #[configurable(metadata(docs::templateable))]
97 pub ssekms_key_id: Option<String>,
98
99 #[serde(default)]
105 pub storage_class: S3StorageClass,
106
107 #[configurable(metadata(docs::additional_props_description = "A single tag."))]
109 #[configurable(metadata(docs::examples = "example_tags()"))]
110 pub tags: Option<BTreeMap<String, String>>,
111
112 #[configurable(metadata(docs::examples = "gzip"))]
118 pub content_encoding: Option<String>,
119
120 #[configurable(metadata(docs::examples = "application/gzip"))]
127 pub content_type: Option<String>,
128}
129
130fn example_tags() -> HashMap<String, String> {
131 HashMap::<_, _>::from_iter([
132 ("Project".to_string(), "Blue".to_string()),
133 ("Classification".to_string(), "confidential".to_string()),
134 ("PHI".to_string(), "True".to_string()),
135 ])
136}
137
138#[configurable_component]
144#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
145#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
146pub enum S3StorageClass {
147 #[default]
149 Standard,
150
151 ReducedRedundancy,
153
154 IntelligentTiering,
156
157 StandardIa,
159
160 ExpressOnezone,
162
163 OnezoneIa,
165
166 Glacier,
168
169 GlacierIr,
171
172 DeepArchive,
174}
175
176impl From<S3StorageClass> for StorageClass {
177 fn from(x: S3StorageClass) -> Self {
178 match x {
179 S3StorageClass::Standard => Self::Standard,
180 S3StorageClass::ReducedRedundancy => Self::ReducedRedundancy,
181 S3StorageClass::IntelligentTiering => Self::IntelligentTiering,
182 S3StorageClass::StandardIa => Self::StandardIa,
183 S3StorageClass::ExpressOnezone => Self::ExpressOnezone,
184 S3StorageClass::OnezoneIa => Self::OnezoneIa,
185 S3StorageClass::Glacier => Self::Glacier,
186 S3StorageClass::GlacierIr => Self::GlacierIr,
187 S3StorageClass::DeepArchive => Self::DeepArchive,
188 }
189 }
190}
191
192#[configurable_component]
198#[derive(Clone, Copy, Debug)]
199pub enum S3ServerSideEncryption {
200 #[serde(rename = "AES256")]
204 Aes256,
205
206 #[serde(rename = "aws:kms")]
212 AwsKms,
213}
214
215impl From<S3ServerSideEncryption> for ServerSideEncryption {
216 fn from(x: S3ServerSideEncryption) -> Self {
217 match x {
218 S3ServerSideEncryption::Aes256 => Self::Aes256,
219 S3ServerSideEncryption::AwsKms => Self::AwsKms,
220 }
221 }
222}
223
224#[configurable_component]
230#[derive(Clone, Copy, Debug, Default)]
231#[serde(rename_all = "kebab-case")]
232pub enum S3CannedAcl {
233 #[default]
240 Private,
241
242 PublicRead,
247
248 PublicReadWrite,
255
256 AwsExecRead,
262
263 AuthenticatedRead,
268
269 BucketOwnerRead,
276
277 BucketOwnerFullControl,
284
285 LogDeliveryWrite,
296}
297
298impl From<S3CannedAcl> for ObjectCannedAcl {
299 fn from(x: S3CannedAcl) -> Self {
300 match x {
301 S3CannedAcl::Private => ObjectCannedAcl::Private,
302 S3CannedAcl::PublicRead => ObjectCannedAcl::PublicRead,
303 S3CannedAcl::PublicReadWrite => ObjectCannedAcl::PublicReadWrite,
304 S3CannedAcl::AwsExecRead => ObjectCannedAcl::AwsExecRead,
305 S3CannedAcl::AuthenticatedRead => ObjectCannedAcl::AuthenticatedRead,
306 S3CannedAcl::BucketOwnerRead => ObjectCannedAcl::BucketOwnerRead,
307 S3CannedAcl::BucketOwnerFullControl => ObjectCannedAcl::BucketOwnerFullControl,
308 S3CannedAcl::LogDeliveryWrite => ObjectCannedAcl::from("log-delivery-write"),
309 }
310 }
311}
312
313fn is_retriable_response(res: &HttpResponse, errors_to_retry: Option<Vec<u16>>) -> bool {
314 let status_code = res.status();
315
316 match errors_to_retry {
317 Some(error_codes) => error_codes.contains(&status_code.as_u16()),
318 None => false,
319 }
320}
321
322fn should_retry_error(
323 errors_to_retry: Option<Vec<u16>>,
324 error: &SdkError<PutObjectError, HttpResponse>,
325) -> bool {
326 match error {
327 SdkError::ResponseError(err) => is_retriable_response(err.raw(), errors_to_retry),
328 SdkError::ServiceError(err) => is_retriable_response(err.raw(), errors_to_retry),
329 _ => false,
330 }
331}
332
333#[configurable_component]
341#[derive(Debug, Clone, Default, PartialEq)]
342#[serde(tag = "type", rename_all = "snake_case")]
343#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))]
344pub enum RetryStrategy {
345 None,
347
348 #[default]
361 Default,
362
363 All,
365
366 Custom {
368 status_codes: Vec<u16>,
370 },
371}
372
373impl RetryLogic for RetryStrategy {
374 type Error = SdkError<PutObjectError, HttpResponse>;
375 type Request = S3Request;
376 type Response = S3Response;
377
378 fn is_retriable_error(&self, error: &Self::Error) -> bool {
379 match self {
380 RetryStrategy::None => false,
381 RetryStrategy::Default => is_retriable_error(error),
382 RetryStrategy::All => true,
383 RetryStrategy::Custom { status_codes } => {
384 is_retriable_error(error) || should_retry_error(Some(status_codes.clone()), error)
385 }
386 }
387 }
388}
389
390#[derive(Debug, Snafu)]
391pub enum HealthcheckError {
392 #[snafu(display("Invalid credentials"))]
393 InvalidCredentials,
394 #[snafu(display("Unknown bucket: {:?}", bucket))]
395 UnknownBucket { bucket: String },
396 #[snafu(display("Unknown status code: {}", status))]
397 UnknownStatus { status: StatusCode },
398}
399
400pub fn build_healthcheck(bucket: String, client: S3Client) -> crate::Result<Healthcheck> {
401 let healthcheck = async move {
402 let req = client
403 .head_bucket()
404 .bucket(bucket.clone())
405 .set_expected_bucket_owner(None)
406 .send()
407 .await;
408
409 match req {
410 Ok(_) => Ok(()),
411 Err(error) => Err(match error {
412 SdkError::ServiceError(inner) => {
413 let status = inner.into_raw().status();
414 match status.as_u16() {
415 status::FORBIDDEN => HealthcheckError::InvalidCredentials.into(),
416 status::NOT_FOUND => HealthcheckError::UnknownBucket { bucket }.into(),
417 _ => HealthcheckError::UnknownStatus { status }.into(),
418 }
419 }
420 error => error.into(),
421 }),
422 }
423 };
424
425 Ok(healthcheck.boxed())
426}
427
428pub async fn create_service(
429 region: &RegionOrEndpoint,
430 auth: &AwsAuthentication,
431 proxy: &ProxyConfig,
432 tls_options: Option<&TlsConfig>,
433 force_path_style: impl Into<bool>,
434) -> crate::Result<S3Service> {
435 let endpoint = region.endpoint();
436 let region = region.region();
437 let force_path_style_value: bool = force_path_style.into();
438 let client = create_client::<S3ClientBuilder>(
439 &S3ClientBuilder {
440 force_path_style: Some(force_path_style_value),
441 },
442 auth,
443 region.clone(),
444 endpoint,
445 proxy,
446 tls_options,
447 None,
448 )
449 .await?;
450 Ok(S3Service::new(client))
451}
452
453#[cfg(test)]
454mod tests {
455 use super::S3StorageClass;
456 use crate::serde::json::to_string;
457
458 #[test]
459 fn storage_class_names() {
460 for &(name, storage_class) in &[
461 ("DEEP_ARCHIVE", S3StorageClass::DeepArchive),
462 ("GLACIER", S3StorageClass::Glacier),
463 ("GLACIER_IR", S3StorageClass::GlacierIr),
464 ("INTELLIGENT_TIERING", S3StorageClass::IntelligentTiering),
465 ("EXPRESS_ONEZONE", S3StorageClass::ExpressOnezone),
466 ("ONEZONE_IA", S3StorageClass::OnezoneIa),
467 ("REDUCED_REDUNDANCY", S3StorageClass::ReducedRedundancy),
468 ("STANDARD", S3StorageClass::Standard),
469 ("STANDARD_IA", S3StorageClass::StandardIa),
470 ] {
471 assert_eq!(name, to_string(storage_class));
472 let result: S3StorageClass = serde_json::from_str(&format!("{name:?}"))
473 .unwrap_or_else(|error| panic!("Unparsable storage class name {name:?}: {error}"));
474 assert_eq!(result, storage_class);
475 }
476 }
477}