1use base64::prelude::{Engine as _, BASE64_STANDARD};
2use bytes::{Bytes, BytesMut};
3use futures::{FutureExt, SinkExt};
4use http::{Request, Uri};
5use hyper::Body;
6use indoc::indoc;
7use serde_json::{json, Value};
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::Encoder as _;
10use vector_lib::configurable::configurable_component;
11
12use crate::{
13 codecs::{Encoder, EncodingConfig, Transformer},
14 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15 event::Event,
16 gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
17 http::HttpClient,
18 sinks::{
19 gcs_common::config::healthcheck_response,
20 util::{
21 http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
22 BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig,
23 },
24 Healthcheck, UriParseSnafu, VectorSink,
25 },
26 tls::{TlsConfig, TlsSettings},
27};
28
29#[derive(Debug, Snafu)]
30enum HealthcheckError {
31 #[snafu(display("Configured topic not found"))]
32 TopicNotFound,
33}
34
35const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
37
38#[derive(Clone, Copy, Debug, Default)]
39pub struct PubsubDefaultBatchSettings;
40
41impl SinkBatchSettings for PubsubDefaultBatchSettings {
42 const MAX_EVENTS: Option<usize> = Some(1000);
43 const MAX_BYTES: Option<usize> = Some(10_000_000);
44 const TIMEOUT_SECS: f64 = 1.0;
45}
46
47#[configurable_component(sink(
49 "gcp_pubsub",
50 "Publish observability events to GCP's Pub/Sub messaging system."
51))]
52#[derive(Clone, Debug)]
53pub struct PubsubConfig {
54 #[configurable(metadata(docs::examples = "vector-123456"))]
56 pub project: String,
57
58 #[configurable(metadata(docs::examples = "this-is-a-topic"))]
60 pub topic: String,
61
62 #[serde(default = "default_endpoint")]
71 #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
72 pub endpoint: String,
73
74 #[serde(default, flatten)]
75 pub auth: GcpAuthConfig,
76
77 #[configurable(derived)]
78 #[serde(default)]
79 pub batch: BatchConfig<PubsubDefaultBatchSettings>,
80
81 #[configurable(derived)]
82 #[serde(default)]
83 pub request: TowerRequestConfig,
84
85 #[configurable(derived)]
86 encoding: EncodingConfig,
87
88 #[configurable(derived)]
89 #[serde(default)]
90 pub tls: Option<TlsConfig>,
91
92 #[configurable(derived)]
93 #[serde(
94 default,
95 deserialize_with = "crate::serde::bool_or_struct",
96 skip_serializing_if = "crate::serde::is_default"
97 )]
98 acknowledgements: AcknowledgementsConfig,
99}
100
101fn default_endpoint() -> String {
102 PUBSUB_URL.to_string()
103}
104
105impl GenerateConfig for PubsubConfig {
106 fn generate_config() -> toml::Value {
107 toml::from_str(indoc! {r#"
108 project = "my-project"
109 topic = "my-topic"
110 encoding.codec = "json"
111 "#})
112 .unwrap()
113 }
114}
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "gcp_pubsub")]
118impl SinkConfig for PubsubConfig {
119 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
120 let sink = PubsubSink::from_config(self).await?;
121 let batch_settings = self
122 .batch
123 .validate()?
124 .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
125 .into_batch_settings()?;
126 let request_settings = self.request.into_settings();
127 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
128 let client = HttpClient::new(tls_settings, cx.proxy())?;
129
130 let healthcheck = healthcheck(client.clone(), sink.uri("")?, sink.auth.clone()).boxed();
131 sink.auth.spawn_regenerate_token();
132
133 let sink = BatchedHttpSink::new(
134 sink,
135 JsonArrayBuffer::new(batch_settings.size),
136 request_settings,
137 batch_settings.timeout,
138 client,
139 )
140 .sink_map_err(|error| error!(message = "Fatal gcp_pubsub sink error.", %error));
141
142 #[allow(deprecated)]
143 Ok((VectorSink::from_event_sink(sink), healthcheck))
144 }
145
146 fn input(&self) -> Input {
147 Input::new(self.encoding.config().input_type())
148 }
149
150 fn acknowledgements(&self) -> &AcknowledgementsConfig {
151 &self.acknowledgements
152 }
153}
154
155struct PubsubSink {
156 auth: GcpAuthenticator,
157 uri_base: String,
158 transformer: Transformer,
159 encoder: Encoder<()>,
160}
161
162impl PubsubSink {
163 async fn from_config(config: &PubsubConfig) -> crate::Result<Self> {
164 let auth = config.auth.build(Scope::PubSub).await?;
166
167 let uri_base = format!(
168 "{}/v1/projects/{}/topics/{}",
169 config.endpoint, config.project, config.topic,
170 );
171
172 let transformer = config.encoding.transformer();
173 let serializer = config.encoding.build()?;
174 let encoder = Encoder::<()>::new(serializer);
175
176 Ok(Self {
177 auth,
178 uri_base,
179 transformer,
180 encoder,
181 })
182 }
183
184 fn uri(&self, suffix: &str) -> crate::Result<Uri> {
185 let uri = format!("{}{}", self.uri_base, suffix);
186 let mut uri = uri.parse::<Uri>().context(UriParseSnafu)?;
187 self.auth.apply_uri(&mut uri);
188 Ok(uri)
189 }
190}
191
192struct PubSubSinkEventEncoder {
193 transformer: Transformer,
194 encoder: Encoder<()>,
195}
196
197impl HttpEventEncoder<Value> for PubSubSinkEventEncoder {
198 fn encode_event(&mut self, mut event: Event) -> Option<Value> {
199 self.transformer.transform(&mut event);
200 let mut bytes = BytesMut::new();
201 self.encoder.encode(event, &mut bytes).ok()?;
203 Some(json!({ "data": BASE64_STANDARD.encode(&bytes) }))
206 }
207}
208
209impl HttpSink for PubsubSink {
210 type Input = Value;
211 type Output = Vec<BoxedRawValue>;
212 type Encoder = PubSubSinkEventEncoder;
213
214 fn build_encoder(&self) -> Self::Encoder {
215 PubSubSinkEventEncoder {
216 transformer: self.transformer.clone(),
217 encoder: self.encoder.clone(),
218 }
219 }
220
221 async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
222 let body = json!({ "messages": events });
223 let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
224
225 let uri = self.uri(":publish").unwrap();
226 let builder = Request::post(uri).header("Content-Type", "application/json");
227
228 let mut request = builder.body(body).unwrap();
229 self.auth.apply(&mut request);
230
231 Ok(request)
232 }
233}
234
235async fn healthcheck(client: HttpClient, uri: Uri, auth: GcpAuthenticator) -> crate::Result<()> {
236 let mut request = Request::get(uri).body(Body::empty()).unwrap();
237 auth.apply(&mut request);
238
239 let response = client.send(request).await?;
240 healthcheck_response(response, HealthcheckError::TopicNotFound.into())
241}
242
243#[cfg(test)]
244mod tests {
245 use indoc::indoc;
246
247 use super::*;
248
249 #[test]
250 fn generate_config() {
251 crate::test_util::test_generate_config::<PubsubConfig>();
252 }
253
254 #[tokio::test]
255 async fn fails_missing_creds() {
256 let config: PubsubConfig = toml::from_str(indoc! {r#"
257 project = "project"
258 topic = "topic"
259 encoding.codec = "json"
260 "#})
261 .unwrap();
262 if config.build(SinkContext::default()).await.is_ok() {
263 panic!("config.build failed to error");
264 }
265 }
266}
267
268#[cfg(all(test, feature = "gcp-integration-tests"))]
269mod integration_tests {
270 use reqwest::{Client, Method, Response};
271 use serde::{Deserialize, Serialize};
272 use serde_json::{json, Value};
273 use vector_lib::codecs::JsonSerializerConfig;
274 use vector_lib::event::{BatchNotifier, BatchStatus};
275
276 use super::*;
277 use crate::gcp;
278 use crate::test_util::components::{run_and_assert_sink_error, COMPONENT_ERROR_TAGS};
279 use crate::test_util::{
280 components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
281 random_events_with_stream, random_metrics_with_stream, random_string, trace_init,
282 };
283
284 const PROJECT: &str = "testproject";
285
286 fn config(topic: &str) -> PubsubConfig {
287 PubsubConfig {
288 project: PROJECT.into(),
289 topic: topic.into(),
290 endpoint: gcp::PUBSUB_ADDRESS.clone(),
291 auth: GcpAuthConfig {
292 skip_authentication: true,
293 ..Default::default()
294 },
295 batch: Default::default(),
296 request: Default::default(),
297 encoding: JsonSerializerConfig::default().into(),
298 tls: Default::default(),
299 acknowledgements: Default::default(),
300 }
301 }
302
303 async fn config_build(topic: &str) -> (VectorSink, crate::sinks::Healthcheck) {
304 let cx = SinkContext::default();
305 config(topic).build(cx).await.expect("Building sink failed")
306 }
307
308 #[tokio::test]
309 async fn publish_metrics() {
310 trace_init();
311
312 let (topic, subscription) = create_topic_subscription().await;
313 let (sink, healthcheck) = config_build(&topic).await;
314
315 healthcheck.await.expect("Health check failed");
316
317 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
318 let (input, events) = random_metrics_with_stream(100, Some(batch), None);
319 run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
320 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
321
322 let response = pull_messages(&subscription, 1000).await;
323 let messages = response
324 .receivedMessages
325 .as_ref()
326 .expect("Response is missing messages");
327 assert_eq!(input.len(), messages.len());
328 for i in 0..input.len() {
329 let data = messages[i].message.decode_data_as_value();
330 let data = serde_json::to_value(data).unwrap();
331 let expected = serde_json::to_value(input[i].as_metric()).unwrap();
332 assert_eq!(data, expected);
333 }
334 }
335
336 #[tokio::test]
337 async fn publish_events() {
338 trace_init();
339
340 let (topic, subscription) = create_topic_subscription().await;
341 let (sink, healthcheck) = config_build(&topic).await;
342
343 healthcheck.await.expect("Health check failed");
344
345 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
346 let (input, events) = random_events_with_stream(100, 100, Some(batch));
347 run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
348 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
349
350 let response = pull_messages(&subscription, 1000).await;
351 let messages = response
352 .receivedMessages
353 .as_ref()
354 .expect("Response is missing messages");
355 assert_eq!(input.len(), messages.len());
356 for i in 0..input.len() {
357 let data = messages[i].message.decode_data();
358 let data = serde_json::to_value(data).unwrap();
359 let expected =
360 serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
361 assert_eq!(data, expected);
362 }
363 }
364
365 #[tokio::test]
366 async fn publish_events_broken_topic() {
367 trace_init();
368
369 let (topic, _subscription) = create_topic_subscription().await;
370 let (sink, _healthcheck) = config_build(&format!("BREAK{topic}BREAK")).await;
371 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
374 let (_input, events) = random_events_with_stream(100, 100, Some(batch));
375 run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
376 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
377 }
378
379 #[tokio::test]
380 async fn checks_for_valid_topic() {
381 trace_init();
382
383 let (topic, _subscription) = create_topic_subscription().await;
384 let topic = format!("BAD{topic}");
385 let (_sink, healthcheck) = config_build(&topic).await;
386 healthcheck.await.expect_err("Health check did not fail");
387 }
388
389 async fn create_topic_subscription() -> (String, String) {
390 let topic = format!("topic-{}", random_string(10));
391 let subscription = format!("subscription-{}", random_string(10));
392 request(Method::PUT, &format!("topics/{topic}"), json!({}))
393 .await
394 .json::<Value>()
395 .await
396 .expect("Creating new topic failed");
397 request(
398 Method::PUT,
399 &format!("subscriptions/{subscription}"),
400 json!({ "topic": format!("projects/{}/topics/{}", PROJECT, topic) }),
401 )
402 .await
403 .json::<Value>()
404 .await
405 .expect("Creating new subscription failed");
406 (topic, subscription)
407 }
408
409 async fn request(method: Method, path: &str, json: Value) -> Response {
410 let url = format!("{}/v1/projects/{}/{}", *gcp::PUBSUB_ADDRESS, PROJECT, path);
411 Client::new()
412 .request(method.clone(), &url)
413 .json(&json)
414 .send()
415 .await
416 .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
417 }
418
419 async fn pull_messages(subscription: &str, count: usize) -> PullResponse {
420 request(
421 Method::POST,
422 &format!("subscriptions/{subscription}:pull"),
423 json!({
424 "returnImmediately": true,
425 "maxMessages": count
426 }),
427 )
428 .await
429 .json::<PullResponse>()
430 .await
431 .expect("Extracting pull data failed")
432 }
433
434 #[derive(Debug, Deserialize)]
435 #[allow(non_snake_case)]
436 struct PullResponse {
437 receivedMessages: Option<Vec<PullMessageOuter>>,
438 }
439
440 #[derive(Debug, Deserialize)]
441 #[allow(non_snake_case)]
442 #[allow(dead_code)] struct PullMessageOuter {
444 ackId: String,
445 message: PullMessage,
446 }
447
448 #[derive(Debug, Deserialize)]
449 #[allow(non_snake_case)]
450 #[allow(dead_code)] struct PullMessage {
452 data: String,
453 messageId: String,
454 publishTime: String,
455 }
456
457 impl PullMessage {
458 fn decode_data(&self) -> TestMessage {
459 let data = BASE64_STANDARD
460 .decode(&self.data)
461 .expect("Invalid base64 data");
462 let data = String::from_utf8_lossy(&data);
463 serde_json::from_str(&data).expect("Invalid message structure")
464 }
465
466 fn decode_data_as_value(&self) -> Value {
467 let data = BASE64_STANDARD
468 .decode(&self.data)
469 .expect("Invalid base64 data");
470 let data = String::from_utf8_lossy(&data);
471 serde_json::from_str(&data).expect("Invalid json")
472 }
473 }
474
475 #[derive(Debug, Deserialize, Serialize)]
476 struct TestMessage {
477 timestamp: String,
478 message: String,
479 }
480}