1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
use futures::FutureExt;
use http::{StatusCode, Uri};
use hyper::Body;
use snafu::Snafu;
use vector_lib::configurable::configurable_component;
use crate::{
gcp::{GcpAuthenticator, GcpError},
http::HttpClient,
sinks::{
gcs_common::service::GcsResponse,
util::retries::{RetryAction, RetryLogic},
Healthcheck, HealthcheckError,
},
};
pub fn default_endpoint() -> String {
"https://storage.googleapis.com".to_string()
}
/// GCS Predefined ACLs.
///
/// For more information, see [Predefined ACLs][predefined_acls].
///
/// [predefined_acls]: https://cloud.google.com/storage/docs/access-control/lists#predefined-acl
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "kebab-case")]
pub enum GcsPredefinedAcl {
/// Bucket/object can be read by authenticated users.
///
/// The bucket/object owner is granted the `OWNER` permission, and anyone authenticated Google
/// account holder is granted the `READER` permission.
AuthenticatedRead,
/// Object is semi-private.
///
/// Both the object owner and bucket owner are granted the `OWNER` permission.
///
/// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
/// specified for a bucket.
BucketOwnerFullControl,
/// Object is private, except to the bucket owner.
///
/// The object owner is granted the `OWNER` permission, and the bucket owner is granted the
/// `READER` permission.
///
/// Only relevant when specified for an object: this predefined ACL is otherwise ignored when
/// specified for a bucket.
BucketOwnerRead,
/// Bucket/object are private.
///
/// The bucket/object owner is granted the `OWNER` permission, and no one else has
/// access.
Private,
/// Bucket/object are private within the project.
///
/// Project owners and project editors are granted the `OWNER` permission, and anyone who is
/// part of the project team is granted the `READER` permission.
///
/// This is the default.
#[derivative(Default)]
ProjectPrivate,
/// Bucket/object can be read publically.
///
/// The bucket/object owner is granted the `OWNER` permission, and all other users, whether
/// authenticated or anonymous, are granted the `READER` permission.
PublicRead,
}
/// GCS storage classes.
///
/// For more information, see [Storage classes][storage_classes].
///
/// [storage_classes]: https://cloud.google.com/storage/docs/storage-classes
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative, PartialEq, Eq)]
#[derivative(Default)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum GcsStorageClass {
/// Standard storage.
///
/// This is the default.
#[derivative(Default)]
Standard,
/// Nearline storage.
Nearline,
/// Coldline storage.
Coldline,
/// Archive storage.
Archive,
}
#[derive(Debug, Snafu)]
pub enum GcsError {
#[snafu(display("Bucket {:?} not found", bucket))]
BucketNotFound { bucket: String },
}
pub fn build_healthcheck(
bucket: String,
client: HttpClient,
base_url: String,
auth: GcpAuthenticator,
) -> crate::Result<Healthcheck> {
let healthcheck = async move {
let uri = base_url.parse::<Uri>()?;
let mut request = http::Request::head(uri).body(Body::empty())?;
auth.apply(&mut request);
let not_found_error = GcsError::BucketNotFound { bucket }.into();
let response = client.send(request).await?;
healthcheck_response(response, not_found_error)
};
Ok(healthcheck.boxed())
}
pub fn healthcheck_response(
response: http::Response<hyper::Body>,
not_found_error: crate::Error,
) -> crate::Result<()> {
match response.status() {
StatusCode::OK => Ok(()),
StatusCode::FORBIDDEN => Err(GcpError::HealthcheckForbidden.into()),
StatusCode::NOT_FOUND => Err(not_found_error),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}
#[derive(Clone)]
pub struct GcsRetryLogic;
// This is a clone of HttpRetryLogic for the Body type, should get merged
impl RetryLogic for GcsRetryLogic {
type Error = hyper::Error;
type Response = GcsResponse;
fn is_retriable_error(&self, _error: &Self::Error) -> bool {
true
}
fn should_retry_response(&self, response: &Self::Response) -> RetryAction {
let status = response.inner.status();
match status {
StatusCode::UNAUTHORIZED => RetryAction::Retry("unauthorized".into()),
StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
StatusCode::NOT_IMPLEMENTED => {
RetryAction::DontRetry("endpoint not implemented".into())
}
_ if status.is_server_error() => RetryAction::Retry(status.to_string().into()),
_ if status.is_success() => RetryAction::Successful,
_ => RetryAction::DontRetry(format!("response status: {}", status).into()),
}
}
}