vector/sinks/s3_common/
service.rs1use std::task::{Context, Poll};
2
3use aws_sdk_s3::{Client as S3Client, operation::put_object::PutObjectError};
4use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
5use aws_smithy_types::byte_stream::ByteStream;
6use base64::prelude::{BASE64_STANDARD, Engine as _};
7use bytes::Bytes;
8use futures::future::BoxFuture;
9use md5::Digest;
10use tower::Service;
11use tracing::Instrument;
12use vector_lib::{
13 event::{EventFinalizers, EventStatus, Finalizable},
14 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
15 stream::DriverResponse,
16};
17
18use super::{config::S3Options, partitioner::S3PartitionKey};
19
20#[derive(Debug, Clone)]
21pub struct S3Request {
22 pub body: Bytes,
23 pub bucket: String,
24 pub metadata: S3Metadata,
25 pub request_metadata: RequestMetadata,
26 pub content_encoding: Option<&'static str>,
27 pub options: S3Options,
28}
29
30impl Finalizable for S3Request {
31 fn take_finalizers(&mut self) -> EventFinalizers {
32 std::mem::take(&mut self.metadata.finalizers)
33 }
34}
35
36impl MetaDescriptive for S3Request {
37 fn get_metadata(&self) -> &RequestMetadata {
38 &self.request_metadata
39 }
40
41 fn metadata_mut(&mut self) -> &mut RequestMetadata {
42 &mut self.request_metadata
43 }
44}
45
46#[derive(Clone, Debug)]
47pub struct S3Metadata {
48 pub partition_key: S3PartitionKey,
49 pub s3_key: String,
50 pub finalizers: EventFinalizers,
51}
52
53#[derive(Debug)]
54pub struct S3Response {
55 events_byte_size: GroupedCountByteSize,
56}
57
58impl DriverResponse for S3Response {
59 fn event_status(&self) -> EventStatus {
60 EventStatus::Delivered
61 }
62
63 fn events_sent(&self) -> &GroupedCountByteSize {
64 &self.events_byte_size
65 }
66}
67
68#[derive(Clone)]
75pub struct S3Service {
76 client: S3Client,
77}
78
79impl S3Service {
80 pub const fn new(client: S3Client) -> S3Service {
81 S3Service { client }
82 }
83
84 pub fn client(&self) -> S3Client {
85 self.client.clone()
86 }
87}
88
89impl Service<S3Request> for S3Service {
90 type Response = S3Response;
91 type Error = SdkError<PutObjectError, HttpResponse>;
92 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
93
94 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
96 Poll::Ready(Ok(()))
97 }
98
99 fn call(&mut self, request: S3Request) -> Self::Future {
101 let options = request.options;
102
103 let content_encoding = request.content_encoding;
104 let content_encoding = options
105 .content_encoding
106 .or_else(|| content_encoding.map(|ce| ce.to_string()));
107 let content_type = options
108 .content_type
109 .or_else(|| Some("text/x-log".to_owned()));
110
111 let content_md5 = BASE64_STANDARD.encode(md5::Md5::digest(&request.body));
112
113 let tagging = options.tags.map(|tags| {
114 let mut tagging = url::form_urlencoded::Serializer::new(String::new());
115 for (p, v) in &tags {
116 tagging.append_pair(p, v);
117 }
118 tagging.finish()
119 });
120
121 let events_byte_size = request
122 .request_metadata
123 .into_events_estimated_json_encoded_byte_size();
124
125 let client = self.client.clone();
126
127 Box::pin(async move {
128 let put_request = client
129 .put_object()
130 .body(bytes_to_bytestream(request.body))
131 .bucket(request.bucket.clone())
132 .key(request.metadata.s3_key.clone())
133 .set_content_encoding(content_encoding)
134 .set_content_type(content_type)
135 .set_acl(options.acl.map(Into::into))
136 .set_grant_full_control(options.grant_full_control)
137 .set_grant_read(options.grant_read)
138 .set_grant_read_acp(options.grant_read_acp)
139 .set_grant_write_acp(options.grant_write_acp)
140 .set_server_side_encryption(options.server_side_encryption.map(Into::into))
141 .set_ssekms_key_id(options.ssekms_key_id)
142 .set_storage_class(Some(options.storage_class.into()))
143 .set_tagging(tagging)
144 .content_md5(content_md5);
145
146 let result = put_request.send().in_current_span().await;
147
148 result.map(|_| {
149 trace!(
150 target: "vector::sinks::s3_common::service::put_object",
151 message = "Put object to s3-compatible storage.",
152 bucket = request.bucket,
153 key = request.metadata.s3_key
154 );
155
156 S3Response { events_byte_size }
157 })
158 })
159 }
160}
161
162fn bytes_to_bytestream(buf: Bytes) -> ByteStream {
163 ByteStream::from(buf)
164}