vector/sources/
gcp_pubsub.rs

1use std::{
2    error::Error as _,
3    future::Future,
4    pin::Pin,
5    sync::{
6        Arc, LazyLock,
7        atomic::{AtomicBool, AtomicUsize, Ordering},
8    },
9    task::{Context, Poll},
10    time::Duration,
11};
12
13use chrono::DateTime;
14use derivative::Derivative;
15use futures::{FutureExt, Stream, StreamExt, TryFutureExt, stream, stream::FuturesUnordered};
16use http::uri::{InvalidUri, Scheme, Uri};
17use serde_with::serde_as;
18use snafu::{ResultExt, Snafu};
19use tokio::sync::{mpsc, watch};
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::{
22    Code, Request, Status,
23    metadata::MetadataValue,
24    transport::{Certificate, ClientTlsConfig, Endpoint, Identity},
25};
26use vector_lib::{
27    byte_size_of::ByteSizeOf,
28    codecs::decoding::{DeserializerConfig, FramingConfig},
29    config::{LegacyKey, LogNamespace},
30    configurable::configurable_component,
31    finalizer::UnorderedFinalizer,
32    internal_event::{
33        ByteSize, BytesReceived, EventsReceived, InternalEventHandle as _, Protocol, Registered,
34    },
35    lookup::owned_value_path,
36};
37use vrl::{
38    path,
39    value::{Kind, kind::Collection},
40};
41
42use crate::{
43    SourceSender,
44    codecs::{Decoder, DecodingConfig},
45    config::{DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
46    event::{BatchNotifier, BatchStatus, Event, MaybeAsLogMut, Value},
47    gcp::{GcpAuthConfig, GcpAuthenticator, PUBSUB_URL, Scope},
48    internal_events::{
49        GcpPubsubConnectError, GcpPubsubReceiveError, GcpPubsubStreamingPullError,
50        StreamClosedError,
51    },
52    serde::{bool_or_struct, default_decoding, default_framing_message_based},
53    shutdown::ShutdownSignal,
54    sources::util,
55    tls::{TlsConfig, TlsSettings},
56};
57
58const MIN_ACK_DEADLINE_SECS: u64 = 10;
59const MAX_ACK_DEADLINE_SECS: u64 = 600;
60
61// We use a bounded channel for the acknowledgement ID communication
62// between the request stream and receiver. During benchmark runs,
63// this channel had only a single element over 80% of the time and
64// rarely went over 8 elements. Having it too small does not introduce
65// deadlocks, as the worst case is slightly less efficient ack
66// processing.
67const ACK_QUEUE_SIZE: usize = 8;
68
69type Finalizer = UnorderedFinalizer<Vec<String>>;
70
71// prost emits some generated code that includes clones on `Arc`
72// objects, which causes a clippy ding on this block. We don't
73// directly control the generated code, so allow this lint here.
74#[allow(clippy::clone_on_ref_ptr)]
75// https://github.com/hyperium/tonic/issues/1350
76#[allow(clippy::missing_const_for_fn)]
77#[allow(warnings)]
78mod proto {
79    include!(concat!(env!("OUT_DIR"), "/google.pubsub.v1.rs"));
80
81    use vector_lib::ByteSizeOf;
82
83    impl ByteSizeOf for StreamingPullResponse {
84        fn allocated_bytes(&self) -> usize {
85            self.received_messages.size_of()
86        }
87    }
88
89    impl ByteSizeOf for ReceivedMessage {
90        fn allocated_bytes(&self) -> usize {
91            self.ack_id.size_of() + self.message.as_ref().map_or(0, ByteSizeOf::size_of)
92        }
93    }
94
95    impl ByteSizeOf for PubsubMessage {
96        fn allocated_bytes(&self) -> usize {
97            self.data.len()
98                + self.message_id.len()
99                + self.ordering_key.len()
100                + self.attributes.size_of()
101        }
102    }
103}
104
105#[derive(Debug, Snafu)]
106pub(crate) enum PubsubError {
107    #[snafu(display("Invalid endpoint URI: {}", source))]
108    Uri { source: InvalidUri },
109    #[snafu(display("Could not create endpoint: {}", source))]
110    Endpoint { source: tonic::transport::Error },
111    #[snafu(display("Could not set up endpoint TLS settings: {}", source))]
112    EndpointTls { source: tonic::transport::Error },
113    #[snafu(display(
114        "`ack_deadline_secs` is outside the valid range of {} to {}",
115        MIN_ACK_DEADLINE_SECS,
116        MAX_ACK_DEADLINE_SECS
117    ))]
118    InvalidAckDeadline,
119}
120
121static CLIENT_ID: LazyLock<String> = LazyLock::new(|| uuid::Uuid::new_v4().to_string());
122
123/// Configuration for the `gcp_pubsub` source.
124#[serde_as]
125#[configurable_component(source(
126    "gcp_pubsub",
127    "Fetch observability events from GCP's Pub/Sub messaging system."
128))]
129#[derive(Clone, Debug, Derivative)]
130#[derivative(Default)]
131#[serde(deny_unknown_fields)]
132pub struct PubsubConfig {
133    /// The project name from which to pull logs.
134    #[configurable(metadata(docs::examples = "my-log-source-project"))]
135    pub project: String,
136
137    /// The subscription within the project which is configured to receive logs.
138    #[configurable(metadata(docs::examples = "my-vector-source-subscription"))]
139    pub subscription: String,
140
141    /// The endpoint from which to pull data.
142    #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
143    #[serde(default = "default_endpoint")]
144    pub endpoint: String,
145
146    #[serde(flatten)]
147    pub auth: GcpAuthConfig,
148
149    #[configurable(derived)]
150    pub tls: Option<TlsConfig>,
151
152    /// The maximum number of concurrent stream connections to open at once.
153    #[serde(default = "default_max_concurrency")]
154    pub max_concurrency: usize,
155
156    /// The number of messages in a response to mark a stream as
157    /// "busy". This is used to determine if more streams should be
158    /// started.
159    ///
160    /// The GCP Pub/Sub servers send responses with 100 or more messages when
161    /// the subscription is busy.
162    #[serde(default = "default_full_response")]
163    pub full_response_size: usize,
164
165    /// How often to poll the currently active streams to see if they
166    /// are all busy and so open a new stream.
167    #[serde(default = "default_poll_time")]
168    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
169    #[configurable(metadata(docs::human_name = "Poll Time"))]
170    pub poll_time_seconds: Duration,
171
172    /// The acknowledgement deadline, in seconds, to use for this stream.
173    ///
174    /// Messages that are not acknowledged when this deadline expires may be retransmitted.
175    #[serde(default = "default_ack_deadline")]
176    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
177    #[configurable(metadata(docs::human_name = "Acknowledgement Deadline"))]
178    pub ack_deadline_secs: Duration,
179
180    /// The acknowledgement deadline, in seconds, to use for this stream.
181    ///
182    /// Messages that are not acknowledged when this deadline expires may be retransmitted.
183    #[configurable(
184        deprecated = "This option has been deprecated, use `ack_deadline_secs` instead."
185    )]
186    pub ack_deadline_seconds: Option<u16>,
187
188    /// The amount of time, in seconds, to wait between retry attempts after an error.
189    #[serde(default = "default_retry_delay")]
190    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
191    #[configurable(metadata(docs::human_name = "Retry Delay"))]
192    pub retry_delay_secs: Duration,
193
194    /// The amount of time, in seconds, to wait between retry attempts after an error.
195    #[configurable(
196        deprecated = "This option has been deprecated, use `retry_delay_secs` instead."
197    )]
198    pub retry_delay_seconds: Option<f64>,
199
200    /// The amount of time, in seconds, with no received activity
201    /// before sending a keepalive request. If this is set larger than
202    /// `60`, you may see periodic errors sent from the server.
203    #[serde(default = "default_keepalive")]
204    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
205    #[configurable(metadata(docs::human_name = "Keepalive"))]
206    pub keepalive_secs: Duration,
207
208    /// The namespace to use for logs. This overrides the global setting.
209    #[configurable(metadata(docs::hidden))]
210    #[serde(default)]
211    pub log_namespace: Option<bool>,
212
213    #[configurable(derived)]
214    #[serde(default = "default_framing_message_based")]
215    #[derivative(Default(value = "default_framing_message_based()"))]
216    pub framing: FramingConfig,
217
218    #[configurable(derived)]
219    #[serde(default = "default_decoding")]
220    #[derivative(Default(value = "default_decoding()"))]
221    pub decoding: DeserializerConfig,
222
223    #[configurable(derived)]
224    #[serde(default, deserialize_with = "bool_or_struct")]
225    pub acknowledgements: SourceAcknowledgementsConfig,
226}
227
228fn default_endpoint() -> String {
229    PUBSUB_URL.to_string()
230}
231
232const fn default_ack_deadline() -> Duration {
233    Duration::from_secs(600)
234}
235
236const fn default_retry_delay() -> Duration {
237    Duration::from_secs(1)
238}
239
240const fn default_keepalive() -> Duration {
241    Duration::from_secs(60)
242}
243
244const fn default_max_concurrency() -> usize {
245    10
246}
247
248const fn default_full_response() -> usize {
249    100
250}
251
252const fn default_poll_time() -> Duration {
253    Duration::from_secs(2)
254}
255
256#[async_trait::async_trait]
257#[typetag::serde(name = "gcp_pubsub")]
258impl SourceConfig for PubsubConfig {
259    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
260        let log_namespace = cx.log_namespace(self.log_namespace);
261        let ack_deadline_secs = match self.ack_deadline_seconds {
262            None => self.ack_deadline_secs,
263            Some(ads) => {
264                warn!(
265                    "The `ack_deadline_seconds` setting is deprecated, use `ack_deadline_secs` instead."
266                );
267                Duration::from_secs(ads as u64)
268            }
269        };
270        if !(MIN_ACK_DEADLINE_SECS..=MAX_ACK_DEADLINE_SECS).contains(&ack_deadline_secs.as_secs()) {
271            return Err(PubsubError::InvalidAckDeadline.into());
272        }
273
274        let retry_delay_secs = match self.retry_delay_seconds {
275            None => self.retry_delay_secs,
276            Some(rds) => {
277                warn!(
278                    "The `retry_delay_seconds` setting is deprecated, use `retry_delay_secs` instead."
279                );
280                Duration::from_secs_f64(rds)
281            }
282        };
283
284        let auth = self.auth.build(Scope::PubSub).await?;
285
286        let mut uri: Uri = self.endpoint.parse().context(UriSnafu)?;
287        auth.apply_uri(&mut uri);
288
289        let tls = TlsSettings::from_options(self.tls.as_ref())?;
290        let host = uri.host().unwrap_or("pubsub.googleapis.com");
291        let mut tls_config = ClientTlsConfig::new().domain_name(host);
292        if let Some((cert, key)) = tls.identity_pem() {
293            tls_config = tls_config.identity(Identity::from_pem(cert, key));
294        }
295        for authority in tls.authorities_pem() {
296            tls_config = tls_config.ca_certificate(Certificate::from_pem(authority));
297        }
298
299        let mut endpoint: Endpoint = uri.to_string().parse().context(EndpointSnafu)?;
300        if uri.scheme() != Some(&Scheme::HTTP) {
301            endpoint = endpoint.tls_config(tls_config).context(EndpointTlsSnafu)?;
302        }
303
304        let token_generator = auth.spawn_regenerate_token();
305
306        let protocol = uri
307            .scheme()
308            .map(|scheme| Protocol(scheme.to_string().into()))
309            .unwrap_or(Protocol::HTTP);
310
311        let source = PubsubSource {
312            endpoint,
313            auth,
314            token_generator,
315            subscription: format!(
316                "projects/{}/subscriptions/{}",
317                self.project, self.subscription
318            ),
319            decoder: DecodingConfig::new(
320                self.framing.clone(),
321                self.decoding.clone(),
322                log_namespace,
323            )
324            .build()?,
325            acknowledgements: cx.do_acknowledgements(self.acknowledgements),
326            shutdown: cx.shutdown,
327            out: cx.out,
328            ack_deadline_secs,
329            retry_delay: retry_delay_secs,
330            keepalive: self.keepalive_secs,
331            concurrency: Default::default(),
332            full_response_size: self.full_response_size,
333            log_namespace,
334            bytes_received: register!(BytesReceived::from(protocol)),
335            events_received: register!(EventsReceived),
336        }
337        .run_all(self.max_concurrency, self.poll_time_seconds)
338        .map_err(|error| error!(message = "Source failed.", %error));
339        Ok(Box::pin(source))
340    }
341
342    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
343        let log_namespace = global_log_namespace.merge(self.log_namespace);
344        let schema_definition = self
345            .decoding
346            .schema_definition(log_namespace)
347            .with_standard_vector_source_metadata()
348            .with_source_metadata(
349                PubsubConfig::NAME,
350                Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
351                &owned_value_path!("timestamp"),
352                Kind::timestamp().or_undefined(),
353                Some("timestamp"),
354            )
355            .with_source_metadata(
356                PubsubConfig::NAME,
357                Some(LegacyKey::Overwrite(owned_value_path!("attributes"))),
358                &owned_value_path!("attributes"),
359                Kind::object(Collection::empty().with_unknown(Kind::bytes())),
360                None,
361            )
362            .with_source_metadata(
363                PubsubConfig::NAME,
364                Some(LegacyKey::Overwrite(owned_value_path!("message_id"))),
365                &owned_value_path!("message_id"),
366                Kind::bytes(),
367                None,
368            );
369
370        vec![SourceOutput::new_maybe_logs(
371            DataType::Log,
372            schema_definition,
373        )]
374    }
375
376    fn can_acknowledge(&self) -> bool {
377        true
378    }
379}
380
381impl_generate_config_from_default!(PubsubConfig);
382
383#[derive(Clone)]
384struct PubsubSource {
385    endpoint: Endpoint,
386    auth: GcpAuthenticator,
387    token_generator: watch::Receiver<()>,
388    subscription: String,
389    decoder: Decoder,
390    acknowledgements: bool,
391    ack_deadline_secs: Duration,
392    shutdown: ShutdownSignal,
393    out: SourceSender,
394    retry_delay: Duration,
395    keepalive: Duration,
396    // The current concurrency is shared across all tasks. It is used
397    // by the streams to avoid shutting down the last stream, which
398    // would result in repeatedly re-opening the stream on idle.
399    concurrency: Arc<AtomicUsize>,
400    full_response_size: usize,
401    log_namespace: LogNamespace,
402    bytes_received: Registered<BytesReceived>,
403    events_received: Registered<EventsReceived>,
404}
405
406enum State {
407    RetryNow,
408    RetryDelay,
409    Shutdown,
410}
411
412impl PubsubSource {
413    async fn run_all(mut self, max_concurrency: usize, poll_time: Duration) -> crate::Result<()> {
414        let mut tasks = FuturesUnordered::new();
415
416        loop {
417            self.concurrency.store(tasks.len(), Ordering::Relaxed);
418            tokio::select! {
419                _ = &mut self.shutdown => break,
420                _ = tasks.next() => {
421                    if tasks.is_empty() {
422                        // Either no tasks were started or a race
423                        // condition resulted in the last task
424                        // exiting. Start up a new stream immediately.
425                        self.start_one(&tasks);
426                    }
427
428                },
429                _ = tokio::time::sleep(poll_time) => {
430                    // If all of the tasks are marked as busy, start
431                    // up a new one.
432                    if tasks.len() < max_concurrency
433                        && tasks.iter().all(|task| task.busy_flag.load(Ordering::Relaxed))
434                    {
435                        self.start_one(&tasks);
436                    }
437                }
438            }
439        }
440
441        // Wait for all active streams to exit on shutdown
442        while tasks.next().await.is_some() {}
443
444        Ok(())
445    }
446
447    fn start_one(&self, tasks: &FuturesUnordered<Task>) {
448        info!(message = "Starting stream.", concurrency = tasks.len() + 1);
449        // The `busy_flag` is used to monitor the status of a
450        // stream. It will start marked as idle to prevent the above
451        // scan from spinning up too many at once. When a stream
452        // receives "full" batches, it will mark itself as busy, and
453        // when it has an idle interval it will mark itself as not
454        // busy.
455        let busy_flag = Arc::new(AtomicBool::new(false));
456        let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag)));
457        tasks.push(Task { task, busy_flag });
458    }
459
460    async fn run(mut self, busy_flag: Arc<AtomicBool>) {
461        loop {
462            match self.run_once(&busy_flag).await {
463                State::RetryNow => debug!("Retrying immediately."),
464                State::RetryDelay => {
465                    info!(
466                        timeout_secs = self.retry_delay.as_secs_f64(),
467                        "Retrying after timeout."
468                    );
469                    tokio::time::sleep(self.retry_delay).await;
470                }
471                State::Shutdown => break,
472            }
473        }
474    }
475
476    async fn run_once(&mut self, busy_flag: &Arc<AtomicBool>) -> State {
477        let connection = match self.endpoint.connect().await {
478            Ok(connection) => connection,
479            Err(error) => {
480                emit!(GcpPubsubConnectError { error });
481                return State::RetryDelay;
482            }
483        };
484
485        let mut client = proto::subscriber_client::SubscriberClient::with_interceptor(
486            connection,
487            |mut req: Request<()>| {
488                if let Some(token) = self.auth.make_token() {
489                    let authorization = MetadataValue::try_from(&token).map_err(|_| {
490                        Status::new(
491                            Code::FailedPrecondition,
492                            "Invalid token text returned by GCP",
493                        )
494                    })?;
495                    req.metadata_mut().insert("authorization", authorization);
496                }
497                Ok(req)
498            },
499        )
500        // Tonic added a default of 4MB in 0.9. This replaces the old behavior.
501        .max_decoding_message_size(usize::MAX);
502
503        let (ack_ids_sender, ack_ids_receiver) = mpsc::channel(ACK_QUEUE_SIZE);
504
505        // Handle shutdown during startup, the streaming pull doesn't
506        // start if there is no data in the subscription.
507        let request_stream = self.request_stream(ack_ids_receiver);
508        debug!("Starting streaming pull.");
509        let stream = tokio::select! {
510            _ = &mut self.shutdown => return State::Shutdown,
511            result = client.streaming_pull(request_stream) => match result {
512                Ok(stream) => stream,
513                Err(error) => {
514                    emit!(GcpPubsubStreamingPullError { error });
515                    return State::RetryDelay;
516                }
517            }
518        };
519        let mut stream = stream.into_inner();
520
521        let (finalizer, mut ack_stream) =
522            Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone()));
523        let mut pending_acks = 0;
524
525        loop {
526            tokio::select! {
527                biased;
528                receipts = ack_stream.next() => if let Some((status, receipts)) = receipts {
529                    pending_acks -= 1;
530                    if status == BatchStatus::Delivered {
531                        ack_ids_sender
532                            .send(receipts)
533                            .await
534                            .unwrap_or_else(|_| unreachable!("request stream never closes"));
535                    }
536                },
537                response = stream.next() => match response {
538                    Some(Ok(response)) => {
539                        self.handle_response(
540                            response,
541                            &finalizer,
542                            &ack_ids_sender,
543                            &mut pending_acks,
544                            busy_flag,
545                        ).await;
546                    }
547                    Some(Err(error)) => break translate_error(error),
548                    None => break State::RetryNow,
549                },
550                _ = &mut self.shutdown, if pending_acks == 0 => return State::Shutdown,
551                _ = self.token_generator.changed() => {
552                    debug!("New authentication token generated, restarting stream.");
553                    break State::RetryNow;
554                },
555                _ = tokio::time::sleep(self.keepalive) => {
556                    if pending_acks == 0 {
557                        // No pending acks, and no new data, so drop
558                        // this stream if we aren't the only active
559                        // one.
560                        if self.concurrency.load(Ordering::Relaxed) > 1 {
561                            info!("Shutting down inactive stream.");
562                            break State::Shutdown;
563                        }
564                        // Otherwise, mark this stream as idle.
565                        busy_flag.store(false, Ordering::Relaxed);
566                    }
567                    // GCP Pub/Sub likes to time out connections after
568                    // about 75 seconds of inactivity. To forestall
569                    // the resulting error, send an empty array of
570                    // acknowledgement IDs to the request stream if no
571                    // other activity has happened. This will result
572                    // in a new request with empty fields, effectively
573                    // a keepalive.
574                    ack_ids_sender
575                        .send(Vec::new())
576                        .await
577                        .unwrap_or_else(|_| unreachable!("request stream never closes"));
578                }
579            }
580        }
581    }
582
583    fn request_stream(
584        &self,
585        ack_ids: mpsc::Receiver<Vec<String>>,
586    ) -> impl Stream<Item = proto::StreamingPullRequest> + 'static + use<> {
587        let subscription = self.subscription.clone();
588        let client_id = CLIENT_ID.clone();
589        let stream_ack_deadline_seconds = self.ack_deadline_secs.as_secs() as i32;
590        let ack_ids = ReceiverStream::new(ack_ids).ready_chunks(ACK_QUEUE_SIZE);
591
592        stream::once(async move {
593            // These fields are only valid on the first request in the
594            // stream, and so must not be repeated below.
595            proto::StreamingPullRequest {
596                subscription,
597                client_id,
598                stream_ack_deadline_seconds,
599                ..Default::default()
600            }
601        })
602        .chain(ack_ids.map(|chunks| {
603            // These "requests" serve only to send updates about
604            // acknowledgements to the server. None of the above
605            // fields need to be repeated and, in fact, will cause
606            // an stream error and cancellation if they are
607            // present.
608            proto::StreamingPullRequest {
609                ack_ids: chunks.into_iter().flatten().collect(),
610                ..Default::default()
611            }
612        }))
613    }
614
615    async fn handle_response(
616        &mut self,
617        response: proto::StreamingPullResponse,
618        finalizer: &Option<Finalizer>,
619        ack_ids: &mpsc::Sender<Vec<String>>,
620        pending_acks: &mut usize,
621        busy_flag: &Arc<AtomicBool>,
622    ) {
623        if response.received_messages.len() >= self.full_response_size {
624            busy_flag.store(true, Ordering::Relaxed);
625        }
626        self.bytes_received.emit(ByteSize(response.size_of()));
627
628        let (batch, notifier) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
629        let (events, ids) = self.parse_messages(response.received_messages, batch).await;
630
631        let count = events.len();
632        match self.out.send_batch(events).await {
633            Err(_) => emit!(StreamClosedError { count }),
634            Ok(()) => match notifier {
635                None => ack_ids
636                    .send(ids)
637                    .await
638                    .unwrap_or_else(|_| unreachable!("request stream never closes")),
639                Some(notifier) => {
640                    finalizer
641                        .as_ref()
642                        .expect("Finalizer must have been set up for acknowledgements")
643                        .add(ids, notifier);
644                    *pending_acks += 1;
645                }
646            },
647        }
648    }
649
650    async fn parse_messages(
651        &self,
652        response: Vec<proto::ReceivedMessage>,
653        batch: Option<BatchNotifier>,
654    ) -> (Vec<Event>, Vec<String>) {
655        let mut ack_ids = Vec::with_capacity(response.len());
656        let events = response
657            .into_iter()
658            .flat_map(|received| {
659                ack_ids.push(received.ack_id);
660                received
661                    .message
662                    .map(|message| self.parse_message(message, &batch))
663            })
664            .flatten()
665            .collect();
666        (events, ack_ids)
667    }
668
669    fn parse_message<'a>(
670        &'a self,
671        message: proto::PubsubMessage,
672        batch: &'a Option<BatchNotifier>,
673    ) -> impl Iterator<Item = Event> + 'a {
674        let attributes = Value::Object(
675            message
676                .attributes
677                .into_iter()
678                .map(|(key, value)| (key.into(), Value::Bytes(value.into())))
679                .collect(),
680        );
681        let log_namespace = self.log_namespace;
682        util::decode_message(
683            self.decoder.clone(),
684            "gcp_pubsub",
685            &message.data,
686            message.publish_time.map(|dt| {
687                DateTime::from_timestamp(dt.seconds, dt.nanos as u32).expect("invalid timestamp")
688            }),
689            batch,
690            log_namespace,
691            &self.events_received,
692        )
693        .map(move |mut event| {
694            if let Some(log) = event.maybe_as_log_mut() {
695                log_namespace.insert_source_metadata(
696                    PubsubConfig::NAME,
697                    log,
698                    Some(LegacyKey::Overwrite(path!("message_id"))),
699                    path!("message_id"),
700                    message.message_id.clone(),
701                );
702                log_namespace.insert_source_metadata(
703                    PubsubConfig::NAME,
704                    log,
705                    Some(LegacyKey::Overwrite(path!("attributes"))),
706                    path!("attributes"),
707                    attributes.clone(),
708                )
709            }
710            event
711        })
712    }
713}
714
715fn translate_error(error: tonic::Status) -> State {
716    // GCP occasionally issues a connection reset
717    // in the middle of the streaming pull. This
718    // reset is not technically an error, so we
719    // want to retry immediately, but it is
720    // reported to us as an error from the
721    // underlying library (`tonic`).
722    if is_reset(&error) {
723        debug!("Stream reset by server.");
724        State::RetryNow
725    } else {
726        emit!(GcpPubsubReceiveError { error });
727        State::RetryDelay
728    }
729}
730
731fn is_reset(error: &Status) -> bool {
732    error
733        .source()
734        .and_then(|source| source.downcast_ref::<hyper::Error>())
735        .and_then(|error| error.source())
736        .and_then(|source| source.downcast_ref::<h2::Error>())
737        .is_some_and(|error| error.is_remote() && error.is_reset())
738}
739
740#[pin_project::pin_project]
741struct Task {
742    task: tokio::task::JoinHandle<()>,
743    busy_flag: Arc<AtomicBool>,
744}
745
746impl Future for Task {
747    type Output = Result<(), tokio::task::JoinError>;
748
749    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
750        self.task.poll_unpin(ctx)
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
757
758    use super::*;
759
760    #[test]
761    fn generate_config() {
762        crate::test_util::test_generate_config::<PubsubConfig>();
763    }
764
765    #[test]
766    fn output_schema_definition_vector_namespace() {
767        let config = PubsubConfig {
768            log_namespace: Some(true),
769            ..Default::default()
770        };
771
772        let definitions = config
773            .outputs(LogNamespace::Vector)
774            .remove(0)
775            .schema_definition(true);
776
777        let expected_definition =
778            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
779                .with_meaning(OwnedTargetPath::event_root(), "message")
780                .with_metadata_field(
781                    &owned_value_path!("vector", "source_type"),
782                    Kind::bytes(),
783                    None,
784                )
785                .with_metadata_field(
786                    &owned_value_path!("vector", "ingest_timestamp"),
787                    Kind::timestamp(),
788                    None,
789                )
790                .with_metadata_field(
791                    &owned_value_path!("gcp_pubsub", "timestamp"),
792                    Kind::timestamp().or_undefined(),
793                    Some("timestamp"),
794                )
795                .with_metadata_field(
796                    &owned_value_path!("gcp_pubsub", "attributes"),
797                    Kind::object(Collection::empty().with_unknown(Kind::bytes())),
798                    None,
799                )
800                .with_metadata_field(
801                    &owned_value_path!("gcp_pubsub", "message_id"),
802                    Kind::bytes(),
803                    None,
804                );
805
806        assert_eq!(definitions, Some(expected_definition));
807    }
808
809    #[test]
810    fn output_schema_definition_legacy_namespace() {
811        let config = PubsubConfig::default();
812
813        let definitions = config
814            .outputs(LogNamespace::Legacy)
815            .remove(0)
816            .schema_definition(true);
817
818        let expected_definition = Definition::new_with_default_metadata(
819            Kind::object(Collection::empty()),
820            [LogNamespace::Legacy],
821        )
822        .with_event_field(
823            &owned_value_path!("message"),
824            Kind::bytes(),
825            Some("message"),
826        )
827        .with_event_field(
828            &owned_value_path!("timestamp"),
829            Kind::timestamp().or_undefined(),
830            Some("timestamp"),
831        )
832        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
833        .with_event_field(
834            &owned_value_path!("attributes"),
835            Kind::object(Collection::empty().with_unknown(Kind::bytes())),
836            None,
837        )
838        .with_event_field(&owned_value_path!("message_id"), Kind::bytes(), None);
839
840        assert_eq!(definitions, Some(expected_definition));
841    }
842}
843
844#[cfg(all(test, feature = "gcp-integration-tests"))]
845mod integration_tests {
846    use std::{
847        collections::{BTreeMap, HashSet},
848        sync::LazyLock,
849    };
850
851    use base64::prelude::{BASE64_STANDARD, Engine as _};
852    use chrono::{DateTime, Utc};
853    use futures::{Stream, StreamExt};
854    use http::method::Method;
855    use hyper::{Request, StatusCode};
856    use serde_json::{Value, json};
857    use tokio::time::{Duration, Instant};
858    use vrl::btreemap;
859
860    use super::*;
861    use crate::{
862        SourceSender,
863        config::{ComponentKey, ProxyConfig},
864        event::EventStatus,
865        gcp,
866        http::HttpClient,
867        shutdown,
868        test_util::{
869            self, components,
870            components::{SOURCE_TAGS, assert_source_compliance},
871            random_string,
872        },
873    };
874
875    const PROJECT: &str = "sourceproject";
876    static PROJECT_URI: LazyLock<String> =
877        LazyLock::new(|| format!("{}/v1/projects/{}", *gcp::PUBSUB_ADDRESS, PROJECT));
878    static ACK_DEADLINE: LazyLock<Duration> = LazyLock::new(|| Duration::from_secs(10)); // Minimum custom deadline allowed by Pub/Sub
879
880    #[tokio::test]
881    async fn oneshot() {
882        assert_source_compliance(&SOURCE_TAGS, async move {
883            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
884            let test_data = tester.send_test_events(99, BTreeMap::new()).await;
885            receive_events(&mut rx, test_data).await;
886            tester.shutdown_check(shutdown).await;
887        })
888        .await;
889    }
890
891    #[tokio::test]
892    async fn shuts_down_before_data_received() {
893        let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
894
895        tester.shutdown(shutdown).await; // Not shutdown_check because this emits nothing
896
897        assert!(rx.next().await.is_none());
898        tester.send_test_events(1, BTreeMap::new()).await;
899        assert!(rx.next().await.is_none());
900        assert_eq!(tester.pull_count(1).await, 1);
901    }
902
903    #[tokio::test]
904    async fn shuts_down_after_data_received() {
905        assert_source_compliance(&SOURCE_TAGS, async move {
906            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
907
908            let test_data = tester.send_test_events(1, BTreeMap::new()).await;
909            receive_events(&mut rx, test_data).await;
910
911            tester.shutdown_check(shutdown).await;
912
913            assert!(rx.next().await.is_none());
914            tester.send_test_events(1, BTreeMap::new()).await;
915            assert!(rx.next().await.is_none());
916            // The following assert is there to test that the source isn't
917            // pulling anything out of the subscription after it reports
918            // shutdown. It works when there wasn't anything previously in
919            // the topic, but does not work here despite evidence that the
920            // entire tokio task has exited.
921            // assert_eq!(tester.pull_count(1).await, 1);
922        })
923        .await;
924    }
925
926    #[tokio::test]
927    async fn streams_data() {
928        assert_source_compliance(&SOURCE_TAGS, async move {
929            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
930            for _ in 0..10 {
931                let test_data = tester.send_test_events(9, BTreeMap::new()).await;
932                receive_events(&mut rx, test_data).await;
933            }
934            tester.shutdown_check(shutdown).await;
935        })
936        .await;
937    }
938
939    #[tokio::test]
940    async fn sends_attributes() {
941        assert_source_compliance(&SOURCE_TAGS, async move {
942            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
943            let attributes = btreemap![
944                random_string(8) => random_string(88),
945                random_string(8) => random_string(88),
946                random_string(8) => random_string(88),
947            ];
948            let test_data = tester.send_test_events(1, attributes).await;
949            receive_events(&mut rx, test_data).await;
950            tester.shutdown_check(shutdown).await;
951        })
952        .await;
953    }
954
955    #[tokio::test]
956    async fn acks_received() {
957        assert_source_compliance(&SOURCE_TAGS, async move {
958            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
959
960            let test_data = tester.send_test_events(1, BTreeMap::new()).await;
961            receive_events(&mut rx, test_data).await;
962
963            tester.shutdown_check(shutdown).await;
964
965            // Make sure there are no messages left in the queue
966            assert_eq!(tester.pull_count(10).await, 0);
967
968            // Wait for the acknowledgement deadline to expire
969            tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
970
971            // All messages are still acknowledged
972            assert_eq!(tester.pull_count(10).await, 0);
973        })
974        .await;
975    }
976
977    #[tokio::test]
978    // I have verified manually that the streaming code above omits the
979    // acknowledgements when events are rejected, but have been unable
980    // to verify the events are not acknowledged through the emulator.
981    #[ignore]
982    async fn does_not_ack_rejected() {
983        assert_source_compliance(&SOURCE_TAGS, async {
984            let (tester, mut rx, shutdown) = setup(EventStatus::Rejected).await;
985
986            let test_data = tester.send_test_events(1, BTreeMap::new()).await;
987            receive_events(&mut rx, test_data).await;
988
989            tester.shutdown(shutdown).await;
990
991            // Make sure there are no messages left in the queue
992            assert_eq!(tester.pull_count(10).await, 0);
993
994            // Wait for the acknowledgement deadline to expire
995            tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
996
997            // All messages are still in the queue
998            assert_eq!(tester.pull_count(10).await, 1);
999        })
1000        .await;
1001    }
1002
1003    async fn setup(
1004        status: EventStatus,
1005    ) -> (
1006        Tester,
1007        impl Stream<Item = Event> + Unpin,
1008        shutdown::SourceShutdownCoordinator,
1009    ) {
1010        components::init_test();
1011
1012        let tls_settings = TlsSettings::from_options(None).unwrap();
1013        let client = HttpClient::new(tls_settings, &ProxyConfig::default()).unwrap();
1014        let tester = Tester::new(client).await;
1015
1016        let (rx, shutdown) = tester.spawn_source(status).await;
1017
1018        (tester, rx, shutdown)
1019    }
1020
1021    fn now_trunc() -> DateTime<Utc> {
1022        let start = Utc::now().timestamp();
1023        // Truncate the milliseconds portion, the hard way.
1024        DateTime::from_timestamp(start, 0).expect("invalid timestamp")
1025    }
1026
1027    struct Tester {
1028        client: HttpClient,
1029        topic: String,
1030        subscription: String,
1031        component: ComponentKey,
1032    }
1033
1034    struct TestData {
1035        lines: Vec<String>,
1036        start: DateTime<Utc>,
1037        attributes: BTreeMap<String, String>,
1038    }
1039
1040    impl Tester {
1041        async fn new(client: HttpClient) -> Self {
1042            let this = Self {
1043                client,
1044                topic: format!("topic-{}", random_string(10).to_lowercase()),
1045                subscription: format!("sub-{}", random_string(10).to_lowercase()),
1046                component: ComponentKey::from("gcp_pubsub"),
1047            };
1048
1049            this.request(Method::PUT, "topics/{topic}", json!({})).await;
1050
1051            let body = json!({
1052                "topic": format!("projects/{}/topics/{}", PROJECT, this.topic),
1053                "ackDeadlineSeconds": *ACK_DEADLINE,
1054            });
1055            this.request(Method::PUT, "subscriptions/{sub}", body).await;
1056
1057            this
1058        }
1059
1060        async fn spawn_source(
1061            &self,
1062            status: EventStatus,
1063        ) -> (
1064            impl Stream<Item = Event> + Unpin + use<>,
1065            shutdown::SourceShutdownCoordinator,
1066        ) {
1067            let (tx, rx) = SourceSender::new_test_finalize(status);
1068            let config = PubsubConfig {
1069                project: PROJECT.into(),
1070                subscription: self.subscription.clone(),
1071                endpoint: gcp::PUBSUB_ADDRESS.clone(),
1072                auth: GcpAuthConfig {
1073                    skip_authentication: true,
1074                    ..Default::default()
1075                },
1076                ack_deadline_secs: *ACK_DEADLINE,
1077                ..Default::default()
1078            };
1079            let (mut ctx, shutdown) = SourceContext::new_shutdown(&self.component, tx);
1080            ctx.acknowledgements = true;
1081            let source = config.build(ctx).await.expect("Failed to build source");
1082            tokio::spawn(async move { source.await.expect("Failed to run source") });
1083
1084            (rx, shutdown)
1085        }
1086
1087        async fn send_test_events(
1088            &self,
1089            count: usize,
1090            attributes: BTreeMap<String, String>,
1091        ) -> TestData {
1092            let start = now_trunc();
1093            let lines: Vec<_> = test_util::random_lines(44).take(count).collect();
1094            let messages: Vec<_> = lines
1095                .iter()
1096                .map(|input| BASE64_STANDARD.encode(input))
1097                .map(|data| json!({ "data": data, "attributes": attributes.clone() }))
1098                .collect();
1099            let body = json!({ "messages": messages });
1100            self.request(Method::POST, "topics/{topic}:publish", body)
1101                .await;
1102
1103            TestData {
1104                lines,
1105                start,
1106                attributes,
1107            }
1108        }
1109
1110        async fn pull_count(&self, count: usize) -> usize {
1111            let response = self
1112                .request(
1113                    Method::POST,
1114                    "subscriptions/{sub}:pull",
1115                    json!({ "maxMessages": count, "returnImmediately": true }),
1116                )
1117                .await;
1118            response
1119                .get("receivedMessages")
1120                .map(|rm| rm.as_array().unwrap().len())
1121                .unwrap_or(0)
1122        }
1123
1124        async fn request(&self, method: Method, base: &str, body: Value) -> Value {
1125            let path = base
1126                .replace("{topic}", &self.topic)
1127                .replace("{sub}", &self.subscription);
1128            let uri = [PROJECT_URI.as_str(), &path].join("/");
1129            let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
1130            let request = Request::builder()
1131                .method(method)
1132                .uri(uri)
1133                .body(body.into())
1134                .unwrap();
1135            let response = self.client.send(request).await.unwrap();
1136            assert_eq!(response.status(), StatusCode::OK);
1137            let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
1138            serde_json::from_str(core::str::from_utf8(&body).unwrap()).unwrap()
1139        }
1140
1141        async fn shutdown_check(&self, shutdown: shutdown::SourceShutdownCoordinator) {
1142            self.shutdown(shutdown).await;
1143            components::SOURCE_TESTS.assert(&components::HTTP_PULL_SOURCE_TAGS);
1144        }
1145
1146        async fn shutdown(&self, mut shutdown: shutdown::SourceShutdownCoordinator) {
1147            let deadline = Instant::now() + Duration::from_secs(1);
1148            let shutdown = shutdown.shutdown_source(&self.component, deadline);
1149            assert!(shutdown.await);
1150        }
1151    }
1152
1153    async fn receive_events(rx: &mut (impl Stream<Item = Event> + Unpin), test_data: TestData) {
1154        let TestData {
1155            start,
1156            lines,
1157            attributes,
1158        } = test_data;
1159
1160        let events: Vec<Event> = tokio::time::timeout(
1161            Duration::from_secs(1),
1162            test_util::collect_n_stream(rx, lines.len()),
1163        )
1164        .await
1165        .unwrap();
1166
1167        let end = Utc::now();
1168        let mut message_ids = HashSet::new();
1169
1170        assert_eq!(events.len(), lines.len());
1171        for (message, event) in lines.into_iter().zip(events) {
1172            let log = event.into_log();
1173            assert_eq!(log.get("message"), Some(&message.into()));
1174            assert_eq!(log.get("source_type"), Some(&"gcp_pubsub".into()));
1175            assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() >= &start);
1176            assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() <= &end);
1177            assert!(
1178                message_ids.insert(log.get("message_id").unwrap().clone().to_string()),
1179                "Message contained duplicate message_id"
1180            );
1181            let logattr = log
1182                .get("attributes")
1183                .expect("missing attributes")
1184                .as_object()
1185                .unwrap()
1186                .clone();
1187            assert_eq!(logattr.len(), attributes.len());
1188            for (a, b) in logattr.into_iter().zip(&attributes) {
1189                assert_eq!(&a.0, b.0.as_str());
1190                assert_eq!(a.1, b.1.clone().into());
1191            }
1192        }
1193    }
1194}