vector/sinks/s3_common/
config.rs

1use std::collections::{BTreeMap, HashMap};
2
3use aws_sdk_s3::{
4    Client as S3Client,
5    operation::put_object::PutObjectError,
6    types::{ObjectCannedAcl, ServerSideEncryption, StorageClass},
7};
8use aws_smithy_runtime_api::{
9    client::{orchestrator::HttpResponse, result::SdkError},
10    http::StatusCode,
11};
12use futures::FutureExt;
13use snafu::Snafu;
14use vector_lib::configurable::configurable_component;
15
16use super::service::{S3Request, S3Response, S3Service};
17use crate::{
18    aws::{AwsAuthentication, RegionOrEndpoint, create_client, is_retriable_error},
19    common::s3::S3ClientBuilder,
20    config::ProxyConfig,
21    http::status,
22    sinks::{Healthcheck, util::retries::RetryLogic},
23    tls::TlsConfig,
24};
25
26/// Per-operation configuration when writing objects to S3.
27#[configurable_component]
28#[derive(Clone, Debug, Default)]
29pub struct S3Options {
30    /// Canned ACL to apply to the created objects.
31    ///
32    /// For more information, see [Canned ACL][canned_acl].
33    ///
34    /// [canned_acl]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl
35    pub acl: Option<S3CannedAcl>,
36
37    /// Grants `READ`, `READ_ACP`, and `WRITE_ACP` permissions on the created objects to the named [grantee].
38    ///
39    /// This allows the grantee to read the created objects and their metadata, as well as read and
40    /// modify the ACL on the created objects.
41    ///
42    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
43    #[configurable(metadata(
44        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
45    ))]
46    #[configurable(metadata(docs::examples = "person@email.com"))]
47    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
48    pub grant_full_control: Option<String>,
49
50    /// Grants `READ` permissions on the created objects to the named [grantee].
51    ///
52    /// This allows the grantee to read the created objects and their metadata.
53    ///
54    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
55    #[configurable(metadata(
56        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
57    ))]
58    #[configurable(metadata(docs::examples = "person@email.com"))]
59    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
60    pub grant_read: Option<String>,
61
62    /// Grants `READ_ACP` permissions on the created objects to the named [grantee].
63    ///
64    /// This allows the grantee to read the ACL on the created objects.
65    ///
66    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
67    #[configurable(metadata(
68        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
69    ))]
70    #[configurable(metadata(docs::examples = "person@email.com"))]
71    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
72    pub grant_read_acp: Option<String>,
73
74    /// Grants `WRITE_ACP` permissions on the created objects to the named [grantee].
75    ///
76    /// This allows the grantee to modify the ACL on the created objects.
77    ///
78    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
79    #[configurable(metadata(
80        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
81    ))]
82    #[configurable(metadata(docs::examples = "person@email.com"))]
83    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
84    pub grant_write_acp: Option<String>,
85
86    /// The Server-side Encryption algorithm used when storing these objects.
87    pub server_side_encryption: Option<S3ServerSideEncryption>,
88
89    /// Specifies the ID of the AWS Key Management Service (AWS KMS) symmetrical customer managed
90    /// customer master key (CMK) that is used for the created objects.
91    ///
92    /// Only applies when `server_side_encryption` is configured to use KMS.
93    ///
94    /// If not specified, Amazon S3 uses the AWS managed CMK in AWS to protect the data.
95    #[configurable(metadata(docs::examples = "abcd1234"))]
96    #[configurable(metadata(docs::templateable))]
97    pub ssekms_key_id: Option<String>,
98
99    /// The storage class for the created objects.
100    ///
101    /// See the [S3 Storage Classes][s3_storage_classes] for more details.
102    ///
103    /// [s3_storage_classes]: https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html
104    #[serde(default)]
105    pub storage_class: S3StorageClass,
106
107    /// The tag-set for the object.
108    #[configurable(metadata(docs::additional_props_description = "A single tag."))]
109    #[configurable(metadata(docs::examples = "example_tags()"))]
110    pub tags: Option<BTreeMap<String, String>>,
111
112    /// Overrides what content encoding has been applied to the object.
113    ///
114    /// Directly comparable to the `Content-Encoding` HTTP header.
115    ///
116    /// If not specified, the compression scheme used dictates this value.
117    #[configurable(metadata(docs::examples = "gzip"))]
118    pub content_encoding: Option<String>,
119
120    /// Overrides the MIME type of the object.
121    ///
122    /// Directly comparable to the `Content-Type` HTTP header.
123    ///
124    /// If not specified, the compression scheme used dictates this value.
125    /// When `compression` is set to `none`, the value `text/x-log` is used.
126    #[configurable(metadata(docs::examples = "application/gzip"))]
127    pub content_type: Option<String>,
128}
129
130fn example_tags() -> HashMap<String, String> {
131    HashMap::<_, _>::from_iter([
132        ("Project".to_string(), "Blue".to_string()),
133        ("Classification".to_string(), "confidential".to_string()),
134        ("PHI".to_string(), "True".to_string()),
135    ])
136}
137
138/// S3 storage classes.
139///
140/// More information on each storage class can be found in the [AWS documentation][aws_docs].
141///
142/// [aws_docs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-class-intro.html
143#[configurable_component]
144#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
145#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
146pub enum S3StorageClass {
147    /// Standard Redundancy.
148    #[default]
149    Standard,
150
151    /// Reduced Redundancy.
152    ReducedRedundancy,
153
154    /// Intelligent Tiering.
155    IntelligentTiering,
156
157    /// Infrequently Accessed.
158    StandardIa,
159
160    /// High Performance (single Availability zone).
161    ExpressOnezone,
162
163    /// Infrequently Accessed (single Availability zone).
164    OnezoneIa,
165
166    /// Glacier Flexible Retrieval.
167    Glacier,
168
169    /// Glacier Instant Retrieval.
170    GlacierIr,
171
172    /// Glacier Deep Archive.
173    DeepArchive,
174}
175
176impl From<S3StorageClass> for StorageClass {
177    fn from(x: S3StorageClass) -> Self {
178        match x {
179            S3StorageClass::Standard => Self::Standard,
180            S3StorageClass::ReducedRedundancy => Self::ReducedRedundancy,
181            S3StorageClass::IntelligentTiering => Self::IntelligentTiering,
182            S3StorageClass::StandardIa => Self::StandardIa,
183            S3StorageClass::ExpressOnezone => Self::ExpressOnezone,
184            S3StorageClass::OnezoneIa => Self::OnezoneIa,
185            S3StorageClass::Glacier => Self::Glacier,
186            S3StorageClass::GlacierIr => Self::GlacierIr,
187            S3StorageClass::DeepArchive => Self::DeepArchive,
188        }
189    }
190}
191
192/// AWS S3 Server-Side Encryption algorithms.
193///
194/// More information on each algorithm can be found in the [AWS documentation][aws_docs].
195///
196/// [aws_docs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/serv-side-encryption.html
197#[configurable_component]
198#[derive(Clone, Copy, Debug)]
199pub enum S3ServerSideEncryption {
200    /// Each object is encrypted with AES-256 using a unique key.
201    ///
202    /// This corresponds to the `SSE-S3` option.
203    #[serde(rename = "AES256")]
204    Aes256,
205
206    /// Each object is encrypted with AES-256 using keys managed by AWS KMS.
207    ///
208    /// Depending on whether or not a KMS key ID is specified, this corresponds either to the
209    /// `SSE-KMS` option (keys generated/managed by KMS) or the `SSE-C` option (keys generated by
210    /// the customer, managed by KMS).
211    #[serde(rename = "aws:kms")]
212    AwsKms,
213}
214
215impl From<S3ServerSideEncryption> for ServerSideEncryption {
216    fn from(x: S3ServerSideEncryption) -> Self {
217        match x {
218            S3ServerSideEncryption::Aes256 => Self::Aes256,
219            S3ServerSideEncryption::AwsKms => Self::AwsKms,
220        }
221    }
222}
223
224/// S3 Canned ACLs.
225///
226/// For more information, see [Canned ACL][canned_acl].
227///
228/// [canned_acl]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl
229#[configurable_component]
230#[derive(Clone, Copy, Debug, Default)]
231#[serde(rename_all = "kebab-case")]
232pub enum S3CannedAcl {
233    /// Bucket/object are private.
234    ///
235    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and no one else has
236    /// access.
237    ///
238    /// This is the default.
239    #[default]
240    Private,
241
242    /// Bucket/object can be read publicly.
243    ///
244    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and anyone in the
245    /// `AllUsers` grantee group is granted the `READ` permission.
246    PublicRead,
247
248    /// Bucket/object can be read and written publicly.
249    ///
250    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and anyone in the
251    /// `AllUsers` grantee group is granted the `READ` and `WRITE` permissions.
252    ///
253    /// This is generally not recommended.
254    PublicReadWrite,
255
256    /// Bucket/object are private, and readable by EC2.
257    ///
258    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and the AWS EC2 service is
259    /// granted the `READ` permission for the purpose of reading Amazon Machine Image (AMI) bundles
260    /// from the given bucket.
261    AwsExecRead,
262
263    /// Bucket/object can be read by authenticated users.
264    ///
265    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and anyone in the
266    /// `AuthenticatedUsers` grantee group is granted the `READ` permission.
267    AuthenticatedRead,
268
269    /// Object is private, except to the bucket owner.
270    ///
271    /// The object owner is granted the `FULL_CONTROL` permission, and the bucket owner is granted the `READ` permission.
272    ///
273    /// Only relevant when specified for an object: this canned ACL is otherwise ignored when
274    /// specified for a bucket.
275    BucketOwnerRead,
276
277    /// Object is semi-private.
278    ///
279    /// Both the object owner and bucket owner are granted the `FULL_CONTROL` permission.
280    ///
281    /// Only relevant when specified for an object: this canned ACL is otherwise ignored when
282    /// specified for a bucket.
283    BucketOwnerFullControl,
284
285    /// Bucket can have logs written.
286    ///
287    /// The `LogDelivery` grantee group is granted `WRITE` and `READ_ACP` permissions.
288    ///
289    /// Only relevant when specified for a bucket: this canned ACL is otherwise ignored when
290    /// specified for an object.
291    ///
292    /// For more information about logs, see [Amazon S3 Server Access Logging][serverlogs].
293    ///
294    /// [serverlogs]: https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerLogs.html
295    LogDeliveryWrite,
296}
297
298impl From<S3CannedAcl> for ObjectCannedAcl {
299    fn from(x: S3CannedAcl) -> Self {
300        match x {
301            S3CannedAcl::Private => ObjectCannedAcl::Private,
302            S3CannedAcl::PublicRead => ObjectCannedAcl::PublicRead,
303            S3CannedAcl::PublicReadWrite => ObjectCannedAcl::PublicReadWrite,
304            S3CannedAcl::AwsExecRead => ObjectCannedAcl::AwsExecRead,
305            S3CannedAcl::AuthenticatedRead => ObjectCannedAcl::AuthenticatedRead,
306            S3CannedAcl::BucketOwnerRead => ObjectCannedAcl::BucketOwnerRead,
307            S3CannedAcl::BucketOwnerFullControl => ObjectCannedAcl::BucketOwnerFullControl,
308            S3CannedAcl::LogDeliveryWrite => ObjectCannedAcl::from("log-delivery-write"),
309        }
310    }
311}
312
313fn is_retriable_response(res: &HttpResponse, errors_to_retry: Option<Vec<u16>>) -> bool {
314    let status_code = res.status();
315
316    match errors_to_retry {
317        Some(error_codes) => error_codes.contains(&status_code.as_u16()),
318        None => false,
319    }
320}
321
322fn should_retry_error(
323    errors_to_retry: Option<Vec<u16>>,
324    error: &SdkError<PutObjectError, HttpResponse>,
325) -> bool {
326    match error {
327        SdkError::ResponseError(err) => is_retriable_response(err.raw(), errors_to_retry),
328        SdkError::ServiceError(err) => is_retriable_response(err.raw(), errors_to_retry),
329        _ => false,
330    }
331}
332
333/// Retry strategy for S3 service calls.
334///
335/// Specifies a retry policy for S3 service calls.
336///
337/// For more information about error responses, see [Client Error Responses][error_responses].
338///
339/// [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses
340#[configurable_component]
341#[derive(Debug, Clone, Default, PartialEq)]
342#[serde(tag = "type", rename_all = "snake_case")]
343#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))]
344pub enum RetryStrategy {
345    /// Don't retry any errors
346    None,
347
348    /// Default strategy. The following error types will be retried:
349    /// - `TimeoutError`
350    /// - `DispatchFailure`
351    /// - `ResponseError` or `ServiceError` when:
352    ///   - HTTP status is 5xx
353    ///   - Status is 429 (Too Many Requests)
354    ///   - `x-amz-retry-after` header is present
355    ///   - HTTP status is 4xx and response body contains one of:
356    ///     - `"RequestTimeout"`
357    ///     - `"RequestExpired"`
358    ///     - `"ThrottlingException"`
359    /// - Fallback: Any unknown error variant
360    #[default]
361    Default,
362
363    /// Retry on *all* errors
364    All,
365
366    /// Custom retry strategy
367    Custom {
368        /// Retry on these specific HTTP status codes
369        status_codes: Vec<u16>,
370    },
371}
372
373impl RetryLogic for RetryStrategy {
374    type Error = SdkError<PutObjectError, HttpResponse>;
375    type Request = S3Request;
376    type Response = S3Response;
377
378    fn is_retriable_error(&self, error: &Self::Error) -> bool {
379        match self {
380            RetryStrategy::None => false,
381            RetryStrategy::Default => is_retriable_error(error),
382            RetryStrategy::All => true,
383            RetryStrategy::Custom { status_codes } => {
384                is_retriable_error(error) || should_retry_error(Some(status_codes.clone()), error)
385            }
386        }
387    }
388}
389
390#[derive(Debug, Snafu)]
391pub enum HealthcheckError {
392    #[snafu(display("Invalid credentials"))]
393    InvalidCredentials,
394    #[snafu(display("Unknown bucket: {:?}", bucket))]
395    UnknownBucket { bucket: String },
396    #[snafu(display("Unknown status code: {}", status))]
397    UnknownStatus { status: StatusCode },
398}
399
400pub fn build_healthcheck(bucket: String, client: S3Client) -> crate::Result<Healthcheck> {
401    let healthcheck = async move {
402        let req = client
403            .head_bucket()
404            .bucket(bucket.clone())
405            .set_expected_bucket_owner(None)
406            .send()
407            .await;
408
409        match req {
410            Ok(_) => Ok(()),
411            Err(error) => Err(match error {
412                SdkError::ServiceError(inner) => {
413                    let status = inner.into_raw().status();
414                    match status.as_u16() {
415                        status::FORBIDDEN => HealthcheckError::InvalidCredentials.into(),
416                        status::NOT_FOUND => HealthcheckError::UnknownBucket { bucket }.into(),
417                        _ => HealthcheckError::UnknownStatus { status }.into(),
418                    }
419                }
420                error => error.into(),
421            }),
422        }
423    };
424
425    Ok(healthcheck.boxed())
426}
427
428pub async fn create_service(
429    region: &RegionOrEndpoint,
430    auth: &AwsAuthentication,
431    proxy: &ProxyConfig,
432    tls_options: Option<&TlsConfig>,
433    force_path_style: impl Into<bool>,
434) -> crate::Result<S3Service> {
435    let endpoint = region.endpoint();
436    let region = region.region();
437    let force_path_style_value: bool = force_path_style.into();
438    let client = create_client::<S3ClientBuilder>(
439        &S3ClientBuilder {
440            force_path_style: Some(force_path_style_value),
441        },
442        auth,
443        region.clone(),
444        endpoint,
445        proxy,
446        tls_options,
447        None,
448    )
449    .await?;
450    Ok(S3Service::new(client))
451}
452
453#[cfg(test)]
454mod tests {
455    use super::S3StorageClass;
456    use crate::serde::json::to_string;
457
458    #[test]
459    fn storage_class_names() {
460        for &(name, storage_class) in &[
461            ("DEEP_ARCHIVE", S3StorageClass::DeepArchive),
462            ("GLACIER", S3StorageClass::Glacier),
463            ("GLACIER_IR", S3StorageClass::GlacierIr),
464            ("INTELLIGENT_TIERING", S3StorageClass::IntelligentTiering),
465            ("EXPRESS_ONEZONE", S3StorageClass::ExpressOnezone),
466            ("ONEZONE_IA", S3StorageClass::OnezoneIa),
467            ("REDUCED_REDUNDANCY", S3StorageClass::ReducedRedundancy),
468            ("STANDARD", S3StorageClass::Standard),
469            ("STANDARD_IA", S3StorageClass::StandardIa),
470        ] {
471            assert_eq!(name, to_string(storage_class));
472            let result: S3StorageClass = serde_json::from_str(&format!("{name:?}"))
473                .unwrap_or_else(|error| panic!("Unparsable storage class name {name:?}: {error}"));
474            assert_eq!(result, storage_class);
475        }
476    }
477}