use base64::prelude::{Engine as _, BASE64_STANDARD};
use bytes::{Bytes, BytesMut};
use futures::{FutureExt, SinkExt};
use http::{Request, Uri};
use hyper::Body;
use indoc::indoc;
use serde_json::{json, Value};
use snafu::{ResultExt, Snafu};
use tokio_util::codec::Encoder as _;
use vector_lib::configurable::configurable_component;
use crate::{
codecs::{Encoder, EncodingConfig, Transformer},
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
event::Event,
gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
http::HttpClient,
sinks::{
gcs_common::config::healthcheck_response,
util::{
http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig,
},
Healthcheck, UriParseSnafu, VectorSink,
},
tls::{TlsConfig, TlsSettings},
};
#[derive(Debug, Snafu)]
enum HealthcheckError {
#[snafu(display("Configured topic not found"))]
TopicNotFound,
}
const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
#[derive(Clone, Copy, Debug, Default)]
pub struct PubsubDefaultBatchSettings;
impl SinkBatchSettings for PubsubDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(1000);
const MAX_BYTES: Option<usize> = Some(10_000_000);
const TIMEOUT_SECS: f64 = 1.0;
}
#[configurable_component(sink(
"gcp_pubsub",
"Publish observability events to GCP's Pub/Sub messaging system."
))]
#[derive(Clone, Debug)]
pub struct PubsubConfig {
#[configurable(metadata(docs::examples = "vector-123456"))]
pub project: String,
#[configurable(metadata(docs::examples = "this-is-a-topic"))]
pub topic: String,
#[serde(default = "default_endpoint")]
#[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
pub endpoint: String,
#[serde(default, flatten)]
pub auth: GcpAuthConfig,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<PubsubDefaultBatchSettings>,
#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
#[configurable(derived)]
encoding: EncodingConfig,
#[configurable(derived)]
#[serde(default)]
pub tls: Option<TlsConfig>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}
fn default_endpoint() -> String {
PUBSUB_URL.to_string()
}
impl GenerateConfig for PubsubConfig {
fn generate_config() -> toml::Value {
toml::from_str(indoc! {r#"
project = "my-project"
topic = "my-topic"
encoding.codec = "json"
"#})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "gcp_pubsub")]
impl SinkConfig for PubsubConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = PubsubSink::from_config(self).await?;
let batch_settings = self
.batch
.validate()?
.limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
.into_batch_settings()?;
let request_settings = self.request.into_settings();
let tls_settings = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls_settings, cx.proxy())?;
let healthcheck = healthcheck(client.clone(), sink.uri("")?, sink.auth.clone()).boxed();
sink.auth.spawn_regenerate_token();
let sink = BatchedHttpSink::new(
sink,
JsonArrayBuffer::new(batch_settings.size),
request_settings,
batch_settings.timeout,
client,
)
.sink_map_err(|error| error!(message = "Fatal gcp_pubsub sink error.", %error));
#[allow(deprecated)]
Ok((VectorSink::from_event_sink(sink), healthcheck))
}
fn input(&self) -> Input {
Input::new(self.encoding.config().input_type())
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
struct PubsubSink {
auth: GcpAuthenticator,
uri_base: String,
transformer: Transformer,
encoder: Encoder<()>,
}
impl PubsubSink {
async fn from_config(config: &PubsubConfig) -> crate::Result<Self> {
let auth = config.auth.build(Scope::PubSub).await?;
let uri_base = format!(
"{}/v1/projects/{}/topics/{}",
config.endpoint, config.project, config.topic,
);
let transformer = config.encoding.transformer();
let serializer = config.encoding.build()?;
let encoder = Encoder::<()>::new(serializer);
Ok(Self {
auth,
uri_base,
transformer,
encoder,
})
}
fn uri(&self, suffix: &str) -> crate::Result<Uri> {
let uri = format!("{}{}", self.uri_base, suffix);
let mut uri = uri.parse::<Uri>().context(UriParseSnafu)?;
self.auth.apply_uri(&mut uri);
Ok(uri)
}
}
struct PubSubSinkEventEncoder {
transformer: Transformer,
encoder: Encoder<()>,
}
impl HttpEventEncoder<Value> for PubSubSinkEventEncoder {
fn encode_event(&mut self, mut event: Event) -> Option<Value> {
self.transformer.transform(&mut event);
let mut bytes = BytesMut::new();
self.encoder.encode(event, &mut bytes).ok()?;
Some(json!({ "data": BASE64_STANDARD.encode(&bytes) }))
}
}
impl HttpSink for PubsubSink {
type Input = Value;
type Output = Vec<BoxedRawValue>;
type Encoder = PubSubSinkEventEncoder;
fn build_encoder(&self) -> Self::Encoder {
PubSubSinkEventEncoder {
transformer: self.transformer.clone(),
encoder: self.encoder.clone(),
}
}
async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
let body = json!({ "messages": events });
let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
let uri = self.uri(":publish").unwrap();
let builder = Request::post(uri).header("Content-Type", "application/json");
let mut request = builder.body(body).unwrap();
self.auth.apply(&mut request);
Ok(request)
}
}
async fn healthcheck(client: HttpClient, uri: Uri, auth: GcpAuthenticator) -> crate::Result<()> {
let mut request = Request::get(uri).body(Body::empty()).unwrap();
auth.apply(&mut request);
let response = client.send(request).await?;
healthcheck_response(response, HealthcheckError::TopicNotFound.into())
}
#[cfg(test)]
mod tests {
use indoc::indoc;
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<PubsubConfig>();
}
#[tokio::test]
async fn fails_missing_creds() {
let config: PubsubConfig = toml::from_str(indoc! {r#"
project = "project"
topic = "topic"
encoding.codec = "json"
"#})
.unwrap();
if config.build(SinkContext::default()).await.is_ok() {
panic!("config.build failed to error");
}
}
}
#[cfg(all(test, feature = "gcp-integration-tests"))]
mod integration_tests {
use reqwest::{Client, Method, Response};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::event::{BatchNotifier, BatchStatus};
use super::*;
use crate::gcp;
use crate::test_util::components::{run_and_assert_sink_error, COMPONENT_ERROR_TAGS};
use crate::test_util::{
components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
random_events_with_stream, random_metrics_with_stream, random_string, trace_init,
};
const PROJECT: &str = "testproject";
fn config(topic: &str) -> PubsubConfig {
PubsubConfig {
project: PROJECT.into(),
topic: topic.into(),
endpoint: gcp::PUBSUB_ADDRESS.clone(),
auth: GcpAuthConfig {
skip_authentication: true,
..Default::default()
},
batch: Default::default(),
request: Default::default(),
encoding: JsonSerializerConfig::default().into(),
tls: Default::default(),
acknowledgements: Default::default(),
}
}
async fn config_build(topic: &str) -> (VectorSink, crate::sinks::Healthcheck) {
let cx = SinkContext::default();
config(topic).build(cx).await.expect("Building sink failed")
}
#[tokio::test]
async fn publish_metrics() {
trace_init();
let (topic, subscription) = create_topic_subscription().await;
let (sink, healthcheck) = config_build(&topic).await;
healthcheck.await.expect("Health check failed");
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input, events) = random_metrics_with_stream(100, Some(batch), None);
run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
let response = pull_messages(&subscription, 1000).await;
let messages = response
.receivedMessages
.as_ref()
.expect("Response is missing messages");
assert_eq!(input.len(), messages.len());
for i in 0..input.len() {
let data = messages[i].message.decode_data_as_value();
let data = serde_json::to_value(data).unwrap();
let expected = serde_json::to_value(input[i].as_metric()).unwrap();
assert_eq!(data, expected);
}
}
#[tokio::test]
async fn publish_events() {
trace_init();
let (topic, subscription) = create_topic_subscription().await;
let (sink, healthcheck) = config_build(&topic).await;
healthcheck.await.expect("Health check failed");
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input, events) = random_events_with_stream(100, 100, Some(batch));
run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
let response = pull_messages(&subscription, 1000).await;
let messages = response
.receivedMessages
.as_ref()
.expect("Response is missing messages");
assert_eq!(input.len(), messages.len());
for i in 0..input.len() {
let data = messages[i].message.decode_data();
let data = serde_json::to_value(data).unwrap();
let expected =
serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
assert_eq!(data, expected);
}
}
#[tokio::test]
async fn publish_events_broken_topic() {
trace_init();
let (topic, _subscription) = create_topic_subscription().await;
let (sink, _healthcheck) = config_build(&format!("BREAK{}BREAK", topic)).await;
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (_input, events) = random_events_with_stream(100, 100, Some(batch));
run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
}
#[tokio::test]
async fn checks_for_valid_topic() {
trace_init();
let (topic, _subscription) = create_topic_subscription().await;
let topic = format!("BAD{}", topic);
let (_sink, healthcheck) = config_build(&topic).await;
healthcheck.await.expect_err("Health check did not fail");
}
async fn create_topic_subscription() -> (String, String) {
let topic = format!("topic-{}", random_string(10));
let subscription = format!("subscription-{}", random_string(10));
request(Method::PUT, &format!("topics/{}", topic), json!({}))
.await
.json::<Value>()
.await
.expect("Creating new topic failed");
request(
Method::PUT,
&format!("subscriptions/{}", subscription),
json!({ "topic": format!("projects/{}/topics/{}", PROJECT, topic) }),
)
.await
.json::<Value>()
.await
.expect("Creating new subscription failed");
(topic, subscription)
}
async fn request(method: Method, path: &str, json: Value) -> Response {
let url = format!("{}/v1/projects/{}/{}", *gcp::PUBSUB_ADDRESS, PROJECT, path);
Client::new()
.request(method.clone(), &url)
.json(&json)
.send()
.await
.unwrap_or_else(|_| panic!("Sending {} request to {} failed", method, url))
}
async fn pull_messages(subscription: &str, count: usize) -> PullResponse {
request(
Method::POST,
&format!("subscriptions/{}:pull", subscription),
json!({
"returnImmediately": true,
"maxMessages": count
}),
)
.await
.json::<PullResponse>()
.await
.expect("Extracting pull data failed")
}
#[derive(Debug, Deserialize)]
#[allow(non_snake_case)]
struct PullResponse {
receivedMessages: Option<Vec<PullMessageOuter>>,
}
#[derive(Debug, Deserialize)]
#[allow(non_snake_case)]
#[allow(dead_code)] struct PullMessageOuter {
ackId: String,
message: PullMessage,
}
#[derive(Debug, Deserialize)]
#[allow(non_snake_case)]
#[allow(dead_code)] struct PullMessage {
data: String,
messageId: String,
publishTime: String,
}
impl PullMessage {
fn decode_data(&self) -> TestMessage {
let data = BASE64_STANDARD
.decode(&self.data)
.expect("Invalid base64 data");
let data = String::from_utf8_lossy(&data);
serde_json::from_str(&data).expect("Invalid message structure")
}
fn decode_data_as_value(&self) -> Value {
let data = BASE64_STANDARD
.decode(&self.data)
.expect("Invalid base64 data");
let data = String::from_utf8_lossy(&data);
serde_json::from_str(&data).expect("Invalid json")
}
}
#[derive(Debug, Deserialize, Serialize)]
struct TestMessage {
timestamp: String,
message: String,
}
}