1use std::collections::{BTreeMap, HashMap};
2
3use aws_sdk_s3::{
4 Client as S3Client,
5 operation::put_object::PutObjectError,
6 types::{ObjectCannedAcl, ServerSideEncryption, StorageClass},
7};
8use aws_smithy_runtime_api::{
9 client::{orchestrator::HttpResponse, result::SdkError},
10 http::StatusCode,
11};
12use futures::FutureExt;
13use snafu::Snafu;
14use vector_lib::configurable::configurable_component;
15
16use super::service::{S3Request, S3Response, S3Service};
17use crate::{
18 aws::{AwsAuthentication, RegionOrEndpoint, create_client, is_retriable_error},
19 common::s3::S3ClientBuilder,
20 config::ProxyConfig,
21 http::status,
22 sinks::{Healthcheck, util::retries::RetryLogic},
23 tls::TlsConfig,
24};
25
26#[configurable_component]
28#[derive(Clone, Debug, Default)]
29pub struct S3Options {
30 pub acl: Option<S3CannedAcl>,
36
37 #[configurable(metadata(
44 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
45 ))]
46 #[configurable(metadata(docs::examples = "person@email.com"))]
47 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
48 pub grant_full_control: Option<String>,
49
50 #[configurable(metadata(
56 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
57 ))]
58 #[configurable(metadata(docs::examples = "person@email.com"))]
59 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
60 pub grant_read: Option<String>,
61
62 #[configurable(metadata(
68 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
69 ))]
70 #[configurable(metadata(docs::examples = "person@email.com"))]
71 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
72 pub grant_read_acp: Option<String>,
73
74 #[configurable(metadata(
80 docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
81 ))]
82 #[configurable(metadata(docs::examples = "person@email.com"))]
83 #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
84 pub grant_write_acp: Option<String>,
85
86 pub server_side_encryption: Option<S3ServerSideEncryption>,
88
89 #[configurable(metadata(docs::examples = "abcd1234"))]
96 #[configurable(metadata(docs::templateable))]
97 pub ssekms_key_id: Option<String>,
98
99 #[serde(default)]
105 pub storage_class: S3StorageClass,
106
107 #[configurable(metadata(docs::additional_props_description = "A single tag."))]
109 #[configurable(metadata(docs::examples = "example_tags()"))]
110 pub tags: Option<BTreeMap<String, String>>,
111
112 #[configurable(metadata(docs::examples = "gzip"))]
118 pub content_encoding: Option<String>,
119
120 #[configurable(metadata(docs::examples = "application/gzip"))]
127 pub content_type: Option<String>,
128}
129
130fn example_tags() -> HashMap<String, String> {
131 HashMap::<_, _>::from_iter([
132 ("Project".to_string(), "Blue".to_string()),
133 ("Classification".to_string(), "confidential".to_string()),
134 ("PHI".to_string(), "True".to_string()),
135 ])
136}
137
138#[configurable_component]
144#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
145#[derivative(Default)]
146#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
147pub enum S3StorageClass {
148 #[derivative(Default)]
150 Standard,
151
152 ReducedRedundancy,
154
155 IntelligentTiering,
157
158 StandardIa,
160
161 ExpressOnezone,
163
164 OnezoneIa,
166
167 Glacier,
169
170 GlacierIr,
172
173 DeepArchive,
175}
176
177impl From<S3StorageClass> for StorageClass {
178 fn from(x: S3StorageClass) -> Self {
179 match x {
180 S3StorageClass::Standard => Self::Standard,
181 S3StorageClass::ReducedRedundancy => Self::ReducedRedundancy,
182 S3StorageClass::IntelligentTiering => Self::IntelligentTiering,
183 S3StorageClass::StandardIa => Self::StandardIa,
184 S3StorageClass::ExpressOnezone => Self::ExpressOnezone,
185 S3StorageClass::OnezoneIa => Self::OnezoneIa,
186 S3StorageClass::Glacier => Self::Glacier,
187 S3StorageClass::GlacierIr => Self::GlacierIr,
188 S3StorageClass::DeepArchive => Self::DeepArchive,
189 }
190 }
191}
192
193#[configurable_component]
199#[derive(Clone, Copy, Debug)]
200pub enum S3ServerSideEncryption {
201 #[serde(rename = "AES256")]
205 Aes256,
206
207 #[serde(rename = "aws:kms")]
213 AwsKms,
214}
215
216impl From<S3ServerSideEncryption> for ServerSideEncryption {
217 fn from(x: S3ServerSideEncryption) -> Self {
218 match x {
219 S3ServerSideEncryption::Aes256 => Self::Aes256,
220 S3ServerSideEncryption::AwsKms => Self::AwsKms,
221 }
222 }
223}
224
225#[configurable_component]
231#[derive(Clone, Copy, Debug, Derivative)]
232#[derivative(Default)]
233#[serde(rename_all = "kebab-case")]
234pub enum S3CannedAcl {
235 #[derivative(Default)]
242 Private,
243
244 PublicRead,
249
250 PublicReadWrite,
257
258 AwsExecRead,
264
265 AuthenticatedRead,
270
271 BucketOwnerRead,
278
279 BucketOwnerFullControl,
286
287 LogDeliveryWrite,
298}
299
300impl From<S3CannedAcl> for ObjectCannedAcl {
301 fn from(x: S3CannedAcl) -> Self {
302 match x {
303 S3CannedAcl::Private => ObjectCannedAcl::Private,
304 S3CannedAcl::PublicRead => ObjectCannedAcl::PublicRead,
305 S3CannedAcl::PublicReadWrite => ObjectCannedAcl::PublicReadWrite,
306 S3CannedAcl::AwsExecRead => ObjectCannedAcl::AwsExecRead,
307 S3CannedAcl::AuthenticatedRead => ObjectCannedAcl::AuthenticatedRead,
308 S3CannedAcl::BucketOwnerRead => ObjectCannedAcl::BucketOwnerRead,
309 S3CannedAcl::BucketOwnerFullControl => ObjectCannedAcl::BucketOwnerFullControl,
310 S3CannedAcl::LogDeliveryWrite => ObjectCannedAcl::from("log-delivery-write"),
311 }
312 }
313}
314
315fn is_retriable_response(res: &HttpResponse, errors_to_retry: Option<Vec<u16>>) -> bool {
316 let status_code = res.status();
317
318 match errors_to_retry {
319 Some(error_codes) => error_codes.contains(&status_code.as_u16()),
320 None => false,
321 }
322}
323
324fn should_retry_error(
325 errors_to_retry: Option<Vec<u16>>,
326 error: &SdkError<PutObjectError, HttpResponse>,
327) -> bool {
328 match error {
329 SdkError::ResponseError(err) => is_retriable_response(err.raw(), errors_to_retry),
330 SdkError::ServiceError(err) => is_retriable_response(err.raw(), errors_to_retry),
331 _ => false,
332 }
333}
334
335#[configurable_component]
343#[derive(Debug, Clone, Default, PartialEq)]
344#[serde(tag = "type", rename_all = "snake_case")]
345#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))]
346pub enum RetryStrategy {
347 None,
349
350 #[default]
363 Default,
364
365 All,
367
368 Custom {
370 status_codes: Vec<u16>,
372 },
373}
374
375impl RetryLogic for RetryStrategy {
376 type Error = SdkError<PutObjectError, HttpResponse>;
377 type Request = S3Request;
378 type Response = S3Response;
379
380 fn is_retriable_error(&self, error: &Self::Error) -> bool {
381 match self {
382 RetryStrategy::None => false,
383 RetryStrategy::Default => is_retriable_error(error),
384 RetryStrategy::All => true,
385 RetryStrategy::Custom { status_codes } => {
386 is_retriable_error(error) || should_retry_error(Some(status_codes.clone()), error)
387 }
388 }
389 }
390}
391
392#[derive(Debug, Snafu)]
393pub enum HealthcheckError {
394 #[snafu(display("Invalid credentials"))]
395 InvalidCredentials,
396 #[snafu(display("Unknown bucket: {:?}", bucket))]
397 UnknownBucket { bucket: String },
398 #[snafu(display("Unknown status code: {}", status))]
399 UnknownStatus { status: StatusCode },
400}
401
402pub fn build_healthcheck(bucket: String, client: S3Client) -> crate::Result<Healthcheck> {
403 let healthcheck = async move {
404 let req = client
405 .head_bucket()
406 .bucket(bucket.clone())
407 .set_expected_bucket_owner(None)
408 .send()
409 .await;
410
411 match req {
412 Ok(_) => Ok(()),
413 Err(error) => Err(match error {
414 SdkError::ServiceError(inner) => {
415 let status = inner.into_raw().status();
416 match status.as_u16() {
417 status::FORBIDDEN => HealthcheckError::InvalidCredentials.into(),
418 status::NOT_FOUND => HealthcheckError::UnknownBucket { bucket }.into(),
419 _ => HealthcheckError::UnknownStatus { status }.into(),
420 }
421 }
422 error => error.into(),
423 }),
424 }
425 };
426
427 Ok(healthcheck.boxed())
428}
429
430pub async fn create_service(
431 region: &RegionOrEndpoint,
432 auth: &AwsAuthentication,
433 proxy: &ProxyConfig,
434 tls_options: Option<&TlsConfig>,
435 force_path_style: impl Into<bool>,
436) -> crate::Result<S3Service> {
437 let endpoint = region.endpoint();
438 let region = region.region();
439 let force_path_style_value: bool = force_path_style.into();
440 let client = create_client::<S3ClientBuilder>(
441 &S3ClientBuilder {
442 force_path_style: Some(force_path_style_value),
443 },
444 auth,
445 region.clone(),
446 endpoint,
447 proxy,
448 tls_options,
449 None,
450 )
451 .await?;
452 Ok(S3Service::new(client))
453}
454
455#[cfg(test)]
456mod tests {
457 use super::S3StorageClass;
458 use crate::serde::json::to_string;
459
460 #[test]
461 fn storage_class_names() {
462 for &(name, storage_class) in &[
463 ("DEEP_ARCHIVE", S3StorageClass::DeepArchive),
464 ("GLACIER", S3StorageClass::Glacier),
465 ("GLACIER_IR", S3StorageClass::GlacierIr),
466 ("INTELLIGENT_TIERING", S3StorageClass::IntelligentTiering),
467 ("EXPRESS_ONEZONE", S3StorageClass::ExpressOnezone),
468 ("ONEZONE_IA", S3StorageClass::OnezoneIa),
469 ("REDUCED_REDUNDANCY", S3StorageClass::ReducedRedundancy),
470 ("STANDARD", S3StorageClass::Standard),
471 ("STANDARD_IA", S3StorageClass::StandardIa),
472 ] {
473 assert_eq!(name, to_string(storage_class));
474 let result: S3StorageClass = serde_json::from_str(&format!("{name:?}"))
475 .unwrap_or_else(|error| panic!("Unparsable storage class name {name:?}: {error}"));
476 assert_eq!(result, storage_class);
477 }
478 }
479}