vector/sinks/gcs_common/
config.rs

1use std::marker::PhantomData;
2
3use futures::FutureExt;
4use http::{StatusCode, Uri};
5use hyper::Body;
6use snafu::Snafu;
7use vector_lib::configurable::configurable_component;
8
9use crate::{
10    gcp::{GcpAuthenticator, GcpError},
11    http::HttpClient,
12    sinks::{
13        Healthcheck, HealthcheckError,
14        gcs_common::service::GcsResponse,
15        util::retries::{RetryAction, RetryLogic},
16    },
17};
18
19pub fn default_endpoint() -> String {
20    "https://storage.googleapis.com".to_string()
21}
22
23/// GCS Predefined ACLs.
24///
25/// For more information, see [Predefined ACLs][predefined_acls].
26///
27/// [predefined_acls]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
28#[configurable_component]
29#[derive(Clone, Copy, Debug, Derivative)]
30#[derivative(Default)]
31#[serde(rename_all = "kebab-case")]
32pub enum GcsPredefinedAcl {
33    /// Bucket/object can be read by authenticated users.
34    ///
35    /// The bucket/object owner is granted the `OWNER` permission, and anyone authenticated Google
36    /// account holder is granted the `READER` permission.
37    AuthenticatedRead,
38
39    /// Object is semi-private.
40    ///
41    /// Both the object owner and bucket owner are granted the `OWNER` permission.
42    ///
43    /// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
44    /// specified for a bucket.
45    BucketOwnerFullControl,
46
47    /// Object is private, except to the bucket owner.
48    ///
49    /// The object owner is granted the `OWNER` permission, and the bucket owner is granted the
50    /// `READER` permission.
51    ///
52    /// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
53    /// specified for a bucket.
54    BucketOwnerRead,
55
56    /// Bucket/object are private.
57    ///
58    /// The bucket/object owner is granted the `OWNER` permission, and no one else has
59    /// access.
60    Private,
61
62    /// Bucket/object are private within the project.
63    ///
64    /// Project owners and project editors are granted the `OWNER` permission, and anyone who is
65    /// part of the project team is granted the `READER` permission.
66    ///
67    /// This is the default.
68    #[derivative(Default)]
69    ProjectPrivate,
70
71    /// Bucket/object can be read publically.
72    ///
73    /// The bucket/object owner is granted the `OWNER` permission, and all other users, whether
74    /// authenticated or anonymous, are granted the `READER` permission.
75    PublicRead,
76}
77
78/// GCS storage classes.
79///
80/// For more information, see [Storage classes][storage_classes].
81///
82/// [storage_classes]: https://cloud.google.com/storage/docs/storage-classes
83#[configurable_component]
84#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
85#[derivative(Default)]
86#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
87pub enum GcsStorageClass {
88    /// Standard storage.
89    ///
90    /// This is the default.
91    #[derivative(Default)]
92    Standard,
93
94    /// Nearline storage.
95    Nearline,
96
97    /// Coldline storage.
98    Coldline,
99
100    /// Archive storage.
101    Archive,
102}
103
104#[derive(Debug, Snafu)]
105pub enum GcsError {
106    #[snafu(display("Bucket {:?} not found", bucket))]
107    BucketNotFound { bucket: String },
108}
109
110pub fn build_healthcheck(
111    bucket: String,
112    client: HttpClient,
113    base_url: String,
114    auth: GcpAuthenticator,
115) -> crate::Result<Healthcheck> {
116    let healthcheck = async move {
117        let uri = base_url.parse::<Uri>()?;
118        let mut request = http::Request::head(uri).body(Body::empty())?;
119
120        auth.apply(&mut request);
121
122        let not_found_error = GcsError::BucketNotFound { bucket }.into();
123
124        let response = client.send(request).await?;
125        healthcheck_response(response, not_found_error)
126    };
127
128    Ok(healthcheck.boxed())
129}
130
131pub fn healthcheck_response(
132    response: http::Response<hyper::Body>,
133    not_found_error: crate::Error,
134) -> crate::Result<()> {
135    match response.status() {
136        StatusCode::OK => Ok(()),
137        StatusCode::FORBIDDEN => Err(GcpError::HealthcheckForbidden.into()),
138        StatusCode::NOT_FOUND => Err(not_found_error),
139        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
140    }
141}
142
143pub struct GcsRetryLogic<Request> {
144    request: PhantomData<Request>,
145}
146
147impl<Request> Default for GcsRetryLogic<Request> {
148    fn default() -> Self {
149        Self {
150            request: PhantomData,
151        }
152    }
153}
154
155impl<Request> Clone for GcsRetryLogic<Request> {
156    fn clone(&self) -> Self {
157        Self {
158            request: PhantomData,
159        }
160    }
161}
162
163// This is a clone of HttpRetryLogic for the Body type, should get merged
164impl<Request: Clone + Send + Sync + 'static> RetryLogic for GcsRetryLogic<Request> {
165    type Error = hyper::Error;
166    type Request = Request;
167    type Response = GcsResponse;
168
169    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
170        true
171    }
172
173    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
174        let status = response.inner.status();
175
176        match status {
177            StatusCode::UNAUTHORIZED => RetryAction::Retry("unauthorized".into()),
178            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
179            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
180            StatusCode::NOT_IMPLEMENTED => {
181                RetryAction::DontRetry("endpoint not implemented".into())
182            }
183            _ if status.is_server_error() => RetryAction::Retry(status.to_string().into()),
184            _ if status.is_success() => RetryAction::Successful,
185            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
186        }
187    }
188}