vector/sinks/gcs_common/
service.rs1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::future::BoxFuture;
5use http::{
6 header::{HeaderName, HeaderValue},
7 Request, Uri,
8};
9use hyper::Body;
10use tower::Service;
11use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
12use vector_lib::stream::DriverResponse;
13
14use crate::{
15 event::{EventFinalizers, EventStatus, Finalizable},
16 gcp::GcpAuthenticator,
17 http::{HttpClient, HttpError},
18};
19
20#[derive(Debug, Clone)]
21pub struct GcsService {
22 client: HttpClient,
23 base_url: String,
24 auth: GcpAuthenticator,
25}
26
27impl GcsService {
28 pub const fn new(client: HttpClient, base_url: String, auth: GcpAuthenticator) -> GcsService {
29 GcsService {
30 client,
31 base_url,
32 auth,
33 }
34 }
35}
36
37#[derive(Clone, Debug)]
38pub struct GcsRequest {
39 pub key: String,
40 pub body: Bytes,
41 pub settings: GcsRequestSettings,
42 pub finalizers: EventFinalizers,
43 pub metadata: RequestMetadata,
44}
45
46impl Finalizable for GcsRequest {
47 fn take_finalizers(&mut self) -> EventFinalizers {
48 std::mem::take(&mut self.finalizers)
49 }
50}
51
52impl MetaDescriptive for GcsRequest {
53 fn get_metadata(&self) -> &RequestMetadata {
54 &self.metadata
55 }
56
57 fn metadata_mut(&mut self) -> &mut RequestMetadata {
58 &mut self.metadata
59 }
60}
61
62#[derive(Clone, Debug)]
66pub struct GcsRequestSettings {
67 pub acl: Option<HeaderValue>,
68 pub content_type: HeaderValue,
69 pub content_encoding: Option<HeaderValue>,
70 pub storage_class: HeaderValue,
71 pub headers: Vec<(HeaderName, HeaderValue)>,
72}
73
74#[derive(Debug)]
75pub struct GcsResponse {
76 pub inner: http::Response<Body>,
77 pub metadata: RequestMetadata,
78}
79
80impl DriverResponse for GcsResponse {
81 fn event_status(&self) -> EventStatus {
82 if self.inner.status().is_success() {
83 EventStatus::Delivered
84 } else if self.inner.status().is_server_error() {
85 EventStatus::Errored
86 } else {
87 EventStatus::Rejected
88 }
89 }
90
91 fn events_sent(&self) -> &GroupedCountByteSize {
92 self.metadata.events_estimated_json_encoded_byte_size()
93 }
94
95 fn bytes_sent(&self) -> Option<usize> {
96 Some(self.metadata.request_encoded_size())
97 }
98}
99
100impl Service<GcsRequest> for GcsService {
101 type Response = GcsResponse;
102 type Error = HttpError;
103 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
104
105 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
107 Poll::Ready(Ok(()))
108 }
109
110 fn call(&mut self, request: GcsRequest) -> Self::Future {
112 let settings = request.settings;
113 let metadata = request.metadata;
114
115 let uri = merge_url_and_key(&self.base_url, &request.key);
116
117 let uri = uri.parse::<Uri>().unwrap();
118
119 let mut builder = Request::put(uri);
120 let headers = builder.headers_mut().unwrap();
121 headers.insert("content-type", settings.content_type);
122 headers.insert(
123 "content-length",
124 HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
125 );
126 settings
127 .content_encoding
128 .map(|ce| headers.insert("content-encoding", ce));
129 settings.acl.map(|acl| headers.insert("x-goog-acl", acl));
130 headers.insert("x-goog-storage-class", settings.storage_class);
131 for (p, v) in settings.headers {
132 headers.insert(p, v);
133 }
134
135 let mut http_request = builder.body(Body::from(request.body)).unwrap();
136 self.auth.apply(&mut http_request);
137
138 let mut client = self.client.clone();
139 Box::pin(async move {
140 let result = client.call(http_request).await;
141 result.map(|inner| GcsResponse { inner, metadata })
142 })
143 }
144}
145
146fn merge_url_and_key(base_url: &str, key: &str) -> String {
148 let base_url = base_url.strip_suffix('/').unwrap_or(base_url);
149 let key = key.strip_prefix('/').unwrap_or(key);
150 format!("{base_url}/{key}")
151}
152
153#[cfg(test)]
154mod tests {
155 use crate::sinks::gcs_common::service::merge_url_and_key;
156
157 #[test]
158 fn merge_base_url_and_key() {
159 assert_eq!(
160 "https://baseurl/key",
161 merge_url_and_key("https://baseurl/", "/key")
162 );
163 assert_eq!(
164 "https://baseurl/key",
165 merge_url_and_key("https://baseurl/", "key")
166 );
167 assert_eq!(
168 "https://baseurl/key",
169 merge_url_and_key("https://baseurl", "/key")
170 );
171 assert_eq!(
172 "https://baseurl/key",
173 merge_url_and_key("https://baseurl", "key")
174 );
175 }
176}