vector/sinks/s3_common/
service.rs

1use std::task::{Context, Poll};
2
3use aws_sdk_s3::operation::put_object::PutObjectError;
4use aws_sdk_s3::Client as S3Client;
5use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
6use aws_smithy_runtime_api::client::result::SdkError;
7use aws_smithy_types::byte_stream::ByteStream;
8use base64::prelude::{Engine as _, BASE64_STANDARD};
9use bytes::Bytes;
10use futures::future::BoxFuture;
11use md5::Digest;
12use tower::Service;
13use tracing::Instrument;
14use vector_lib::event::{EventFinalizers, EventStatus, Finalizable};
15use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
16use vector_lib::stream::DriverResponse;
17
18use super::config::S3Options;
19use super::partitioner::S3PartitionKey;
20
21#[derive(Debug, Clone)]
22pub struct S3Request {
23    pub body: Bytes,
24    pub bucket: String,
25    pub metadata: S3Metadata,
26    pub request_metadata: RequestMetadata,
27    pub content_encoding: Option<&'static str>,
28    pub options: S3Options,
29}
30
31impl Finalizable for S3Request {
32    fn take_finalizers(&mut self) -> EventFinalizers {
33        std::mem::take(&mut self.metadata.finalizers)
34    }
35}
36
37impl MetaDescriptive for S3Request {
38    fn get_metadata(&self) -> &RequestMetadata {
39        &self.request_metadata
40    }
41
42    fn metadata_mut(&mut self) -> &mut RequestMetadata {
43        &mut self.request_metadata
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct S3Metadata {
49    pub partition_key: S3PartitionKey,
50    pub s3_key: String,
51    pub finalizers: EventFinalizers,
52}
53
54#[derive(Debug)]
55pub struct S3Response {
56    events_byte_size: GroupedCountByteSize,
57}
58
59impl DriverResponse for S3Response {
60    fn event_status(&self) -> EventStatus {
61        EventStatus::Delivered
62    }
63
64    fn events_sent(&self) -> &GroupedCountByteSize {
65        &self.events_byte_size
66    }
67}
68
69/// Wrapper for the AWS SDK S3 client.
70///
71/// Provides a `tower::Service`-compatible wrapper around the native
72/// AWS SDK S3 Client, allowing it to be composed within a Tower "stack",
73/// such that we can easily and transparently provide retries, concurrency
74/// limits, rate limits, and more.
75#[derive(Clone)]
76pub struct S3Service {
77    client: S3Client,
78}
79
80impl S3Service {
81    pub const fn new(client: S3Client) -> S3Service {
82        S3Service { client }
83    }
84
85    pub fn client(&self) -> S3Client {
86        self.client.clone()
87    }
88}
89
90impl Service<S3Request> for S3Service {
91    type Response = S3Response;
92    type Error = SdkError<PutObjectError, HttpResponse>;
93    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
94
95    // Emission of an internal event in case of errors is handled upstream by the caller.
96    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
97        Poll::Ready(Ok(()))
98    }
99
100    // Emission of internal events for errors and dropped events is handled upstream by the caller.
101    fn call(&mut self, request: S3Request) -> Self::Future {
102        let options = request.options;
103
104        let content_encoding = request.content_encoding;
105        let content_encoding = options
106            .content_encoding
107            .or_else(|| content_encoding.map(|ce| ce.to_string()));
108        let content_type = options
109            .content_type
110            .or_else(|| Some("text/x-log".to_owned()));
111
112        let content_md5 = BASE64_STANDARD.encode(md5::Md5::digest(&request.body));
113
114        let tagging = options.tags.map(|tags| {
115            let mut tagging = url::form_urlencoded::Serializer::new(String::new());
116            for (p, v) in &tags {
117                tagging.append_pair(p, v);
118            }
119            tagging.finish()
120        });
121
122        let events_byte_size = request
123            .request_metadata
124            .into_events_estimated_json_encoded_byte_size();
125
126        let client = self.client.clone();
127
128        Box::pin(async move {
129            let put_request = client
130                .put_object()
131                .body(bytes_to_bytestream(request.body))
132                .bucket(request.bucket.clone())
133                .key(request.metadata.s3_key.clone())
134                .set_content_encoding(content_encoding)
135                .set_content_type(content_type)
136                .set_acl(options.acl.map(Into::into))
137                .set_grant_full_control(options.grant_full_control)
138                .set_grant_read(options.grant_read)
139                .set_grant_read_acp(options.grant_read_acp)
140                .set_grant_write_acp(options.grant_write_acp)
141                .set_server_side_encryption(options.server_side_encryption.map(Into::into))
142                .set_ssekms_key_id(options.ssekms_key_id)
143                .set_storage_class(Some(options.storage_class.into()))
144                .set_tagging(tagging)
145                .content_md5(content_md5);
146
147            let result = put_request.send().in_current_span().await;
148
149            result.map(|_| {
150                trace!(
151                    target: "vector::sinks::s3_common::service::put_object",
152                    message = "Put object to s3-compatible storage.",
153                    bucket = request.bucket,
154                    key = request.metadata.s3_key
155                );
156
157                S3Response { events_byte_size }
158            })
159        })
160    }
161}
162
163fn bytes_to_bytestream(buf: Bytes) -> ByteStream {
164    ByteStream::from(buf)
165}