1use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2use std::sync::{Arc, LazyLock};
3use std::{error::Error as _, future::Future, pin::Pin, task::Context, task::Poll, time::Duration};
4
5use chrono::DateTime;
6use derivative::Derivative;
7use futures::{stream, stream::FuturesUnordered, FutureExt, Stream, StreamExt, TryFutureExt};
8use http::uri::{InvalidUri, Scheme, Uri};
9use serde_with::serde_as;
10use snafu::{ResultExt, Snafu};
11use tokio::sync::{mpsc, watch};
12use tokio_stream::wrappers::ReceiverStream;
13use tonic::{
14 metadata::MetadataValue,
15 transport::{Certificate, ClientTlsConfig, Endpoint, Identity},
16 Code, Request, Status,
17};
18use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
19use vector_lib::config::{LegacyKey, LogNamespace};
20use vector_lib::configurable::configurable_component;
21use vector_lib::internal_event::{
22 ByteSize, BytesReceived, EventsReceived, InternalEventHandle as _, Protocol, Registered,
23};
24use vector_lib::lookup::owned_value_path;
25use vector_lib::{byte_size_of::ByteSizeOf, finalizer::UnorderedFinalizer};
26use vrl::path;
27use vrl::value::{kind::Collection, Kind};
28
29use crate::{
30 codecs::{Decoder, DecodingConfig},
31 config::{DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
32 event::{BatchNotifier, BatchStatus, Event, MaybeAsLogMut, Value},
33 gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
34 internal_events::{
35 GcpPubsubConnectError, GcpPubsubReceiveError, GcpPubsubStreamingPullError,
36 StreamClosedError,
37 },
38 serde::{bool_or_struct, default_decoding, default_framing_message_based},
39 shutdown::ShutdownSignal,
40 sources::util,
41 tls::{TlsConfig, TlsSettings},
42 SourceSender,
43};
44
45const MIN_ACK_DEADLINE_SECS: u64 = 10;
46const MAX_ACK_DEADLINE_SECS: u64 = 600;
47
48const ACK_QUEUE_SIZE: usize = 8;
55
56type Finalizer = UnorderedFinalizer<Vec<String>>;
57
58#[allow(clippy::clone_on_ref_ptr)]
62#[allow(clippy::missing_const_for_fn)]
64#[allow(warnings)]
65mod proto {
66 include!(concat!(env!("OUT_DIR"), "/google.pubsub.v1.rs"));
67
68 use vector_lib::ByteSizeOf;
69
70 impl ByteSizeOf for StreamingPullResponse {
71 fn allocated_bytes(&self) -> usize {
72 self.received_messages.size_of()
73 }
74 }
75
76 impl ByteSizeOf for ReceivedMessage {
77 fn allocated_bytes(&self) -> usize {
78 self.ack_id.size_of() + self.message.as_ref().map_or(0, ByteSizeOf::size_of)
79 }
80 }
81
82 impl ByteSizeOf for PubsubMessage {
83 fn allocated_bytes(&self) -> usize {
84 self.data.len()
85 + self.message_id.len()
86 + self.ordering_key.len()
87 + self.attributes.size_of()
88 }
89 }
90}
91
92#[derive(Debug, Snafu)]
93pub(crate) enum PubsubError {
94 #[snafu(display("Invalid endpoint URI: {}", source))]
95 Uri { source: InvalidUri },
96 #[snafu(display("Could not create endpoint: {}", source))]
97 Endpoint { source: tonic::transport::Error },
98 #[snafu(display("Could not set up endpoint TLS settings: {}", source))]
99 EndpointTls { source: tonic::transport::Error },
100 #[snafu(display(
101 "`ack_deadline_secs` is outside the valid range of {} to {}",
102 MIN_ACK_DEADLINE_SECS,
103 MAX_ACK_DEADLINE_SECS
104 ))]
105 InvalidAckDeadline,
106}
107
108static CLIENT_ID: LazyLock<String> = LazyLock::new(|| uuid::Uuid::new_v4().to_string());
109
110#[serde_as]
112#[configurable_component(source(
113 "gcp_pubsub",
114 "Fetch observability events from GCP's Pub/Sub messaging system."
115))]
116#[derive(Clone, Debug, Derivative)]
117#[derivative(Default)]
118#[serde(deny_unknown_fields)]
119pub struct PubsubConfig {
120 #[configurable(metadata(docs::examples = "my-log-source-project"))]
122 pub project: String,
123
124 #[configurable(metadata(docs::examples = "my-vector-source-subscription"))]
126 pub subscription: String,
127
128 #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
130 #[serde(default = "default_endpoint")]
131 pub endpoint: String,
132
133 #[serde(flatten)]
134 pub auth: GcpAuthConfig,
135
136 #[configurable(derived)]
137 pub tls: Option<TlsConfig>,
138
139 #[serde(default = "default_max_concurrency")]
141 pub max_concurrency: usize,
142
143 #[serde(default = "default_full_response")]
150 pub full_response_size: usize,
151
152 #[serde(default = "default_poll_time")]
155 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
156 #[configurable(metadata(docs::human_name = "Poll Time"))]
157 pub poll_time_seconds: Duration,
158
159 #[serde(default = "default_ack_deadline")]
163 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
164 #[configurable(metadata(docs::human_name = "Acknowledgement Deadline"))]
165 pub ack_deadline_secs: Duration,
166
167 #[configurable(
171 deprecated = "This option has been deprecated, use `ack_deadline_secs` instead."
172 )]
173 pub ack_deadline_seconds: Option<u16>,
174
175 #[serde(default = "default_retry_delay")]
177 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
178 #[configurable(metadata(docs::human_name = "Retry Delay"))]
179 pub retry_delay_secs: Duration,
180
181 #[configurable(
183 deprecated = "This option has been deprecated, use `retry_delay_secs` instead."
184 )]
185 pub retry_delay_seconds: Option<f64>,
186
187 #[serde(default = "default_keepalive")]
191 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
192 #[configurable(metadata(docs::human_name = "Keepalive"))]
193 pub keepalive_secs: Duration,
194
195 #[configurable(metadata(docs::hidden))]
197 #[serde(default)]
198 pub log_namespace: Option<bool>,
199
200 #[configurable(derived)]
201 #[serde(default = "default_framing_message_based")]
202 #[derivative(Default(value = "default_framing_message_based()"))]
203 pub framing: FramingConfig,
204
205 #[configurable(derived)]
206 #[serde(default = "default_decoding")]
207 #[derivative(Default(value = "default_decoding()"))]
208 pub decoding: DeserializerConfig,
209
210 #[configurable(derived)]
211 #[serde(default, deserialize_with = "bool_or_struct")]
212 pub acknowledgements: SourceAcknowledgementsConfig,
213}
214
215fn default_endpoint() -> String {
216 PUBSUB_URL.to_string()
217}
218
219const fn default_ack_deadline() -> Duration {
220 Duration::from_secs(600)
221}
222
223const fn default_retry_delay() -> Duration {
224 Duration::from_secs(1)
225}
226
227const fn default_keepalive() -> Duration {
228 Duration::from_secs(60)
229}
230
231const fn default_max_concurrency() -> usize {
232 10
233}
234
235const fn default_full_response() -> usize {
236 100
237}
238
239const fn default_poll_time() -> Duration {
240 Duration::from_secs(2)
241}
242
243#[async_trait::async_trait]
244#[typetag::serde(name = "gcp_pubsub")]
245impl SourceConfig for PubsubConfig {
246 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
247 let log_namespace = cx.log_namespace(self.log_namespace);
248 let ack_deadline_secs = match self.ack_deadline_seconds {
249 None => self.ack_deadline_secs,
250 Some(ads) => {
251 warn!("The `ack_deadline_seconds` setting is deprecated, use `ack_deadline_secs` instead.");
252 Duration::from_secs(ads as u64)
253 }
254 };
255 if !(MIN_ACK_DEADLINE_SECS..=MAX_ACK_DEADLINE_SECS).contains(&ack_deadline_secs.as_secs()) {
256 return Err(PubsubError::InvalidAckDeadline.into());
257 }
258
259 let retry_delay_secs = match self.retry_delay_seconds {
260 None => self.retry_delay_secs,
261 Some(rds) => {
262 warn!("The `retry_delay_seconds` setting is deprecated, use `retry_delay_secs` instead.");
263 Duration::from_secs_f64(rds)
264 }
265 };
266
267 let auth = self.auth.build(Scope::PubSub).await?;
268
269 let mut uri: Uri = self.endpoint.parse().context(UriSnafu)?;
270 auth.apply_uri(&mut uri);
271
272 let tls = TlsSettings::from_options(self.tls.as_ref())?;
273 let host = uri.host().unwrap_or("pubsub.googleapis.com");
274 let mut tls_config = ClientTlsConfig::new().domain_name(host);
275 if let Some((cert, key)) = tls.identity_pem() {
276 tls_config = tls_config.identity(Identity::from_pem(cert, key));
277 }
278 for authority in tls.authorities_pem() {
279 tls_config = tls_config.ca_certificate(Certificate::from_pem(authority));
280 }
281
282 let mut endpoint: Endpoint = uri.to_string().parse().context(EndpointSnafu)?;
283 if uri.scheme() != Some(&Scheme::HTTP) {
284 endpoint = endpoint.tls_config(tls_config).context(EndpointTlsSnafu)?;
285 }
286
287 let token_generator = auth.spawn_regenerate_token();
288
289 let protocol = uri
290 .scheme()
291 .map(|scheme| Protocol(scheme.to_string().into()))
292 .unwrap_or(Protocol::HTTP);
293
294 let source = PubsubSource {
295 endpoint,
296 auth,
297 token_generator,
298 subscription: format!(
299 "projects/{}/subscriptions/{}",
300 self.project, self.subscription
301 ),
302 decoder: DecodingConfig::new(
303 self.framing.clone(),
304 self.decoding.clone(),
305 log_namespace,
306 )
307 .build()?,
308 acknowledgements: cx.do_acknowledgements(self.acknowledgements),
309 shutdown: cx.shutdown,
310 out: cx.out,
311 ack_deadline_secs,
312 retry_delay: retry_delay_secs,
313 keepalive: self.keepalive_secs,
314 concurrency: Default::default(),
315 full_response_size: self.full_response_size,
316 log_namespace,
317 bytes_received: register!(BytesReceived::from(protocol)),
318 events_received: register!(EventsReceived),
319 }
320 .run_all(self.max_concurrency, self.poll_time_seconds)
321 .map_err(|error| error!(message = "Source failed.", %error));
322 Ok(Box::pin(source))
323 }
324
325 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
326 let log_namespace = global_log_namespace.merge(self.log_namespace);
327 let schema_definition = self
328 .decoding
329 .schema_definition(log_namespace)
330 .with_standard_vector_source_metadata()
331 .with_source_metadata(
332 PubsubConfig::NAME,
333 Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
334 &owned_value_path!("timestamp"),
335 Kind::timestamp().or_undefined(),
336 Some("timestamp"),
337 )
338 .with_source_metadata(
339 PubsubConfig::NAME,
340 Some(LegacyKey::Overwrite(owned_value_path!("attributes"))),
341 &owned_value_path!("attributes"),
342 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
343 None,
344 )
345 .with_source_metadata(
346 PubsubConfig::NAME,
347 Some(LegacyKey::Overwrite(owned_value_path!("message_id"))),
348 &owned_value_path!("message_id"),
349 Kind::bytes(),
350 None,
351 );
352
353 vec![SourceOutput::new_maybe_logs(
354 DataType::Log,
355 schema_definition,
356 )]
357 }
358
359 fn can_acknowledge(&self) -> bool {
360 true
361 }
362}
363
364impl_generate_config_from_default!(PubsubConfig);
365
366#[derive(Clone)]
367struct PubsubSource {
368 endpoint: Endpoint,
369 auth: GcpAuthenticator,
370 token_generator: watch::Receiver<()>,
371 subscription: String,
372 decoder: Decoder,
373 acknowledgements: bool,
374 ack_deadline_secs: Duration,
375 shutdown: ShutdownSignal,
376 out: SourceSender,
377 retry_delay: Duration,
378 keepalive: Duration,
379 concurrency: Arc<AtomicUsize>,
383 full_response_size: usize,
384 log_namespace: LogNamespace,
385 bytes_received: Registered<BytesReceived>,
386 events_received: Registered<EventsReceived>,
387}
388
389enum State {
390 RetryNow,
391 RetryDelay,
392 Shutdown,
393}
394
395impl PubsubSource {
396 async fn run_all(mut self, max_concurrency: usize, poll_time: Duration) -> crate::Result<()> {
397 let mut tasks = FuturesUnordered::new();
398
399 loop {
400 self.concurrency.store(tasks.len(), Ordering::Relaxed);
401 tokio::select! {
402 _ = &mut self.shutdown => break,
403 _ = tasks.next() => {
404 if tasks.is_empty() {
405 self.start_one(&tasks);
409 }
410
411 },
412 _ = tokio::time::sleep(poll_time) => {
413 if tasks.len() < max_concurrency
416 && tasks.iter().all(|task| task.busy_flag.load(Ordering::Relaxed))
417 {
418 self.start_one(&tasks);
419 }
420 }
421 }
422 }
423
424 while tasks.next().await.is_some() {}
426
427 Ok(())
428 }
429
430 fn start_one(&self, tasks: &FuturesUnordered<Task>) {
431 info!(message = "Starting stream.", concurrency = tasks.len() + 1);
432 let busy_flag = Arc::new(AtomicBool::new(false));
439 let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag)));
440 tasks.push(Task { task, busy_flag });
441 }
442
443 async fn run(mut self, busy_flag: Arc<AtomicBool>) {
444 loop {
445 match self.run_once(&busy_flag).await {
446 State::RetryNow => debug!("Retrying immediately."),
447 State::RetryDelay => {
448 info!(
449 timeout_secs = self.retry_delay.as_secs_f64(),
450 "Retrying after timeout."
451 );
452 tokio::time::sleep(self.retry_delay).await;
453 }
454 State::Shutdown => break,
455 }
456 }
457 }
458
459 async fn run_once(&mut self, busy_flag: &Arc<AtomicBool>) -> State {
460 let connection = match self.endpoint.connect().await {
461 Ok(connection) => connection,
462 Err(error) => {
463 emit!(GcpPubsubConnectError { error });
464 return State::RetryDelay;
465 }
466 };
467
468 let mut client = proto::subscriber_client::SubscriberClient::with_interceptor(
469 connection,
470 |mut req: Request<()>| {
471 if let Some(token) = self.auth.make_token() {
472 let authorization = MetadataValue::try_from(&token).map_err(|_| {
473 Status::new(
474 Code::FailedPrecondition,
475 "Invalid token text returned by GCP",
476 )
477 })?;
478 req.metadata_mut().insert("authorization", authorization);
479 }
480 Ok(req)
481 },
482 )
483 .max_decoding_message_size(usize::MAX);
485
486 let (ack_ids_sender, ack_ids_receiver) = mpsc::channel(ACK_QUEUE_SIZE);
487
488 let request_stream = self.request_stream(ack_ids_receiver);
491 debug!("Starting streaming pull.");
492 let stream = tokio::select! {
493 _ = &mut self.shutdown => return State::Shutdown,
494 result = client.streaming_pull(request_stream) => match result {
495 Ok(stream) => stream,
496 Err(error) => {
497 emit!(GcpPubsubStreamingPullError { error });
498 return State::RetryDelay;
499 }
500 }
501 };
502 let mut stream = stream.into_inner();
503
504 let (finalizer, mut ack_stream) =
505 Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone()));
506 let mut pending_acks = 0;
507
508 loop {
509 tokio::select! {
510 biased;
511 receipts = ack_stream.next() => if let Some((status, receipts)) = receipts {
512 pending_acks -= 1;
513 if status == BatchStatus::Delivered {
514 ack_ids_sender
515 .send(receipts)
516 .await
517 .unwrap_or_else(|_| unreachable!("request stream never closes"));
518 }
519 },
520 response = stream.next() => match response {
521 Some(Ok(response)) => {
522 self.handle_response(
523 response,
524 &finalizer,
525 &ack_ids_sender,
526 &mut pending_acks,
527 busy_flag,
528 ).await;
529 }
530 Some(Err(error)) => break translate_error(error),
531 None => break State::RetryNow,
532 },
533 _ = &mut self.shutdown, if pending_acks == 0 => return State::Shutdown,
534 _ = self.token_generator.changed() => {
535 debug!("New authentication token generated, restarting stream.");
536 break State::RetryNow;
537 },
538 _ = tokio::time::sleep(self.keepalive) => {
539 if pending_acks == 0 {
540 if self.concurrency.load(Ordering::Relaxed) > 1 {
544 info!("Shutting down inactive stream.");
545 break State::Shutdown;
546 }
547 busy_flag.store(false, Ordering::Relaxed);
549 }
550 ack_ids_sender
558 .send(Vec::new())
559 .await
560 .unwrap_or_else(|_| unreachable!("request stream never closes"));
561 }
562 }
563 }
564 }
565
566 fn request_stream(
567 &self,
568 ack_ids: mpsc::Receiver<Vec<String>>,
569 ) -> impl Stream<Item = proto::StreamingPullRequest> + 'static + use<> {
570 let subscription = self.subscription.clone();
571 let client_id = CLIENT_ID.clone();
572 let stream_ack_deadline_seconds = self.ack_deadline_secs.as_secs() as i32;
573 let ack_ids = ReceiverStream::new(ack_ids).ready_chunks(ACK_QUEUE_SIZE);
574
575 stream::once(async move {
576 proto::StreamingPullRequest {
579 subscription,
580 client_id,
581 stream_ack_deadline_seconds,
582 ..Default::default()
583 }
584 })
585 .chain(ack_ids.map(|chunks| {
586 proto::StreamingPullRequest {
592 ack_ids: chunks.into_iter().flatten().collect(),
593 ..Default::default()
594 }
595 }))
596 }
597
598 async fn handle_response(
599 &mut self,
600 response: proto::StreamingPullResponse,
601 finalizer: &Option<Finalizer>,
602 ack_ids: &mpsc::Sender<Vec<String>>,
603 pending_acks: &mut usize,
604 busy_flag: &Arc<AtomicBool>,
605 ) {
606 if response.received_messages.len() >= self.full_response_size {
607 busy_flag.store(true, Ordering::Relaxed);
608 }
609 self.bytes_received.emit(ByteSize(response.size_of()));
610
611 let (batch, notifier) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
612 let (events, ids) = self.parse_messages(response.received_messages, batch).await;
613
614 let count = events.len();
615 match self.out.send_batch(events).await {
616 Err(_) => emit!(StreamClosedError { count }),
617 Ok(()) => match notifier {
618 None => ack_ids
619 .send(ids)
620 .await
621 .unwrap_or_else(|_| unreachable!("request stream never closes")),
622 Some(notifier) => {
623 finalizer
624 .as_ref()
625 .expect("Finalizer must have been set up for acknowledgements")
626 .add(ids, notifier);
627 *pending_acks += 1;
628 }
629 },
630 }
631 }
632
633 async fn parse_messages(
634 &self,
635 response: Vec<proto::ReceivedMessage>,
636 batch: Option<BatchNotifier>,
637 ) -> (Vec<Event>, Vec<String>) {
638 let mut ack_ids = Vec::with_capacity(response.len());
639 let events = response
640 .into_iter()
641 .flat_map(|received| {
642 ack_ids.push(received.ack_id);
643 received
644 .message
645 .map(|message| self.parse_message(message, &batch))
646 })
647 .flatten()
648 .collect();
649 (events, ack_ids)
650 }
651
652 fn parse_message<'a>(
653 &'a self,
654 message: proto::PubsubMessage,
655 batch: &'a Option<BatchNotifier>,
656 ) -> impl Iterator<Item = Event> + 'a {
657 let attributes = Value::Object(
658 message
659 .attributes
660 .into_iter()
661 .map(|(key, value)| (key.into(), Value::Bytes(value.into())))
662 .collect(),
663 );
664 let log_namespace = self.log_namespace;
665 util::decode_message(
666 self.decoder.clone(),
667 "gcp_pubsub",
668 &message.data,
669 message.publish_time.map(|dt| {
670 DateTime::from_timestamp(dt.seconds, dt.nanos as u32).expect("invalid timestamp")
671 }),
672 batch,
673 log_namespace,
674 &self.events_received,
675 )
676 .map(move |mut event| {
677 if let Some(log) = event.maybe_as_log_mut() {
678 log_namespace.insert_source_metadata(
679 PubsubConfig::NAME,
680 log,
681 Some(LegacyKey::Overwrite(path!("message_id"))),
682 path!("message_id"),
683 message.message_id.clone(),
684 );
685 log_namespace.insert_source_metadata(
686 PubsubConfig::NAME,
687 log,
688 Some(LegacyKey::Overwrite(path!("attributes"))),
689 path!("attributes"),
690 attributes.clone(),
691 )
692 }
693 event
694 })
695 }
696}
697
698fn translate_error(error: tonic::Status) -> State {
699 if is_reset(&error) {
706 debug!("Stream reset by server.");
707 State::RetryNow
708 } else {
709 emit!(GcpPubsubReceiveError { error });
710 State::RetryDelay
711 }
712}
713
714fn is_reset(error: &Status) -> bool {
715 error
716 .source()
717 .and_then(|source| source.downcast_ref::<hyper::Error>())
718 .and_then(|error| error.source())
719 .and_then(|source| source.downcast_ref::<h2::Error>())
720 .is_some_and(|error| error.is_remote() && error.is_reset())
721}
722
723#[pin_project::pin_project]
724struct Task {
725 task: tokio::task::JoinHandle<()>,
726 busy_flag: Arc<AtomicBool>,
727}
728
729impl Future for Task {
730 type Output = Result<(), tokio::task::JoinError>;
731
732 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
733 self.task.poll_unpin(ctx)
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use vector_lib::lookup::OwnedTargetPath;
740 use vector_lib::schema::Definition;
741
742 use super::*;
743
744 #[test]
745 fn generate_config() {
746 crate::test_util::test_generate_config::<PubsubConfig>();
747 }
748
749 #[test]
750 fn output_schema_definition_vector_namespace() {
751 let config = PubsubConfig {
752 log_namespace: Some(true),
753 ..Default::default()
754 };
755
756 let definitions = config
757 .outputs(LogNamespace::Vector)
758 .remove(0)
759 .schema_definition(true);
760
761 let expected_definition =
762 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
763 .with_meaning(OwnedTargetPath::event_root(), "message")
764 .with_metadata_field(
765 &owned_value_path!("vector", "source_type"),
766 Kind::bytes(),
767 None,
768 )
769 .with_metadata_field(
770 &owned_value_path!("vector", "ingest_timestamp"),
771 Kind::timestamp(),
772 None,
773 )
774 .with_metadata_field(
775 &owned_value_path!("gcp_pubsub", "timestamp"),
776 Kind::timestamp().or_undefined(),
777 Some("timestamp"),
778 )
779 .with_metadata_field(
780 &owned_value_path!("gcp_pubsub", "attributes"),
781 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
782 None,
783 )
784 .with_metadata_field(
785 &owned_value_path!("gcp_pubsub", "message_id"),
786 Kind::bytes(),
787 None,
788 );
789
790 assert_eq!(definitions, Some(expected_definition));
791 }
792
793 #[test]
794 fn output_schema_definition_legacy_namespace() {
795 let config = PubsubConfig::default();
796
797 let definitions = config
798 .outputs(LogNamespace::Legacy)
799 .remove(0)
800 .schema_definition(true);
801
802 let expected_definition = Definition::new_with_default_metadata(
803 Kind::object(Collection::empty()),
804 [LogNamespace::Legacy],
805 )
806 .with_event_field(
807 &owned_value_path!("message"),
808 Kind::bytes(),
809 Some("message"),
810 )
811 .with_event_field(
812 &owned_value_path!("timestamp"),
813 Kind::timestamp().or_undefined(),
814 Some("timestamp"),
815 )
816 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
817 .with_event_field(
818 &owned_value_path!("attributes"),
819 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
820 None,
821 )
822 .with_event_field(&owned_value_path!("message_id"), Kind::bytes(), None);
823
824 assert_eq!(definitions, Some(expected_definition));
825 }
826}
827
828#[cfg(all(test, feature = "gcp-integration-tests"))]
829mod integration_tests {
830 use std::collections::{BTreeMap, HashSet};
831 use std::sync::LazyLock;
832
833 use base64::prelude::{Engine as _, BASE64_STANDARD};
834 use chrono::{DateTime, Utc};
835 use futures::{Stream, StreamExt};
836 use http::method::Method;
837 use hyper::{Request, StatusCode};
838 use serde_json::{json, Value};
839 use tokio::time::{Duration, Instant};
840 use vrl::btreemap;
841
842 use super::*;
843 use crate::config::{ComponentKey, ProxyConfig};
844 use crate::test_util::components::{assert_source_compliance, SOURCE_TAGS};
845 use crate::test_util::{self, components, random_string};
846 use crate::{event::EventStatus, gcp, http::HttpClient, shutdown, SourceSender};
847
848 const PROJECT: &str = "sourceproject";
849 static PROJECT_URI: LazyLock<String> =
850 LazyLock::new(|| format!("{}/v1/projects/{}", *gcp::PUBSUB_ADDRESS, PROJECT));
851 static ACK_DEADLINE: LazyLock<Duration> = LazyLock::new(|| Duration::from_secs(10)); #[tokio::test]
854 async fn oneshot() {
855 assert_source_compliance(&SOURCE_TAGS, async move {
856 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
857 let test_data = tester.send_test_events(99, BTreeMap::new()).await;
858 receive_events(&mut rx, test_data).await;
859 tester.shutdown_check(shutdown).await;
860 })
861 .await;
862 }
863
864 #[tokio::test]
865 async fn shuts_down_before_data_received() {
866 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
867
868 tester.shutdown(shutdown).await; assert!(rx.next().await.is_none());
871 tester.send_test_events(1, BTreeMap::new()).await;
872 assert!(rx.next().await.is_none());
873 assert_eq!(tester.pull_count(1).await, 1);
874 }
875
876 #[tokio::test]
877 async fn shuts_down_after_data_received() {
878 assert_source_compliance(&SOURCE_TAGS, async move {
879 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
880
881 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
882 receive_events(&mut rx, test_data).await;
883
884 tester.shutdown_check(shutdown).await;
885
886 assert!(rx.next().await.is_none());
887 tester.send_test_events(1, BTreeMap::new()).await;
888 assert!(rx.next().await.is_none());
889 })
896 .await;
897 }
898
899 #[tokio::test]
900 async fn streams_data() {
901 assert_source_compliance(&SOURCE_TAGS, async move {
902 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
903 for _ in 0..10 {
904 let test_data = tester.send_test_events(9, BTreeMap::new()).await;
905 receive_events(&mut rx, test_data).await;
906 }
907 tester.shutdown_check(shutdown).await;
908 })
909 .await;
910 }
911
912 #[tokio::test]
913 async fn sends_attributes() {
914 assert_source_compliance(&SOURCE_TAGS, async move {
915 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
916 let attributes = btreemap![
917 random_string(8) => random_string(88),
918 random_string(8) => random_string(88),
919 random_string(8) => random_string(88),
920 ];
921 let test_data = tester.send_test_events(1, attributes).await;
922 receive_events(&mut rx, test_data).await;
923 tester.shutdown_check(shutdown).await;
924 })
925 .await;
926 }
927
928 #[tokio::test]
929 async fn acks_received() {
930 assert_source_compliance(&SOURCE_TAGS, async move {
931 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
932
933 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
934 receive_events(&mut rx, test_data).await;
935
936 tester.shutdown_check(shutdown).await;
937
938 assert_eq!(tester.pull_count(10).await, 0);
940
941 tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
943
944 assert_eq!(tester.pull_count(10).await, 0);
946 })
947 .await;
948 }
949
950 #[tokio::test]
951 #[ignore]
955 async fn does_not_ack_rejected() {
956 assert_source_compliance(&SOURCE_TAGS, async {
957 let (tester, mut rx, shutdown) = setup(EventStatus::Rejected).await;
958
959 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
960 receive_events(&mut rx, test_data).await;
961
962 tester.shutdown(shutdown).await;
963
964 assert_eq!(tester.pull_count(10).await, 0);
966
967 tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
969
970 assert_eq!(tester.pull_count(10).await, 1);
972 })
973 .await;
974 }
975
976 async fn setup(
977 status: EventStatus,
978 ) -> (
979 Tester,
980 impl Stream<Item = Event> + Unpin,
981 shutdown::SourceShutdownCoordinator,
982 ) {
983 components::init_test();
984
985 let tls_settings = TlsSettings::from_options(None).unwrap();
986 let client = HttpClient::new(tls_settings, &ProxyConfig::default()).unwrap();
987 let tester = Tester::new(client).await;
988
989 let (rx, shutdown) = tester.spawn_source(status).await;
990
991 (tester, rx, shutdown)
992 }
993
994 fn now_trunc() -> DateTime<Utc> {
995 let start = Utc::now().timestamp();
996 DateTime::from_timestamp(start, 0).expect("invalid timestamp")
998 }
999
1000 struct Tester {
1001 client: HttpClient,
1002 topic: String,
1003 subscription: String,
1004 component: ComponentKey,
1005 }
1006
1007 struct TestData {
1008 lines: Vec<String>,
1009 start: DateTime<Utc>,
1010 attributes: BTreeMap<String, String>,
1011 }
1012
1013 impl Tester {
1014 async fn new(client: HttpClient) -> Self {
1015 let this = Self {
1016 client,
1017 topic: format!("topic-{}", random_string(10).to_lowercase()),
1018 subscription: format!("sub-{}", random_string(10).to_lowercase()),
1019 component: ComponentKey::from("gcp_pubsub"),
1020 };
1021
1022 this.request(Method::PUT, "topics/{topic}", json!({})).await;
1023
1024 let body = json!({
1025 "topic": format!("projects/{}/topics/{}", PROJECT, this.topic),
1026 "ackDeadlineSeconds": *ACK_DEADLINE,
1027 });
1028 this.request(Method::PUT, "subscriptions/{sub}", body).await;
1029
1030 this
1031 }
1032
1033 async fn spawn_source(
1034 &self,
1035 status: EventStatus,
1036 ) -> (
1037 impl Stream<Item = Event> + Unpin,
1038 shutdown::SourceShutdownCoordinator,
1039 ) {
1040 let (tx, rx) = SourceSender::new_test_finalize(status);
1041 let config = PubsubConfig {
1042 project: PROJECT.into(),
1043 subscription: self.subscription.clone(),
1044 endpoint: gcp::PUBSUB_ADDRESS.clone(),
1045 auth: GcpAuthConfig {
1046 skip_authentication: true,
1047 ..Default::default()
1048 },
1049 ack_deadline_secs: *ACK_DEADLINE,
1050 ..Default::default()
1051 };
1052 let (mut ctx, shutdown) = SourceContext::new_shutdown(&self.component, tx);
1053 ctx.acknowledgements = true;
1054 let source = config.build(ctx).await.expect("Failed to build source");
1055 tokio::spawn(async move { source.await.expect("Failed to run source") });
1056
1057 (rx, shutdown)
1058 }
1059
1060 async fn send_test_events(
1061 &self,
1062 count: usize,
1063 attributes: BTreeMap<String, String>,
1064 ) -> TestData {
1065 let start = now_trunc();
1066 let lines: Vec<_> = test_util::random_lines(44).take(count).collect();
1067 let messages: Vec<_> = lines
1068 .iter()
1069 .map(|input| BASE64_STANDARD.encode(input))
1070 .map(|data| json!({ "data": data, "attributes": attributes.clone() }))
1071 .collect();
1072 let body = json!({ "messages": messages });
1073 self.request(Method::POST, "topics/{topic}:publish", body)
1074 .await;
1075
1076 TestData {
1077 lines,
1078 start,
1079 attributes,
1080 }
1081 }
1082
1083 async fn pull_count(&self, count: usize) -> usize {
1084 let response = self
1085 .request(
1086 Method::POST,
1087 "subscriptions/{sub}:pull",
1088 json!({ "maxMessages": count, "returnImmediately": true }),
1089 )
1090 .await;
1091 response
1092 .get("receivedMessages")
1093 .map(|rm| rm.as_array().unwrap().len())
1094 .unwrap_or(0)
1095 }
1096
1097 async fn request(&self, method: Method, base: &str, body: Value) -> Value {
1098 let path = base
1099 .replace("{topic}", &self.topic)
1100 .replace("{sub}", &self.subscription);
1101 let uri = [PROJECT_URI.as_str(), &path].join("/");
1102 let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
1103 let request = Request::builder()
1104 .method(method)
1105 .uri(uri)
1106 .body(body.into())
1107 .unwrap();
1108 let response = self.client.send(request).await.unwrap();
1109 assert_eq!(response.status(), StatusCode::OK);
1110 let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
1111 serde_json::from_str(core::str::from_utf8(&body).unwrap()).unwrap()
1112 }
1113
1114 async fn shutdown_check(&self, shutdown: shutdown::SourceShutdownCoordinator) {
1115 self.shutdown(shutdown).await;
1116 components::SOURCE_TESTS.assert(&components::HTTP_PULL_SOURCE_TAGS);
1117 }
1118
1119 async fn shutdown(&self, mut shutdown: shutdown::SourceShutdownCoordinator) {
1120 let deadline = Instant::now() + Duration::from_secs(1);
1121 let shutdown = shutdown.shutdown_source(&self.component, deadline);
1122 assert!(shutdown.await);
1123 }
1124 }
1125
1126 async fn receive_events(rx: &mut (impl Stream<Item = Event> + Unpin), test_data: TestData) {
1127 let TestData {
1128 start,
1129 lines,
1130 attributes,
1131 } = test_data;
1132
1133 let events: Vec<Event> = tokio::time::timeout(
1134 Duration::from_secs(1),
1135 test_util::collect_n_stream(rx, lines.len()),
1136 )
1137 .await
1138 .unwrap();
1139
1140 let end = Utc::now();
1141 let mut message_ids = HashSet::new();
1142
1143 assert_eq!(events.len(), lines.len());
1144 for (message, event) in lines.into_iter().zip(events) {
1145 let log = event.into_log();
1146 assert_eq!(log.get("message"), Some(&message.into()));
1147 assert_eq!(log.get("source_type"), Some(&"gcp_pubsub".into()));
1148 assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() >= &start);
1149 assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() <= &end);
1150 assert!(
1151 message_ids.insert(log.get("message_id").unwrap().clone().to_string()),
1152 "Message contained duplicate message_id"
1153 );
1154 let logattr = log
1155 .get("attributes")
1156 .expect("missing attributes")
1157 .as_object()
1158 .unwrap()
1159 .clone();
1160 assert_eq!(logattr.len(), attributes.len());
1161 for (a, b) in logattr.into_iter().zip(&attributes) {
1162 assert_eq!(&a.0, b.0.as_str());
1163 assert_eq!(a.1, b.1.clone().into());
1164 }
1165 }
1166 }
1167}