vector/sinks/gcp/
pubsub.rs

1use base64::prelude::{Engine as _, BASE64_STANDARD};
2use bytes::{Bytes, BytesMut};
3use futures::{FutureExt, SinkExt};
4use http::{Request, Uri};
5use hyper::Body;
6use indoc::indoc;
7use serde_json::{json, Value};
8use snafu::{ResultExt, Snafu};
9use tokio_util::codec::Encoder as _;
10use vector_lib::configurable::configurable_component;
11
12use crate::{
13    codecs::{Encoder, EncodingConfig, Transformer},
14    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15    event::Event,
16    gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
17    http::HttpClient,
18    sinks::{
19        gcs_common::config::healthcheck_response,
20        util::{
21            http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
22            BatchConfig, BoxedRawValue, JsonArrayBuffer, SinkBatchSettings, TowerRequestConfig,
23        },
24        Healthcheck, UriParseSnafu, VectorSink,
25    },
26    tls::{TlsConfig, TlsSettings},
27};
28
29#[derive(Debug, Snafu)]
30enum HealthcheckError {
31    #[snafu(display("Configured topic not found"))]
32    TopicNotFound,
33}
34
35// 10MB maximum message size: https://cloud.google.com/pubsub/quotas#resource_limits
36const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
37
38#[derive(Clone, Copy, Debug, Default)]
39pub struct PubsubDefaultBatchSettings;
40
41impl SinkBatchSettings for PubsubDefaultBatchSettings {
42    const MAX_EVENTS: Option<usize> = Some(1000);
43    const MAX_BYTES: Option<usize> = Some(10_000_000);
44    const TIMEOUT_SECS: f64 = 1.0;
45}
46
47/// Configuration for the `gcp_pubsub` sink.
48#[configurable_component(sink(
49    "gcp_pubsub",
50    "Publish observability events to GCP's Pub/Sub messaging system."
51))]
52#[derive(Clone, Debug)]
53pub struct PubsubConfig {
54    /// The project name to which to publish events.
55    #[configurable(metadata(docs::examples = "vector-123456"))]
56    pub project: String,
57
58    /// The topic within the project to which to publish events.
59    #[configurable(metadata(docs::examples = "this-is-a-topic"))]
60    pub topic: String,
61
62    /// The endpoint to which to publish events.
63    ///
64    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
65    /// by the [`GCP Pub/Sub`][pubsub_api] API are used.
66    ///
67    /// The trailing slash `/` must not be included.
68    ///
69    /// [pubsub_api]: https://cloud.google.com/pubsub/docs/reference/rest
70    #[serde(default = "default_endpoint")]
71    #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
72    pub endpoint: String,
73
74    #[serde(default, flatten)]
75    pub auth: GcpAuthConfig,
76
77    #[configurable(derived)]
78    #[serde(default)]
79    pub batch: BatchConfig<PubsubDefaultBatchSettings>,
80
81    #[configurable(derived)]
82    #[serde(default)]
83    pub request: TowerRequestConfig,
84
85    #[configurable(derived)]
86    encoding: EncodingConfig,
87
88    #[configurable(derived)]
89    #[serde(default)]
90    pub tls: Option<TlsConfig>,
91
92    #[configurable(derived)]
93    #[serde(
94        default,
95        deserialize_with = "crate::serde::bool_or_struct",
96        skip_serializing_if = "crate::serde::is_default"
97    )]
98    acknowledgements: AcknowledgementsConfig,
99}
100
101fn default_endpoint() -> String {
102    PUBSUB_URL.to_string()
103}
104
105impl GenerateConfig for PubsubConfig {
106    fn generate_config() -> toml::Value {
107        toml::from_str(indoc! {r#"
108            project = "my-project"
109            topic = "my-topic"
110            encoding.codec = "json"
111        "#})
112        .unwrap()
113    }
114}
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "gcp_pubsub")]
118impl SinkConfig for PubsubConfig {
119    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
120        let sink = PubsubSink::from_config(self).await?;
121        let batch_settings = self
122            .batch
123            .validate()?
124            .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
125            .into_batch_settings()?;
126        let request_settings = self.request.into_settings();
127        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
128        let client = HttpClient::new(tls_settings, cx.proxy())?;
129
130        let healthcheck = healthcheck(client.clone(), sink.uri("")?, sink.auth.clone()).boxed();
131        sink.auth.spawn_regenerate_token();
132
133        let sink = BatchedHttpSink::new(
134            sink,
135            JsonArrayBuffer::new(batch_settings.size),
136            request_settings,
137            batch_settings.timeout,
138            client,
139        )
140        .sink_map_err(|error| error!(message = "Fatal gcp_pubsub sink error.", %error));
141
142        #[allow(deprecated)]
143        Ok((VectorSink::from_event_sink(sink), healthcheck))
144    }
145
146    fn input(&self) -> Input {
147        Input::new(self.encoding.config().input_type())
148    }
149
150    fn acknowledgements(&self) -> &AcknowledgementsConfig {
151        &self.acknowledgements
152    }
153}
154
155struct PubsubSink {
156    auth: GcpAuthenticator,
157    uri_base: String,
158    transformer: Transformer,
159    encoder: Encoder<()>,
160}
161
162impl PubsubSink {
163    async fn from_config(config: &PubsubConfig) -> crate::Result<Self> {
164        // We only need to load the credentials if we are not targeting an emulator.
165        let auth = config.auth.build(Scope::PubSub).await?;
166
167        let uri_base = format!(
168            "{}/v1/projects/{}/topics/{}",
169            config.endpoint, config.project, config.topic,
170        );
171
172        let transformer = config.encoding.transformer();
173        let serializer = config.encoding.build()?;
174        let encoder = Encoder::<()>::new(serializer);
175
176        Ok(Self {
177            auth,
178            uri_base,
179            transformer,
180            encoder,
181        })
182    }
183
184    fn uri(&self, suffix: &str) -> crate::Result<Uri> {
185        let uri = format!("{}{}", self.uri_base, suffix);
186        let mut uri = uri.parse::<Uri>().context(UriParseSnafu)?;
187        self.auth.apply_uri(&mut uri);
188        Ok(uri)
189    }
190}
191
192struct PubSubSinkEventEncoder {
193    transformer: Transformer,
194    encoder: Encoder<()>,
195}
196
197impl HttpEventEncoder<Value> for PubSubSinkEventEncoder {
198    fn encode_event(&mut self, mut event: Event) -> Option<Value> {
199        self.transformer.transform(&mut event);
200        let mut bytes = BytesMut::new();
201        // Errors are handled by `Encoder`.
202        self.encoder.encode(event, &mut bytes).ok()?;
203        // Each event needs to be base64 encoded, and put into a JSON object
204        // as the `data` item.
205        Some(json!({ "data": BASE64_STANDARD.encode(&bytes) }))
206    }
207}
208
209impl HttpSink for PubsubSink {
210    type Input = Value;
211    type Output = Vec<BoxedRawValue>;
212    type Encoder = PubSubSinkEventEncoder;
213
214    fn build_encoder(&self) -> Self::Encoder {
215        PubSubSinkEventEncoder {
216            transformer: self.transformer.clone(),
217            encoder: self.encoder.clone(),
218        }
219    }
220
221    async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
222        let body = json!({ "messages": events });
223        let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
224
225        let uri = self.uri(":publish").unwrap();
226        let builder = Request::post(uri).header("Content-Type", "application/json");
227
228        let mut request = builder.body(body).unwrap();
229        self.auth.apply(&mut request);
230
231        Ok(request)
232    }
233}
234
235async fn healthcheck(client: HttpClient, uri: Uri, auth: GcpAuthenticator) -> crate::Result<()> {
236    let mut request = Request::get(uri).body(Body::empty()).unwrap();
237    auth.apply(&mut request);
238
239    let response = client.send(request).await?;
240    healthcheck_response(response, HealthcheckError::TopicNotFound.into())
241}
242
243#[cfg(test)]
244mod tests {
245    use indoc::indoc;
246
247    use super::*;
248
249    #[test]
250    fn generate_config() {
251        crate::test_util::test_generate_config::<PubsubConfig>();
252    }
253
254    #[tokio::test]
255    async fn fails_missing_creds() {
256        let config: PubsubConfig = toml::from_str(indoc! {r#"
257                project = "project"
258                topic = "topic"
259                encoding.codec = "json"
260            "#})
261        .unwrap();
262        if config.build(SinkContext::default()).await.is_ok() {
263            panic!("config.build failed to error");
264        }
265    }
266}
267
268#[cfg(all(test, feature = "gcp-integration-tests"))]
269mod integration_tests {
270    use reqwest::{Client, Method, Response};
271    use serde::{Deserialize, Serialize};
272    use serde_json::{json, Value};
273    use vector_lib::codecs::JsonSerializerConfig;
274    use vector_lib::event::{BatchNotifier, BatchStatus};
275
276    use super::*;
277    use crate::gcp;
278    use crate::test_util::components::{run_and_assert_sink_error, COMPONENT_ERROR_TAGS};
279    use crate::test_util::{
280        components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
281        random_events_with_stream, random_metrics_with_stream, random_string, trace_init,
282    };
283
284    const PROJECT: &str = "testproject";
285
286    fn config(topic: &str) -> PubsubConfig {
287        PubsubConfig {
288            project: PROJECT.into(),
289            topic: topic.into(),
290            endpoint: gcp::PUBSUB_ADDRESS.clone(),
291            auth: GcpAuthConfig {
292                skip_authentication: true,
293                ..Default::default()
294            },
295            batch: Default::default(),
296            request: Default::default(),
297            encoding: JsonSerializerConfig::default().into(),
298            tls: Default::default(),
299            acknowledgements: Default::default(),
300        }
301    }
302
303    async fn config_build(topic: &str) -> (VectorSink, crate::sinks::Healthcheck) {
304        let cx = SinkContext::default();
305        config(topic).build(cx).await.expect("Building sink failed")
306    }
307
308    #[tokio::test]
309    async fn publish_metrics() {
310        trace_init();
311
312        let (topic, subscription) = create_topic_subscription().await;
313        let (sink, healthcheck) = config_build(&topic).await;
314
315        healthcheck.await.expect("Health check failed");
316
317        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
318        let (input, events) = random_metrics_with_stream(100, Some(batch), None);
319        run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
320        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
321
322        let response = pull_messages(&subscription, 1000).await;
323        let messages = response
324            .receivedMessages
325            .as_ref()
326            .expect("Response is missing messages");
327        assert_eq!(input.len(), messages.len());
328        for i in 0..input.len() {
329            let data = messages[i].message.decode_data_as_value();
330            let data = serde_json::to_value(data).unwrap();
331            let expected = serde_json::to_value(input[i].as_metric()).unwrap();
332            assert_eq!(data, expected);
333        }
334    }
335
336    #[tokio::test]
337    async fn publish_events() {
338        trace_init();
339
340        let (topic, subscription) = create_topic_subscription().await;
341        let (sink, healthcheck) = config_build(&topic).await;
342
343        healthcheck.await.expect("Health check failed");
344
345        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
346        let (input, events) = random_events_with_stream(100, 100, Some(batch));
347        run_and_assert_sink_compliance(sink, events, &HTTP_SINK_TAGS).await;
348        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
349
350        let response = pull_messages(&subscription, 1000).await;
351        let messages = response
352            .receivedMessages
353            .as_ref()
354            .expect("Response is missing messages");
355        assert_eq!(input.len(), messages.len());
356        for i in 0..input.len() {
357            let data = messages[i].message.decode_data();
358            let data = serde_json::to_value(data).unwrap();
359            let expected =
360                serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
361            assert_eq!(data, expected);
362        }
363    }
364
365    #[tokio::test]
366    async fn publish_events_broken_topic() {
367        trace_init();
368
369        let (topic, _subscription) = create_topic_subscription().await;
370        let (sink, _healthcheck) = config_build(&format!("BREAK{topic}BREAK")).await;
371        // Explicitly skip healthcheck
372
373        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
374        let (_input, events) = random_events_with_stream(100, 100, Some(batch));
375        run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
376        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
377    }
378
379    #[tokio::test]
380    async fn checks_for_valid_topic() {
381        trace_init();
382
383        let (topic, _subscription) = create_topic_subscription().await;
384        let topic = format!("BAD{topic}");
385        let (_sink, healthcheck) = config_build(&topic).await;
386        healthcheck.await.expect_err("Health check did not fail");
387    }
388
389    async fn create_topic_subscription() -> (String, String) {
390        let topic = format!("topic-{}", random_string(10));
391        let subscription = format!("subscription-{}", random_string(10));
392        request(Method::PUT, &format!("topics/{topic}"), json!({}))
393            .await
394            .json::<Value>()
395            .await
396            .expect("Creating new topic failed");
397        request(
398            Method::PUT,
399            &format!("subscriptions/{subscription}"),
400            json!({ "topic": format!("projects/{}/topics/{}", PROJECT, topic) }),
401        )
402        .await
403        .json::<Value>()
404        .await
405        .expect("Creating new subscription failed");
406        (topic, subscription)
407    }
408
409    async fn request(method: Method, path: &str, json: Value) -> Response {
410        let url = format!("{}/v1/projects/{}/{}", *gcp::PUBSUB_ADDRESS, PROJECT, path);
411        Client::new()
412            .request(method.clone(), &url)
413            .json(&json)
414            .send()
415            .await
416            .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
417    }
418
419    async fn pull_messages(subscription: &str, count: usize) -> PullResponse {
420        request(
421            Method::POST,
422            &format!("subscriptions/{subscription}:pull"),
423            json!({
424                "returnImmediately": true,
425                "maxMessages": count
426            }),
427        )
428        .await
429        .json::<PullResponse>()
430        .await
431        .expect("Extracting pull data failed")
432    }
433
434    #[derive(Debug, Deserialize)]
435    #[allow(non_snake_case)]
436    struct PullResponse {
437        receivedMessages: Option<Vec<PullMessageOuter>>,
438    }
439
440    #[derive(Debug, Deserialize)]
441    #[allow(non_snake_case)]
442    #[allow(dead_code)] // deserialize all fields
443    struct PullMessageOuter {
444        ackId: String,
445        message: PullMessage,
446    }
447
448    #[derive(Debug, Deserialize)]
449    #[allow(non_snake_case)]
450    #[allow(dead_code)] // deserialize all fields
451    struct PullMessage {
452        data: String,
453        messageId: String,
454        publishTime: String,
455    }
456
457    impl PullMessage {
458        fn decode_data(&self) -> TestMessage {
459            let data = BASE64_STANDARD
460                .decode(&self.data)
461                .expect("Invalid base64 data");
462            let data = String::from_utf8_lossy(&data);
463            serde_json::from_str(&data).expect("Invalid message structure")
464        }
465
466        fn decode_data_as_value(&self) -> Value {
467            let data = BASE64_STANDARD
468                .decode(&self.data)
469                .expect("Invalid base64 data");
470            let data = String::from_utf8_lossy(&data);
471            serde_json::from_str(&data).expect("Invalid json")
472        }
473    }
474
475    #[derive(Debug, Deserialize, Serialize)]
476    struct TestMessage {
477        timestamp: String,
478        message: String,
479    }
480}