1use std::{
2 error::Error as _,
3 future::Future,
4 pin::Pin,
5 sync::{
6 Arc, LazyLock,
7 atomic::{AtomicBool, AtomicUsize, Ordering},
8 },
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use chrono::DateTime;
14use derivative::Derivative;
15use futures::{FutureExt, Stream, StreamExt, TryFutureExt, stream, stream::FuturesUnordered};
16use http::uri::{InvalidUri, Scheme, Uri};
17use serde_with::serde_as;
18use snafu::{ResultExt, Snafu};
19use tokio::sync::{mpsc, watch};
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::{
22 Code, Request, Status,
23 metadata::MetadataValue,
24 transport::{Certificate, ClientTlsConfig, Endpoint, Identity},
25};
26use vector_lib::{
27 byte_size_of::ByteSizeOf,
28 codecs::decoding::{DeserializerConfig, FramingConfig},
29 config::{LegacyKey, LogNamespace},
30 configurable::configurable_component,
31 finalizer::UnorderedFinalizer,
32 internal_event::{
33 ByteSize, BytesReceived, EventsReceived, InternalEventHandle as _, Protocol, Registered,
34 },
35 lookup::owned_value_path,
36};
37use vrl::{
38 path,
39 value::{Kind, kind::Collection},
40};
41
42use crate::{
43 SourceSender,
44 codecs::{Decoder, DecodingConfig},
45 config::{DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
46 event::{BatchNotifier, BatchStatus, Event, MaybeAsLogMut, Value},
47 gcp::{GcpAuthConfig, GcpAuthenticator, PUBSUB_URL, Scope},
48 internal_events::{
49 GcpPubsubConnectError, GcpPubsubReceiveError, GcpPubsubStreamingPullError,
50 StreamClosedError,
51 },
52 serde::{bool_or_struct, default_decoding, default_framing_message_based},
53 shutdown::ShutdownSignal,
54 sources::util,
55 tls::{TlsConfig, TlsSettings},
56};
57
58const MIN_ACK_DEADLINE_SECS: u64 = 10;
59const MAX_ACK_DEADLINE_SECS: u64 = 600;
60
61const ACK_QUEUE_SIZE: usize = 8;
68
69type Finalizer = UnorderedFinalizer<Vec<String>>;
70
71#[allow(clippy::clone_on_ref_ptr)]
75#[allow(clippy::missing_const_for_fn)]
77#[allow(warnings)]
78mod proto {
79 include!(concat!(env!("OUT_DIR"), "/google.pubsub.v1.rs"));
80
81 use vector_lib::ByteSizeOf;
82
83 impl ByteSizeOf for StreamingPullResponse {
84 fn allocated_bytes(&self) -> usize {
85 self.received_messages.size_of()
86 }
87 }
88
89 impl ByteSizeOf for ReceivedMessage {
90 fn allocated_bytes(&self) -> usize {
91 self.ack_id.size_of() + self.message.as_ref().map_or(0, ByteSizeOf::size_of)
92 }
93 }
94
95 impl ByteSizeOf for PubsubMessage {
96 fn allocated_bytes(&self) -> usize {
97 self.data.len()
98 + self.message_id.len()
99 + self.ordering_key.len()
100 + self.attributes.size_of()
101 }
102 }
103}
104
105#[derive(Debug, Snafu)]
106pub(crate) enum PubsubError {
107 #[snafu(display("Invalid endpoint URI: {}", source))]
108 Uri { source: InvalidUri },
109 #[snafu(display("Could not create endpoint: {}", source))]
110 Endpoint { source: tonic::transport::Error },
111 #[snafu(display("Could not set up endpoint TLS settings: {}", source))]
112 EndpointTls { source: tonic::transport::Error },
113 #[snafu(display(
114 "`ack_deadline_secs` is outside the valid range of {} to {}",
115 MIN_ACK_DEADLINE_SECS,
116 MAX_ACK_DEADLINE_SECS
117 ))]
118 InvalidAckDeadline,
119}
120
121static CLIENT_ID: LazyLock<String> = LazyLock::new(|| uuid::Uuid::new_v4().to_string());
122
123#[serde_as]
125#[configurable_component(source(
126 "gcp_pubsub",
127 "Fetch observability events from GCP's Pub/Sub messaging system."
128))]
129#[derive(Clone, Debug, Derivative)]
130#[derivative(Default)]
131#[serde(deny_unknown_fields)]
132pub struct PubsubConfig {
133 #[configurable(metadata(docs::examples = "my-log-source-project"))]
135 pub project: String,
136
137 #[configurable(metadata(docs::examples = "my-vector-source-subscription"))]
139 pub subscription: String,
140
141 #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
143 #[serde(default = "default_endpoint")]
144 pub endpoint: String,
145
146 #[serde(flatten)]
147 pub auth: GcpAuthConfig,
148
149 #[configurable(derived)]
150 pub tls: Option<TlsConfig>,
151
152 #[serde(default = "default_max_concurrency")]
154 pub max_concurrency: usize,
155
156 #[serde(default = "default_full_response")]
163 pub full_response_size: usize,
164
165 #[serde(default = "default_poll_time")]
168 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
169 #[configurable(metadata(docs::human_name = "Poll Time"))]
170 pub poll_time_seconds: Duration,
171
172 #[serde(default = "default_ack_deadline")]
176 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
177 #[configurable(metadata(docs::human_name = "Acknowledgement Deadline"))]
178 pub ack_deadline_secs: Duration,
179
180 #[configurable(
184 deprecated = "This option has been deprecated, use `ack_deadline_secs` instead."
185 )]
186 pub ack_deadline_seconds: Option<u16>,
187
188 #[serde(default = "default_retry_delay")]
190 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
191 #[configurable(metadata(docs::human_name = "Retry Delay"))]
192 pub retry_delay_secs: Duration,
193
194 #[configurable(
196 deprecated = "This option has been deprecated, use `retry_delay_secs` instead."
197 )]
198 pub retry_delay_seconds: Option<f64>,
199
200 #[serde(default = "default_keepalive")]
204 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
205 #[configurable(metadata(docs::human_name = "Keepalive"))]
206 pub keepalive_secs: Duration,
207
208 #[configurable(metadata(docs::hidden))]
210 #[serde(default)]
211 pub log_namespace: Option<bool>,
212
213 #[configurable(derived)]
214 #[serde(default = "default_framing_message_based")]
215 #[derivative(Default(value = "default_framing_message_based()"))]
216 pub framing: FramingConfig,
217
218 #[configurable(derived)]
219 #[serde(default = "default_decoding")]
220 #[derivative(Default(value = "default_decoding()"))]
221 pub decoding: DeserializerConfig,
222
223 #[configurable(derived)]
224 #[serde(default, deserialize_with = "bool_or_struct")]
225 pub acknowledgements: SourceAcknowledgementsConfig,
226}
227
228fn default_endpoint() -> String {
229 PUBSUB_URL.to_string()
230}
231
232const fn default_ack_deadline() -> Duration {
233 Duration::from_secs(600)
234}
235
236const fn default_retry_delay() -> Duration {
237 Duration::from_secs(1)
238}
239
240const fn default_keepalive() -> Duration {
241 Duration::from_secs(60)
242}
243
244const fn default_max_concurrency() -> usize {
245 10
246}
247
248const fn default_full_response() -> usize {
249 100
250}
251
252const fn default_poll_time() -> Duration {
253 Duration::from_secs(2)
254}
255
256#[async_trait::async_trait]
257#[typetag::serde(name = "gcp_pubsub")]
258impl SourceConfig for PubsubConfig {
259 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
260 let log_namespace = cx.log_namespace(self.log_namespace);
261 let ack_deadline_secs = match self.ack_deadline_seconds {
262 None => self.ack_deadline_secs,
263 Some(ads) => {
264 warn!(
265 "The `ack_deadline_seconds` setting is deprecated, use `ack_deadline_secs` instead."
266 );
267 Duration::from_secs(ads as u64)
268 }
269 };
270 if !(MIN_ACK_DEADLINE_SECS..=MAX_ACK_DEADLINE_SECS).contains(&ack_deadline_secs.as_secs()) {
271 return Err(PubsubError::InvalidAckDeadline.into());
272 }
273
274 let retry_delay_secs = match self.retry_delay_seconds {
275 None => self.retry_delay_secs,
276 Some(rds) => {
277 warn!(
278 "The `retry_delay_seconds` setting is deprecated, use `retry_delay_secs` instead."
279 );
280 Duration::from_secs_f64(rds)
281 }
282 };
283
284 let auth = self.auth.build(Scope::PubSub).await?;
285
286 let mut uri: Uri = self.endpoint.parse().context(UriSnafu)?;
287 auth.apply_uri(&mut uri);
288
289 let tls = TlsSettings::from_options(self.tls.as_ref())?;
290 let host = uri.host().unwrap_or("pubsub.googleapis.com");
291 let mut tls_config = ClientTlsConfig::new().domain_name(host);
292 if let Some((cert, key)) = tls.identity_pem() {
293 tls_config = tls_config.identity(Identity::from_pem(cert, key));
294 }
295 for authority in tls.authorities_pem() {
296 tls_config = tls_config.ca_certificate(Certificate::from_pem(authority));
297 }
298
299 let mut endpoint: Endpoint = uri.to_string().parse().context(EndpointSnafu)?;
300 if uri.scheme() != Some(&Scheme::HTTP) {
301 endpoint = endpoint.tls_config(tls_config).context(EndpointTlsSnafu)?;
302 }
303
304 let token_generator = auth.spawn_regenerate_token();
305
306 let protocol = uri
307 .scheme()
308 .map(|scheme| Protocol(scheme.to_string().into()))
309 .unwrap_or(Protocol::HTTP);
310
311 let source = PubsubSource {
312 endpoint,
313 auth,
314 token_generator,
315 subscription: format!(
316 "projects/{}/subscriptions/{}",
317 self.project, self.subscription
318 ),
319 decoder: DecodingConfig::new(
320 self.framing.clone(),
321 self.decoding.clone(),
322 log_namespace,
323 )
324 .build()?,
325 acknowledgements: cx.do_acknowledgements(self.acknowledgements),
326 shutdown: cx.shutdown,
327 out: cx.out,
328 ack_deadline_secs,
329 retry_delay: retry_delay_secs,
330 keepalive: self.keepalive_secs,
331 concurrency: Default::default(),
332 full_response_size: self.full_response_size,
333 log_namespace,
334 bytes_received: register!(BytesReceived::from(protocol)),
335 events_received: register!(EventsReceived),
336 }
337 .run_all(self.max_concurrency, self.poll_time_seconds)
338 .map_err(|error| error!(message = "Source failed.", %error));
339 Ok(Box::pin(source))
340 }
341
342 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
343 let log_namespace = global_log_namespace.merge(self.log_namespace);
344 let schema_definition = self
345 .decoding
346 .schema_definition(log_namespace)
347 .with_standard_vector_source_metadata()
348 .with_source_metadata(
349 PubsubConfig::NAME,
350 Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
351 &owned_value_path!("timestamp"),
352 Kind::timestamp().or_undefined(),
353 Some("timestamp"),
354 )
355 .with_source_metadata(
356 PubsubConfig::NAME,
357 Some(LegacyKey::Overwrite(owned_value_path!("attributes"))),
358 &owned_value_path!("attributes"),
359 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
360 None,
361 )
362 .with_source_metadata(
363 PubsubConfig::NAME,
364 Some(LegacyKey::Overwrite(owned_value_path!("message_id"))),
365 &owned_value_path!("message_id"),
366 Kind::bytes(),
367 None,
368 );
369
370 vec![SourceOutput::new_maybe_logs(
371 DataType::Log,
372 schema_definition,
373 )]
374 }
375
376 fn can_acknowledge(&self) -> bool {
377 true
378 }
379}
380
381impl_generate_config_from_default!(PubsubConfig);
382
383#[derive(Clone)]
384struct PubsubSource {
385 endpoint: Endpoint,
386 auth: GcpAuthenticator,
387 token_generator: watch::Receiver<()>,
388 subscription: String,
389 decoder: Decoder,
390 acknowledgements: bool,
391 ack_deadline_secs: Duration,
392 shutdown: ShutdownSignal,
393 out: SourceSender,
394 retry_delay: Duration,
395 keepalive: Duration,
396 concurrency: Arc<AtomicUsize>,
400 full_response_size: usize,
401 log_namespace: LogNamespace,
402 bytes_received: Registered<BytesReceived>,
403 events_received: Registered<EventsReceived>,
404}
405
406enum State {
407 RetryNow,
408 RetryDelay,
409 Shutdown,
410}
411
412impl PubsubSource {
413 async fn run_all(mut self, max_concurrency: usize, poll_time: Duration) -> crate::Result<()> {
414 let mut tasks = FuturesUnordered::new();
415
416 loop {
417 self.concurrency.store(tasks.len(), Ordering::Relaxed);
418 tokio::select! {
419 _ = &mut self.shutdown => break,
420 _ = tasks.next() => {
421 if tasks.is_empty() {
422 self.start_one(&tasks);
426 }
427
428 },
429 _ = tokio::time::sleep(poll_time) => {
430 if tasks.len() < max_concurrency
433 && tasks.iter().all(|task| task.busy_flag.load(Ordering::Relaxed))
434 {
435 self.start_one(&tasks);
436 }
437 }
438 }
439 }
440
441 while tasks.next().await.is_some() {}
443
444 Ok(())
445 }
446
447 fn start_one(&self, tasks: &FuturesUnordered<Task>) {
448 info!(message = "Starting stream.", concurrency = tasks.len() + 1);
449 let busy_flag = Arc::new(AtomicBool::new(false));
456 let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag)));
457 tasks.push(Task { task, busy_flag });
458 }
459
460 async fn run(mut self, busy_flag: Arc<AtomicBool>) {
461 loop {
462 match self.run_once(&busy_flag).await {
463 State::RetryNow => debug!("Retrying immediately."),
464 State::RetryDelay => {
465 info!(
466 timeout_secs = self.retry_delay.as_secs_f64(),
467 "Retrying after timeout."
468 );
469 tokio::time::sleep(self.retry_delay).await;
470 }
471 State::Shutdown => break,
472 }
473 }
474 }
475
476 async fn run_once(&mut self, busy_flag: &Arc<AtomicBool>) -> State {
477 let connection = match self.endpoint.connect().await {
478 Ok(connection) => connection,
479 Err(error) => {
480 emit!(GcpPubsubConnectError { error });
481 return State::RetryDelay;
482 }
483 };
484
485 let mut client = proto::subscriber_client::SubscriberClient::with_interceptor(
486 connection,
487 |mut req: Request<()>| {
488 if let Some(token) = self.auth.make_token() {
489 let authorization = MetadataValue::try_from(&token).map_err(|_| {
490 Status::new(
491 Code::FailedPrecondition,
492 "Invalid token text returned by GCP",
493 )
494 })?;
495 req.metadata_mut().insert("authorization", authorization);
496 }
497 Ok(req)
498 },
499 )
500 .max_decoding_message_size(usize::MAX);
502
503 let (ack_ids_sender, ack_ids_receiver) = mpsc::channel(ACK_QUEUE_SIZE);
504
505 let request_stream = self.request_stream(ack_ids_receiver);
508 debug!("Starting streaming pull.");
509 let stream = tokio::select! {
510 _ = &mut self.shutdown => return State::Shutdown,
511 result = client.streaming_pull(request_stream) => match result {
512 Ok(stream) => stream,
513 Err(error) => {
514 emit!(GcpPubsubStreamingPullError { error });
515 return State::RetryDelay;
516 }
517 }
518 };
519 let mut stream = stream.into_inner();
520
521 let (finalizer, mut ack_stream) =
522 Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone()));
523 let mut pending_acks = 0;
524
525 loop {
526 tokio::select! {
527 biased;
528 receipts = ack_stream.next() => if let Some((status, receipts)) = receipts {
529 pending_acks -= 1;
530 if status == BatchStatus::Delivered {
531 ack_ids_sender
532 .send(receipts)
533 .await
534 .unwrap_or_else(|_| unreachable!("request stream never closes"));
535 }
536 },
537 response = stream.next() => match response {
538 Some(Ok(response)) => {
539 self.handle_response(
540 response,
541 &finalizer,
542 &ack_ids_sender,
543 &mut pending_acks,
544 busy_flag,
545 ).await;
546 }
547 Some(Err(error)) => break translate_error(error),
548 None => break State::RetryNow,
549 },
550 _ = &mut self.shutdown, if pending_acks == 0 => return State::Shutdown,
551 _ = self.token_generator.changed() => {
552 debug!("New authentication token generated, restarting stream.");
553 break State::RetryNow;
554 },
555 _ = tokio::time::sleep(self.keepalive) => {
556 if pending_acks == 0 {
557 if self.concurrency.load(Ordering::Relaxed) > 1 {
561 info!("Shutting down inactive stream.");
562 break State::Shutdown;
563 }
564 busy_flag.store(false, Ordering::Relaxed);
566 }
567 ack_ids_sender
575 .send(Vec::new())
576 .await
577 .unwrap_or_else(|_| unreachable!("request stream never closes"));
578 }
579 }
580 }
581 }
582
583 fn request_stream(
584 &self,
585 ack_ids: mpsc::Receiver<Vec<String>>,
586 ) -> impl Stream<Item = proto::StreamingPullRequest> + 'static + use<> {
587 let subscription = self.subscription.clone();
588 let client_id = CLIENT_ID.clone();
589 let stream_ack_deadline_seconds = self.ack_deadline_secs.as_secs() as i32;
590 let ack_ids = ReceiverStream::new(ack_ids).ready_chunks(ACK_QUEUE_SIZE);
591
592 stream::once(async move {
593 proto::StreamingPullRequest {
596 subscription,
597 client_id,
598 stream_ack_deadline_seconds,
599 ..Default::default()
600 }
601 })
602 .chain(ack_ids.map(|chunks| {
603 proto::StreamingPullRequest {
609 ack_ids: chunks.into_iter().flatten().collect(),
610 ..Default::default()
611 }
612 }))
613 }
614
615 async fn handle_response(
616 &mut self,
617 response: proto::StreamingPullResponse,
618 finalizer: &Option<Finalizer>,
619 ack_ids: &mpsc::Sender<Vec<String>>,
620 pending_acks: &mut usize,
621 busy_flag: &Arc<AtomicBool>,
622 ) {
623 if response.received_messages.len() >= self.full_response_size {
624 busy_flag.store(true, Ordering::Relaxed);
625 }
626 self.bytes_received.emit(ByteSize(response.size_of()));
627
628 let (batch, notifier) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
629 let (events, ids) = self.parse_messages(response.received_messages, batch).await;
630
631 let count = events.len();
632 match self.out.send_batch(events).await {
633 Err(_) => emit!(StreamClosedError { count }),
634 Ok(()) => match notifier {
635 None => ack_ids
636 .send(ids)
637 .await
638 .unwrap_or_else(|_| unreachable!("request stream never closes")),
639 Some(notifier) => {
640 finalizer
641 .as_ref()
642 .expect("Finalizer must have been set up for acknowledgements")
643 .add(ids, notifier);
644 *pending_acks += 1;
645 }
646 },
647 }
648 }
649
650 async fn parse_messages(
651 &self,
652 response: Vec<proto::ReceivedMessage>,
653 batch: Option<BatchNotifier>,
654 ) -> (Vec<Event>, Vec<String>) {
655 let mut ack_ids = Vec::with_capacity(response.len());
656 let events = response
657 .into_iter()
658 .flat_map(|received| {
659 ack_ids.push(received.ack_id);
660 received
661 .message
662 .map(|message| self.parse_message(message, &batch))
663 })
664 .flatten()
665 .collect();
666 (events, ack_ids)
667 }
668
669 fn parse_message<'a>(
670 &'a self,
671 message: proto::PubsubMessage,
672 batch: &'a Option<BatchNotifier>,
673 ) -> impl Iterator<Item = Event> + 'a {
674 let attributes = Value::Object(
675 message
676 .attributes
677 .into_iter()
678 .map(|(key, value)| (key.into(), Value::Bytes(value.into())))
679 .collect(),
680 );
681 let log_namespace = self.log_namespace;
682 util::decode_message(
683 self.decoder.clone(),
684 "gcp_pubsub",
685 &message.data,
686 message.publish_time.map(|dt| {
687 DateTime::from_timestamp(dt.seconds, dt.nanos as u32).expect("invalid timestamp")
688 }),
689 batch,
690 log_namespace,
691 &self.events_received,
692 )
693 .map(move |mut event| {
694 if let Some(log) = event.maybe_as_log_mut() {
695 log_namespace.insert_source_metadata(
696 PubsubConfig::NAME,
697 log,
698 Some(LegacyKey::Overwrite(path!("message_id"))),
699 path!("message_id"),
700 message.message_id.clone(),
701 );
702 log_namespace.insert_source_metadata(
703 PubsubConfig::NAME,
704 log,
705 Some(LegacyKey::Overwrite(path!("attributes"))),
706 path!("attributes"),
707 attributes.clone(),
708 )
709 }
710 event
711 })
712 }
713}
714
715fn translate_error(error: tonic::Status) -> State {
716 if is_reset(&error) {
723 debug!("Stream reset by server.");
724 State::RetryNow
725 } else {
726 emit!(GcpPubsubReceiveError { error });
727 State::RetryDelay
728 }
729}
730
731fn is_reset(error: &Status) -> bool {
732 error
733 .source()
734 .and_then(|source| source.downcast_ref::<hyper::Error>())
735 .and_then(|error| error.source())
736 .and_then(|source| source.downcast_ref::<h2::Error>())
737 .is_some_and(|error| error.is_remote() && error.is_reset())
738}
739
740#[pin_project::pin_project]
741struct Task {
742 task: tokio::task::JoinHandle<()>,
743 busy_flag: Arc<AtomicBool>,
744}
745
746impl Future for Task {
747 type Output = Result<(), tokio::task::JoinError>;
748
749 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
750 self.task.poll_unpin(ctx)
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
757
758 use super::*;
759
760 #[test]
761 fn generate_config() {
762 crate::test_util::test_generate_config::<PubsubConfig>();
763 }
764
765 #[test]
766 fn output_schema_definition_vector_namespace() {
767 let config = PubsubConfig {
768 log_namespace: Some(true),
769 ..Default::default()
770 };
771
772 let definitions = config
773 .outputs(LogNamespace::Vector)
774 .remove(0)
775 .schema_definition(true);
776
777 let expected_definition =
778 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
779 .with_meaning(OwnedTargetPath::event_root(), "message")
780 .with_metadata_field(
781 &owned_value_path!("vector", "source_type"),
782 Kind::bytes(),
783 None,
784 )
785 .with_metadata_field(
786 &owned_value_path!("vector", "ingest_timestamp"),
787 Kind::timestamp(),
788 None,
789 )
790 .with_metadata_field(
791 &owned_value_path!("gcp_pubsub", "timestamp"),
792 Kind::timestamp().or_undefined(),
793 Some("timestamp"),
794 )
795 .with_metadata_field(
796 &owned_value_path!("gcp_pubsub", "attributes"),
797 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
798 None,
799 )
800 .with_metadata_field(
801 &owned_value_path!("gcp_pubsub", "message_id"),
802 Kind::bytes(),
803 None,
804 );
805
806 assert_eq!(definitions, Some(expected_definition));
807 }
808
809 #[test]
810 fn output_schema_definition_legacy_namespace() {
811 let config = PubsubConfig::default();
812
813 let definitions = config
814 .outputs(LogNamespace::Legacy)
815 .remove(0)
816 .schema_definition(true);
817
818 let expected_definition = Definition::new_with_default_metadata(
819 Kind::object(Collection::empty()),
820 [LogNamespace::Legacy],
821 )
822 .with_event_field(
823 &owned_value_path!("message"),
824 Kind::bytes(),
825 Some("message"),
826 )
827 .with_event_field(
828 &owned_value_path!("timestamp"),
829 Kind::timestamp().or_undefined(),
830 Some("timestamp"),
831 )
832 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
833 .with_event_field(
834 &owned_value_path!("attributes"),
835 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
836 None,
837 )
838 .with_event_field(&owned_value_path!("message_id"), Kind::bytes(), None);
839
840 assert_eq!(definitions, Some(expected_definition));
841 }
842}
843
844#[cfg(all(test, feature = "gcp-integration-tests"))]
845mod integration_tests {
846 use std::{
847 collections::{BTreeMap, HashSet},
848 sync::LazyLock,
849 };
850
851 use base64::prelude::{BASE64_STANDARD, Engine as _};
852 use chrono::{DateTime, Utc};
853 use futures::{Stream, StreamExt};
854 use http::method::Method;
855 use hyper::{Request, StatusCode};
856 use serde_json::{Value, json};
857 use tokio::time::{Duration, Instant};
858 use vrl::btreemap;
859
860 use super::*;
861 use crate::{
862 SourceSender,
863 config::{ComponentKey, ProxyConfig},
864 event::EventStatus,
865 gcp,
866 http::HttpClient,
867 shutdown,
868 test_util::{
869 self, components,
870 components::{SOURCE_TAGS, assert_source_compliance},
871 random_string,
872 },
873 };
874
875 const PROJECT: &str = "sourceproject";
876 static PROJECT_URI: LazyLock<String> =
877 LazyLock::new(|| format!("{}/v1/projects/{}", *gcp::PUBSUB_ADDRESS, PROJECT));
878 static ACK_DEADLINE: LazyLock<Duration> = LazyLock::new(|| Duration::from_secs(10)); #[tokio::test]
881 async fn oneshot() {
882 assert_source_compliance(&SOURCE_TAGS, async move {
883 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
884 let test_data = tester.send_test_events(99, BTreeMap::new()).await;
885 receive_events(&mut rx, test_data).await;
886 tester.shutdown_check(shutdown).await;
887 })
888 .await;
889 }
890
891 #[tokio::test]
892 async fn shuts_down_before_data_received() {
893 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
894
895 tester.shutdown(shutdown).await; assert!(rx.next().await.is_none());
898 tester.send_test_events(1, BTreeMap::new()).await;
899 assert!(rx.next().await.is_none());
900 assert_eq!(tester.pull_count(1).await, 1);
901 }
902
903 #[tokio::test]
904 async fn shuts_down_after_data_received() {
905 assert_source_compliance(&SOURCE_TAGS, async move {
906 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
907
908 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
909 receive_events(&mut rx, test_data).await;
910
911 tester.shutdown_check(shutdown).await;
912
913 assert!(rx.next().await.is_none());
914 tester.send_test_events(1, BTreeMap::new()).await;
915 assert!(rx.next().await.is_none());
916 })
923 .await;
924 }
925
926 #[tokio::test]
927 async fn streams_data() {
928 assert_source_compliance(&SOURCE_TAGS, async move {
929 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
930 for _ in 0..10 {
931 let test_data = tester.send_test_events(9, BTreeMap::new()).await;
932 receive_events(&mut rx, test_data).await;
933 }
934 tester.shutdown_check(shutdown).await;
935 })
936 .await;
937 }
938
939 #[tokio::test]
940 async fn sends_attributes() {
941 assert_source_compliance(&SOURCE_TAGS, async move {
942 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
943 let attributes = btreemap![
944 random_string(8) => random_string(88),
945 random_string(8) => random_string(88),
946 random_string(8) => random_string(88),
947 ];
948 let test_data = tester.send_test_events(1, attributes).await;
949 receive_events(&mut rx, test_data).await;
950 tester.shutdown_check(shutdown).await;
951 })
952 .await;
953 }
954
955 #[tokio::test]
956 async fn acks_received() {
957 assert_source_compliance(&SOURCE_TAGS, async move {
958 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
959
960 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
961 receive_events(&mut rx, test_data).await;
962
963 tester.shutdown_check(shutdown).await;
964
965 assert_eq!(tester.pull_count(10).await, 0);
967
968 tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
970
971 assert_eq!(tester.pull_count(10).await, 0);
973 })
974 .await;
975 }
976
977 #[tokio::test]
978 #[ignore]
982 async fn does_not_ack_rejected() {
983 assert_source_compliance(&SOURCE_TAGS, async {
984 let (tester, mut rx, shutdown) = setup(EventStatus::Rejected).await;
985
986 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
987 receive_events(&mut rx, test_data).await;
988
989 tester.shutdown(shutdown).await;
990
991 assert_eq!(tester.pull_count(10).await, 0);
993
994 tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
996
997 assert_eq!(tester.pull_count(10).await, 1);
999 })
1000 .await;
1001 }
1002
1003 async fn setup(
1004 status: EventStatus,
1005 ) -> (
1006 Tester,
1007 impl Stream<Item = Event> + Unpin,
1008 shutdown::SourceShutdownCoordinator,
1009 ) {
1010 components::init_test();
1011
1012 let tls_settings = TlsSettings::from_options(None).unwrap();
1013 let client = HttpClient::new(tls_settings, &ProxyConfig::default()).unwrap();
1014 let tester = Tester::new(client).await;
1015
1016 let (rx, shutdown) = tester.spawn_source(status).await;
1017
1018 (tester, rx, shutdown)
1019 }
1020
1021 fn now_trunc() -> DateTime<Utc> {
1022 let start = Utc::now().timestamp();
1023 DateTime::from_timestamp(start, 0).expect("invalid timestamp")
1025 }
1026
1027 struct Tester {
1028 client: HttpClient,
1029 topic: String,
1030 subscription: String,
1031 component: ComponentKey,
1032 }
1033
1034 struct TestData {
1035 lines: Vec<String>,
1036 start: DateTime<Utc>,
1037 attributes: BTreeMap<String, String>,
1038 }
1039
1040 impl Tester {
1041 async fn new(client: HttpClient) -> Self {
1042 let this = Self {
1043 client,
1044 topic: format!("topic-{}", random_string(10).to_lowercase()),
1045 subscription: format!("sub-{}", random_string(10).to_lowercase()),
1046 component: ComponentKey::from("gcp_pubsub"),
1047 };
1048
1049 this.request(Method::PUT, "topics/{topic}", json!({})).await;
1050
1051 let body = json!({
1052 "topic": format!("projects/{}/topics/{}", PROJECT, this.topic),
1053 "ackDeadlineSeconds": *ACK_DEADLINE,
1054 });
1055 this.request(Method::PUT, "subscriptions/{sub}", body).await;
1056
1057 this
1058 }
1059
1060 async fn spawn_source(
1061 &self,
1062 status: EventStatus,
1063 ) -> (
1064 impl Stream<Item = Event> + Unpin + use<>,
1065 shutdown::SourceShutdownCoordinator,
1066 ) {
1067 let (tx, rx) = SourceSender::new_test_finalize(status);
1068 let config = PubsubConfig {
1069 project: PROJECT.into(),
1070 subscription: self.subscription.clone(),
1071 endpoint: gcp::PUBSUB_ADDRESS.clone(),
1072 auth: GcpAuthConfig {
1073 skip_authentication: true,
1074 ..Default::default()
1075 },
1076 ack_deadline_secs: *ACK_DEADLINE,
1077 ..Default::default()
1078 };
1079 let (mut ctx, shutdown) = SourceContext::new_shutdown(&self.component, tx);
1080 ctx.acknowledgements = true;
1081 let source = config.build(ctx).await.expect("Failed to build source");
1082 tokio::spawn(async move { source.await.expect("Failed to run source") });
1083
1084 (rx, shutdown)
1085 }
1086
1087 async fn send_test_events(
1088 &self,
1089 count: usize,
1090 attributes: BTreeMap<String, String>,
1091 ) -> TestData {
1092 let start = now_trunc();
1093 let lines: Vec<_> = test_util::random_lines(44).take(count).collect();
1094 let messages: Vec<_> = lines
1095 .iter()
1096 .map(|input| BASE64_STANDARD.encode(input))
1097 .map(|data| json!({ "data": data, "attributes": attributes.clone() }))
1098 .collect();
1099 let body = json!({ "messages": messages });
1100 self.request(Method::POST, "topics/{topic}:publish", body)
1101 .await;
1102
1103 TestData {
1104 lines,
1105 start,
1106 attributes,
1107 }
1108 }
1109
1110 async fn pull_count(&self, count: usize) -> usize {
1111 let response = self
1112 .request(
1113 Method::POST,
1114 "subscriptions/{sub}:pull",
1115 json!({ "maxMessages": count, "returnImmediately": true }),
1116 )
1117 .await;
1118 response
1119 .get("receivedMessages")
1120 .map(|rm| rm.as_array().unwrap().len())
1121 .unwrap_or(0)
1122 }
1123
1124 async fn request(&self, method: Method, base: &str, body: Value) -> Value {
1125 let path = base
1126 .replace("{topic}", &self.topic)
1127 .replace("{sub}", &self.subscription);
1128 let uri = [PROJECT_URI.as_str(), &path].join("/");
1129 let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
1130 let request = Request::builder()
1131 .method(method)
1132 .uri(uri)
1133 .body(body.into())
1134 .unwrap();
1135 let response = self.client.send(request).await.unwrap();
1136 assert_eq!(response.status(), StatusCode::OK);
1137 let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
1138 serde_json::from_str(core::str::from_utf8(&body).unwrap()).unwrap()
1139 }
1140
1141 async fn shutdown_check(&self, shutdown: shutdown::SourceShutdownCoordinator) {
1142 self.shutdown(shutdown).await;
1143 components::SOURCE_TESTS.assert(&components::HTTP_PULL_SOURCE_TAGS);
1144 }
1145
1146 async fn shutdown(&self, mut shutdown: shutdown::SourceShutdownCoordinator) {
1147 let deadline = Instant::now() + Duration::from_secs(1);
1148 let shutdown = shutdown.shutdown_source(&self.component, deadline);
1149 assert!(shutdown.await);
1150 }
1151 }
1152
1153 async fn receive_events(rx: &mut (impl Stream<Item = Event> + Unpin), test_data: TestData) {
1154 let TestData {
1155 start,
1156 lines,
1157 attributes,
1158 } = test_data;
1159
1160 let events: Vec<Event> = tokio::time::timeout(
1161 Duration::from_secs(1),
1162 test_util::collect_n_stream(rx, lines.len()),
1163 )
1164 .await
1165 .unwrap();
1166
1167 let end = Utc::now();
1168 let mut message_ids = HashSet::new();
1169
1170 assert_eq!(events.len(), lines.len());
1171 for (message, event) in lines.into_iter().zip(events) {
1172 let log = event.into_log();
1173 assert_eq!(log.get("message"), Some(&message.into()));
1174 assert_eq!(log.get("source_type"), Some(&"gcp_pubsub".into()));
1175 assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() >= &start);
1176 assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() <= &end);
1177 assert!(
1178 message_ids.insert(log.get("message_id").unwrap().clone().to_string()),
1179 "Message contained duplicate message_id"
1180 );
1181 let logattr = log
1182 .get("attributes")
1183 .expect("missing attributes")
1184 .as_object()
1185 .unwrap()
1186 .clone();
1187 assert_eq!(logattr.len(), attributes.len());
1188 for (a, b) in logattr.into_iter().zip(&attributes) {
1189 assert_eq!(&a.0, b.0.as_str());
1190 assert_eq!(a.1, b.1.clone().into());
1191 }
1192 }
1193 }
1194}