vector/sinks/gcs_common/
service.rs

1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::future::BoxFuture;
5use http::{
6    header::{HeaderName, HeaderValue},
7    Request, Uri,
8};
9use hyper::Body;
10use tower::Service;
11use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
12use vector_lib::stream::DriverResponse;
13
14use crate::{
15    event::{EventFinalizers, EventStatus, Finalizable},
16    gcp::GcpAuthenticator,
17    http::{HttpClient, HttpError},
18};
19
20#[derive(Debug, Clone)]
21pub struct GcsService {
22    client: HttpClient,
23    base_url: String,
24    auth: GcpAuthenticator,
25}
26
27impl GcsService {
28    pub const fn new(client: HttpClient, base_url: String, auth: GcpAuthenticator) -> GcsService {
29        GcsService {
30            client,
31            base_url,
32            auth,
33        }
34    }
35}
36
37#[derive(Clone, Debug)]
38pub struct GcsRequest {
39    pub key: String,
40    pub body: Bytes,
41    pub settings: GcsRequestSettings,
42    pub finalizers: EventFinalizers,
43    pub metadata: RequestMetadata,
44}
45
46impl Finalizable for GcsRequest {
47    fn take_finalizers(&mut self) -> EventFinalizers {
48        std::mem::take(&mut self.finalizers)
49    }
50}
51
52impl MetaDescriptive for GcsRequest {
53    fn get_metadata(&self) -> &RequestMetadata {
54        &self.metadata
55    }
56
57    fn metadata_mut(&mut self) -> &mut RequestMetadata {
58        &mut self.metadata
59    }
60}
61
62// Settings required to produce a request that do not change per
63// request. All possible values are pre-computed for direct use in
64// producing a request.
65#[derive(Clone, Debug)]
66pub struct GcsRequestSettings {
67    pub acl: Option<HeaderValue>,
68    pub content_type: HeaderValue,
69    pub content_encoding: Option<HeaderValue>,
70    pub storage_class: HeaderValue,
71    pub headers: Vec<(HeaderName, HeaderValue)>,
72}
73
74#[derive(Debug)]
75pub struct GcsResponse {
76    pub inner: http::Response<Body>,
77    pub metadata: RequestMetadata,
78}
79
80impl DriverResponse for GcsResponse {
81    fn event_status(&self) -> EventStatus {
82        if self.inner.status().is_success() {
83            EventStatus::Delivered
84        } else if self.inner.status().is_server_error() {
85            EventStatus::Errored
86        } else {
87            EventStatus::Rejected
88        }
89    }
90
91    fn events_sent(&self) -> &GroupedCountByteSize {
92        self.metadata.events_estimated_json_encoded_byte_size()
93    }
94
95    fn bytes_sent(&self) -> Option<usize> {
96        Some(self.metadata.request_encoded_size())
97    }
98}
99
100impl Service<GcsRequest> for GcsService {
101    type Response = GcsResponse;
102    type Error = HttpError;
103    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
104
105    // Emission of an internal event in case of errors is handled upstream by the caller.
106    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
107        Poll::Ready(Ok(()))
108    }
109
110    // Emission of internal events for errors and dropped events is handled upstream by the caller.
111    fn call(&mut self, request: GcsRequest) -> Self::Future {
112        let settings = request.settings;
113        let metadata = request.metadata;
114
115        let uri = merge_url_and_key(&self.base_url, &request.key);
116
117        let uri = uri.parse::<Uri>().unwrap();
118
119        let mut builder = Request::put(uri);
120        let headers = builder.headers_mut().unwrap();
121        headers.insert("content-type", settings.content_type);
122        headers.insert(
123            "content-length",
124            HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
125        );
126        settings
127            .content_encoding
128            .map(|ce| headers.insert("content-encoding", ce));
129        settings.acl.map(|acl| headers.insert("x-goog-acl", acl));
130        headers.insert("x-goog-storage-class", settings.storage_class);
131        for (p, v) in settings.headers {
132            headers.insert(p, v);
133        }
134
135        let mut http_request = builder.body(Body::from(request.body)).unwrap();
136        self.auth.apply(&mut http_request);
137
138        let mut client = self.client.clone();
139        Box::pin(async move {
140            let result = client.call(http_request).await;
141            result.map(|inner| GcsResponse { inner, metadata })
142        })
143    }
144}
145
146/// converts // to / between the base url and the key if necessary
147fn merge_url_and_key(base_url: &str, key: &str) -> String {
148    let base_url = base_url.strip_suffix('/').unwrap_or(base_url);
149    let key = key.strip_prefix('/').unwrap_or(key);
150    format!("{base_url}/{key}")
151}
152
153#[cfg(test)]
154mod tests {
155    use crate::sinks::gcs_common::service::merge_url_and_key;
156
157    #[test]
158    fn merge_base_url_and_key() {
159        assert_eq!(
160            "https://baseurl/key",
161            merge_url_and_key("https://baseurl/", "/key")
162        );
163        assert_eq!(
164            "https://baseurl/key",
165            merge_url_and_key("https://baseurl/", "key")
166        );
167        assert_eq!(
168            "https://baseurl/key",
169            merge_url_and_key("https://baseurl", "/key")
170        );
171        assert_eq!(
172            "https://baseurl/key",
173            merge_url_and_key("https://baseurl", "key")
174        );
175    }
176}