vector/sinks/s3_common/
service.rs

1use std::task::{Context, Poll};
2
3use aws_sdk_s3::{Client as S3Client, operation::put_object::PutObjectError};
4use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
5use aws_smithy_types::byte_stream::ByteStream;
6use base64::prelude::{BASE64_STANDARD, Engine as _};
7use bytes::Bytes;
8use futures::future::BoxFuture;
9use md5::Digest;
10use tower::Service;
11use tracing::Instrument;
12use vector_lib::{
13    event::{EventFinalizers, EventStatus, Finalizable},
14    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
15    stream::DriverResponse,
16};
17
18use super::{config::S3Options, partitioner::S3PartitionKey};
19
20#[derive(Debug, Clone)]
21pub struct S3Request {
22    pub body: Bytes,
23    pub bucket: String,
24    pub metadata: S3Metadata,
25    pub request_metadata: RequestMetadata,
26    pub content_encoding: Option<&'static str>,
27    pub options: S3Options,
28}
29
30impl Finalizable for S3Request {
31    fn take_finalizers(&mut self) -> EventFinalizers {
32        std::mem::take(&mut self.metadata.finalizers)
33    }
34}
35
36impl MetaDescriptive for S3Request {
37    fn get_metadata(&self) -> &RequestMetadata {
38        &self.request_metadata
39    }
40
41    fn metadata_mut(&mut self) -> &mut RequestMetadata {
42        &mut self.request_metadata
43    }
44}
45
46#[derive(Clone, Debug)]
47pub struct S3Metadata {
48    pub partition_key: S3PartitionKey,
49    pub s3_key: String,
50    pub finalizers: EventFinalizers,
51}
52
53#[derive(Debug)]
54pub struct S3Response {
55    events_byte_size: GroupedCountByteSize,
56}
57
58impl DriverResponse for S3Response {
59    fn event_status(&self) -> EventStatus {
60        EventStatus::Delivered
61    }
62
63    fn events_sent(&self) -> &GroupedCountByteSize {
64        &self.events_byte_size
65    }
66}
67
68/// Wrapper for the AWS SDK S3 client.
69///
70/// Provides a `tower::Service`-compatible wrapper around the native
71/// AWS SDK S3 Client, allowing it to be composed within a Tower "stack",
72/// such that we can easily and transparently provide retries, concurrency
73/// limits, rate limits, and more.
74#[derive(Clone)]
75pub struct S3Service {
76    client: S3Client,
77}
78
79impl S3Service {
80    pub const fn new(client: S3Client) -> S3Service {
81        S3Service { client }
82    }
83
84    pub fn client(&self) -> S3Client {
85        self.client.clone()
86    }
87}
88
89impl Service<S3Request> for S3Service {
90    type Response = S3Response;
91    type Error = SdkError<PutObjectError, HttpResponse>;
92    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
93
94    // Emission of an internal event in case of errors is handled upstream by the caller.
95    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
96        Poll::Ready(Ok(()))
97    }
98
99    // Emission of internal events for errors and dropped events is handled upstream by the caller.
100    fn call(&mut self, request: S3Request) -> Self::Future {
101        let options = request.options;
102
103        let content_encoding = request.content_encoding;
104        let content_encoding = options
105            .content_encoding
106            .or_else(|| content_encoding.map(|ce| ce.to_string()));
107        let content_type = options
108            .content_type
109            .or_else(|| Some("text/x-log".to_owned()));
110
111        let content_md5 = BASE64_STANDARD.encode(md5::Md5::digest(&request.body));
112
113        let tagging = options.tags.map(|tags| {
114            let mut tagging = url::form_urlencoded::Serializer::new(String::new());
115            for (p, v) in &tags {
116                tagging.append_pair(p, v);
117            }
118            tagging.finish()
119        });
120
121        let events_byte_size = request
122            .request_metadata
123            .into_events_estimated_json_encoded_byte_size();
124
125        let client = self.client.clone();
126
127        Box::pin(async move {
128            let put_request = client
129                .put_object()
130                .body(bytes_to_bytestream(request.body))
131                .bucket(request.bucket.clone())
132                .key(request.metadata.s3_key.clone())
133                .set_content_encoding(content_encoding)
134                .set_content_type(content_type)
135                .set_acl(options.acl.map(Into::into))
136                .set_grant_full_control(options.grant_full_control)
137                .set_grant_read(options.grant_read)
138                .set_grant_read_acp(options.grant_read_acp)
139                .set_grant_write_acp(options.grant_write_acp)
140                .set_server_side_encryption(options.server_side_encryption.map(Into::into))
141                .set_ssekms_key_id(options.ssekms_key_id)
142                .set_storage_class(Some(options.storage_class.into()))
143                .set_tagging(tagging)
144                .content_md5(content_md5);
145
146            let result = put_request.send().in_current_span().await;
147
148            result.map(|_| {
149                trace!(
150                    target: "vector::sinks::s3_common::service::put_object",
151                    message = "Put object to s3-compatible storage.",
152                    bucket = request.bucket,
153                    key = request.metadata.s3_key
154                );
155
156                S3Response { events_byte_size }
157            })
158        })
159    }
160}
161
162fn bytes_to_bytestream(buf: Bytes) -> ByteStream {
163    ByteStream::from(buf)
164}