vector/sinks/s3_common/
service.rs1use std::task::{Context, Poll};
2
3use aws_sdk_s3::operation::put_object::PutObjectError;
4use aws_sdk_s3::Client as S3Client;
5use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
6use aws_smithy_runtime_api::client::result::SdkError;
7use aws_smithy_types::byte_stream::ByteStream;
8use base64::prelude::{Engine as _, BASE64_STANDARD};
9use bytes::Bytes;
10use futures::future::BoxFuture;
11use md5::Digest;
12use tower::Service;
13use tracing::Instrument;
14use vector_lib::event::{EventFinalizers, EventStatus, Finalizable};
15use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
16use vector_lib::stream::DriverResponse;
17
18use super::config::S3Options;
19use super::partitioner::S3PartitionKey;
20
21#[derive(Debug, Clone)]
22pub struct S3Request {
23 pub body: Bytes,
24 pub bucket: String,
25 pub metadata: S3Metadata,
26 pub request_metadata: RequestMetadata,
27 pub content_encoding: Option<&'static str>,
28 pub options: S3Options,
29}
30
31impl Finalizable for S3Request {
32 fn take_finalizers(&mut self) -> EventFinalizers {
33 std::mem::take(&mut self.metadata.finalizers)
34 }
35}
36
37impl MetaDescriptive for S3Request {
38 fn get_metadata(&self) -> &RequestMetadata {
39 &self.request_metadata
40 }
41
42 fn metadata_mut(&mut self) -> &mut RequestMetadata {
43 &mut self.request_metadata
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct S3Metadata {
49 pub partition_key: S3PartitionKey,
50 pub s3_key: String,
51 pub finalizers: EventFinalizers,
52}
53
54#[derive(Debug)]
55pub struct S3Response {
56 events_byte_size: GroupedCountByteSize,
57}
58
59impl DriverResponse for S3Response {
60 fn event_status(&self) -> EventStatus {
61 EventStatus::Delivered
62 }
63
64 fn events_sent(&self) -> &GroupedCountByteSize {
65 &self.events_byte_size
66 }
67}
68
69#[derive(Clone)]
76pub struct S3Service {
77 client: S3Client,
78}
79
80impl S3Service {
81 pub const fn new(client: S3Client) -> S3Service {
82 S3Service { client }
83 }
84
85 pub fn client(&self) -> S3Client {
86 self.client.clone()
87 }
88}
89
90impl Service<S3Request> for S3Service {
91 type Response = S3Response;
92 type Error = SdkError<PutObjectError, HttpResponse>;
93 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
94
95 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
97 Poll::Ready(Ok(()))
98 }
99
100 fn call(&mut self, request: S3Request) -> Self::Future {
102 let options = request.options;
103
104 let content_encoding = request.content_encoding;
105 let content_encoding = options
106 .content_encoding
107 .or_else(|| content_encoding.map(|ce| ce.to_string()));
108 let content_type = options
109 .content_type
110 .or_else(|| Some("text/x-log".to_owned()));
111
112 let content_md5 = BASE64_STANDARD.encode(md5::Md5::digest(&request.body));
113
114 let tagging = options.tags.map(|tags| {
115 let mut tagging = url::form_urlencoded::Serializer::new(String::new());
116 for (p, v) in &tags {
117 tagging.append_pair(p, v);
118 }
119 tagging.finish()
120 });
121
122 let events_byte_size = request
123 .request_metadata
124 .into_events_estimated_json_encoded_byte_size();
125
126 let client = self.client.clone();
127
128 Box::pin(async move {
129 let put_request = client
130 .put_object()
131 .body(bytes_to_bytestream(request.body))
132 .bucket(request.bucket.clone())
133 .key(request.metadata.s3_key.clone())
134 .set_content_encoding(content_encoding)
135 .set_content_type(content_type)
136 .set_acl(options.acl.map(Into::into))
137 .set_grant_full_control(options.grant_full_control)
138 .set_grant_read(options.grant_read)
139 .set_grant_read_acp(options.grant_read_acp)
140 .set_grant_write_acp(options.grant_write_acp)
141 .set_server_side_encryption(options.server_side_encryption.map(Into::into))
142 .set_ssekms_key_id(options.ssekms_key_id)
143 .set_storage_class(Some(options.storage_class.into()))
144 .set_tagging(tagging)
145 .content_md5(content_md5);
146
147 let result = put_request.send().in_current_span().await;
148
149 result.map(|_| {
150 trace!(
151 target: "vector::sinks::s3_common::service::put_object",
152 message = "Put object to s3-compatible storage.",
153 bucket = request.bucket,
154 key = request.metadata.s3_key
155 );
156
157 S3Response { events_byte_size }
158 })
159 })
160 }
161}
162
163fn bytes_to_bytestream(buf: Bytes) -> ByteStream {
164 ByteStream::from(buf)
165}