use std::{collections::HashMap, convert::TryFrom, io};
use bytes::Bytes;
use chrono::{FixedOffset, Utc};
use http::header::{HeaderName, HeaderValue};
use http::Uri;
use indoc::indoc;
use snafu::ResultExt;
use snafu::Snafu;
use tower::ServiceBuilder;
use uuid::Uuid;
use vector_lib::codecs::encoding::Framer;
use vector_lib::configurable::configurable_component;
use vector_lib::event::{EventFinalizers, Finalizable};
use vector_lib::{request_metadata::RequestMetadata, TimeZone};
use crate::sinks::util::metadata::RequestMetadataBuilder;
use crate::sinks::util::service::TowerRequestConfigDefaults;
use crate::{
codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
event::Event,
gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
http::{get_http_scheme_from_uri, HttpClient},
serde::json::to_string,
sinks::{
gcs_common::{
config::{
build_healthcheck, default_endpoint, GcsPredefinedAcl, GcsRetryLogic,
GcsStorageClass,
},
service::{GcsRequest, GcsRequestSettings, GcsService},
sink::GcsSink,
},
util::{
batch::BatchConfig, partitioner::KeyPartitioner, request_builder::EncodeResult,
timezone_to_offset, BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder,
ServiceBuilderExt, TowerRequestConfig,
},
Healthcheck, VectorSink,
},
template::{Template, TemplateParseError},
tls::{TlsConfig, TlsSettings},
};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum GcsHealthcheckError {
#[snafu(display("key_prefix template parse error: {}", source))]
KeyPrefixTemplate { source: TemplateParseError },
}
#[derive(Clone, Copy, Debug)]
pub struct GcsTowerRequestConfigDefaults;
impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
const RATE_LIMIT_NUM: u64 = 1_000;
}
#[configurable_component(sink(
"gcp_cloud_storage",
"Store observability events in GCP Cloud Storage."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct GcsSinkConfig {
#[configurable(metadata(docs::examples = "my-bucket"))]
bucket: String,
acl: Option<GcsPredefinedAcl>,
storage_class: Option<GcsStorageClass>,
#[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
#[configurable(metadata(docs::advanced))]
metadata: Option<HashMap<String, String>>,
#[configurable(metadata(docs::templateable))]
#[configurable(metadata(
docs::examples = "date=%F/",
docs::examples = "date=%F/hour=%H/",
docs::examples = "year=%Y/month=%m/day=%d/",
docs::examples = "application_id={{ application_id }}/date=%F/"
))]
#[configurable(metadata(docs::advanced))]
key_prefix: Option<String>,
#[serde(default = "default_time_format")]
#[configurable(metadata(docs::advanced))]
filename_time_format: String,
#[serde(default = "crate::serde::default_true")]
#[configurable(metadata(docs::advanced))]
filename_append_uuid: bool,
#[configurable(metadata(docs::advanced))]
filename_extension: Option<String>,
#[serde(flatten)]
encoding: EncodingConfigWithFraming,
#[configurable(derived)]
#[serde(default)]
compression: Compression,
#[configurable(derived)]
#[serde(default)]
batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
#[configurable(metadata(docs::examples = "http://localhost:9000"))]
#[configurable(validation(format = "uri"))]
#[serde(default = "default_endpoint")]
endpoint: String,
#[configurable(derived)]
#[serde(default)]
request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
#[serde(flatten)]
auth: GcpAuthConfig,
#[configurable(derived)]
tls: Option<TlsConfig>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
#[configurable(derived)]
#[serde(default)]
pub timezone: Option<TimeZone>,
}
fn default_time_format() -> String {
"%s".to_string()
}
#[cfg(test)]
fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
GcsSinkConfig {
bucket: Default::default(),
acl: Default::default(),
storage_class: Default::default(),
metadata: Default::default(),
key_prefix: Default::default(),
filename_time_format: default_time_format(),
filename_append_uuid: true,
filename_extension: Default::default(),
encoding,
compression: Compression::gzip_default(),
batch: Default::default(),
endpoint: Default::default(),
request: Default::default(),
auth: Default::default(),
tls: Default::default(),
acknowledgements: Default::default(),
timezone: Default::default(),
}
}
impl GenerateConfig for GcsSinkConfig {
fn generate_config() -> toml::Value {
toml::from_str(indoc! {r#"
bucket = "my-bucket"
credentials_path = "/path/to/credentials.json"
framing.method = "newline_delimited"
encoding.codec = "json"
"#})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "gcp_cloud_storage")]
impl SinkConfig for GcsSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
let base_url = format!("{}/{}/", self.endpoint, self.bucket);
let tls = TlsSettings::from_options(self.tls.as_ref())?;
let client = HttpClient::new(tls, cx.proxy())?;
let healthcheck = build_healthcheck(
self.bucket.clone(),
client.clone(),
base_url.clone(),
auth.clone(),
)?;
auth.spawn_regenerate_token();
let sink = self.build_sink(client, base_url, auth, cx)?;
Ok((sink, healthcheck))
}
fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type() & DataType::Log)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
impl GcsSinkConfig {
fn build_sink(
&self,
client: HttpClient,
base_url: String,
auth: GcpAuthenticator,
cx: SinkContext,
) -> crate::Result<VectorSink> {
let request = self.request.into_settings();
let batch_settings = self.batch.into_batcher_settings()?;
let partitioner = self.key_partitioner()?;
let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
let svc = ServiceBuilder::new()
.settings(request, GcsRetryLogic)
.service(GcsService::new(client, base_url, auth));
let request_settings = RequestSettings::new(self, cx)?;
let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
Ok(VectorSink::from_event_streamsink(sink))
}
fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
Ok(KeyPartitioner::new(
Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
.context(KeyPrefixTemplateSnafu)?,
None,
))
}
}
#[derive(Clone, Debug)]
struct RequestSettings {
acl: Option<HeaderValue>,
content_type: HeaderValue,
content_encoding: Option<HeaderValue>,
storage_class: HeaderValue,
headers: Vec<(HeaderName, HeaderValue)>,
extension: String,
time_format: String,
append_uuid: bool,
encoder: (Transformer, Encoder<Framer>),
compression: Compression,
tz_offset: Option<FixedOffset>,
}
impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
type Metadata = (String, EventFinalizers);
type Events = Vec<Event>;
type Encoder = (Transformer, Encoder<Framer>);
type Payload = Bytes;
type Request = GcsRequest;
type Error = io::Error;
fn compression(&self) -> Compression {
self.compression
}
fn encoder(&self) -> &Self::Encoder {
&self.encoder
}
fn split_input(
&self,
input: (String, Vec<Event>),
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let (partition_key, mut events) = input;
let finalizers = events.take_finalizers();
let builder = RequestMetadataBuilder::from_events(&events);
((partition_key, finalizers), builder, events)
}
fn build_request(
&self,
gcp_metadata: Self::Metadata,
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
let (key, finalizers) = gcp_metadata;
let filename = {
let seconds = match self.tz_offset {
Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
None => Utc::now()
.with_timezone(&chrono::Utc)
.format(&self.time_format),
};
if self.append_uuid {
let uuid = Uuid::new_v4();
format!("{}-{}", seconds, uuid.hyphenated())
} else {
seconds.to_string()
}
};
let key = format!("{}{}.{}", key, filename, self.extension);
let body = payload.into_payload();
GcsRequest {
key,
body,
finalizers,
settings: GcsRequestSettings {
acl: self.acl.clone(),
content_type: self.content_type.clone(),
content_encoding: self.content_encoding.clone(),
storage_class: self.storage_class.clone(),
headers: self.headers.clone(),
},
metadata,
}
}
}
impl RequestSettings {
fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
let transformer = config.encoding.transformer();
let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);
let acl = config
.acl
.map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
let content_type = HeaderValue::from_str(encoder.content_type()).unwrap();
let content_encoding = config
.compression
.content_encoding()
.map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
let storage_class = config.storage_class.unwrap_or_default();
let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
let metadata = config
.metadata
.as_ref()
.map(|metadata| {
metadata
.iter()
.map(make_header)
.collect::<Result<Vec<_>, _>>()
})
.unwrap_or_else(|| Ok(vec![]))?;
let extension = config
.filename_extension
.clone()
.unwrap_or_else(|| config.compression.extension().into());
let time_format = config.filename_time_format.clone();
let append_uuid = config.filename_append_uuid;
let offset = config
.timezone
.or(cx.globals.timezone)
.and_then(timezone_to_offset);
Ok(Self {
acl,
content_type,
content_encoding,
storage_class,
headers: metadata,
extension,
time_format,
append_uuid,
compression: config.compression,
encoder: (transformer, encoder),
tz_offset: offset,
})
}
}
fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
Ok((
HeaderName::from_bytes(name.as_bytes())?,
HeaderValue::from_str(value)?,
))
}
#[cfg(test)]
mod tests {
use futures_util::{future::ready, stream};
use vector_lib::codecs::encoding::FramingConfig;
use vector_lib::codecs::{
JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
};
use vector_lib::partition::Partitioner;
use vector_lib::request_metadata::GroupedCountByteSize;
use vector_lib::EstimatedJsonEncodedSizeOf;
use crate::event::LogEvent;
use crate::test_util::{
components::{run_and_assert_sink_compliance, SINK_TAGS},
http::{always_200_response, spawn_blackhole_http_server},
};
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<GcsSinkConfig>();
}
#[tokio::test]
async fn component_spec_compliance() {
let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
let context = SinkContext::default();
let tls = TlsSettings::default();
let client =
HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
let config =
default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
let sink = config
.build_sink(
client,
mock_endpoint.to_string(),
GcpAuthenticator::None,
context,
)
.expect("failed to build sink");
let event = Event::Log(LogEvent::from("simple message"));
run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
}
#[test]
fn gcs_encode_event_apply_rules() {
crate::test_util::trace_init();
let message = "hello world".to_string();
let mut event = LogEvent::from(message);
event.insert("key", "value");
let sink_config = GcsSinkConfig {
key_prefix: Some("key: {{ key }}".into()),
..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
};
let key = sink_config
.key_partitioner()
.unwrap()
.partition(&Event::Log(event))
.expect("key wasn't provided");
assert_eq!(key, "key: value");
}
fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
RequestSettings::new(sink_config, context).expect("Could not create request settings")
}
fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
let context = SinkContext::default();
let sink_config = GcsSinkConfig {
key_prefix: Some("key/".into()),
filename_time_format: "date".into(),
filename_extension: extension.map(Into::into),
filename_append_uuid: uuid,
compression,
..default_config(
(
Some(NewlineDelimitedEncoderConfig::new()),
JsonSerializerConfig::default(),
)
.into(),
)
};
let log = LogEvent::default().into();
let key = sink_config
.key_partitioner()
.unwrap()
.partition(&log)
.expect("key wasn't provided");
let mut byte_size = GroupedCountByteSize::new_untagged();
byte_size.add_event(&log, log.estimated_json_encoded_size_of());
let request_settings = request_settings(&sink_config, context);
let (metadata, metadata_request_builder, _events) =
request_settings.split_input((key, vec![log]));
let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
let request_metadata = metadata_request_builder.build(&payload);
request_settings.build_request(metadata, request_metadata, payload)
}
#[test]
fn gcs_build_request() {
let req = build_request(Some("ext"), false, Compression::None);
assert_eq!(req.key, "key/date.ext".to_string());
let req = build_request(None, false, Compression::None);
assert_eq!(req.key, "key/date.log".to_string());
let req = build_request(None, false, Compression::gzip_default());
assert_eq!(req.key, "key/date.log.gz".to_string());
let req = build_request(None, true, Compression::gzip_default());
assert_ne!(req.key, "key/date.log.gz".to_string());
}
}