vector/sinks/gcs_common/
service.rs

1use std::task::Poll;
2
3use bytes::Bytes;
4use futures::future::BoxFuture;
5use http::{
6    Request, Uri,
7    header::{HeaderName, HeaderValue},
8};
9use hyper::Body;
10use tower::Service;
11use vector_lib::{
12    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
13    stream::DriverResponse,
14};
15
16use crate::{
17    event::{EventFinalizers, EventStatus, Finalizable},
18    gcp::GcpAuthenticator,
19    http::{HttpClient, HttpError},
20};
21
22#[derive(Debug, Clone)]
23pub struct GcsService {
24    client: HttpClient,
25    base_url: String,
26    auth: GcpAuthenticator,
27}
28
29impl GcsService {
30    pub const fn new(client: HttpClient, base_url: String, auth: GcpAuthenticator) -> GcsService {
31        GcsService {
32            client,
33            base_url,
34            auth,
35        }
36    }
37}
38
39#[derive(Clone, Debug)]
40pub struct GcsRequest {
41    pub key: String,
42    pub body: Bytes,
43    pub settings: GcsRequestSettings,
44    pub finalizers: EventFinalizers,
45    pub metadata: RequestMetadata,
46}
47
48impl Finalizable for GcsRequest {
49    fn take_finalizers(&mut self) -> EventFinalizers {
50        std::mem::take(&mut self.finalizers)
51    }
52}
53
54impl MetaDescriptive for GcsRequest {
55    fn get_metadata(&self) -> &RequestMetadata {
56        &self.metadata
57    }
58
59    fn metadata_mut(&mut self) -> &mut RequestMetadata {
60        &mut self.metadata
61    }
62}
63
64// Settings required to produce a request that do not change per
65// request. All possible values are pre-computed for direct use in
66// producing a request.
67#[derive(Clone, Debug)]
68pub struct GcsRequestSettings {
69    pub acl: Option<HeaderValue>,
70    pub content_type: HeaderValue,
71    pub content_encoding: Option<HeaderValue>,
72    pub storage_class: HeaderValue,
73    pub headers: Vec<(HeaderName, HeaderValue)>,
74}
75
76#[derive(Debug)]
77pub struct GcsResponse {
78    pub inner: http::Response<Body>,
79    pub metadata: RequestMetadata,
80}
81
82impl DriverResponse for GcsResponse {
83    fn event_status(&self) -> EventStatus {
84        if self.inner.status().is_success() {
85            EventStatus::Delivered
86        } else if self.inner.status().is_server_error() {
87            EventStatus::Errored
88        } else {
89            EventStatus::Rejected
90        }
91    }
92
93    fn events_sent(&self) -> &GroupedCountByteSize {
94        self.metadata.events_estimated_json_encoded_byte_size()
95    }
96
97    fn bytes_sent(&self) -> Option<usize> {
98        Some(self.metadata.request_encoded_size())
99    }
100}
101
102impl Service<GcsRequest> for GcsService {
103    type Response = GcsResponse;
104    type Error = HttpError;
105    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
106
107    // Emission of an internal event in case of errors is handled upstream by the caller.
108    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
109        Poll::Ready(Ok(()))
110    }
111
112    // Emission of internal events for errors and dropped events is handled upstream by the caller.
113    fn call(&mut self, request: GcsRequest) -> Self::Future {
114        let settings = request.settings;
115        let metadata = request.metadata;
116
117        let uri = merge_url_and_key(&self.base_url, &request.key);
118
119        let uri = uri.parse::<Uri>().unwrap();
120
121        let mut builder = Request::put(uri);
122        let headers = builder.headers_mut().unwrap();
123        headers.insert("content-type", settings.content_type);
124        headers.insert(
125            "content-length",
126            HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
127        );
128        settings
129            .content_encoding
130            .map(|ce| headers.insert("content-encoding", ce));
131        settings.acl.map(|acl| headers.insert("x-goog-acl", acl));
132        headers.insert("x-goog-storage-class", settings.storage_class);
133        for (p, v) in settings.headers {
134            headers.insert(p, v);
135        }
136
137        let mut http_request = builder.body(Body::from(request.body)).unwrap();
138        self.auth.apply(&mut http_request);
139
140        let mut client = self.client.clone();
141        Box::pin(async move {
142            let result = client.call(http_request).await;
143            result.map(|inner| GcsResponse { inner, metadata })
144        })
145    }
146}
147
148/// converts // to / between the base url and the key if necessary
149fn merge_url_and_key(base_url: &str, key: &str) -> String {
150    let base_url = base_url.strip_suffix('/').unwrap_or(base_url);
151    let key = key.strip_prefix('/').unwrap_or(key);
152    format!("{base_url}/{key}")
153}
154
155#[cfg(test)]
156mod tests {
157    use crate::sinks::gcs_common::service::merge_url_and_key;
158
159    #[test]
160    fn merge_base_url_and_key() {
161        assert_eq!(
162            "https://baseurl/key",
163            merge_url_and_key("https://baseurl/", "/key")
164        );
165        assert_eq!(
166            "https://baseurl/key",
167            merge_url_and_key("https://baseurl/", "key")
168        );
169        assert_eq!(
170            "https://baseurl/key",
171            merge_url_and_key("https://baseurl", "/key")
172        );
173        assert_eq!(
174            "https://baseurl/key",
175            merge_url_and_key("https://baseurl", "key")
176        );
177    }
178}