vector/sinks/gcs_common/
config.rs1use std::marker::PhantomData;
2
3use futures::FutureExt;
4use http::{StatusCode, Uri};
5use hyper::Body;
6use snafu::Snafu;
7use vector_lib::configurable::configurable_component;
8
9use crate::{
10 gcp::{GcpAuthenticator, GcpError},
11 http::HttpClient,
12 sinks::{
13 gcs_common::service::GcsResponse,
14 util::retries::{RetryAction, RetryLogic},
15 Healthcheck, HealthcheckError,
16 },
17};
18
19pub fn default_endpoint() -> String {
20 "https://storage.googleapis.com".to_string()
21}
22
23#[configurable_component]
29#[derive(Clone, Copy, Debug, Derivative)]
30#[derivative(Default)]
31#[serde(rename_all = "kebab-case")]
32pub enum GcsPredefinedAcl {
33 AuthenticatedRead,
38
39 BucketOwnerFullControl,
46
47 BucketOwnerRead,
55
56 Private,
61
62 #[derivative(Default)]
69 ProjectPrivate,
70
71 PublicRead,
76}
77
78#[configurable_component]
84#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
85#[derivative(Default)]
86#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
87pub enum GcsStorageClass {
88 #[derivative(Default)]
92 Standard,
93
94 Nearline,
96
97 Coldline,
99
100 Archive,
102}
103
104#[derive(Debug, Snafu)]
105pub enum GcsError {
106 #[snafu(display("Bucket {:?} not found", bucket))]
107 BucketNotFound { bucket: String },
108}
109
110pub fn build_healthcheck(
111 bucket: String,
112 client: HttpClient,
113 base_url: String,
114 auth: GcpAuthenticator,
115) -> crate::Result<Healthcheck> {
116 let healthcheck = async move {
117 let uri = base_url.parse::<Uri>()?;
118 let mut request = http::Request::head(uri).body(Body::empty())?;
119
120 auth.apply(&mut request);
121
122 let not_found_error = GcsError::BucketNotFound { bucket }.into();
123
124 let response = client.send(request).await?;
125 healthcheck_response(response, not_found_error)
126 };
127
128 Ok(healthcheck.boxed())
129}
130
131pub fn healthcheck_response(
132 response: http::Response<hyper::Body>,
133 not_found_error: crate::Error,
134) -> crate::Result<()> {
135 match response.status() {
136 StatusCode::OK => Ok(()),
137 StatusCode::FORBIDDEN => Err(GcpError::HealthcheckForbidden.into()),
138 StatusCode::NOT_FOUND => Err(not_found_error),
139 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
140 }
141}
142
143pub struct GcsRetryLogic<Request> {
144 request: PhantomData<Request>,
145}
146
147impl<Request> Default for GcsRetryLogic<Request> {
148 fn default() -> Self {
149 Self {
150 request: PhantomData,
151 }
152 }
153}
154
155impl<Request> Clone for GcsRetryLogic<Request> {
156 fn clone(&self) -> Self {
157 Self {
158 request: PhantomData,
159 }
160 }
161}
162
163impl<Request: Clone + Send + Sync + 'static> RetryLogic for GcsRetryLogic<Request> {
165 type Error = hyper::Error;
166 type Request = Request;
167 type Response = GcsResponse;
168
169 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
170 true
171 }
172
173 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
174 let status = response.inner.status();
175
176 match status {
177 StatusCode::UNAUTHORIZED => RetryAction::Retry("unauthorized".into()),
178 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
179 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
180 StatusCode::NOT_IMPLEMENTED => {
181 RetryAction::DontRetry("endpoint not implemented".into())
182 }
183 _ if status.is_server_error() => RetryAction::Retry(status.to_string().into()),
184 _ if status.is_success() => RetryAction::Successful,
185 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
186 }
187 }
188}