vector/sinks/gcp/
pubsub.rs

1use base64::prelude::{BASE64_STANDARD, Engine as _};
2use bytes::{Bytes, BytesMut};
3use futures::{FutureExt, SinkExt};
4use http::{Request, Uri};
5use hyper::Body;
6use indoc::indoc;
7use serde_json::{Value, json};
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::Encoder as _;
10use vector_lib::configurable::configurable_component;
11
12use crate::{
13    codecs::{Encoder, EncodingConfig, Transformer},
14    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15    event::Event,
16    gcp::{GcpAuthConfig, GcpAuthenticator, PUBSUB_URL, Scope},
17    http::HttpClient,
18    sinks::{
19        Healthcheck, UriParseSnafu, VectorSink,
20        gcs_common::config::healthcheck_response,
21        util::{
22            BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig,
23            http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
24        },
25    },
26    tls::{TlsConfig, TlsSettings},
27};
28
29#[derive(Debug, Snafu)]
30enum HealthcheckError {
31    #[snafu(display("Configured topic not found"))]
32    TopicNotFound,
33}
34
35// 10MB maximum message size: https://cloud.google.com/pubsub/quotas#resource_limits
36const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
37
38#[derive(Clone, Copy, Debug, Default)]
39pub struct PubsubDefaultBatchSettings;
40
41impl SinkBatchSettings for PubsubDefaultBatchSettings {
42    const MAX_EVENTS: Option<usize> = Some(1000);
43    const MAX_BYTES: Option<usize> = Some(10_000_000);
44    const TIMEOUT_SECS: f64 = 1.0;
45}
46
47/// Configuration for the `gcp_pubsub` sink.
48#[configurable_component(sink(
49    "gcp_pubsub",
50    "Publish observability events to GCP's Pub/Sub messaging system."
51))]
52#[derive(Clone, Debug)]
53pub struct PubsubConfig {
54    /// The project name to which to publish events.
55    #[configurable(metadata(docs::examples = "vector-123456"))]
56    pub project: String,
57
58    /// The topic within the project to which to publish events.
59    #[configurable(metadata(docs::examples = "this-is-a-topic"))]
60    pub topic: String,
61
62    /// The endpoint to which to publish events.
63    ///
64    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
65    /// by the [`GCP Pub/Sub`][pubsub_api] API are used.
66    ///
67    /// The trailing slash `/` must not be included.
68    ///
69    /// [pubsub_api]: https://cloud.google.com/pubsub/docs/reference/rest
70    #[serde(default = "default_endpoint")]
71    #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
72    pub endpoint: String,
73
74    #[serde(default, flatten)]
75    pub auth: GcpAuthConfig,
76
77    #[configurable(derived)]
78    #[serde(default)]
79    pub batch: BatchConfig<PubsubDefaultBatchSettings>,
80
81    #[configurable(derived)]
82    #[serde(default)]
83    pub request: TowerRequestConfig,
84
85    #[configurable(derived)]
86    encoding: EncodingConfig,
87
88    #[configurable(derived)]
89    #[serde(default)]
90    pub tls: Option<TlsConfig>,
91
92    #[configurable(derived)]
93    #[serde(
94        default,
95        deserialize_with = "crate::serde::bool_or_struct",
96        skip_serializing_if = "crate::serde::is_default"
97    )]
98    acknowledgements: AcknowledgementsConfig,
99}
100
101fn default_endpoint() -> String {
102    PUBSUB_URL.to_string()
103}
104
105impl GenerateConfig for PubsubConfig {
106    fn generate_config() -> toml::Value {
107        toml::from_str(indoc! {r#"
108            project = "my-project"
109            topic = "my-topic"
110            encoding.codec = "json"
111        "#})
112        .unwrap()
113    }
114}
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "gcp_pubsub")]
118impl SinkConfig for PubsubConfig {
119    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
120        let sink = PubsubSink::from_config(self).await?;
121        let batch_settings = self
122            .batch
123            .validate()?
124            .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
125            .into_batch_settings()?;
126        let request_settings = self.request.into_settings();
127        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
128        let client = HttpClient::new(tls_settings, cx.proxy())?;
129
130        let healthcheck = healthcheck(client.clone(), sink.uri("")?, sink.auth.clone()).boxed();
131        sink.auth.spawn_regenerate_token();
132
133        let sink = BatchedHttpSink::new(
134            sink,
135            JsonArrayBuffer::new(batch_settings.size),
136            request_settings,
137            batch_settings.timeout,
138            client,
139        )
140        .sink_map_err(|error| error!(message = "Fatal gcp_pubsub sink error.", %error));
141
142        #[allow(deprecated)]
143        Ok((VectorSink::from_event_sink(sink), healthcheck))
144    }
145
146    fn input(&self) -> Input {
147        Input::new(self.encoding.config().input_type())
148    }
149
150    fn acknowledgements(&self) -> &AcknowledgementsConfig {
151        &self.acknowledgements
152    }
153}
154
155struct PubsubSink {
156    auth: GcpAuthenticator,
157    uri_base: String,
158    transformer: Transformer,
159    encoder: Encoder<()>,
160}
161
162impl PubsubSink {
163    async fn from_config(config: &PubsubConfig) -> crate::Result<Self> {
164        // We only need to load the credentials if we are not targeting an emulator.
165        let auth = config.auth.build(Scope::PubSub).await?;
166
167        let uri_base = format!(
168            "{}/v1/projects/{}/topics/{}",
169            config.endpoint, config.project, config.topic,
170        );
171
172        let transformer = config.encoding.transformer();
173        let serializer = config.encoding.build()?;
174        let encoder = Encoder::<()>::new(serializer);
175
176        Ok(Self {
177            auth,
178            uri_base,
179            transformer,
180            encoder,
181        })
182    }
183
184    fn uri(&self, suffix: &str) -> crate::Result<Uri> {
185        let uri = format!("{}{}", self.uri_base, suffix);
186        let mut uri = uri.parse::<Uri>().context(UriParseSnafu)?;
187        self.auth.apply_uri(&mut uri);
188        Ok(uri)
189    }
190}
191
192struct PubSubSinkEventEncoder {
193    transformer: Transformer,
194    encoder: Encoder<()>,
195}
196
197impl HttpEventEncoder<Value> for PubSubSinkEventEncoder {
198    fn encode_event(&mut self, mut event: Event) -> Option<Value> {
199        self.transformer.transform(&mut event);
200        let mut bytes = BytesMut::new();
201        // Errors are handled by `Encoder`.
202        self.encoder.encode(event, &mut bytes).ok()?;
203        // Each event needs to be base64 encoded, and put into a JSON object
204        // as the `data` item.
205        Some(json!({ "data": BASE64_STANDARD.encode(&bytes) }))
206    }
207}
208
209impl HttpSink for PubsubSink {
210    type Input = Value;
211    type Output = Vec<BoxedRawValue>;
212    type Encoder = PubSubSinkEventEncoder;
213
214    fn build_encoder(&self) -> Self::Encoder {
215        PubSubSinkEventEncoder {
216            transformer: self.transformer.clone(),
217            encoder: self.encoder.clone(),
218        }
219    }
220
221    async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
222        let body = json!({ "messages": events });
223        let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
224
225        let uri = self.uri(":publish").unwrap();
226        let builder = Request::post(uri).header("Content-Type", "application/json");
227
228        let mut request = builder.body(body).unwrap();
229        self.auth.apply(&mut request);
230
231        Ok(request)
232    }
233}
234
235async fn healthcheck(client: HttpClient, uri: Uri, auth: GcpAuthenticator) -> crate::Result<()> {
236    let mut request = Request::get(uri).body(Body::empty()).unwrap();
237    auth.apply(&mut request);
238
239    let response = client.send(request).await?;
240    healthcheck_response(response, HealthcheckError::TopicNotFound.into())
241}
242
243#[cfg(test)]
244mod tests {
245    use indoc::indoc;
246
247    use super::*;
248
249    #[test]
250    fn generate_config() {
251        crate::test_util::test_generate_config::<PubsubConfig>();
252    }
253
254    #[tokio::test]
255    async fn fails_missing_creds() {
256        let config: PubsubConfig = toml::from_str(indoc! {r#"
257                project = "project"
258                topic = "topic"
259                encoding.codec = "json"
260            "#})
261        .unwrap();
262        if config.build(SinkContext::default()).await.is_ok() {
263            panic!("config.build failed to error");
264        }
265    }
266}
267
268#[cfg(all(test, feature = "gcp-integration-tests"))]
269mod integration_tests {
270    use reqwest::{Client, Method, Response};
271    use serde::{Deserialize, Serialize};
272    use serde_json::{Value, json};
273    use vector_lib::{
274        codecs::JsonSerializerConfig,
275        event::{BatchNotifier, BatchStatus},
276    };
277
278    use super::*;
279    use crate::{
280        gcp,
281        test_util::{
282            components::{
283                COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS, run_and_assert_sink_compliance,
284                run_and_assert_sink_error,
285            },
286            random_events_with_stream, random_metrics_with_stream, random_string, trace_init,
287        },
288    };
289
290    const PROJECT: &str = "testproject";
291
292    fn config(topic: &str) -> PubsubConfig {
293        PubsubConfig {
294            project: PROJECT.into(),
295            topic: topic.into(),
296            endpoint: gcp::PUBSUB_ADDRESS.clone(),
297            auth: GcpAuthConfig {
298                skip_authentication: true,
299                ..Default::default()
300            },
301            batch: Default::default(),
302            request: Default::default(),
303            encoding: JsonSerializerConfig::default().into(),
304            tls: Default::default(),
305            acknowledgements: Default::default(),
306        }
307    }
308
309    async fn config_build(topic: &str) -> (VectorSink, crate::sinks::Healthcheck) {
310        let cx = SinkContext::default();
311        config(topic).build(cx).await.expect("Building sink failed")
312    }
313
314    #[tokio::test]
315    async fn publish_metrics() {
316        trace_init();
317
318        let (topic, subscription) = create_topic_subscription().await;
319        let (sink, healthcheck) = config_build(&topic).await;
320
321        healthcheck.await.expect("Health check failed");
322
323        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
324        let (input, events) = random_metrics_with_stream(100, Some(batch), None);
325        run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
326        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
327
328        let response = pull_messages(&subscription, 1000).await;
329        let messages = response
330            .receivedMessages
331            .as_ref()
332            .expect("Response is missing messages");
333        assert_eq!(input.len(), messages.len());
334        for i in 0..input.len() {
335            let data = messages[i].message.decode_data_as_value();
336            let data = serde_json::to_value(data).unwrap();
337            let expected = serde_json::to_value(input[i].as_metric()).unwrap();
338            assert_eq!(data, expected);
339        }
340    }
341
342    #[tokio::test]
343    async fn publish_events() {
344        trace_init();
345
346        let (topic, subscription) = create_topic_subscription().await;
347        let (sink, healthcheck) = config_build(&topic).await;
348
349        healthcheck.await.expect("Health check failed");
350
351        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
352        let (input, events) = random_events_with_stream(100, 100, Some(batch));
353        run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
354        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
355
356        let response = pull_messages(&subscription, 1000).await;
357        let messages = response
358            .receivedMessages
359            .as_ref()
360            .expect("Response is missing messages");
361        assert_eq!(input.len(), messages.len());
362        for i in 0..input.len() {
363            let data = messages[i].message.decode_data();
364            let data = serde_json::to_value(data).unwrap();
365            let expected =
366                serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
367            assert_eq!(data, expected);
368        }
369    }
370
371    #[tokio::test]
372    async fn publish_events_broken_topic() {
373        trace_init();
374
375        let (topic, _subscription) = create_topic_subscription().await;
376        let (sink, _healthcheck) = config_build(&format!("BREAK{topic}BREAK")).await;
377        // Explicitly skip healthcheck
378
379        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
380        let (_input, events) = random_events_with_stream(100, 100, Some(batch));
381        run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
382        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
383    }
384
385    #[tokio::test]
386    async fn checks_for_valid_topic() {
387        trace_init();
388
389        let (topic, _subscription) = create_topic_subscription().await;
390        let topic = format!("BAD{topic}");
391        let (_sink, healthcheck) = config_build(&topic).await;
392        healthcheck.await.expect_err("Health check did not fail");
393    }
394
395    async fn create_topic_subscription() -> (String, String) {
396        let topic = format!("topic-{}", random_string(10));
397        let subscription = format!("subscription-{}", random_string(10));
398        request(Method::PUT, &format!("topics/{topic}"), json!({}))
399            .await
400            .json::<Value>()
401            .await
402            .expect("Creating new topic failed");
403        request(
404            Method::PUT,
405            &format!("subscriptions/{subscription}"),
406            json!({ "topic": format!("projects/{}/topics/{}", PROJECT, topic) }),
407        )
408        .await
409        .json::<Value>()
410        .await
411        .expect("Creating new subscription failed");
412        (topic, subscription)
413    }
414
415    async fn request(method: Method, path: &str, json: Value) -> Response {
416        let url = format!("{}/v1/projects/{}/{}", *gcp::PUBSUB_ADDRESS, PROJECT, path);
417        Client::new()
418            .request(method.clone(), &url)
419            .json(&json)
420            .send()
421            .await
422            .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
423    }
424
425    async fn pull_messages(subscription: &str, count: usize) -> PullResponse {
426        request(
427            Method::POST,
428            &format!("subscriptions/{subscription}:pull"),
429            json!({
430                "returnImmediately": true,
431                "maxMessages": count
432            }),
433        )
434        .await
435        .json::<PullResponse>()
436        .await
437        .expect("Extracting pull data failed")
438    }
439
440    #[derive(Debug, Deserialize)]
441    #[allow(non_snake_case)]
442    struct PullResponse {
443        receivedMessages: Option<Vec<PullMessageOuter>>,
444    }
445
446    #[derive(Debug, Deserialize)]
447    #[allow(non_snake_case)]
448    #[allow(dead_code)] // deserialize all fields
449    struct PullMessageOuter {
450        ackId: String,
451        message: PullMessage,
452    }
453
454    #[derive(Debug, Deserialize)]
455    #[allow(non_snake_case)]
456    #[allow(dead_code)] // deserialize all fields
457    struct PullMessage {
458        data: String,
459        messageId: String,
460        publishTime: String,
461    }
462
463    impl PullMessage {
464        fn decode_data(&self) -> TestMessage {
465            let data = BASE64_STANDARD
466                .decode(&self.data)
467                .expect("Invalid base64 data");
468            let data = String::from_utf8_lossy(&data);
469            serde_json::from_str(&data).expect("Invalid message structure")
470        }
471
472        fn decode_data_as_value(&self) -> Value {
473            let data = BASE64_STANDARD
474                .decode(&self.data)
475                .expect("Invalid base64 data");
476            let data = String::from_utf8_lossy(&data);
477            serde_json::from_str(&data).expect("Invalid json")
478        }
479    }
480
481    #[derive(Debug, Deserialize, Serialize)]
482    struct TestMessage {
483        timestamp: String,
484        message: String,
485    }
486}