vector/sinks/gcs_common/
service.rs1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::future::BoxFuture;
5use http::{
6 Request, Uri,
7 header::{HeaderName, HeaderValue},
8};
9use hyper::Body;
10use tower::Service;
11use vector_lib::{
12 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
13 stream::DriverResponse,
14};
15
16use crate::{
17 event::{EventFinalizers, EventStatus, Finalizable},
18 gcp::GcpAuthenticator,
19 http::{HttpClient, HttpError},
20};
21
22#[derive(Debug, Clone)]
23pub struct GcsService {
24 client: HttpClient,
25 base_url: String,
26 auth: GcpAuthenticator,
27}
28
29impl GcsService {
30 pub const fn new(client: HttpClient, base_url: String, auth: GcpAuthenticator) -> GcsService {
31 GcsService {
32 client,
33 base_url,
34 auth,
35 }
36 }
37}
38
39#[derive(Clone, Debug)]
40pub struct GcsRequest {
41 pub key: String,
42 pub body: Bytes,
43 pub settings: GcsRequestSettings,
44 pub finalizers: EventFinalizers,
45 pub metadata: RequestMetadata,
46}
47
48impl Finalizable for GcsRequest {
49 fn take_finalizers(&mut self) -> EventFinalizers {
50 std::mem::take(&mut self.finalizers)
51 }
52}
53
54impl MetaDescriptive for GcsRequest {
55 fn get_metadata(&self) -> &RequestMetadata {
56 &self.metadata
57 }
58
59 fn metadata_mut(&mut self) -> &mut RequestMetadata {
60 &mut self.metadata
61 }
62}
63
64#[derive(Clone, Debug)]
68pub struct GcsRequestSettings {
69 pub acl: Option<HeaderValue>,
70 pub content_type: HeaderValue,
71 pub content_encoding: Option<HeaderValue>,
72 pub storage_class: HeaderValue,
73 pub headers: Vec<(HeaderName, HeaderValue)>,
74}
75
76#[derive(Debug)]
77pub struct GcsResponse {
78 pub inner: http::Response<Body>,
79 pub metadata: RequestMetadata,
80}
81
82impl DriverResponse for GcsResponse {
83 fn event_status(&self) -> EventStatus {
84 if self.inner.status().is_success() {
85 EventStatus::Delivered
86 } else if self.inner.status().is_server_error() {
87 EventStatus::Errored
88 } else {
89 EventStatus::Rejected
90 }
91 }
92
93 fn events_sent(&self) -> &GroupedCountByteSize {
94 self.metadata.events_estimated_json_encoded_byte_size()
95 }
96
97 fn bytes_sent(&self) -> Option<usize> {
98 Some(self.metadata.request_encoded_size())
99 }
100}
101
102impl Service<GcsRequest> for GcsService {
103 type Response = GcsResponse;
104 type Error = HttpError;
105 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
106
107 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
109 Poll::Ready(Ok(()))
110 }
111
112 fn call(&mut self, request: GcsRequest) -> Self::Future {
114 let settings = request.settings;
115 let metadata = request.metadata;
116
117 let uri = merge_url_and_key(&self.base_url, &request.key);
118
119 let uri = uri.parse::<Uri>().unwrap();
120
121 let mut builder = Request::put(uri);
122 let headers = builder.headers_mut().unwrap();
123 headers.insert("content-type", settings.content_type);
124 headers.insert(
125 "content-length",
126 HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
127 );
128 settings
129 .content_encoding
130 .map(|ce| headers.insert("content-encoding", ce));
131 settings.acl.map(|acl| headers.insert("x-goog-acl", acl));
132 headers.insert("x-goog-storage-class", settings.storage_class);
133 for (p, v) in settings.headers {
134 headers.insert(p, v);
135 }
136
137 let mut http_request = builder.body(Body::from(request.body)).unwrap();
138 self.auth.apply(&mut http_request);
139
140 let mut client = self.client.clone();
141 Box::pin(async move {
142 let result = client.call(http_request).await;
143 result.map(|inner| GcsResponse { inner, metadata })
144 })
145 }
146}
147
148fn merge_url_and_key(base_url: &str, key: &str) -> String {
150 let base_url = base_url.strip_suffix('/').unwrap_or(base_url);
151 let key = key.strip_prefix('/').unwrap_or(key);
152 format!("{base_url}/{key}")
153}
154
155#[cfg(test)]
156mod tests {
157 use crate::sinks::gcs_common::service::merge_url_and_key;
158
159 #[test]
160 fn merge_base_url_and_key() {
161 assert_eq!(
162 "https://baseurl/key",
163 merge_url_and_key("https://baseurl/", "/key")
164 );
165 assert_eq!(
166 "https://baseurl/key",
167 merge_url_and_key("https://baseurl/", "key")
168 );
169 assert_eq!(
170 "https://baseurl/key",
171 merge_url_and_key("https://baseurl", "/key")
172 );
173 assert_eq!(
174 "https://baseurl/key",
175 merge_url_and_key("https://baseurl", "key")
176 );
177 }
178}