vector/sources/
gcp_pubsub.rs

1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, LazyLock};
3use std::{error::Error as _, future::Future, pin::Pin, task::Context, task::Poll, time::Duration};
4
5use chrono::DateTime;
6use derivative::Derivative;
7use futures::{stream, stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt};
8use http::uri::{InvalidUri, Scheme, Uri};
9use serde_with::serde_as;
10use snafu::{ResultExt, Snafu};
11use tokio::sync::{mpsc, watch};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{
14    metadata::MetadataValue,
15    transport::{Certificate, ClientTlsConfig, Endpoint, Identity},
16    Code, Request, Status,
17};
18use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
19use vector_lib::config::{LegacyKey, LogNamespace};
20use vector_lib::configurable::configurable_component;
21use vector_lib::internal_event::{
22    ByteSize, BytesReceived, EventsReceived, InternalEventHandle as _, Protocol, Registered,
23};
24use vector_lib::lookup::owned_value_path;
25use vector_lib::{byte_size_of::ByteSizeOf, finalizer::UnorderedFinalizer};
26use vrl::path;
27use vrl::value::{kind::Collection, Kind};
28
29use crate::{
30    codecs::{Decoder, DecodingConfig},
31    config::{DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
32    event::{BatchNotifier, BatchStatus, Event, MaybeAsLogMut, Value},
33    gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
34    internal_events::{
35        GcpPubsubConnectError, GcpPubsubReceiveError, GcpPubsubStreamingPullError,
36        StreamClosedError,
37    },
38    serde::{bool_or_struct, default_decoding, default_framing_message_based},
39    shutdown::ShutdownSignal,
40    sources::util,
41    tls::{TlsConfig, TlsSettings},
42    SourceSender,
43};
44
45const MIN_ACK_DEADLINE_SECS: u64 = 10;
46const MAX_ACK_DEADLINE_SECS: u64 = 600;
47
48// We use a bounded channel for the acknowledgement ID communication
49// between the request stream and receiver. During benchmark runs,
50// this channel had only a single element over 80% of the time and
51// rarely went over 8 elements. Having it too small does not introduce
52// deadlocks, as the worst case is slightly less efficient ack
53// processing.
54const ACK_QUEUE_SIZE: usize = 8;
55
56type Finalizer = UnorderedFinalizer<Vec<String>>;
57
58// prost emits some generated code that includes clones on `Arc`
59// objects, which causes a clippy ding on this block. We don't
60// directly control the generated code, so allow this lint here.
61#[allow(clippy::clone_on_ref_ptr)]
62// https://github.com/hyperium/tonic/issues/1350
63#[allow(clippy::missing_const_for_fn)]
64#[allow(warnings)]
65mod proto {
66    include!(concat!(env!("OUT_DIR"), "/google.pubsub.v1.rs"));
67
68    use vector_lib::ByteSizeOf;
69
70    impl ByteSizeOf for StreamingPullResponse {
71        fn allocated_bytes(&self) -> usize {
72            self.received_messages.size_of()
73        }
74    }
75
76    impl ByteSizeOf for ReceivedMessage {
77        fn allocated_bytes(&self) -> usize {
78            self.ack_id.size_of() + self.message.as_ref().map_or(0, ByteSizeOf::size_of)
79        }
80    }
81
82    impl ByteSizeOf for PubsubMessage {
83        fn allocated_bytes(&self) -> usize {
84            self.data.len()
85                + self.message_id.len()
86                + self.ordering_key.len()
87                + self.attributes.size_of()
88        }
89    }
90}
91
92#[derive(Debug, Snafu)]
93pub(crate) enum PubsubError {
94    #[snafu(display("Invalid endpoint URI: {}", source))]
95    Uri { source: InvalidUri },
96    #[snafu(display("Could not create endpoint: {}", source))]
97    Endpoint { source: tonic::transport::Error },
98    #[snafu(display("Could not set up endpoint TLS settings: {}", source))]
99    EndpointTls { source: tonic::transport::Error },
100    #[snafu(display(
101        "`ack_deadline_secs` is outside the valid range of {} to {}",
102        MIN_ACK_DEADLINE_SECS,
103        MAX_ACK_DEADLINE_SECS
104    ))]
105    InvalidAckDeadline,
106}
107
108static CLIENT_ID: LazyLock<String> = LazyLock::new(|| uuid::Uuid::new_v4().to_string());
109
110/// Configuration for the `gcp_pubsub` source.
111#[serde_as]
112#[configurable_component(source(
113    "gcp_pubsub",
114    "Fetch observability events from GCP's Pub/Sub messaging system."
115))]
116#[derive(Clone, Debug, Derivative)]
117#[derivative(Default)]
118#[serde(deny_unknown_fields)]
119pub struct PubsubConfig {
120    /// The project name from which to pull logs.
121    #[configurable(metadata(docs::examples = "my-log-source-project"))]
122    pub project: String,
123
124    /// The subscription within the project which is configured to receive logs.
125    #[configurable(metadata(docs::examples = "my-vector-source-subscription"))]
126    pub subscription: String,
127
128    /// The endpoint from which to pull data.
129    #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
130    #[serde(default = "default_endpoint")]
131    pub endpoint: String,
132
133    #[serde(flatten)]
134    pub auth: GcpAuthConfig,
135
136    #[configurable(derived)]
137    pub tls: Option<TlsConfig>,
138
139    /// The maximum number of concurrent stream connections to open at once.
140    #[serde(default = "default_max_concurrency")]
141    pub max_concurrency: usize,
142
143    /// The number of messages in a response to mark a stream as
144    /// "busy". This is used to determine if more streams should be
145    /// started.
146    ///
147    /// The GCP Pub/Sub servers send responses with 100 or more messages when
148    /// the subscription is busy.
149    #[serde(default = "default_full_response")]
150    pub full_response_size: usize,
151
152    /// How often to poll the currently active streams to see if they
153    /// are all busy and so open a new stream.
154    #[serde(default = "default_poll_time")]
155    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
156    #[configurable(metadata(docs::human_name = "Poll Time"))]
157    pub poll_time_seconds: Duration,
158
159    /// The acknowledgement deadline, in seconds, to use for this stream.
160    ///
161    /// Messages that are not acknowledged when this deadline expires may be retransmitted.
162    #[serde(default = "default_ack_deadline")]
163    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
164    #[configurable(metadata(docs::human_name = "Acknowledgement Deadline"))]
165    pub ack_deadline_secs: Duration,
166
167    /// The acknowledgement deadline, in seconds, to use for this stream.
168    ///
169    /// Messages that are not acknowledged when this deadline expires may be retransmitted.
170    #[configurable(
171        deprecated = "This option has been deprecated, use `ack_deadline_secs` instead."
172    )]
173    pub ack_deadline_seconds: Option<u16>,
174
175    /// The amount of time, in seconds, to wait between retry attempts after an error.
176    #[serde(default = "default_retry_delay")]
177    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
178    #[configurable(metadata(docs::human_name = "Retry Delay"))]
179    pub retry_delay_secs: Duration,
180
181    /// The amount of time, in seconds, to wait between retry attempts after an error.
182    #[configurable(
183        deprecated = "This option has been deprecated, use `retry_delay_secs` instead."
184    )]
185    pub retry_delay_seconds: Option<f64>,
186
187    /// The amount of time, in seconds, with no received activity
188    /// before sending a keepalive request. If this is set larger than
189    /// `60`, you may see periodic errors sent from the server.
190    #[serde(default = "default_keepalive")]
191    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
192    #[configurable(metadata(docs::human_name = "Keepalive"))]
193    pub keepalive_secs: Duration,
194
195    /// The namespace to use for logs. This overrides the global setting.
196    #[configurable(metadata(docs::hidden))]
197    #[serde(default)]
198    pub log_namespace: Option<bool>,
199
200    #[configurable(derived)]
201    #[serde(default = "default_framing_message_based")]
202    #[derivative(Default(value = "default_framing_message_based()"))]
203    pub framing: FramingConfig,
204
205    #[configurable(derived)]
206    #[serde(default = "default_decoding")]
207    #[derivative(Default(value = "default_decoding()"))]
208    pub decoding: DeserializerConfig,
209
210    #[configurable(derived)]
211    #[serde(default, deserialize_with = "bool_or_struct")]
212    pub acknowledgements: SourceAcknowledgementsConfig,
213}
214
215fn default_endpoint() -> String {
216    PUBSUB_URL.to_string()
217}
218
219const fn default_ack_deadline() -> Duration {
220    Duration::from_secs(600)
221}
222
223const fn default_retry_delay() -> Duration {
224    Duration::from_secs(1)
225}
226
227const fn default_keepalive() -> Duration {
228    Duration::from_secs(60)
229}
230
231const fn default_max_concurrency() -> usize {
232    10
233}
234
235const fn default_full_response() -> usize {
236    100
237}
238
239const fn default_poll_time() -> Duration {
240    Duration::from_secs(2)
241}
242
243#[async_trait::async_trait]
244#[typetag::serde(name = "gcp_pubsub")]
245impl SourceConfig for PubsubConfig {
246    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
247        let log_namespace = cx.log_namespace(self.log_namespace);
248        let ack_deadline_secs = match self.ack_deadline_seconds {
249            None => self.ack_deadline_secs,
250            Some(ads) => {
251                warn!("The `ack_deadline_seconds` setting is deprecated, use `ack_deadline_secs` instead.");
252                Duration::from_secs(ads as u64)
253            }
254        };
255        if !(MIN_ACK_DEADLINE_SECS..=MAX_ACK_DEADLINE_SECS).contains(&ack_deadline_secs.as_secs()) {
256            return Err(PubsubError::InvalidAckDeadline.into());
257        }
258
259        let retry_delay_secs = match self.retry_delay_seconds {
260            None => self.retry_delay_secs,
261            Some(rds) => {
262                warn!("The `retry_delay_seconds` setting is deprecated, use `retry_delay_secs` instead.");
263                Duration::from_secs_f64(rds)
264            }
265        };
266
267        let auth = self.auth.build(Scope::PubSub).await?;
268
269        let mut uri: Uri = self.endpoint.parse().context(UriSnafu)?;
270        auth.apply_uri(&mut uri);
271
272        let tls = TlsSettings::from_options(self.tls.as_ref())?;
273        let host = uri.host().unwrap_or("pubsub.googleapis.com");
274        let mut tls_config = ClientTlsConfig::new().domain_name(host);
275        if let Some((cert, key)) = tls.identity_pem() {
276            tls_config = tls_config.identity(Identity::from_pem(cert, key));
277        }
278        for authority in tls.authorities_pem() {
279            tls_config = tls_config.ca_certificate(Certificate::from_pem(authority));
280        }
281
282        let mut endpoint: Endpoint = uri.to_string().parse().context(EndpointSnafu)?;
283        if uri.scheme() != Some(&Scheme::HTTP) {
284            endpoint = endpoint.tls_config(tls_config).context(EndpointTlsSnafu)?;
285        }
286
287        let token_generator = auth.spawn_regenerate_token();
288
289        let protocol = uri
290            .scheme()
291            .map(|scheme| Protocol(scheme.to_string().into()))
292            .unwrap_or(Protocol::HTTP);
293
294        let source = PubsubSource {
295            endpoint,
296            auth,
297            token_generator,
298            subscription: format!(
299                "projects/{}/subscriptions/{}",
300                self.project, self.subscription
301            ),
302            decoder: DecodingConfig::new(
303                self.framing.clone(),
304                self.decoding.clone(),
305                log_namespace,
306            )
307            .build()?,
308            acknowledgements: cx.do_acknowledgements(self.acknowledgements),
309            shutdown: cx.shutdown,
310            out: cx.out,
311            ack_deadline_secs,
312            retry_delay: retry_delay_secs,
313            keepalive: self.keepalive_secs,
314            concurrency: Default::default(),
315            full_response_size: self.full_response_size,
316            log_namespace,
317            bytes_received: register!(BytesReceived::from(protocol)),
318            events_received: register!(EventsReceived),
319        }
320        .run_all(self.max_concurrency, self.poll_time_seconds)
321        .map_err(|error| error!(message = "Source failed.", %error));
322        Ok(Box::pin(source))
323    }
324
325    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
326        let log_namespace = global_log_namespace.merge(self.log_namespace);
327        let schema_definition = self
328            .decoding
329            .schema_definition(log_namespace)
330            .with_standard_vector_source_metadata()
331            .with_source_metadata(
332                PubsubConfig::NAME,
333                Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
334                &owned_value_path!("timestamp"),
335                Kind::timestamp().or_undefined(),
336                Some("timestamp"),
337            )
338            .with_source_metadata(
339                PubsubConfig::NAME,
340                Some(LegacyKey::Overwrite(owned_value_path!("attributes"))),
341                &owned_value_path!("attributes"),
342                Kind::object(Collection::empty().with_unknown(Kind::bytes())),
343                None,
344            )
345            .with_source_metadata(
346                PubsubConfig::NAME,
347                Some(LegacyKey::Overwrite(owned_value_path!("message_id"))),
348                &owned_value_path!("message_id"),
349                Kind::bytes(),
350                None,
351            );
352
353        vec![SourceOutput::new_maybe_logs(
354            DataType::Log,
355            schema_definition,
356        )]
357    }
358
359    fn can_acknowledge(&self) -> bool {
360        true
361    }
362}
363
364impl_generate_config_from_default!(PubsubConfig);
365
366#[derive(Clone)]
367struct PubsubSource {
368    endpoint: Endpoint,
369    auth: GcpAuthenticator,
370    token_generator: watch::Receiver<()>,
371    subscription: String,
372    decoder: Decoder,
373    acknowledgements: bool,
374    ack_deadline_secs: Duration,
375    shutdown: ShutdownSignal,
376    out: SourceSender,
377    retry_delay: Duration,
378    keepalive: Duration,
379    // The current concurrency is shared across all tasks. It is used
380    // by the streams to avoid shutting down the last stream, which
381    // would result in repeatedly re-opening the stream on idle.
382    concurrency: Arc<AtomicUsize>,
383    full_response_size: usize,
384    log_namespace: LogNamespace,
385    bytes_received: Registered<BytesReceived>,
386    events_received: Registered<EventsReceived>,
387}
388
389enum State {
390    RetryNow,
391    RetryDelay,
392    Shutdown,
393}
394
395impl PubsubSource {
396    async fn run_all(mut self, max_concurrency: usize, poll_time: Duration) -> crate::Result<()> {
397        let mut tasks = FuturesUnordered::new();
398
399        loop {
400            self.concurrency.store(tasks.len(), Ordering::Relaxed);
401            tokio::select! {
402                _ = &mut self.shutdown => break,
403                _ = tasks.next() => {
404                    if tasks.is_empty() {
405                        // Either no tasks were started or a race
406                        // condition resulted in the last task
407                        // exiting. Start up a new stream immediately.
408                        self.start_one(&tasks);
409                    }
410
411                },
412                _ = tokio::time::sleep(poll_time) => {
413                    // If all of the tasks are marked as busy, start
414                    // up a new one.
415                    if tasks.len() < max_concurrency
416                        && tasks.iter().all(|task| task.busy_flag.load(Ordering::Relaxed))
417                    {
418                        self.start_one(&tasks);
419                    }
420                }
421            }
422        }
423
424        // Wait for all active streams to exit on shutdown
425        while tasks.next().await.is_some() {}
426
427        Ok(())
428    }
429
430    fn start_one(&self, tasks: &FuturesUnordered<Task>) {
431        info!(message = "Starting stream.", concurrency = tasks.len() + 1);
432        // The `busy_flag` is used to monitor the status of a
433        // stream. It will start marked as idle to prevent the above
434        // scan from spinning up too many at once. When a stream
435        // receives "full" batches, it will mark itself as busy, and
436        // when it has an idle interval it will mark itself as not
437        // busy.
438        let busy_flag = Arc::new(AtomicBool::new(false));
439        let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag)));
440        tasks.push(Task { task, busy_flag });
441    }
442
443    async fn run(mut self, busy_flag: Arc<AtomicBool>) {
444        loop {
445            match self.run_once(&busy_flag).await {
446                State::RetryNow => debug!("Retrying immediately."),
447                State::RetryDelay => {
448                    info!(
449                        timeout_secs = self.retry_delay.as_secs_f64(),
450                        "Retrying after timeout."
451                    );
452                    tokio::time::sleep(self.retry_delay).await;
453                }
454                State::Shutdown => break,
455            }
456        }
457    }
458
459    async fn run_once(&mut self, busy_flag: &Arc<AtomicBool>) -> State {
460        let connection = match self.endpoint.connect().await {
461            Ok(connection) => connection,
462            Err(error) => {
463                emit!(GcpPubsubConnectError { error });
464                return State::RetryDelay;
465            }
466        };
467
468        let mut client = proto::subscriber_client::SubscriberClient::with_interceptor(
469            connection,
470            |mut req: Request<()>| {
471                if let Some(token) = self.auth.make_token() {
472                    let authorization = MetadataValue::try_from(&token).map_err(|_| {
473                        Status::new(
474                            Code::FailedPrecondition,
475                            "Invalid token text returned by GCP",
476                        )
477                    })?;
478                    req.metadata_mut().insert("authorization", authorization);
479                }
480                Ok(req)
481            },
482        )
483        // Tonic added a default of 4MB in 0.9. This replaces the old behavior.
484        .max_decoding_message_size(usize::MAX);
485
486        let (ack_ids_sender, ack_ids_receiver) = mpsc::channel(ACK_QUEUE_SIZE);
487
488        // Handle shutdown during startup, the streaming pull doesn't
489        // start if there is no data in the subscription.
490        let request_stream = self.request_stream(ack_ids_receiver);
491        debug!("Starting streaming pull.");
492        let stream = tokio::select! {
493            _ = &mut self.shutdown => return State::Shutdown,
494            result = client.streaming_pull(request_stream) => match result {
495                Ok(stream) => stream,
496                Err(error) => {
497                    emit!(GcpPubsubStreamingPullError { error });
498                    return State::RetryDelay;
499                }
500            }
501        };
502        let mut stream = stream.into_inner();
503
504        let (finalizer, mut ack_stream) =
505            Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone()));
506        let mut pending_acks = 0;
507
508        loop {
509            tokio::select! {
510                biased;
511                receipts = ack_stream.next() => if let Some((status, receipts)) = receipts {
512                    pending_acks -= 1;
513                    if status == BatchStatus::Delivered {
514                        ack_ids_sender
515                            .send(receipts)
516                            .await
517                            .unwrap_or_else(|_| unreachable!("request stream never closes"));
518                    }
519                },
520                response = stream.next() => match response {
521                    Some(Ok(response)) => {
522                        self.handle_response(
523                            response,
524                            &finalizer,
525                            &ack_ids_sender,
526                            &mut pending_acks,
527                            busy_flag,
528                        ).await;
529                    }
530                    Some(Err(error)) => break translate_error(error),
531                    None => break State::RetryNow,
532                },
533                _ = &mut self.shutdown, if pending_acks == 0 => return State::Shutdown,
534                _ = self.token_generator.changed() => {
535                    debug!("New authentication token generated, restarting stream.");
536                    break State::RetryNow;
537                },
538                _ = tokio::time::sleep(self.keepalive) => {
539                    if pending_acks == 0 {
540                        // No pending acks, and no new data, so drop
541                        // this stream if we aren't the only active
542                        // one.
543                        if self.concurrency.load(Ordering::Relaxed) > 1 {
544                            info!("Shutting down inactive stream.");
545                            break State::Shutdown;
546                        }
547                        // Otherwise, mark this stream as idle.
548                        busy_flag.store(false, Ordering::Relaxed);
549                    }
550                    // GCP Pub/Sub likes to time out connections after
551                    // about 75 seconds of inactivity. To forestall
552                    // the resulting error, send an empty array of
553                    // acknowledgement IDs to the request stream if no
554                    // other activity has happened. This will result
555                    // in a new request with empty fields, effectively
556                    // a keepalive.
557                    ack_ids_sender
558                        .send(Vec::new())
559                        .await
560                        .unwrap_or_else(|_| unreachable!("request stream never closes"));
561                }
562            }
563        }
564    }
565
566    fn request_stream(
567        &self,
568        ack_ids: mpsc::Receiver<Vec<String>>,
569    ) -> impl Stream<Item = proto::StreamingPullRequest> + 'static + use<> {
570        let subscription = self.subscription.clone();
571        let client_id = CLIENT_ID.clone();
572        let stream_ack_deadline_seconds = self.ack_deadline_secs.as_secs() as i32;
573        let ack_ids = ReceiverStream::new(ack_ids).ready_chunks(ACK_QUEUE_SIZE);
574
575        stream::once(async move {
576            // These fields are only valid on the first request in the
577            // stream, and so must not be repeated below.
578            proto::StreamingPullRequest {
579                subscription,
580                client_id,
581                stream_ack_deadline_seconds,
582                ..Default::default()
583            }
584        })
585        .chain(ack_ids.map(|chunks| {
586            // These "requests" serve only to send updates about
587            // acknowledgements to the server. None of the above
588            // fields need to be repeated and, in fact, will cause
589            // an stream error and cancellation if they are
590            // present.
591            proto::StreamingPullRequest {
592                ack_ids: chunks.into_iter().flatten().collect(),
593                ..Default::default()
594            }
595        }))
596    }
597
598    async fn handle_response(
599        &mut self,
600        response: proto::StreamingPullResponse,
601        finalizer: &Option<Finalizer>,
602        ack_ids: &mpsc::Sender<Vec<String>>,
603        pending_acks: &mut usize,
604        busy_flag: &Arc<AtomicBool>,
605    ) {
606        if response.received_messages.len() >= self.full_response_size {
607            busy_flag.store(true, Ordering::Relaxed);
608        }
609        self.bytes_received.emit(ByteSize(response.size_of()));
610
611        let (batch, notifier) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
612        let (events, ids) = self.parse_messages(response.received_messages, batch).await;
613
614        let count = events.len();
615        match self.out.send_batch(events).await {
616            Err(_) => emit!(StreamClosedError { count }),
617            Ok(()) => match notifier {
618                None => ack_ids
619                    .send(ids)
620                    .await
621                    .unwrap_or_else(|_| unreachable!("request stream never closes")),
622                Some(notifier) => {
623                    finalizer
624                        .as_ref()
625                        .expect("Finalizer must have been set up for acknowledgements")
626                        .add(ids, notifier);
627                    *pending_acks += 1;
628                }
629            },
630        }
631    }
632
633    async fn parse_messages(
634        &self,
635        response: Vec<proto::ReceivedMessage>,
636        batch: Option<BatchNotifier>,
637    ) -> (Vec<Event>, Vec<String>) {
638        let mut ack_ids = Vec::with_capacity(response.len());
639        let events = response
640            .into_iter()
641            .flat_map(|received| {
642                ack_ids.push(received.ack_id);
643                received
644                    .message
645                    .map(|message| self.parse_message(message, &batch))
646            })
647            .flatten()
648            .collect();
649        (events, ack_ids)
650    }
651
652    fn parse_message<'a>(
653        &'a self,
654        message: proto::PubsubMessage,
655        batch: &'a Option<BatchNotifier>,
656    ) -> impl Iterator<Item = Event> + 'a {
657        let attributes = Value::Object(
658            message
659                .attributes
660                .into_iter()
661                .map(|(key, value)| (key.into(), Value::Bytes(value.into())))
662                .collect(),
663        );
664        let log_namespace = self.log_namespace;
665        util::decode_message(
666            self.decoder.clone(),
667            "gcp_pubsub",
668            &message.data,
669            message.publish_time.map(|dt| {
670                DateTime::from_timestamp(dt.seconds, dt.nanos as u32).expect("invalid timestamp")
671            }),
672            batch,
673            log_namespace,
674            &self.events_received,
675        )
676        .map(move |mut event| {
677            if let Some(log) = event.maybe_as_log_mut() {
678                log_namespace.insert_source_metadata(
679                    PubsubConfig::NAME,
680                    log,
681                    Some(LegacyKey::Overwrite(path!("message_id"))),
682                    path!("message_id"),
683                    message.message_id.clone(),
684                );
685                log_namespace.insert_source_metadata(
686                    PubsubConfig::NAME,
687                    log,
688                    Some(LegacyKey::Overwrite(path!("attributes"))),
689                    path!("attributes"),
690                    attributes.clone(),
691                )
692            }
693            event
694        })
695    }
696}
697
698fn translate_error(error: tonic::Status) -> State {
699    // GCP occasionally issues a connection reset
700    // in the middle of the streaming pull. This
701    // reset is not technically an error, so we
702    // want to retry immediately, but it is
703    // reported to us as an error from the
704    // underlying library (`tonic`).
705    if is_reset(&error) {
706        debug!("Stream reset by server.");
707        State::RetryNow
708    } else {
709        emit!(GcpPubsubReceiveError { error });
710        State::RetryDelay
711    }
712}
713
714fn is_reset(error: &Status) -> bool {
715    error
716        .source()
717        .and_then(|source| source.downcast_ref::<hyper::Error>())
718        .and_then(|error| error.source())
719        .and_then(|source| source.downcast_ref::<h2::Error>())
720        .is_some_and(|error| error.is_remote() && error.is_reset())
721}
722
723#[pin_project::pin_project]
724struct Task {
725    task: tokio::task::JoinHandle<()>,
726    busy_flag: Arc<AtomicBool>,
727}
728
729impl Future for Task {
730    type Output = Result<(), tokio::task::JoinError>;
731
732    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
733        self.task.poll_unpin(ctx)
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use vector_lib::lookup::OwnedTargetPath;
740    use vector_lib::schema::Definition;
741
742    use super::*;
743
744    #[test]
745    fn generate_config() {
746        crate::test_util::test_generate_config::<PubsubConfig>();
747    }
748
749    #[test]
750    fn output_schema_definition_vector_namespace() {
751        let config = PubsubConfig {
752            log_namespace: Some(true),
753            ..Default::default()
754        };
755
756        let definitions = config
757            .outputs(LogNamespace::Vector)
758            .remove(0)
759            .schema_definition(true);
760
761        let expected_definition =
762            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
763                .with_meaning(OwnedTargetPath::event_root(), "message")
764                .with_metadata_field(
765                    &owned_value_path!("vector", "source_type"),
766                    Kind::bytes(),
767                    None,
768                )
769                .with_metadata_field(
770                    &owned_value_path!("vector", "ingest_timestamp"),
771                    Kind::timestamp(),
772                    None,
773                )
774                .with_metadata_field(
775                    &owned_value_path!("gcp_pubsub", "timestamp"),
776                    Kind::timestamp().or_undefined(),
777                    Some("timestamp"),
778                )
779                .with_metadata_field(
780                    &owned_value_path!("gcp_pubsub", "attributes"),
781                    Kind::object(Collection::empty().with_unknown(Kind::bytes())),
782                    None,
783                )
784                .with_metadata_field(
785                    &owned_value_path!("gcp_pubsub", "message_id"),
786                    Kind::bytes(),
787                    None,
788                );
789
790        assert_eq!(definitions, Some(expected_definition));
791    }
792
793    #[test]
794    fn output_schema_definition_legacy_namespace() {
795        let config = PubsubConfig::default();
796
797        let definitions = config
798            .outputs(LogNamespace::Legacy)
799            .remove(0)
800            .schema_definition(true);
801
802        let expected_definition = Definition::new_with_default_metadata(
803            Kind::object(Collection::empty()),
804            [LogNamespace::Legacy],
805        )
806        .with_event_field(
807            &owned_value_path!("message"),
808            Kind::bytes(),
809            Some("message"),
810        )
811        .with_event_field(
812            &owned_value_path!("timestamp"),
813            Kind::timestamp().or_undefined(),
814            Some("timestamp"),
815        )
816        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
817        .with_event_field(
818            &owned_value_path!("attributes"),
819            Kind::object(Collection::empty().with_unknown(Kind::bytes())),
820            None,
821        )
822        .with_event_field(&owned_value_path!("message_id"), Kind::bytes(), None);
823
824        assert_eq!(definitions, Some(expected_definition));
825    }
826}
827
828#[cfg(all(test, feature = "gcp-integration-tests"))]
829mod integration_tests {
830    use std::collections::{BTreeMap, HashSet};
831    use std::sync::LazyLock;
832
833    use base64::prelude::{Engine as _, BASE64_STANDARD};
834    use chrono::{DateTime, Utc};
835    use futures::{Stream, StreamExt};
836    use http::method::Method;
837    use hyper::{Request, StatusCode};
838    use serde_json::{json, Value};
839    use tokio::time::{Duration, Instant};
840    use vrl::btreemap;
841
842    use super::*;
843    use crate::config::{ComponentKey, ProxyConfig};
844    use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS};
845    use crate::test_util::{self, components, random_string};
846    use crate::{event::EventStatus, gcp, http::HttpClient, shutdown, SourceSender};
847
848    const PROJECT: &str = "sourceproject";
849    static PROJECT_URI: LazyLock<String> =
850        LazyLock::new(|| format!("{}/v1/projects/{}", *gcp::PUBSUB_ADDRESS, PROJECT));
851    static ACK_DEADLINE: LazyLock<Duration> = LazyLock::new(|| Duration::from_secs(10)); // Minimum custom deadline allowed by Pub/Sub
852
853    #[tokio::test]
854    async fn oneshot() {
855        assert_source_compliance(&SOURCE_TAGS, async move {
856            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
857            let test_data = tester.send_test_events(99, BTreeMap::new()).await;
858            receive_events(&mut rx, test_data).await;
859            tester.shutdown_check(shutdown).await;
860        })
861        .await;
862    }
863
864    #[tokio::test]
865    async fn shuts_down_before_data_received() {
866        let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
867
868        tester.shutdown(shutdown).await; // Not shutdown_check because this emits nothing
869
870        assert!(rx.next().await.is_none());
871        tester.send_test_events(1, BTreeMap::new()).await;
872        assert!(rx.next().await.is_none());
873        assert_eq!(tester.pull_count(1).await, 1);
874    }
875
876    #[tokio::test]
877    async fn shuts_down_after_data_received() {
878        assert_source_compliance(&SOURCE_TAGS, async move {
879            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
880
881            let test_data = tester.send_test_events(1, BTreeMap::new()).await;
882            receive_events(&mut rx, test_data).await;
883
884            tester.shutdown_check(shutdown).await;
885
886            assert!(rx.next().await.is_none());
887            tester.send_test_events(1, BTreeMap::new()).await;
888            assert!(rx.next().await.is_none());
889            // The following assert is there to test that the source isn't
890            // pulling anything out of the subscription after it reports
891            // shutdown. It works when there wasn't anything previously in
892            // the topic, but does not work here despite evidence that the
893            // entire tokio task has exited.
894            // assert_eq!(tester.pull_count(1).await, 1);
895        })
896        .await;
897    }
898
899    #[tokio::test]
900    async fn streams_data() {
901        assert_source_compliance(&SOURCE_TAGS, async move {
902            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
903            for _ in 0..10 {
904                let test_data = tester.send_test_events(9, BTreeMap::new()).await;
905                receive_events(&mut rx, test_data).await;
906            }
907            tester.shutdown_check(shutdown).await;
908        })
909        .await;
910    }
911
912    #[tokio::test]
913    async fn sends_attributes() {
914        assert_source_compliance(&SOURCE_TAGS, async move {
915            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
916            let attributes = btreemap![
917                random_string(8) => random_string(88),
918                random_string(8) => random_string(88),
919                random_string(8) => random_string(88),
920            ];
921            let test_data = tester.send_test_events(1, attributes).await;
922            receive_events(&mut rx, test_data).await;
923            tester.shutdown_check(shutdown).await;
924        })
925        .await;
926    }
927
928    #[tokio::test]
929    async fn acks_received() {
930        assert_source_compliance(&SOURCE_TAGS, async move {
931            let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
932
933            let test_data = tester.send_test_events(1, BTreeMap::new()).await;
934            receive_events(&mut rx, test_data).await;
935
936            tester.shutdown_check(shutdown).await;
937
938            // Make sure there are no messages left in the queue
939            assert_eq!(tester.pull_count(10).await, 0);
940
941            // Wait for the acknowledgement deadline to expire
942            tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
943
944            // All messages are still acknowledged
945            assert_eq!(tester.pull_count(10).await, 0);
946        })
947        .await;
948    }
949
950    #[tokio::test]
951    // I have verified manually that the streaming code above omits the
952    // acknowledgements when events are rejected, but have been unable
953    // to verify the events are not acknowledged through the emulator.
954    #[ignore]
955    async fn does_not_ack_rejected() {
956        assert_source_compliance(&SOURCE_TAGS, async {
957            let (tester, mut rx, shutdown) = setup(EventStatus::Rejected).await;
958
959            let test_data = tester.send_test_events(1, BTreeMap::new()).await;
960            receive_events(&mut rx, test_data).await;
961
962            tester.shutdown(shutdown).await;
963
964            // Make sure there are no messages left in the queue
965            assert_eq!(tester.pull_count(10).await, 0);
966
967            // Wait for the acknowledgement deadline to expire
968            tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
969
970            // All messages are still in the queue
971            assert_eq!(tester.pull_count(10).await, 1);
972        })
973        .await;
974    }
975
976    async fn setup(
977        status: EventStatus,
978    ) -> (
979        Tester,
980        impl Stream<Item = Event> + Unpin,
981        shutdown::SourceShutdownCoordinator,
982    ) {
983        components::init_test();
984
985        let tls_settings = TlsSettings::from_options(None).unwrap();
986        let client = HttpClient::new(tls_settings, &ProxyConfig::default()).unwrap();
987        let tester = Tester::new(client).await;
988
989        let (rx, shutdown) = tester.spawn_source(status).await;
990
991        (tester, rx, shutdown)
992    }
993
994    fn now_trunc() -> DateTime<Utc> {
995        let start = Utc::now().timestamp();
996        // Truncate the milliseconds portion, the hard way.
997        DateTime::from_timestamp(start, 0).expect("invalid timestamp")
998    }
999
1000    struct Tester {
1001        client: HttpClient,
1002        topic: String,
1003        subscription: String,
1004        component: ComponentKey,
1005    }
1006
1007    struct TestData {
1008        lines: Vec<String>,
1009        start: DateTime<Utc>,
1010        attributes: BTreeMap<String, String>,
1011    }
1012
1013    impl Tester {
1014        async fn new(client: HttpClient) -> Self {
1015            let this = Self {
1016                client,
1017                topic: format!("topic-{}", random_string(10).to_lowercase()),
1018                subscription: format!("sub-{}", random_string(10).to_lowercase()),
1019                component: ComponentKey::from("gcp_pubsub"),
1020            };
1021
1022            this.request(Method::PUT, "topics/{topic}", json!({})).await;
1023
1024            let body = json!({
1025                "topic": format!("projects/{}/topics/{}", PROJECT, this.topic),
1026                "ackDeadlineSeconds": *ACK_DEADLINE,
1027            });
1028            this.request(Method::PUT, "subscriptions/{sub}", body).await;
1029
1030            this
1031        }
1032
1033        async fn spawn_source(
1034            &self,
1035            status: EventStatus,
1036        ) -> (
1037            impl Stream<Item = Event> + Unpin,
1038            shutdown::SourceShutdownCoordinator,
1039        ) {
1040            let (tx, rx) = SourceSender::new_test_finalize(status);
1041            let config = PubsubConfig {
1042                project: PROJECT.into(),
1043                subscription: self.subscription.clone(),
1044                endpoint: gcp::PUBSUB_ADDRESS.clone(),
1045                auth: GcpAuthConfig {
1046                    skip_authentication: true,
1047                    ..Default::default()
1048                },
1049                ack_deadline_secs: *ACK_DEADLINE,
1050                ..Default::default()
1051            };
1052            let (mut ctx, shutdown) = SourceContext::new_shutdown(&self.component, tx);
1053            ctx.acknowledgements = true;
1054            let source = config.build(ctx).await.expect("Failed to build source");
1055            tokio::spawn(async move { source.await.expect("Failed to run source") });
1056
1057            (rx, shutdown)
1058        }
1059
1060        async fn send_test_events(
1061            &self,
1062            count: usize,
1063            attributes: BTreeMap<String, String>,
1064        ) -> TestData {
1065            let start = now_trunc();
1066            let lines: Vec<_> = test_util::random_lines(44).take(count).collect();
1067            let messages: Vec<_> = lines
1068                .iter()
1069                .map(|input| BASE64_STANDARD.encode(input))
1070                .map(|data| json!({ "data": data, "attributes": attributes.clone() }))
1071                .collect();
1072            let body = json!({ "messages": messages });
1073            self.request(Method::POST, "topics/{topic}:publish", body)
1074                .await;
1075
1076            TestData {
1077                lines,
1078                start,
1079                attributes,
1080            }
1081        }
1082
1083        async fn pull_count(&self, count: usize) -> usize {
1084            let response = self
1085                .request(
1086                    Method::POST,
1087                    "subscriptions/{sub}:pull",
1088                    json!({ "maxMessages": count, "returnImmediately": true }),
1089                )
1090                .await;
1091            response
1092                .get("receivedMessages")
1093                .map(|rm| rm.as_array().unwrap().len())
1094                .unwrap_or(0)
1095        }
1096
1097        async fn request(&self, method: Method, base: &str, body: Value) -> Value {
1098            let path = base
1099                .replace("{topic}", &self.topic)
1100                .replace("{sub}", &self.subscription);
1101            let uri = [PROJECT_URI.as_str(), &path].join("/");
1102            let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
1103            let request = Request::builder()
1104                .method(method)
1105                .uri(uri)
1106                .body(body.into())
1107                .unwrap();
1108            let response = self.client.send(request).await.unwrap();
1109            assert_eq!(response.status(), StatusCode::OK);
1110            let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
1111            serde_json::from_str(core::str::from_utf8(&body).unwrap()).unwrap()
1112        }
1113
1114        async fn shutdown_check(&self, shutdown: shutdown::SourceShutdownCoordinator) {
1115            self.shutdown(shutdown).await;
1116            components::SOURCE_TESTS.assert(&components::HTTP_PULL_SOURCE_TAGS);
1117        }
1118
1119        async fn shutdown(&self, mut shutdown: shutdown::SourceShutdownCoordinator) {
1120            let deadline = Instant::now() + Duration::from_secs(1);
1121            let shutdown = shutdown.shutdown_source(&self.component, deadline);
1122            assert!(shutdown.await);
1123        }
1124    }
1125
1126    async fn receive_events(rx: &mut (impl Stream<Item = Event> + Unpin), test_data: TestData) {
1127        let TestData {
1128            start,
1129            lines,
1130            attributes,
1131        } = test_data;
1132
1133        let events: Vec<Event> = tokio::time::timeout(
1134            Duration::from_secs(1),
1135            test_util::collect_n_stream(rx, lines.len()),
1136        )
1137        .await
1138        .unwrap();
1139
1140        let end = Utc::now();
1141        let mut message_ids = HashSet::new();
1142
1143        assert_eq!(events.len(), lines.len());
1144        for (message, event) in lines.into_iter().zip(events) {
1145            let log = event.into_log();
1146            assert_eq!(log.get("message"), Some(&message.into()));
1147            assert_eq!(log.get("source_type"), Some(&"gcp_pubsub".into()));
1148            assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() >= &start);
1149            assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() <= &end);
1150            assert!(
1151                message_ids.insert(log.get("message_id").unwrap().clone().to_string()),
1152                "Message contained duplicate message_id"
1153            );
1154            let logattr = log
1155                .get("attributes")
1156                .expect("missing attributes")
1157                .as_object()
1158                .unwrap()
1159                .clone();
1160            assert_eq!(logattr.len(), attributes.len());
1161            for (a, b) in logattr.into_iter().zip(&attributes) {
1162                assert_eq!(&a.0, b.0.as_str());
1163                assert_eq!(a.1, b.1.clone().into());
1164            }
1165        }
1166    }
1167}