vector/sources/
amqp.rs

1//! `AMQP` source.
2//! Handles version AMQP 0.9.1 which is used by RabbitMQ.
3use std::{io::Cursor, pin::Pin};
4
5use async_stream::stream;
6use bytes::Bytes;
7use chrono::{TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use futures_util::Stream;
10use lapin::{Channel, acker::Acker, message::Delivery};
11use snafu::Snafu;
12use tokio_util::codec::FramedRead;
13use vector_lib::{
14    EstimatedJsonEncodedSizeOf,
15    codecs::decoding::{DeserializerConfig, FramingConfig},
16    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
17    configurable::configurable_component,
18    event::{Event, LogEvent},
19    finalizer::UnorderedFinalizer,
20    internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
21    lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path},
22};
23use vrl::value::Kind;
24
25use crate::{
26    SourceSender,
27    amqp::AmqpConfig,
28    codecs::{Decoder, DecodingConfig},
29    config::{SourceConfig, SourceContext, SourceOutput},
30    event::{BatchNotifier, BatchStatus},
31    internal_events::{
32        StreamClosedError,
33        source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
34    },
35    serde::{bool_or_struct, default_decoding, default_framing_message_based},
36    shutdown::ShutdownSignal,
37};
38
39#[derive(Debug, Snafu)]
40enum BuildError {
41    #[snafu(display("Could not create AMQP consumer: {}", source))]
42    AmqpCreateError {
43        source: Box<dyn std::error::Error + Send + Sync>,
44    },
45}
46
47/// Configuration for the `amqp` source.
48///
49/// Supports AMQP version 0.9.1
50#[configurable_component(source(
51    "amqp",
52    "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
53))]
54#[derive(Clone, Debug, Derivative)]
55#[derivative(Default)]
56#[serde(deny_unknown_fields)]
57pub struct AmqpSourceConfig {
58    /// The name of the queue to consume.
59    #[serde(default = "default_queue")]
60    pub(crate) queue: String,
61
62    /// The identifier for the consumer.
63    #[serde(default = "default_consumer")]
64    #[configurable(metadata(docs::examples = "consumer-group-name"))]
65    pub(crate) consumer: String,
66
67    #[serde(flatten)]
68    pub(crate) connection: AmqpConfig,
69
70    /// The `AMQP` routing key.
71    #[serde(default = "default_routing_key_field")]
72    #[derivative(Default(value = "default_routing_key_field()"))]
73    pub(crate) routing_key_field: OptionalValuePath,
74
75    /// The `AMQP` exchange key.
76    #[serde(default = "default_exchange_key")]
77    #[derivative(Default(value = "default_exchange_key()"))]
78    pub(crate) exchange_key: OptionalValuePath,
79
80    /// The `AMQP` offset key.
81    #[serde(default = "default_offset_key")]
82    #[derivative(Default(value = "default_offset_key()"))]
83    pub(crate) offset_key: OptionalValuePath,
84
85    /// The namespace to use for logs. This overrides the global setting.
86    #[configurable(metadata(docs::hidden))]
87    #[serde(default)]
88    pub log_namespace: Option<bool>,
89
90    #[configurable(derived)]
91    #[serde(default = "default_framing_message_based")]
92    #[derivative(Default(value = "default_framing_message_based()"))]
93    pub(crate) framing: FramingConfig,
94
95    #[configurable(derived)]
96    #[serde(default = "default_decoding")]
97    #[derivative(Default(value = "default_decoding()"))]
98    pub(crate) decoding: DeserializerConfig,
99
100    #[configurable(derived)]
101    #[serde(default, deserialize_with = "bool_or_struct")]
102    pub(crate) acknowledgements: SourceAcknowledgementsConfig,
103}
104
105fn default_queue() -> String {
106    "vector".into()
107}
108
109fn default_consumer() -> String {
110    "vector".into()
111}
112
113fn default_routing_key_field() -> OptionalValuePath {
114    OptionalValuePath::from(owned_value_path!("routing"))
115}
116
117fn default_exchange_key() -> OptionalValuePath {
118    OptionalValuePath::from(owned_value_path!("exchange"))
119}
120
121fn default_offset_key() -> OptionalValuePath {
122    OptionalValuePath::from(owned_value_path!("offset"))
123}
124
125impl_generate_config_from_default!(AmqpSourceConfig);
126
127impl AmqpSourceConfig {
128    fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
129        DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
130    }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde(name = "amqp")]
135impl SourceConfig for AmqpSourceConfig {
136    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
137        let log_namespace = cx.log_namespace(self.log_namespace);
138        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
139
140        amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
141    }
142
143    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
144        let log_namespace = global_log_namespace.merge(self.log_namespace);
145        let schema_definition = self
146            .decoding
147            .schema_definition(log_namespace)
148            .with_standard_vector_source_metadata()
149            .with_source_metadata(
150                AmqpSourceConfig::NAME,
151                None,
152                &owned_value_path!("timestamp"),
153                Kind::timestamp(),
154                Some("timestamp"),
155            )
156            .with_source_metadata(
157                AmqpSourceConfig::NAME,
158                self.routing_key_field
159                    .path
160                    .clone()
161                    .map(LegacyKey::InsertIfEmpty),
162                &owned_value_path!("routing"),
163                Kind::bytes(),
164                None,
165            )
166            .with_source_metadata(
167                AmqpSourceConfig::NAME,
168                self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
169                &owned_value_path!("exchange"),
170                Kind::bytes(),
171                None,
172            )
173            .with_source_metadata(
174                AmqpSourceConfig::NAME,
175                self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
176                &owned_value_path!("offset"),
177                Kind::integer(),
178                None,
179            );
180
181        vec![SourceOutput::new_maybe_logs(
182            self.decoding.output_type(),
183            schema_definition,
184        )]
185    }
186
187    fn can_acknowledge(&self) -> bool {
188        true
189    }
190}
191
192#[derive(Debug)]
193struct FinalizerEntry {
194    acker: Acker,
195}
196
197impl From<Delivery> for FinalizerEntry {
198    fn from(delivery: Delivery) -> Self {
199        Self {
200            acker: delivery.acker,
201        }
202    }
203}
204
205pub(crate) async fn amqp_source(
206    config: &AmqpSourceConfig,
207    shutdown: ShutdownSignal,
208    out: SourceSender,
209    log_namespace: LogNamespace,
210    acknowledgements: bool,
211) -> crate::Result<super::Source> {
212    let config = config.clone();
213    let (_conn, channel) = config
214        .connection
215        .connect()
216        .await
217        .map_err(|source| BuildError::AmqpCreateError { source })?;
218
219    Ok(Box::pin(run_amqp_source(
220        config,
221        shutdown,
222        out,
223        channel,
224        log_namespace,
225        acknowledgements,
226    )))
227}
228
229struct Keys<'a> {
230    routing_key_field: &'a OptionalValuePath,
231    routing: &'a str,
232    exchange_key: &'a OptionalValuePath,
233    exchange: &'a str,
234    offset_key: &'a OptionalValuePath,
235    delivery_tag: i64,
236}
237
238/// Populates the decoded event with extra metadata.
239fn populate_log_event(
240    log: &mut LogEvent,
241    timestamp: Option<chrono::DateTime<Utc>>,
242    keys: &Keys<'_>,
243    log_namespace: LogNamespace,
244) {
245    log_namespace.insert_source_metadata(
246        AmqpSourceConfig::NAME,
247        log,
248        keys.routing_key_field
249            .path
250            .as_ref()
251            .map(LegacyKey::InsertIfEmpty),
252        path!("routing"),
253        keys.routing.to_string(),
254    );
255
256    log_namespace.insert_source_metadata(
257        AmqpSourceConfig::NAME,
258        log,
259        keys.exchange_key
260            .path
261            .as_ref()
262            .map(LegacyKey::InsertIfEmpty),
263        path!("exchange"),
264        keys.exchange.to_string(),
265    );
266
267    log_namespace.insert_source_metadata(
268        AmqpSourceConfig::NAME,
269        log,
270        keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
271        path!("offset"),
272        keys.delivery_tag,
273    );
274
275    log_namespace.insert_vector_metadata(
276        log,
277        log_schema().source_type_key(),
278        path!("source_type"),
279        Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
280    );
281
282    // This handles the transition from the original timestamp logic. Originally the
283    // `timestamp_key` was populated by the `properties.timestamp()` time on the message, falling
284    // back to calling `now()`.
285    match log_namespace {
286        LogNamespace::Vector => {
287            if let Some(timestamp) = timestamp {
288                log.insert(
289                    metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
290                    timestamp,
291                );
292            };
293
294            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
295        }
296        LogNamespace::Legacy => {
297            if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
298                log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
299            }
300        }
301    };
302}
303
304/// Receives an event from `AMQP` and pushes it along the pipeline.
305async fn receive_event(
306    config: &AmqpSourceConfig,
307    out: &mut SourceSender,
308    log_namespace: LogNamespace,
309    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
310    msg: Delivery,
311) -> Result<(), ()> {
312    let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
313    let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
314    let mut stream = FramedRead::new(payload, decoder);
315
316    // Extract timestamp from AMQP message
317    let timestamp = msg
318        .properties
319        .timestamp()
320        .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
321
322    let routing = msg.routing_key.to_string();
323    let exchange = msg.exchange.to_string();
324    let keys = Keys {
325        routing_key_field: &config.routing_key_field,
326        exchange_key: &config.exchange_key,
327        offset_key: &config.offset_key,
328        routing: &routing,
329        exchange: &exchange,
330        delivery_tag: msg.delivery_tag as i64,
331    };
332    let events_received = register!(EventsReceived);
333
334    let stream = stream! {
335        while let Some(result) = stream.next().await {
336            match result {
337                Ok((events, byte_size)) => {
338                    emit!(AmqpBytesReceived {
339                        byte_size,
340                        protocol: "amqp_0_9_1",
341                    });
342
343                    events_received.emit(CountByteSize(
344                        events.len(),
345                        events.estimated_json_encoded_size_of(),
346                    ));
347
348                    for mut event in events {
349                        if let Event::Log(ref mut log) = event {
350                            populate_log_event(log,
351                                        timestamp,
352                                        &keys,
353                                        log_namespace);
354                        }
355
356                        yield event;
357                    }
358                }
359                Err(error) => {
360                    use vector_lib::codecs::StreamDecodingError as _;
361
362                    // Error is logged by `codecs::Decoder`, no further handling
363                    // is needed here.
364                    if !error.can_continue() {
365                        break;
366                    }
367                }
368            }
369        }
370    }
371    .boxed();
372
373    finalize_event_stream(finalizer, out, stream, msg).await;
374
375    Ok(())
376}
377
378/// Send the event stream created by the framed read to the `out` stream.
379async fn finalize_event_stream(
380    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
381    out: &mut SourceSender,
382    mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
383    msg: Delivery,
384) {
385    match finalizer {
386        Some(finalizer) => {
387            let (batch, receiver) = BatchNotifier::new_with_receiver();
388            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
389
390            match out.send_event_stream(&mut stream).await {
391                Err(_) => {
392                    emit!(StreamClosedError { count: 1 });
393                }
394                Ok(_) => {
395                    finalizer.add(msg.into(), receiver);
396                }
397            }
398        }
399        None => match out.send_event_stream(&mut stream).await {
400            Err(_) => {
401                emit!(StreamClosedError { count: 1 });
402            }
403            Ok(_) => {
404                let ack_options = lapin::options::BasicAckOptions::default();
405                if let Err(error) = msg.acker.ack(ack_options).await {
406                    emit!(AmqpAckError { error });
407                }
408            }
409        },
410    }
411}
412
413/// Runs the `AMQP` source involving the main loop pulling data from the server.
414async fn run_amqp_source(
415    config: AmqpSourceConfig,
416    shutdown: ShutdownSignal,
417    mut out: SourceSender,
418    channel: Channel,
419    log_namespace: LogNamespace,
420    acknowledgements: bool,
421) -> Result<(), ()> {
422    let (finalizer, mut ack_stream) =
423        UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
424
425    debug!("Starting amqp source, listening to queue {}.", config.queue);
426    let mut consumer = channel
427        .basic_consume(
428            &config.queue,
429            &config.consumer,
430            lapin::options::BasicConsumeOptions::default(),
431            lapin::types::FieldTable::default(),
432        )
433        .await
434        .map_err(|error| {
435            error!(message = "Failed to consume.", error = ?error, internal_log_rate_limit = true);
436        })?
437        .fuse();
438    let mut shutdown = shutdown.fuse();
439    loop {
440        tokio::select! {
441            _ = &mut shutdown => break,
442            entry = ack_stream.next() => {
443                if let Some((status, entry)) = entry {
444                    handle_ack(status, entry).await;
445                }
446            },
447            opt_m = consumer.next() => {
448                if let Some(try_m) = opt_m {
449                    match try_m {
450                        Err(error) => {
451                            emit!(AmqpEventError { error });
452                            return Err(());
453                        }
454                        Ok(msg) => {
455                            receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
456                        }
457                    }
458                } else {
459                    break
460                }
461            }
462        };
463    }
464
465    Ok(())
466}
467
468async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
469    match status {
470        BatchStatus::Delivered => {
471            let ack_options = lapin::options::BasicAckOptions::default();
472            if let Err(error) = entry.acker.ack(ack_options).await {
473                emit!(AmqpAckError { error });
474            }
475        }
476        BatchStatus::Errored => {
477            let ack_options = lapin::options::BasicRejectOptions::default();
478            if let Err(error) = entry.acker.reject(ack_options).await {
479                emit!(AmqpRejectError { error });
480            }
481        }
482        BatchStatus::Rejected => {
483            let ack_options = lapin::options::BasicRejectOptions::default();
484            if let Err(error) = entry.acker.reject(ack_options).await {
485                emit!(AmqpRejectError { error });
486            }
487        }
488    }
489}
490
491#[cfg(test)]
492pub mod test {
493    use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig};
494    use vrl::value::kind::Collection;
495
496    use super::*;
497
498    #[test]
499    fn generate_config() {
500        crate::test_util::test_generate_config::<AmqpSourceConfig>();
501    }
502
503    pub fn make_config() -> AmqpSourceConfig {
504        let mut config = AmqpSourceConfig {
505            queue: "it".to_string(),
506            ..Default::default()
507        };
508        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
509        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
510        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
511        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
512        config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
513
514        config
515    }
516
517    pub fn make_tls_config() -> AmqpSourceConfig {
518        let mut config = AmqpSourceConfig {
519            queue: "it".to_string(),
520            ..Default::default()
521        };
522        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
523        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
524        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
525        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
526        let ca_file =
527            std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
528        config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
529        let tls = TlsConfig {
530            ca_file: Some(ca_file.as_str().into()),
531            ..Default::default()
532        };
533        config.connection.tls = Some(tls);
534        config
535    }
536
537    #[test]
538    fn output_schema_definition_vector_namespace() {
539        let config = AmqpSourceConfig {
540            log_namespace: Some(true),
541            ..Default::default()
542        };
543
544        let definition = config
545            .outputs(LogNamespace::Vector)
546            .remove(0)
547            .schema_definition(true);
548
549        let expected_definition =
550            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
551                .with_meaning(OwnedTargetPath::event_root(), "message")
552                .with_metadata_field(
553                    &owned_value_path!("vector", "source_type"),
554                    Kind::bytes(),
555                    None,
556                )
557                .with_metadata_field(
558                    &owned_value_path!("vector", "ingest_timestamp"),
559                    Kind::timestamp(),
560                    None,
561                )
562                .with_metadata_field(
563                    &owned_value_path!("amqp", "timestamp"),
564                    Kind::timestamp(),
565                    Some("timestamp"),
566                )
567                .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
568                .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
569                .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
570
571        assert_eq!(definition, Some(expected_definition));
572    }
573
574    #[test]
575    fn output_schema_definition_legacy_namespace() {
576        let config = AmqpSourceConfig::default();
577
578        let definition = config
579            .outputs(LogNamespace::Legacy)
580            .remove(0)
581            .schema_definition(true);
582
583        let expected_definition = Definition::new_with_default_metadata(
584            Kind::object(Collection::empty()),
585            [LogNamespace::Legacy],
586        )
587        .with_event_field(
588            &owned_value_path!("message"),
589            Kind::bytes(),
590            Some("message"),
591        )
592        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
593        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
594        .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
595        .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
596        .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
597
598        assert_eq!(definition, Some(expected_definition));
599    }
600}
601
602/// Integration tests use the docker compose files in `scripts/integration/docker-compose.amqp.yml`.
603#[cfg(feature = "amqp-integration-tests")]
604#[cfg(test)]
605mod integration_test {
606    use chrono::Utc;
607    use lapin::{BasicProperties, options::*};
608    use tokio::time::Duration;
609    use vector_lib::config::log_schema;
610
611    use super::{test::*, *};
612    use crate::{
613        SourceSender,
614        amqp::await_connection,
615        shutdown::ShutdownSignal,
616        test_util::{
617            components::{SOURCE_TAGS, run_and_assert_source_compliance},
618            random_string,
619        },
620    };
621
622    #[tokio::test]
623    async fn amqp_source_create_ok() {
624        let config = make_config();
625        await_connection(&config.connection).await;
626        assert!(
627            amqp_source(
628                &config,
629                ShutdownSignal::noop(),
630                SourceSender::new_test().0,
631                LogNamespace::Legacy,
632                false,
633            )
634            .await
635            .is_ok()
636        );
637    }
638
639    #[tokio::test]
640    async fn amqp_tls_source_create_ok() {
641        let config = make_tls_config();
642        await_connection(&config.connection).await;
643
644        assert!(
645            amqp_source(
646                &config,
647                ShutdownSignal::noop(),
648                SourceSender::new_test().0,
649                LogNamespace::Legacy,
650                false,
651            )
652            .await
653            .is_ok()
654        );
655    }
656
657    async fn send_event(
658        channel: &lapin::Channel,
659        exchange: &str,
660        routing_key: &str,
661        text: &str,
662        _timestamp: i64,
663    ) {
664        let payload = text.as_bytes();
665        let payload_len = payload.len();
666        trace!("Sending message of length {} to {}.", payload_len, exchange,);
667
668        channel
669            .basic_publish(
670                exchange,
671                routing_key,
672                BasicPublishOptions::default(),
673                payload.as_ref(),
674                BasicProperties::default(),
675            )
676            .await
677            .unwrap()
678            .await
679            .unwrap();
680    }
681
682    async fn source_consume_event(mut config: AmqpSourceConfig) {
683        let exchange = format!("test-{}-exchange", random_string(10));
684        let queue = format!("test-{}-queue", random_string(10));
685        let routing_key = "my_key";
686        trace!("Test exchange name: {}.", exchange);
687        let consumer = format!("test-consumer-{}", random_string(10));
688
689        config.consumer = consumer;
690        config.queue = queue;
691
692        let (_conn, channel) = config.connection.connect().await.unwrap();
693        let exchange_opts = lapin::options::ExchangeDeclareOptions {
694            auto_delete: true,
695            ..Default::default()
696        };
697
698        channel
699            .exchange_declare(
700                &exchange,
701                lapin::ExchangeKind::Fanout,
702                exchange_opts,
703                lapin::types::FieldTable::default(),
704            )
705            .await
706            .unwrap();
707
708        let queue_opts = QueueDeclareOptions {
709            auto_delete: true,
710            ..Default::default()
711        };
712        channel
713            .queue_declare(
714                &config.queue,
715                queue_opts,
716                lapin::types::FieldTable::default(),
717            )
718            .await
719            .unwrap();
720
721        channel
722            .queue_bind(
723                &config.queue,
724                &exchange,
725                "",
726                lapin::options::QueueBindOptions::default(),
727                lapin::types::FieldTable::default(),
728            )
729            .await
730            .unwrap();
731
732        trace!("Sending event...");
733        let now = Utc::now();
734        send_event(
735            &channel,
736            &exchange,
737            routing_key,
738            "my message",
739            now.timestamp_millis(),
740        )
741        .await;
742
743        trace!("Receiving event...");
744        let events =
745            run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
746        assert!(!events.is_empty());
747
748        let log = events[0].as_log();
749        trace!("{:?}", log);
750        assert_eq!(*log.get_message().unwrap(), "my message".into());
751        assert_eq!(log["routing"], routing_key.into());
752        assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
753        let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
754            .as_timestamp()
755            .unwrap();
756        assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
757        assert_eq!(log["exchange"], exchange.into());
758    }
759
760    #[tokio::test]
761    async fn amqp_source_consume_event() {
762        let config = make_config();
763        await_connection(&config.connection).await;
764        source_consume_event(config).await;
765    }
766
767    #[tokio::test]
768    async fn amqp_tls_source_consume_event() {
769        let config = make_tls_config();
770        await_connection(&config.connection).await;
771        source_consume_event(config).await;
772    }
773}