vector/sinks/s3_common/
config.rs

1use std::collections::{BTreeMap, HashMap};
2
3use aws_sdk_s3::{
4    operation::put_object::PutObjectError,
5    types::{ObjectCannedAcl, ServerSideEncryption, StorageClass},
6    Client as S3Client,
7};
8use aws_smithy_runtime_api::{
9    client::{orchestrator::HttpResponse, result::SdkError},
10    http::StatusCode,
11};
12use futures::FutureExt;
13use snafu::Snafu;
14use vector_lib::configurable::configurable_component;
15
16use super::service::{S3Request, S3Response, S3Service};
17use crate::{
18    aws::{create_client, is_retriable_error, AwsAuthentication, RegionOrEndpoint},
19    common::s3::S3ClientBuilder,
20    config::ProxyConfig,
21    http::status,
22    sinks::{util::retries::RetryLogic, Healthcheck},
23    tls::TlsConfig,
24};
25
26/// Per-operation configuration when writing objects to S3.
27#[configurable_component]
28#[derive(Clone, Debug, Default)]
29pub struct S3Options {
30    /// Canned ACL to apply to the created objects.
31    ///
32    /// For more information, see [Canned ACL][canned_acl].
33    ///
34    /// [canned_acl]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl
35    pub acl: Option<S3CannedAcl>,
36
37    /// Grants `READ`, `READ_ACP`, and `WRITE_ACP` permissions on the created objects to the named [grantee].
38    ///
39    /// This allows the grantee to read the created objects and their metadata, as well as read and
40    /// modify the ACL on the created objects.
41    ///
42    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
43    #[configurable(metadata(
44        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
45    ))]
46    #[configurable(metadata(docs::examples = "person@email.com"))]
47    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
48    pub grant_full_control: Option<String>,
49
50    /// Grants `READ` permissions on the created objects to the named [grantee].
51    ///
52    /// This allows the grantee to read the created objects and their metadata.
53    ///
54    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
55    #[configurable(metadata(
56        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
57    ))]
58    #[configurable(metadata(docs::examples = "person@email.com"))]
59    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
60    pub grant_read: Option<String>,
61
62    /// Grants `READ_ACP` permissions on the created objects to the named [grantee].
63    ///
64    /// This allows the grantee to read the ACL on the created objects.
65    ///
66    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
67    #[configurable(metadata(
68        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
69    ))]
70    #[configurable(metadata(docs::examples = "person@email.com"))]
71    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
72    pub grant_read_acp: Option<String>,
73
74    /// Grants `WRITE_ACP` permissions on the created objects to the named [grantee].
75    ///
76    /// This allows the grantee to modify the ACL on the created objects.
77    ///
78    /// [grantee]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#specifying-grantee
79    #[configurable(metadata(
80        docs::examples = "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
81    ))]
82    #[configurable(metadata(docs::examples = "person@email.com"))]
83    #[configurable(metadata(docs::examples = "http://acs.amazonaws.com/groups/global/AllUsers"))]
84    pub grant_write_acp: Option<String>,
85
86    /// The Server-side Encryption algorithm used when storing these objects.
87    pub server_side_encryption: Option<S3ServerSideEncryption>,
88
89    /// Specifies the ID of the AWS Key Management Service (AWS KMS) symmetrical customer managed
90    /// customer master key (CMK) that is used for the created objects.
91    ///
92    /// Only applies when `server_side_encryption` is configured to use KMS.
93    ///
94    /// If not specified, Amazon S3 uses the AWS managed CMK in AWS to protect the data.
95    #[configurable(metadata(docs::examples = "abcd1234"))]
96    #[configurable(metadata(docs::templateable))]
97    pub ssekms_key_id: Option<String>,
98
99    /// The storage class for the created objects.
100    ///
101    /// See the [S3 Storage Classes][s3_storage_classes] for more details.
102    ///
103    /// [s3_storage_classes]: https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html
104    #[serde(default)]
105    pub storage_class: S3StorageClass,
106
107    /// The tag-set for the object.
108    #[configurable(metadata(docs::additional_props_description = "A single tag."))]
109    #[configurable(metadata(docs::examples = "example_tags()"))]
110    pub tags: Option<BTreeMap<String, String>>,
111
112    /// Overrides what content encoding has been applied to the object.
113    ///
114    /// Directly comparable to the `Content-Encoding` HTTP header.
115    ///
116    /// If not specified, the compression scheme used dictates this value.
117    #[configurable(metadata(docs::examples = "gzip"))]
118    pub content_encoding: Option<String>,
119
120    /// Overrides the MIME type of the object.
121    ///
122    /// Directly comparable to the `Content-Type` HTTP header.
123    ///
124    /// If not specified, the compression scheme used dictates this value.
125    /// When `compression` is set to `none`, the value `text/x-log` is used.
126    #[configurable(metadata(docs::examples = "application/gzip"))]
127    pub content_type: Option<String>,
128}
129
130fn example_tags() -> HashMap<String, String> {
131    HashMap::<_, _>::from_iter([
132        ("Project".to_string(), "Blue".to_string()),
133        ("Classification".to_string(), "confidential".to_string()),
134        ("PHI".to_string(), "True".to_string()),
135    ])
136}
137
138/// S3 storage classes.
139///
140/// More information on each storage class can be found in the [AWS documentation][aws_docs].
141///
142/// [aws_docs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-class-intro.html
143#[configurable_component]
144#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
145#[derivative(Default)]
146#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
147pub enum S3StorageClass {
148    /// Standard Redundancy.
149    #[derivative(Default)]
150    Standard,
151
152    /// Reduced Redundancy.
153    ReducedRedundancy,
154
155    /// Intelligent Tiering.
156    IntelligentTiering,
157
158    /// Infrequently Accessed.
159    StandardIa,
160
161    /// High Performance (single Availability zone).
162    ExpressOnezone,
163
164    /// Infrequently Accessed (single Availability zone).
165    OnezoneIa,
166
167    /// Glacier Flexible Retrieval.
168    Glacier,
169
170    /// Glacier Instant Retrieval.
171    GlacierIr,
172
173    /// Glacier Deep Archive.
174    DeepArchive,
175}
176
177impl From<S3StorageClass> for StorageClass {
178    fn from(x: S3StorageClass) -> Self {
179        match x {
180            S3StorageClass::Standard => Self::Standard,
181            S3StorageClass::ReducedRedundancy => Self::ReducedRedundancy,
182            S3StorageClass::IntelligentTiering => Self::IntelligentTiering,
183            S3StorageClass::StandardIa => Self::StandardIa,
184            S3StorageClass::ExpressOnezone => Self::ExpressOnezone,
185            S3StorageClass::OnezoneIa => Self::OnezoneIa,
186            S3StorageClass::Glacier => Self::Glacier,
187            S3StorageClass::GlacierIr => Self::GlacierIr,
188            S3StorageClass::DeepArchive => Self::DeepArchive,
189        }
190    }
191}
192
193/// AWS S3 Server-Side Encryption algorithms.
194///
195/// More information on each algorithm can be found in the [AWS documentation][aws_docs].
196///
197/// [aws_docs]: https://docs.aws.amazon.com/AmazonS3/latest/userguide/serv-side-encryption.html
198#[configurable_component]
199#[derive(Clone, Copy, Debug)]
200pub enum S3ServerSideEncryption {
201    /// Each object is encrypted with AES-256 using a unique key.
202    ///
203    /// This corresponds to the `SSE-S3` option.
204    #[serde(rename = "AES256")]
205    Aes256,
206
207    /// Each object is encrypted with AES-256 using keys managed by AWS KMS.
208    ///
209    /// Depending on whether or not a KMS key ID is specified, this corresponds either to the
210    /// `SSE-KMS` option (keys generated/managed by KMS) or the `SSE-C` option (keys generated by
211    /// the customer, managed by KMS).
212    #[serde(rename = "aws:kms")]
213    AwsKms,
214}
215
216impl From<S3ServerSideEncryption> for ServerSideEncryption {
217    fn from(x: S3ServerSideEncryption) -> Self {
218        match x {
219            S3ServerSideEncryption::Aes256 => Self::Aes256,
220            S3ServerSideEncryption::AwsKms => Self::AwsKms,
221        }
222    }
223}
224
225/// S3 Canned ACLs.
226///
227/// For more information, see [Canned ACL][canned_acl].
228///
229/// [canned_acl]: https://docs.aws.amazon.com/AmazonS3/latest/dev/acl-overview.html#canned-acl
230#[configurable_component]
231#[derive(Clone, Copy, Debug, Derivative)]
232#[derivative(Default)]
233#[serde(rename_all = "kebab-case")]
234pub enum S3CannedAcl {
235    /// Bucket/object are private.
236    ///
237    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and no one else has
238    /// access.
239    ///
240    /// This is the default.
241    #[derivative(Default)]
242    Private,
243
244    /// Bucket/object can be read publicly.
245    ///
246    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and anyone in the
247    /// `AllUsers` grantee group is granted the `READ` permission.
248    PublicRead,
249
250    /// Bucket/object can be read and written publicly.
251    ///
252    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and anyone in the
253    /// `AllUsers` grantee group is granted the `READ` and `WRITE` permissions.
254    ///
255    /// This is generally not recommended.
256    PublicReadWrite,
257
258    /// Bucket/object are private, and readable by EC2.
259    ///
260    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and the AWS EC2 service is
261    /// granted the `READ` permission for the purpose of reading Amazon Machine Image (AMI) bundles
262    /// from the given bucket.
263    AwsExecRead,
264
265    /// Bucket/object can be read by authenticated users.
266    ///
267    /// The bucket/object owner is granted the `FULL_CONTROL` permission, and anyone in the
268    /// `AuthenticatedUsers` grantee group is granted the `READ` permission.
269    AuthenticatedRead,
270
271    /// Object is private, except to the bucket owner.
272    ///
273    /// The object owner is granted the `FULL_CONTROL` permission, and the bucket owner is granted the `READ` permission.
274    ///
275    /// Only relevant when specified for an object: this canned ACL is otherwise ignored when
276    /// specified for a bucket.
277    BucketOwnerRead,
278
279    /// Object is semi-private.
280    ///
281    /// Both the object owner and bucket owner are granted the `FULL_CONTROL` permission.
282    ///
283    /// Only relevant when specified for an object: this canned ACL is otherwise ignored when
284    /// specified for a bucket.
285    BucketOwnerFullControl,
286
287    /// Bucket can have logs written.
288    ///
289    /// The `LogDelivery` grantee group is granted `WRITE` and `READ_ACP` permissions.
290    ///
291    /// Only relevant when specified for a bucket: this canned ACL is otherwise ignored when
292    /// specified for an object.
293    ///
294    /// For more information about logs, see [Amazon S3 Server Access Logging][serverlogs].
295    ///
296    /// [serverlogs]: https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerLogs.html
297    LogDeliveryWrite,
298}
299
300impl From<S3CannedAcl> for ObjectCannedAcl {
301    fn from(x: S3CannedAcl) -> Self {
302        match x {
303            S3CannedAcl::Private => ObjectCannedAcl::Private,
304            S3CannedAcl::PublicRead => ObjectCannedAcl::PublicRead,
305            S3CannedAcl::PublicReadWrite => ObjectCannedAcl::PublicReadWrite,
306            S3CannedAcl::AwsExecRead => ObjectCannedAcl::AwsExecRead,
307            S3CannedAcl::AuthenticatedRead => ObjectCannedAcl::AuthenticatedRead,
308            S3CannedAcl::BucketOwnerRead => ObjectCannedAcl::BucketOwnerRead,
309            S3CannedAcl::BucketOwnerFullControl => ObjectCannedAcl::BucketOwnerFullControl,
310            S3CannedAcl::LogDeliveryWrite => ObjectCannedAcl::from("log-delivery-write"),
311        }
312    }
313}
314
315fn is_retriable_response(res: &HttpResponse, errors_to_retry: Option<Vec<u16>>) -> bool {
316    let status_code = res.status();
317
318    match errors_to_retry {
319        Some(error_codes) => error_codes.contains(&status_code.as_u16()),
320        None => false,
321    }
322}
323
324fn should_retry_error(
325    errors_to_retry: Option<Vec<u16>>,
326    error: &SdkError<PutObjectError, HttpResponse>,
327) -> bool {
328    match error {
329        SdkError::ResponseError(err) => is_retriable_response(err.raw(), errors_to_retry),
330        SdkError::ServiceError(err) => is_retriable_response(err.raw(), errors_to_retry),
331        _ => false,
332    }
333}
334
335/// Retry strategy for S3 service calls.
336///
337/// Specifies a retry policy for S3 service calls.
338///
339/// For more information about error responses, see [Client Error Responses][error_responses].
340///
341/// [error_responses]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Status#client_error_responses
342#[configurable_component]
343#[derive(Debug, Clone, Default, PartialEq)]
344#[serde(tag = "type", rename_all = "snake_case")]
345#[configurable(metadata(docs::enum_tag_description = "The retry strategy enum."))]
346pub enum RetryStrategy {
347    /// Don't retry any errors
348    #[default]
349    None,
350
351    /// Retry on *all* errors
352    All,
353
354    /// Custom retry strategy
355    Custom {
356        /// Retry on these specific HTTP status codes
357        status_codes: Vec<u16>,
358    },
359}
360
361impl RetryLogic for RetryStrategy {
362    type Error = SdkError<PutObjectError, HttpResponse>;
363    type Request = S3Request;
364    type Response = S3Response;
365
366    fn is_retriable_error(&self, error: &Self::Error) -> bool {
367        match self {
368            RetryStrategy::None => false,
369            RetryStrategy::All => true,
370            RetryStrategy::Custom { status_codes } => {
371                is_retriable_error(error) || should_retry_error(Some(status_codes.clone()), error)
372            }
373        }
374    }
375}
376
377#[derive(Debug, Snafu)]
378pub enum HealthcheckError {
379    #[snafu(display("Invalid credentials"))]
380    InvalidCredentials,
381    #[snafu(display("Unknown bucket: {:?}", bucket))]
382    UnknownBucket { bucket: String },
383    #[snafu(display("Unknown status code: {}", status))]
384    UnknownStatus { status: StatusCode },
385}
386
387pub fn build_healthcheck(bucket: String, client: S3Client) -> crate::Result<Healthcheck> {
388    let healthcheck = async move {
389        let req = client
390            .head_bucket()
391            .bucket(bucket.clone())
392            .set_expected_bucket_owner(None)
393            .send()
394            .await;
395
396        match req {
397            Ok(_) => Ok(()),
398            Err(error) => Err(match error {
399                SdkError::ServiceError(inner) => {
400                    let status = inner.into_raw().status();
401                    match status.as_u16() {
402                        status::FORBIDDEN => HealthcheckError::InvalidCredentials.into(),
403                        status::NOT_FOUND => HealthcheckError::UnknownBucket { bucket }.into(),
404                        _ => HealthcheckError::UnknownStatus { status }.into(),
405                    }
406                }
407                error => error.into(),
408            }),
409        }
410    };
411
412    Ok(healthcheck.boxed())
413}
414
415pub async fn create_service(
416    region: &RegionOrEndpoint,
417    auth: &AwsAuthentication,
418    proxy: &ProxyConfig,
419    tls_options: Option<&TlsConfig>,
420    force_path_style: impl Into<bool>,
421) -> crate::Result<S3Service> {
422    let endpoint = region.endpoint();
423    let region = region.region();
424    let force_path_style_value: bool = force_path_style.into();
425    let client = create_client::<S3ClientBuilder>(
426        &S3ClientBuilder {
427            force_path_style: Some(force_path_style_value),
428        },
429        auth,
430        region.clone(),
431        endpoint,
432        proxy,
433        tls_options,
434        None,
435    )
436    .await?;
437    Ok(S3Service::new(client))
438}
439
440#[cfg(test)]
441mod tests {
442    use super::S3StorageClass;
443    use crate::serde::json::to_string;
444
445    #[test]
446    fn storage_class_names() {
447        for &(name, storage_class) in &[
448            ("DEEP_ARCHIVE", S3StorageClass::DeepArchive),
449            ("GLACIER", S3StorageClass::Glacier),
450            ("GLACIER_IR", S3StorageClass::GlacierIr),
451            ("INTELLIGENT_TIERING", S3StorageClass::IntelligentTiering),
452            ("EXPRESS_ONEZONE", S3StorageClass::ExpressOnezone),
453            ("ONEZONE_IA", S3StorageClass::OnezoneIa),
454            ("REDUCED_REDUNDANCY", S3StorageClass::ReducedRedundancy),
455            ("STANDARD", S3StorageClass::Standard),
456            ("STANDARD_IA", S3StorageClass::StandardIa),
457        ] {
458            assert_eq!(name, to_string(storage_class));
459            let result: S3StorageClass = serde_json::from_str(&format!("{name:?}"))
460                .unwrap_or_else(|error| panic!("Unparsable storage class name {name:?}: {error}"));
461            assert_eq!(result, storage_class);
462        }
463    }
464}