vector/sinks/gcs_common/
service.rs1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::future::BoxFuture;
5use http::{
6 Request, Uri,
7 header::{HeaderName, HeaderValue},
8};
9use hyper::Body;
10use tower::Service;
11use vector_lib::{
12 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
13 stream::DriverResponse,
14};
15
16use crate::{
17 event::{EventFinalizers, EventStatus, Finalizable},
18 gcp::GcpAuthenticator,
19 http::{HttpClient, HttpError},
20};
21
22#[derive(Debug, Clone)]
23pub struct GcsService {
24 client: HttpClient,
25 base_url: String,
26 auth: GcpAuthenticator,
27}
28
29impl GcsService {
30 pub const fn new(client: HttpClient, base_url: String, auth: GcpAuthenticator) -> GcsService {
31 GcsService {
32 client,
33 base_url,
34 auth,
35 }
36 }
37}
38
39#[derive(Clone, Debug)]
40pub struct GcsRequest {
41 pub key: String,
42 pub body: Bytes,
43 pub settings: GcsRequestSettings,
44 pub finalizers: EventFinalizers,
45 pub metadata: RequestMetadata,
46}
47
48impl Finalizable for GcsRequest {
49 fn take_finalizers(&mut self) -> EventFinalizers {
50 std::mem::take(&mut self.finalizers)
51 }
52}
53
54impl MetaDescriptive for GcsRequest {
55 fn get_metadata(&self) -> &RequestMetadata {
56 &self.metadata
57 }
58
59 fn metadata_mut(&mut self) -> &mut RequestMetadata {
60 &mut self.metadata
61 }
62}
63
64#[derive(Clone, Debug)]
68pub struct GcsRequestSettings {
69 pub acl: Option<HeaderValue>,
70 pub content_type: HeaderValue,
71 pub content_encoding: Option<HeaderValue>,
72 pub storage_class: HeaderValue,
73 pub cache_control: Option<HeaderValue>,
74 pub headers: Vec<(HeaderName, HeaderValue)>,
75}
76
77#[derive(Debug)]
78pub struct GcsResponse {
79 pub inner: http::Response<Body>,
80 pub metadata: RequestMetadata,
81}
82
83impl DriverResponse for GcsResponse {
84 fn event_status(&self) -> EventStatus {
85 if self.inner.status().is_success() {
86 EventStatus::Delivered
87 } else if self.inner.status().is_server_error() {
88 EventStatus::Errored
89 } else {
90 EventStatus::Rejected
91 }
92 }
93
94 fn events_sent(&self) -> &GroupedCountByteSize {
95 self.metadata.events_estimated_json_encoded_byte_size()
96 }
97
98 fn bytes_sent(&self) -> Option<usize> {
99 Some(self.metadata.request_encoded_size())
100 }
101}
102
103impl Service<GcsRequest> for GcsService {
104 type Response = GcsResponse;
105 type Error = HttpError;
106 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
107
108 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
110 Poll::Ready(Ok(()))
111 }
112
113 fn call(&mut self, request: GcsRequest) -> Self::Future {
115 let settings = request.settings;
116 let metadata = request.metadata;
117
118 let uri = merge_url_and_key(&self.base_url, &request.key);
119
120 let uri = uri.parse::<Uri>().unwrap();
121
122 let mut builder = Request::put(uri);
123 let headers = builder.headers_mut().unwrap();
124 headers.insert("content-type", settings.content_type);
125 headers.insert(
126 "content-length",
127 HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
128 );
129 settings
130 .content_encoding
131 .map(|ce| headers.insert("content-encoding", ce));
132 settings.acl.map(|acl| headers.insert("x-goog-acl", acl));
133 settings
134 .cache_control
135 .map(|cc| headers.insert("cache-control", cc));
136 headers.insert("x-goog-storage-class", settings.storage_class);
137 for (p, v) in settings.headers {
138 headers.insert(p, v);
139 }
140
141 let mut http_request = builder.body(Body::from(request.body)).unwrap();
142 self.auth.apply(&mut http_request);
143
144 let mut client = self.client.clone();
145 Box::pin(async move {
146 let result = client.call(http_request).await;
147 result.map(|inner| GcsResponse { inner, metadata })
148 })
149 }
150}
151
152fn merge_url_and_key(base_url: &str, key: &str) -> String {
154 let base_url = base_url.strip_suffix('/').unwrap_or(base_url);
155 let key = key.strip_prefix('/').unwrap_or(key);
156 format!("{base_url}/{key}")
157}
158
159#[cfg(test)]
160mod tests {
161 use crate::sinks::gcs_common::service::merge_url_and_key;
162
163 #[test]
164 fn merge_base_url_and_key() {
165 assert_eq!(
166 "https://baseurl/key",
167 merge_url_and_key("https://baseurl/", "/key")
168 );
169 assert_eq!(
170 "https://baseurl/key",
171 merge_url_and_key("https://baseurl/", "key")
172 );
173 assert_eq!(
174 "https://baseurl/key",
175 merge_url_and_key("https://baseurl", "/key")
176 );
177 assert_eq!(
178 "https://baseurl/key",
179 merge_url_and_key("https://baseurl", "key")
180 );
181 }
182}