1use std::{
2 error::Error as _,
3 future::Future,
4 pin::Pin,
5 sync::{
6 Arc, LazyLock,
7 atomic::{AtomicBool, AtomicUsize, Ordering},
8 },
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use chrono::DateTime;
14use derivative::Derivative;
15use futures::{FutureExt, Stream, StreamExt, TryFutureExt, stream, stream::FuturesUnordered};
16use http::uri::{InvalidUri, Scheme, Uri};
17use serde_with::serde_as;
18use snafu::{ResultExt, Snafu};
19use tokio::sync::{mpsc, watch};
20use tokio_stream::wrappers::ReceiverStream;
21use tonic::{
22 Code, Request, Status,
23 metadata::MetadataValue,
24 transport::{Certificate, ClientTlsConfig, Endpoint, Identity},
25};
26use vector_lib::{
27 byte_size_of::ByteSizeOf,
28 codecs::decoding::{DeserializerConfig, FramingConfig},
29 config::{LegacyKey, LogNamespace},
30 configurable::configurable_component,
31 finalizer::UnorderedFinalizer,
32 internal_event::{
33 ByteSize, BytesReceived, EventsReceived, InternalEventHandle as _, Protocol, Registered,
34 },
35 lookup::owned_value_path,
36};
37use vrl::{
38 path,
39 value::{Kind, kind::Collection},
40};
41
42use crate::{
43 SourceSender,
44 codecs::{Decoder, DecodingConfig},
45 config::{DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
46 event::{BatchNotifier, BatchStatus, Event, MaybeAsLogMut, Value},
47 gcp::{GcpAuthConfig, GcpAuthenticator, PUBSUB_URL, Scope},
48 internal_events::{
49 GcpPubsubConnectError, GcpPubsubReceiveError, GcpPubsubStreamingPullError,
50 StreamClosedError,
51 },
52 serde::{bool_or_struct, default_decoding, default_framing_message_based},
53 shutdown::ShutdownSignal,
54 sources::util,
55 tls::{TlsConfig, TlsSettings},
56};
57
58const MIN_ACK_DEADLINE_SECS: u64 = 10;
59const MAX_ACK_DEADLINE_SECS: u64 = 600;
60
61const ACK_QUEUE_SIZE: usize = 8;
68
69type Finalizer = UnorderedFinalizer<Vec<String>>;
70
71#[allow(clippy::clone_on_ref_ptr)]
75#[allow(clippy::missing_const_for_fn)]
77#[allow(warnings)]
78mod proto {
79 include!(concat!(env!("OUT_DIR"), "/google.pubsub.v1.rs"));
80
81 use vector_lib::ByteSizeOf;
82
83 impl ByteSizeOf for StreamingPullResponse {
84 fn allocated_bytes(&self) -> usize {
85 self.received_messages.size_of()
86 }
87 }
88
89 impl ByteSizeOf for ReceivedMessage {
90 fn allocated_bytes(&self) -> usize {
91 self.ack_id.size_of() + self.message.as_ref().map_or(0, ByteSizeOf::size_of)
92 }
93 }
94
95 impl ByteSizeOf for PubsubMessage {
96 fn allocated_bytes(&self) -> usize {
97 self.data.len()
98 + self.message_id.len()
99 + self.ordering_key.len()
100 + self.attributes.size_of()
101 }
102 }
103}
104
105#[derive(Debug, Snafu)]
106pub(crate) enum PubsubError {
107 #[snafu(display("Invalid endpoint URI: {}", source))]
108 Uri { source: InvalidUri },
109 #[snafu(display("Could not create endpoint: {}", source))]
110 Endpoint { source: tonic::transport::Error },
111 #[snafu(display("Could not set up endpoint TLS settings: {}", source))]
112 EndpointTls { source: tonic::transport::Error },
113 #[snafu(display(
114 "`ack_deadline_secs` is outside the valid range of {} to {}",
115 MIN_ACK_DEADLINE_SECS,
116 MAX_ACK_DEADLINE_SECS
117 ))]
118 InvalidAckDeadline,
119}
120
121static CLIENT_ID: LazyLock<String> = LazyLock::new(|| uuid::Uuid::new_v4().to_string());
122
123#[serde_as]
125#[configurable_component(source(
126 "gcp_pubsub",
127 "Fetch observability events from GCP's Pub/Sub messaging system."
128))]
129#[derive(Clone, Debug, Derivative)]
130#[derivative(Default)]
131#[serde(deny_unknown_fields)]
132pub struct PubsubConfig {
133 #[configurable(metadata(docs::examples = "my-log-source-project"))]
135 pub project: String,
136
137 #[configurable(metadata(docs::examples = "my-vector-source-subscription"))]
139 pub subscription: String,
140
141 #[configurable(metadata(docs::examples = "https://us-central1-pubsub.googleapis.com"))]
143 #[serde(default = "default_endpoint")]
144 pub endpoint: String,
145
146 #[serde(flatten)]
147 pub auth: GcpAuthConfig,
148
149 #[configurable(derived)]
150 pub tls: Option<TlsConfig>,
151
152 #[serde(default = "default_max_concurrency")]
154 pub max_concurrency: usize,
155
156 #[serde(default = "default_full_response")]
163 pub full_response_size: usize,
164
165 #[serde(default = "default_poll_time")]
168 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
169 #[configurable(metadata(docs::human_name = "Poll Time"))]
170 pub poll_time_seconds: Duration,
171
172 #[serde(default = "default_ack_deadline")]
176 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
177 #[configurable(metadata(docs::human_name = "Acknowledgement Deadline"))]
178 pub ack_deadline_secs: Duration,
179
180 #[configurable(
184 deprecated = "This option has been deprecated, use `ack_deadline_secs` instead."
185 )]
186 pub ack_deadline_seconds: Option<u16>,
187
188 #[serde(default = "default_retry_delay")]
190 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
191 #[configurable(metadata(docs::human_name = "Retry Delay"))]
192 pub retry_delay_secs: Duration,
193
194 #[configurable(
196 deprecated = "This option has been deprecated, use `retry_delay_secs` instead."
197 )]
198 pub retry_delay_seconds: Option<f64>,
199
200 #[serde(default = "default_keepalive")]
204 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
205 #[configurable(metadata(docs::human_name = "Keepalive"))]
206 pub keepalive_secs: Duration,
207
208 #[configurable(metadata(docs::hidden))]
210 #[serde(default)]
211 pub log_namespace: Option<bool>,
212
213 #[configurable(derived)]
214 #[serde(default = "default_framing_message_based")]
215 #[derivative(Default(value = "default_framing_message_based()"))]
216 pub framing: FramingConfig,
217
218 #[configurable(derived)]
219 #[serde(default = "default_decoding")]
220 #[derivative(Default(value = "default_decoding()"))]
221 pub decoding: DeserializerConfig,
222
223 #[configurable(derived)]
224 #[serde(default, deserialize_with = "bool_or_struct")]
225 pub acknowledgements: SourceAcknowledgementsConfig,
226}
227
228fn default_endpoint() -> String {
229 PUBSUB_URL.to_string()
230}
231
232const fn default_ack_deadline() -> Duration {
233 Duration::from_secs(600)
234}
235
236const fn default_retry_delay() -> Duration {
237 Duration::from_secs(1)
238}
239
240const fn default_keepalive() -> Duration {
241 Duration::from_secs(60)
242}
243
244const fn default_max_concurrency() -> usize {
245 10
246}
247
248const fn default_full_response() -> usize {
249 100
250}
251
252const fn default_poll_time() -> Duration {
253 Duration::from_secs(2)
254}
255
256#[async_trait::async_trait]
257#[typetag::serde(name = "gcp_pubsub")]
258impl SourceConfig for PubsubConfig {
259 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
260 let log_namespace = cx.log_namespace(self.log_namespace);
261 let ack_deadline_secs = match self.ack_deadline_seconds {
262 None => self.ack_deadline_secs,
263 Some(ads) => {
264 warn!(
265 "The `ack_deadline_seconds` setting is deprecated, use `ack_deadline_secs` instead."
266 );
267 Duration::from_secs(ads as u64)
268 }
269 };
270 if !(MIN_ACK_DEADLINE_SECS..=MAX_ACK_DEADLINE_SECS).contains(&ack_deadline_secs.as_secs()) {
271 return Err(PubsubError::InvalidAckDeadline.into());
272 }
273
274 let retry_delay_secs = match self.retry_delay_seconds {
275 None => self.retry_delay_secs,
276 Some(rds) => {
277 warn!(
278 "The `retry_delay_seconds` setting is deprecated, use `retry_delay_secs` instead."
279 );
280 Duration::from_secs_f64(rds)
281 }
282 };
283
284 let auth = self.auth.build(Scope::PubSub).await?;
285
286 let mut uri: Uri = self.endpoint.parse().context(UriSnafu)?;
287 auth.apply_uri(&mut uri);
288
289 let tls = TlsSettings::from_options(self.tls.as_ref())?;
290 let host = uri.host().unwrap_or("pubsub.googleapis.com");
291 let mut tls_config = ClientTlsConfig::new().domain_name(host);
292 if let Some((cert, key)) = tls.identity_pem() {
293 tls_config = tls_config.identity(Identity::from_pem(cert, key));
294 }
295 for authority in tls.authorities_pem() {
296 tls_config = tls_config.ca_certificate(Certificate::from_pem(authority));
297 }
298
299 let mut endpoint: Endpoint = uri.to_string().parse().context(EndpointSnafu)?;
300 if uri.scheme() != Some(&Scheme::HTTP) {
301 endpoint = endpoint.tls_config(tls_config).context(EndpointTlsSnafu)?;
302 }
303
304 let token_generator = auth.spawn_regenerate_token();
305
306 let protocol = uri
307 .scheme()
308 .map(|scheme| Protocol(scheme.to_string().into()))
309 .unwrap_or(Protocol::HTTP);
310
311 let source = PubsubSource {
312 endpoint,
313 auth,
314 token_generator,
315 subscription: format!(
316 "projects/{}/subscriptions/{}",
317 self.project, self.subscription
318 ),
319 decoder: DecodingConfig::new(
320 self.framing.clone(),
321 self.decoding.clone(),
322 log_namespace,
323 )
324 .build()?,
325 acknowledgements: cx.do_acknowledgements(self.acknowledgements),
326 shutdown: cx.shutdown,
327 out: cx.out,
328 ack_deadline_secs,
329 retry_delay: retry_delay_secs,
330 keepalive: self.keepalive_secs,
331 concurrency: Default::default(),
332 full_response_size: self.full_response_size,
333 log_namespace,
334 bytes_received: register!(BytesReceived::from(protocol)),
335 events_received: register!(EventsReceived),
336 }
337 .run_all(self.max_concurrency, self.poll_time_seconds)
338 .map_err(|error| error!(message = "Source failed.", %error));
339 Ok(Box::pin(source))
340 }
341
342 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
343 let log_namespace = global_log_namespace.merge(self.log_namespace);
344 let schema_definition = self
345 .decoding
346 .schema_definition(log_namespace)
347 .with_standard_vector_source_metadata()
348 .with_source_metadata(
349 PubsubConfig::NAME,
350 Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
351 &owned_value_path!("timestamp"),
352 Kind::timestamp().or_undefined(),
353 Some("timestamp"),
354 )
355 .with_source_metadata(
356 PubsubConfig::NAME,
357 Some(LegacyKey::Overwrite(owned_value_path!("attributes"))),
358 &owned_value_path!("attributes"),
359 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
360 None,
361 )
362 .with_source_metadata(
363 PubsubConfig::NAME,
364 Some(LegacyKey::Overwrite(owned_value_path!("message_id"))),
365 &owned_value_path!("message_id"),
366 Kind::bytes(),
367 None,
368 );
369
370 vec![SourceOutput::new_maybe_logs(
371 DataType::Log,
372 schema_definition,
373 )]
374 }
375
376 fn can_acknowledge(&self) -> bool {
377 true
378 }
379}
380
381impl_generate_config_from_default!(PubsubConfig);
382
383#[derive(Clone)]
384struct PubsubSource {
385 endpoint: Endpoint,
386 auth: GcpAuthenticator,
387 token_generator: watch::Receiver<()>,
388 subscription: String,
389 decoder: Decoder,
390 acknowledgements: bool,
391 ack_deadline_secs: Duration,
392 shutdown: ShutdownSignal,
393 out: SourceSender,
394 retry_delay: Duration,
395 keepalive: Duration,
396 concurrency: Arc<AtomicUsize>,
400 full_response_size: usize,
401 log_namespace: LogNamespace,
402 bytes_received: Registered<BytesReceived>,
403 events_received: Registered<EventsReceived>,
404}
405
406enum State {
407 RetryNow,
408 RetryDelay,
409 Shutdown,
410}
411
412impl PubsubSource {
413 async fn run_all(mut self, max_concurrency: usize, poll_time: Duration) -> crate::Result<()> {
414 let mut tasks = FuturesUnordered::new();
415
416 loop {
417 self.concurrency.store(tasks.len(), Ordering::Relaxed);
418 tokio::select! {
419 _ = &mut self.shutdown => break,
420 _ = tasks.next() => {
421 if tasks.is_empty() {
422 self.start_one(&tasks);
426 }
427
428 },
429 _ = tokio::time::sleep(poll_time) => {
430 if tasks.len() < max_concurrency
433 && tasks.iter().all(|task| task.busy_flag.load(Ordering::Relaxed))
434 {
435 self.start_one(&tasks);
436 }
437 }
438 }
439 }
440
441 while tasks.next().await.is_some() {}
443
444 Ok(())
445 }
446
447 fn start_one(&self, tasks: &FuturesUnordered<Task>) {
448 info!(message = "Starting stream.", concurrency = tasks.len() + 1);
449 let busy_flag = Arc::new(AtomicBool::new(false));
456 let task = tokio::spawn(self.clone().run(Arc::clone(&busy_flag)));
457 tasks.push(Task { task, busy_flag });
458 }
459
460 async fn run(mut self, busy_flag: Arc<AtomicBool>) {
461 loop {
462 match self.run_once(&busy_flag).await {
463 State::RetryNow => debug!("Retrying immediately."),
464 State::RetryDelay => {
465 info!(
466 timeout_secs = self.retry_delay.as_secs_f64(),
467 "Retrying after timeout."
468 );
469 tokio::time::sleep(self.retry_delay).await;
470 }
471 State::Shutdown => break,
472 }
473 }
474 }
475
476 async fn run_once(&mut self, busy_flag: &Arc<AtomicBool>) -> State {
477 let connection = match self.endpoint.connect().await {
478 Ok(connection) => connection,
479 Err(error) => {
480 emit!(GcpPubsubConnectError { error });
481 return State::RetryDelay;
482 }
483 };
484
485 let mut client = proto::subscriber_client::SubscriberClient::with_interceptor(
486 connection,
487 |mut req: Request<()>| {
488 if let Some(token) = self.auth.make_token() {
489 let authorization = MetadataValue::try_from(&token).map_err(|_| {
490 Status::new(
491 Code::FailedPrecondition,
492 "Invalid token text returned by GCP",
493 )
494 })?;
495 req.metadata_mut().insert("authorization", authorization);
496 }
497 Ok(req)
498 },
499 )
500 .max_decoding_message_size(usize::MAX);
502
503 let (ack_ids_sender, ack_ids_receiver) = mpsc::channel(ACK_QUEUE_SIZE);
504
505 let request_stream = self.request_stream(ack_ids_receiver);
508 debug!("Starting streaming pull.");
509 let stream = tokio::select! {
510 _ = &mut self.shutdown => return State::Shutdown,
511 result = client.streaming_pull(request_stream) => match result {
512 Ok(stream) => stream,
513 Err(error) => {
514 emit!(GcpPubsubStreamingPullError { error });
515 return State::RetryDelay;
516 }
517 }
518 };
519 let mut stream = stream.into_inner();
520
521 let (finalizer, mut ack_stream) =
522 Finalizer::maybe_new(self.acknowledgements, Some(self.shutdown.clone()));
523 let mut pending_acks = 0;
524
525 loop {
526 tokio::select! {
527 biased;
528 receipts = ack_stream.next() => if let Some((status, receipts)) = receipts {
529 pending_acks -= 1;
530 if status == BatchStatus::Delivered {
531 ack_ids_sender
532 .send(receipts)
533 .await
534 .unwrap_or_else(|_| unreachable!("request stream never closes"));
535 }
536 },
537 response = stream.next() => match response {
538 Some(Ok(response)) => {
539 self.handle_response(
540 response,
541 &finalizer,
542 &ack_ids_sender,
543 &mut pending_acks,
544 busy_flag,
545 ).await;
546 }
547 Some(Err(error)) => break translate_error(error),
548 None => break State::RetryNow,
549 },
550 _ = &mut self.shutdown, if pending_acks == 0 => return State::Shutdown,
551 _ = self.token_generator.changed() => {
552 debug!("New authentication token generated, restarting stream.");
553 break State::RetryNow;
554 },
555 _ = tokio::time::sleep(self.keepalive) => {
556 if pending_acks == 0 {
557 if self.concurrency.load(Ordering::Relaxed) > 1 {
561 info!("Shutting down inactive stream.");
562 break State::Shutdown;
563 }
564 busy_flag.store(false, Ordering::Relaxed);
566 }
567 ack_ids_sender
575 .send(Vec::new())
576 .await
577 .unwrap_or_else(|_| unreachable!("request stream never closes"));
578 }
579 }
580 }
581 }
582
583 fn request_stream(
584 &self,
585 ack_ids: mpsc::Receiver<Vec<String>>,
586 ) -> impl Stream<Item = proto::StreamingPullRequest> + 'static + use<> {
587 let subscription = self.subscription.clone();
588 let client_id = CLIENT_ID.clone();
589 let stream_ack_deadline_seconds = self.ack_deadline_secs.as_secs() as i32;
590 let ack_ids = ReceiverStream::new(ack_ids).ready_chunks(ACK_QUEUE_SIZE);
591
592 stream::once(async move {
593 proto::StreamingPullRequest {
596 subscription,
597 client_id,
598 stream_ack_deadline_seconds,
599 ..Default::default()
600 }
601 })
602 .chain(ack_ids.map(|chunks| {
603 proto::StreamingPullRequest {
609 ack_ids: chunks.into_iter().flatten().collect(),
610 ..Default::default()
611 }
612 }))
613 }
614
615 async fn handle_response(
616 &mut self,
617 response: proto::StreamingPullResponse,
618 finalizer: &Option<Finalizer>,
619 ack_ids: &mpsc::Sender<Vec<String>>,
620 pending_acks: &mut usize,
621 busy_flag: &Arc<AtomicBool>,
622 ) {
623 if response.received_messages.len() >= self.full_response_size {
624 busy_flag.store(true, Ordering::Relaxed);
625 }
626 self.bytes_received.emit(ByteSize(response.size_of()));
627
628 let (batch, notifier) = BatchNotifier::maybe_new_with_receiver(self.acknowledgements);
629 let (events, ids) = self.parse_messages(response.received_messages, batch).await;
630
631 let count = events.len();
632 match self.out.send_batch(events).await {
633 Err(_) => emit!(StreamClosedError { count }),
634 Ok(()) => match notifier {
635 None => ack_ids
636 .send(ids)
637 .await
638 .unwrap_or_else(|_| unreachable!("request stream never closes")),
639 Some(notifier) => {
640 finalizer
641 .as_ref()
642 .expect("Finalizer must have been set up for acknowledgements")
643 .add(ids, notifier);
644 *pending_acks += 1;
645 }
646 },
647 }
648 }
649
650 async fn parse_messages(
651 &self,
652 response: Vec<proto::ReceivedMessage>,
653 batch: Option<BatchNotifier>,
654 ) -> (Vec<Event>, Vec<String>) {
655 let mut ack_ids = Vec::with_capacity(response.len());
656 let events = response
657 .into_iter()
658 .flat_map(|received| {
659 ack_ids.push(received.ack_id);
660 received
661 .message
662 .map(|message| self.parse_message(message, &batch))
663 })
664 .flatten()
665 .collect();
666 (events, ack_ids)
667 }
668
669 fn parse_message<'a>(
670 &'a self,
671 message: proto::PubsubMessage,
672 batch: &'a Option<BatchNotifier>,
673 ) -> impl Iterator<Item = Event> + 'a {
674 let attributes = Value::Object(
675 message
676 .attributes
677 .into_iter()
678 .map(|(key, value)| (key.into(), Value::Bytes(value.into())))
679 .collect(),
680 );
681 let log_namespace = self.log_namespace;
682 util::decode_message(
683 self.decoder.clone(),
684 "gcp_pubsub",
685 &message.data,
686 message.publish_time.map(|dt| {
687 DateTime::from_timestamp(dt.seconds, dt.nanos as u32).expect("invalid timestamp")
688 }),
689 batch,
690 log_namespace,
691 &self.events_received,
692 )
693 .map(move |mut event| {
694 if let Some(log) = event.maybe_as_log_mut() {
695 log_namespace.insert_source_metadata(
696 PubsubConfig::NAME,
697 log,
698 Some(LegacyKey::Overwrite(path!("message_id"))),
699 path!("message_id"),
700 message.message_id.clone(),
701 );
702 log_namespace.insert_source_metadata(
703 PubsubConfig::NAME,
704 log,
705 Some(LegacyKey::Overwrite(path!("attributes"))),
706 path!("attributes"),
707 attributes.clone(),
708 )
709 }
710 event
711 })
712 }
713}
714
715fn translate_error(error: tonic::Status) -> State {
716 if is_reset(&error) {
723 debug!("Stream reset by server.");
724 State::RetryNow
725 } else {
726 emit!(GcpPubsubReceiveError { error });
727 State::RetryDelay
728 }
729}
730
731fn is_reset(error: &Status) -> bool {
732 error
733 .source()
734 .and_then(|source| source.downcast_ref::<hyper::Error>())
735 .and_then(|error| error.source())
736 .and_then(|source| source.downcast_ref::<h2::Error>())
737 .is_some_and(|error| error.is_remote() && error.is_reset())
738}
739
740#[pin_project::pin_project]
741struct Task {
742 task: tokio::task::JoinHandle<()>,
743 busy_flag: Arc<AtomicBool>,
744}
745
746impl Future for Task {
747 type Output = Result<(), tokio::task::JoinError>;
748
749 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
750 self.task.poll_unpin(ctx)
751 }
752}
753
754#[cfg(test)]
755mod tests {
756 use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
757
758 use super::*;
759
760 #[test]
761 fn generate_config() {
762 crate::test_util::test_generate_config::<PubsubConfig>();
763 }
764
765 #[test]
766 fn output_schema_definition_vector_namespace() {
767 let config = PubsubConfig {
768 log_namespace: Some(true),
769 ..Default::default()
770 };
771
772 let definitions = config
773 .outputs(LogNamespace::Vector)
774 .remove(0)
775 .schema_definition(true);
776
777 let expected_definition =
778 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
779 .with_meaning(OwnedTargetPath::event_root(), "message")
780 .with_metadata_field(
781 &owned_value_path!("vector", "source_type"),
782 Kind::bytes(),
783 None,
784 )
785 .with_metadata_field(
786 &owned_value_path!("vector", "ingest_timestamp"),
787 Kind::timestamp(),
788 None,
789 )
790 .with_metadata_field(
791 &owned_value_path!("gcp_pubsub", "timestamp"),
792 Kind::timestamp().or_undefined(),
793 Some("timestamp"),
794 )
795 .with_metadata_field(
796 &owned_value_path!("gcp_pubsub", "attributes"),
797 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
798 None,
799 )
800 .with_metadata_field(
801 &owned_value_path!("gcp_pubsub", "message_id"),
802 Kind::bytes(),
803 None,
804 );
805
806 assert_eq!(definitions, Some(expected_definition));
807 }
808
809 #[test]
810 fn output_schema_definition_legacy_namespace() {
811 let config = PubsubConfig::default();
812
813 let definitions = config
814 .outputs(LogNamespace::Legacy)
815 .remove(0)
816 .schema_definition(true);
817
818 let expected_definition = Definition::new_with_default_metadata(
819 Kind::object(Collection::empty()),
820 [LogNamespace::Legacy],
821 )
822 .with_event_field(
823 &owned_value_path!("message"),
824 Kind::bytes(),
825 Some("message"),
826 )
827 .with_event_field(
828 &owned_value_path!("timestamp"),
829 Kind::timestamp().or_undefined(),
830 Some("timestamp"),
831 )
832 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
833 .with_event_field(
834 &owned_value_path!("attributes"),
835 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
836 None,
837 )
838 .with_event_field(&owned_value_path!("message_id"), Kind::bytes(), None);
839
840 assert_eq!(definitions, Some(expected_definition));
841 }
842}
843
844#[cfg(all(test, feature = "gcp-integration-tests"))]
845mod integration_tests {
846 use std::{
847 collections::{BTreeMap, HashSet},
848 sync::LazyLock,
849 };
850
851 use base64::prelude::{BASE64_STANDARD, Engine as _};
852 use chrono::{DateTime, Utc};
853 use futures::{Stream, StreamExt};
854 use http::method::Method;
855 use hyper::{Request, StatusCode};
856 use serde_json::{Value, json};
857 use tokio::time::{Duration, Instant};
858 use vrl::btreemap;
859
860 use super::*;
861 use crate::{
862 SourceSender,
863 config::{ComponentKey, ProxyConfig},
864 event::EventStatus,
865 gcp,
866 http::HttpClient,
867 shutdown,
868 test_util::{
869 self, components,
870 components::{SOURCE_TAGS, assert_source_compliance},
871 random_string,
872 },
873 };
874
875 const PROJECT: &str = "sourceproject";
876 static PROJECT_URI: LazyLock<String> =
877 LazyLock::new(|| format!("{}/v1/projects/{}", *gcp::PUBSUB_ADDRESS, PROJECT));
878 static ACK_DEADLINE: LazyLock<Duration> = LazyLock::new(|| Duration::from_secs(10)); #[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
881 #[tokio::test]
882 async fn oneshot() {
883 assert_source_compliance(&SOURCE_TAGS, async move {
884 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
885 let test_data = tester.send_test_events(99, BTreeMap::new()).await;
886 receive_events(&mut rx, test_data).await;
887 tester.shutdown_check(shutdown).await;
888 })
889 .await;
890 }
891
892 #[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
893 #[tokio::test]
894 async fn shuts_down_before_data_received() {
895 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
896
897 tester.shutdown(shutdown).await; assert!(rx.next().await.is_none());
900 tester.send_test_events(1, BTreeMap::new()).await;
901 assert!(rx.next().await.is_none());
902 assert_eq!(tester.pull_count(1).await, 1);
903 }
904
905 #[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
906 #[tokio::test]
907 async fn shuts_down_after_data_received() {
908 assert_source_compliance(&SOURCE_TAGS, async move {
909 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
910
911 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
912 receive_events(&mut rx, test_data).await;
913
914 tester.shutdown_check(shutdown).await;
915
916 assert!(rx.next().await.is_none());
917 tester.send_test_events(1, BTreeMap::new()).await;
918 assert!(rx.next().await.is_none());
919 })
926 .await;
927 }
928
929 #[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
930 #[tokio::test]
931 async fn streams_data() {
932 assert_source_compliance(&SOURCE_TAGS, async move {
933 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
934 for _ in 0..10 {
935 let test_data = tester.send_test_events(9, BTreeMap::new()).await;
936 receive_events(&mut rx, test_data).await;
937 }
938 tester.shutdown_check(shutdown).await;
939 })
940 .await;
941 }
942
943 #[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
944 #[tokio::test]
945 async fn sends_attributes() {
946 assert_source_compliance(&SOURCE_TAGS, async move {
947 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
948 let attributes = btreemap![
949 random_string(8) => random_string(88),
950 random_string(8) => random_string(88),
951 random_string(8) => random_string(88),
952 ];
953 let test_data = tester.send_test_events(1, attributes).await;
954 receive_events(&mut rx, test_data).await;
955 tester.shutdown_check(shutdown).await;
956 })
957 .await;
958 }
959
960 #[ignore = "https://github.com/vectordotdev/vector/issues/24133"]
961 #[tokio::test]
962 async fn acks_received() {
963 assert_source_compliance(&SOURCE_TAGS, async move {
964 let (tester, mut rx, shutdown) = setup(EventStatus::Delivered).await;
965
966 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
967 receive_events(&mut rx, test_data).await;
968
969 tester.shutdown_check(shutdown).await;
970
971 assert_eq!(tester.pull_count(10).await, 0);
973
974 tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
976
977 assert_eq!(tester.pull_count(10).await, 0);
979 })
980 .await;
981 }
982
983 #[tokio::test]
984 #[ignore]
988 async fn does_not_ack_rejected() {
989 assert_source_compliance(&SOURCE_TAGS, async {
990 let (tester, mut rx, shutdown) = setup(EventStatus::Rejected).await;
991
992 let test_data = tester.send_test_events(1, BTreeMap::new()).await;
993 receive_events(&mut rx, test_data).await;
994
995 tester.shutdown(shutdown).await;
996
997 assert_eq!(tester.pull_count(10).await, 0);
999
1000 tokio::time::sleep(*ACK_DEADLINE + Duration::from_secs(1)).await;
1002
1003 assert_eq!(tester.pull_count(10).await, 1);
1005 })
1006 .await;
1007 }
1008
1009 async fn setup(
1010 status: EventStatus,
1011 ) -> (
1012 Tester,
1013 impl Stream<Item = Event> + Unpin,
1014 shutdown::SourceShutdownCoordinator,
1015 ) {
1016 components::init_test();
1017
1018 let tls_settings = TlsSettings::from_options(None).unwrap();
1019 let client = HttpClient::new(tls_settings, &ProxyConfig::default()).unwrap();
1020 let tester = Tester::new(client).await;
1021
1022 let (rx, shutdown) = tester.spawn_source(status).await;
1023
1024 (tester, rx, shutdown)
1025 }
1026
1027 fn now_trunc() -> DateTime<Utc> {
1028 let start = Utc::now().timestamp();
1029 DateTime::from_timestamp(start, 0).expect("invalid timestamp")
1031 }
1032
1033 struct Tester {
1034 client: HttpClient,
1035 topic: String,
1036 subscription: String,
1037 component: ComponentKey,
1038 }
1039
1040 struct TestData {
1041 lines: Vec<String>,
1042 start: DateTime<Utc>,
1043 attributes: BTreeMap<String, String>,
1044 }
1045
1046 impl Tester {
1047 async fn new(client: HttpClient) -> Self {
1048 let this = Self {
1049 client,
1050 topic: format!("topic-{}", random_string(10).to_lowercase()),
1051 subscription: format!("sub-{}", random_string(10).to_lowercase()),
1052 component: ComponentKey::from("gcp_pubsub"),
1053 };
1054
1055 this.request(Method::PUT, "topics/{topic}", json!({})).await;
1056
1057 let body = json!({
1058 "topic": format!("projects/{}/topics/{}", PROJECT, this.topic),
1059 "ackDeadlineSeconds": *ACK_DEADLINE,
1060 });
1061 this.request(Method::PUT, "subscriptions/{sub}", body).await;
1062
1063 this
1064 }
1065
1066 async fn spawn_source(
1067 &self,
1068 status: EventStatus,
1069 ) -> (
1070 impl Stream<Item = Event> + Unpin + use<>,
1071 shutdown::SourceShutdownCoordinator,
1072 ) {
1073 let (tx, rx) = SourceSender::new_test_finalize(status);
1074 let config = PubsubConfig {
1075 project: PROJECT.into(),
1076 subscription: self.subscription.clone(),
1077 endpoint: gcp::PUBSUB_ADDRESS.clone(),
1078 auth: GcpAuthConfig {
1079 skip_authentication: true,
1080 ..Default::default()
1081 },
1082 ack_deadline_secs: *ACK_DEADLINE,
1083 ..Default::default()
1084 };
1085 let (mut ctx, shutdown) = SourceContext::new_shutdown(&self.component, tx);
1086 ctx.acknowledgements = true;
1087 let source = config.build(ctx).await.expect("Failed to build source");
1088 tokio::spawn(async move { source.await.expect("Failed to run source") });
1089
1090 (rx, shutdown)
1091 }
1092
1093 async fn send_test_events(
1094 &self,
1095 count: usize,
1096 attributes: BTreeMap<String, String>,
1097 ) -> TestData {
1098 let start = now_trunc();
1099 let lines: Vec<_> = test_util::random_lines(44).take(count).collect();
1100 let messages: Vec<_> = lines
1101 .iter()
1102 .map(|input| BASE64_STANDARD.encode(input))
1103 .map(|data| json!({ "data": data, "attributes": attributes.clone() }))
1104 .collect();
1105 let body = json!({ "messages": messages });
1106 self.request(Method::POST, "topics/{topic}:publish", body)
1107 .await;
1108
1109 TestData {
1110 lines,
1111 start,
1112 attributes,
1113 }
1114 }
1115
1116 async fn pull_count(&self, count: usize) -> usize {
1117 let response = self
1118 .request(
1119 Method::POST,
1120 "subscriptions/{sub}:pull",
1121 json!({ "maxMessages": count, "returnImmediately": true }),
1122 )
1123 .await;
1124 response
1125 .get("receivedMessages")
1126 .map(|rm| rm.as_array().unwrap().len())
1127 .unwrap_or(0)
1128 }
1129
1130 async fn request(&self, method: Method, base: &str, body: Value) -> Value {
1131 let path = base
1132 .replace("{topic}", &self.topic)
1133 .replace("{sub}", &self.subscription);
1134 let uri = [PROJECT_URI.as_str(), &path].join("/");
1135 let body = crate::serde::json::to_bytes(&body).unwrap().freeze();
1136 let request = Request::builder()
1137 .method(method)
1138 .uri(uri)
1139 .body(body.into())
1140 .unwrap();
1141 let response = self.client.send(request).await.unwrap();
1142 assert_eq!(response.status(), StatusCode::OK);
1143 let body = http_body::Body::collect(response.into_body())
1144 .await
1145 .unwrap()
1146 .to_bytes();
1147 serde_json::from_str(core::str::from_utf8(&body).unwrap()).unwrap()
1148 }
1149
1150 async fn shutdown_check(&self, shutdown: shutdown::SourceShutdownCoordinator) {
1151 self.shutdown(shutdown).await;
1152 components::SOURCE_TESTS.assert(&components::HTTP_PULL_SOURCE_TAGS);
1153 }
1154
1155 async fn shutdown(&self, mut shutdown: shutdown::SourceShutdownCoordinator) {
1156 let deadline = Instant::now() + Duration::from_secs(1);
1157 let shutdown = shutdown.shutdown_source(&self.component, deadline);
1158 assert!(shutdown.await);
1159 }
1160 }
1161
1162 async fn receive_events(rx: &mut (impl Stream<Item = Event> + Unpin), test_data: TestData) {
1163 let TestData {
1164 start,
1165 lines,
1166 attributes,
1167 } = test_data;
1168
1169 let events: Vec<Event> = tokio::time::timeout(
1170 Duration::from_secs(1),
1171 test_util::collect_n_stream(rx, lines.len()),
1172 )
1173 .await
1174 .unwrap();
1175
1176 let end = Utc::now();
1177 let mut message_ids = HashSet::new();
1178
1179 assert_eq!(events.len(), lines.len());
1180 for (message, event) in lines.into_iter().zip(events) {
1181 let log = event.into_log();
1182 assert_eq!(log.get("message"), Some(&message.into()));
1183 assert_eq!(log.get("source_type"), Some(&"gcp_pubsub".into()));
1184 assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() >= &start);
1185 assert!(log.get("timestamp").unwrap().as_timestamp().unwrap() <= &end);
1186 assert!(
1187 message_ids.insert(log.get("message_id").unwrap().clone().to_string()),
1188 "Message contained duplicate message_id"
1189 );
1190 let logattr = log
1191 .get("attributes")
1192 .expect("missing attributes")
1193 .as_object()
1194 .unwrap()
1195 .clone();
1196 assert_eq!(logattr.len(), attributes.len());
1197 for (a, b) in logattr.into_iter().zip(&attributes) {
1198 assert_eq!(&a.0, b.0.as_str());
1199 assert_eq!(a.1, b.1.clone().into());
1200 }
1201 }
1202 }
1203}