vector/sinks/gcs_common/
service.rs

1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::future::BoxFuture;
5use http::{
6    Request, Uri,
7    header::{HeaderName, HeaderValue},
8};
9use hyper::Body;
10use tower::Service;
11use vector_lib::{
12    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
13    stream::DriverResponse,
14};
15
16use crate::{
17    event::{EventFinalizers, EventStatus, Finalizable},
18    gcp::GcpAuthenticator,
19    http::{HttpClient, HttpError},
20};
21
22#[derive(Debug, Clone)]
23pub struct GcsService {
24    client: HttpClient,
25    base_url: String,
26    auth: GcpAuthenticator,
27}
28
29impl GcsService {
30    pub const fn new(client: HttpClient, base_url: String, auth: GcpAuthenticator) -> GcsService {
31        GcsService {
32            client,
33            base_url,
34            auth,
35        }
36    }
37}
38
39#[derive(Clone, Debug)]
40pub struct GcsRequest {
41    pub key: String,
42    pub body: Bytes,
43    pub settings: GcsRequestSettings,
44    pub finalizers: EventFinalizers,
45    pub metadata: RequestMetadata,
46}
47
48impl Finalizable for GcsRequest {
49    fn take_finalizers(&mut self) -> EventFinalizers {
50        std::mem::take(&mut self.finalizers)
51    }
52}
53
54impl MetaDescriptive for GcsRequest {
55    fn get_metadata(&self) -> &RequestMetadata {
56        &self.metadata
57    }
58
59    fn metadata_mut(&mut self) -> &mut RequestMetadata {
60        &mut self.metadata
61    }
62}
63
64// Settings required to produce a request that do not change per
65// request. All possible values are pre-computed for direct use in
66// producing a request.
67#[derive(Clone, Debug)]
68pub struct GcsRequestSettings {
69    pub acl: Option<HeaderValue>,
70    pub content_type: HeaderValue,
71    pub content_encoding: Option<HeaderValue>,
72    pub storage_class: HeaderValue,
73    pub cache_control: Option<HeaderValue>,
74    pub headers: Vec<(HeaderName, HeaderValue)>,
75}
76
77#[derive(Debug)]
78pub struct GcsResponse {
79    pub inner: http::Response<Body>,
80    pub metadata: RequestMetadata,
81}
82
83impl DriverResponse for GcsResponse {
84    fn event_status(&self) -> EventStatus {
85        if self.inner.status().is_success() {
86            EventStatus::Delivered
87        } else if self.inner.status().is_server_error() {
88            EventStatus::Errored
89        } else {
90            EventStatus::Rejected
91        }
92    }
93
94    fn events_sent(&self) -> &GroupedCountByteSize {
95        self.metadata.events_estimated_json_encoded_byte_size()
96    }
97
98    fn bytes_sent(&self) -> Option<usize> {
99        Some(self.metadata.request_encoded_size())
100    }
101}
102
103impl Service<GcsRequest> for GcsService {
104    type Response = GcsResponse;
105    type Error = HttpError;
106    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
107
108    // Emission of an internal event in case of errors is handled upstream by the caller.
109    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
110        Poll::Ready(Ok(()))
111    }
112
113    // Emission of internal events for errors and dropped events is handled upstream by the caller.
114    fn call(&mut self, request: GcsRequest) -> Self::Future {
115        let settings = request.settings;
116        let metadata = request.metadata;
117
118        let uri = merge_url_and_key(&self.base_url, &request.key);
119
120        let uri = uri.parse::<Uri>().unwrap();
121
122        let mut builder = Request::put(uri);
123        let headers = builder.headers_mut().unwrap();
124        headers.insert("content-type", settings.content_type);
125        headers.insert(
126            "content-length",
127            HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
128        );
129        settings
130            .content_encoding
131            .map(|ce| headers.insert("content-encoding", ce));
132        settings.acl.map(|acl| headers.insert("x-goog-acl", acl));
133        settings
134            .cache_control
135            .map(|cc| headers.insert("cache-control", cc));
136        headers.insert("x-goog-storage-class", settings.storage_class);
137        for (p, v) in settings.headers {
138            headers.insert(p, v);
139        }
140
141        let mut http_request = builder.body(Body::from(request.body)).unwrap();
142        self.auth.apply(&mut http_request);
143
144        let mut client = self.client.clone();
145        Box::pin(async move {
146            let result = client.call(http_request).await;
147            result.map(|inner| GcsResponse { inner, metadata })
148        })
149    }
150}
151
152/// converts // to / between the base url and the key if necessary
153fn merge_url_and_key(base_url: &str, key: &str) -> String {
154    let base_url = base_url.strip_suffix('/').unwrap_or(base_url);
155    let key = key.strip_prefix('/').unwrap_or(key);
156    format!("{base_url}/{key}")
157}
158
159#[cfg(test)]
160mod tests {
161    use crate::sinks::gcs_common::service::merge_url_and_key;
162
163    #[test]
164    fn merge_base_url_and_key() {
165        assert_eq!(
166            "https://baseurl/key",
167            merge_url_and_key("https://baseurl/", "/key")
168        );
169        assert_eq!(
170            "https://baseurl/key",
171            merge_url_and_key("https://baseurl/", "key")
172        );
173        assert_eq!(
174            "https://baseurl/key",
175            merge_url_and_key("https://baseurl", "/key")
176        );
177        assert_eq!(
178            "https://baseurl/key",
179            merge_url_and_key("https://baseurl", "key")
180        );
181    }
182}