vector/sources/
amqp.rs

1//! `AMQP` source.
2//! Handles version AMQP 0.9.1 which is used by RabbitMQ.
3use crate::{
4    amqp::AmqpConfig,
5    codecs::{Decoder, DecodingConfig},
6    config::{SourceConfig, SourceContext, SourceOutput},
7    event::{BatchNotifier, BatchStatus},
8    internal_events::{
9        source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
10        StreamClosedError,
11    },
12    serde::{bool_or_struct, default_decoding, default_framing_message_based},
13    shutdown::ShutdownSignal,
14    SourceSender,
15};
16use async_stream::stream;
17use bytes::Bytes;
18use chrono::{TimeZone, Utc};
19use futures::{FutureExt, StreamExt};
20use futures_util::Stream;
21use lapin::{acker::Acker, message::Delivery, Channel};
22use snafu::Snafu;
23use std::{io::Cursor, pin::Pin};
24use tokio_util::codec::FramedRead;
25use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
26use vector_lib::configurable::configurable_component;
27use vector_lib::lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path};
28use vector_lib::{
29    config::{log_schema, LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
30    event::{Event, LogEvent},
31    EstimatedJsonEncodedSizeOf,
32};
33use vector_lib::{
34    finalizer::UnorderedFinalizer,
35    internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
36};
37use vrl::value::Kind;
38
39#[derive(Debug, Snafu)]
40enum BuildError {
41    #[snafu(display("Could not create AMQP consumer: {}", source))]
42    AmqpCreateError {
43        source: Box<dyn std::error::Error + Send + Sync>,
44    },
45}
46
47/// Configuration for the `amqp` source.
48///
49/// Supports AMQP version 0.9.1
50#[configurable_component(source(
51    "amqp",
52    "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
53))]
54#[derive(Clone, Debug, Derivative)]
55#[derivative(Default)]
56#[serde(deny_unknown_fields)]
57pub struct AmqpSourceConfig {
58    /// The name of the queue to consume.
59    #[serde(default = "default_queue")]
60    pub(crate) queue: String,
61
62    /// The identifier for the consumer.
63    #[serde(default = "default_consumer")]
64    #[configurable(metadata(docs::examples = "consumer-group-name"))]
65    pub(crate) consumer: String,
66
67    #[serde(flatten)]
68    pub(crate) connection: AmqpConfig,
69
70    /// The `AMQP` routing key.
71    #[serde(default = "default_routing_key_field")]
72    #[derivative(Default(value = "default_routing_key_field()"))]
73    pub(crate) routing_key_field: OptionalValuePath,
74
75    /// The `AMQP` exchange key.
76    #[serde(default = "default_exchange_key")]
77    #[derivative(Default(value = "default_exchange_key()"))]
78    pub(crate) exchange_key: OptionalValuePath,
79
80    /// The `AMQP` offset key.
81    #[serde(default = "default_offset_key")]
82    #[derivative(Default(value = "default_offset_key()"))]
83    pub(crate) offset_key: OptionalValuePath,
84
85    /// The namespace to use for logs. This overrides the global setting.
86    #[configurable(metadata(docs::hidden))]
87    #[serde(default)]
88    pub log_namespace: Option<bool>,
89
90    #[configurable(derived)]
91    #[serde(default = "default_framing_message_based")]
92    #[derivative(Default(value = "default_framing_message_based()"))]
93    pub(crate) framing: FramingConfig,
94
95    #[configurable(derived)]
96    #[serde(default = "default_decoding")]
97    #[derivative(Default(value = "default_decoding()"))]
98    pub(crate) decoding: DeserializerConfig,
99
100    #[configurable(derived)]
101    #[serde(default, deserialize_with = "bool_or_struct")]
102    pub(crate) acknowledgements: SourceAcknowledgementsConfig,
103}
104
105fn default_queue() -> String {
106    "vector".into()
107}
108
109fn default_consumer() -> String {
110    "vector".into()
111}
112
113fn default_routing_key_field() -> OptionalValuePath {
114    OptionalValuePath::from(owned_value_path!("routing"))
115}
116
117fn default_exchange_key() -> OptionalValuePath {
118    OptionalValuePath::from(owned_value_path!("exchange"))
119}
120
121fn default_offset_key() -> OptionalValuePath {
122    OptionalValuePath::from(owned_value_path!("offset"))
123}
124
125impl_generate_config_from_default!(AmqpSourceConfig);
126
127impl AmqpSourceConfig {
128    fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
129        DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
130    }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde(name = "amqp")]
135impl SourceConfig for AmqpSourceConfig {
136    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
137        let log_namespace = cx.log_namespace(self.log_namespace);
138        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
139
140        amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
141    }
142
143    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
144        let log_namespace = global_log_namespace.merge(self.log_namespace);
145        let schema_definition = self
146            .decoding
147            .schema_definition(log_namespace)
148            .with_standard_vector_source_metadata()
149            .with_source_metadata(
150                AmqpSourceConfig::NAME,
151                None,
152                &owned_value_path!("timestamp"),
153                Kind::timestamp(),
154                Some("timestamp"),
155            )
156            .with_source_metadata(
157                AmqpSourceConfig::NAME,
158                self.routing_key_field
159                    .path
160                    .clone()
161                    .map(LegacyKey::InsertIfEmpty),
162                &owned_value_path!("routing"),
163                Kind::bytes(),
164                None,
165            )
166            .with_source_metadata(
167                AmqpSourceConfig::NAME,
168                self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
169                &owned_value_path!("exchange"),
170                Kind::bytes(),
171                None,
172            )
173            .with_source_metadata(
174                AmqpSourceConfig::NAME,
175                self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
176                &owned_value_path!("offset"),
177                Kind::integer(),
178                None,
179            );
180
181        vec![SourceOutput::new_maybe_logs(
182            self.decoding.output_type(),
183            schema_definition,
184        )]
185    }
186
187    fn can_acknowledge(&self) -> bool {
188        true
189    }
190}
191
192#[derive(Debug)]
193struct FinalizerEntry {
194    acker: Acker,
195}
196
197impl From<Delivery> for FinalizerEntry {
198    fn from(delivery: Delivery) -> Self {
199        Self {
200            acker: delivery.acker,
201        }
202    }
203}
204
205pub(crate) async fn amqp_source(
206    config: &AmqpSourceConfig,
207    shutdown: ShutdownSignal,
208    out: SourceSender,
209    log_namespace: LogNamespace,
210    acknowledgements: bool,
211) -> crate::Result<super::Source> {
212    let config = config.clone();
213    let (_conn, channel) = config
214        .connection
215        .connect()
216        .await
217        .map_err(|source| BuildError::AmqpCreateError { source })?;
218
219    Ok(Box::pin(run_amqp_source(
220        config,
221        shutdown,
222        out,
223        channel,
224        log_namespace,
225        acknowledgements,
226    )))
227}
228
229struct Keys<'a> {
230    routing_key_field: &'a OptionalValuePath,
231    routing: &'a str,
232    exchange_key: &'a OptionalValuePath,
233    exchange: &'a str,
234    offset_key: &'a OptionalValuePath,
235    delivery_tag: i64,
236}
237
238/// Populates the decoded event with extra metadata.
239fn populate_log_event(
240    log: &mut LogEvent,
241    timestamp: Option<chrono::DateTime<Utc>>,
242    keys: &Keys<'_>,
243    log_namespace: LogNamespace,
244) {
245    log_namespace.insert_source_metadata(
246        AmqpSourceConfig::NAME,
247        log,
248        keys.routing_key_field
249            .path
250            .as_ref()
251            .map(LegacyKey::InsertIfEmpty),
252        path!("routing"),
253        keys.routing.to_string(),
254    );
255
256    log_namespace.insert_source_metadata(
257        AmqpSourceConfig::NAME,
258        log,
259        keys.exchange_key
260            .path
261            .as_ref()
262            .map(LegacyKey::InsertIfEmpty),
263        path!("exchange"),
264        keys.exchange.to_string(),
265    );
266
267    log_namespace.insert_source_metadata(
268        AmqpSourceConfig::NAME,
269        log,
270        keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
271        path!("offset"),
272        keys.delivery_tag,
273    );
274
275    log_namespace.insert_vector_metadata(
276        log,
277        log_schema().source_type_key(),
278        path!("source_type"),
279        Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
280    );
281
282    // This handles the transition from the original timestamp logic. Originally the
283    // `timestamp_key` was populated by the `properties.timestamp()` time on the message, falling
284    // back to calling `now()`.
285    match log_namespace {
286        LogNamespace::Vector => {
287            if let Some(timestamp) = timestamp {
288                log.insert(
289                    metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
290                    timestamp,
291                );
292            };
293
294            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
295        }
296        LogNamespace::Legacy => {
297            if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
298                log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
299            }
300        }
301    };
302}
303
304/// Receives an event from `AMQP` and pushes it along the pipeline.
305async fn receive_event(
306    config: &AmqpSourceConfig,
307    out: &mut SourceSender,
308    log_namespace: LogNamespace,
309    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
310    msg: Delivery,
311) -> Result<(), ()> {
312    let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
313    let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
314    let mut stream = FramedRead::new(payload, decoder);
315
316    // Extract timestamp from AMQP message
317    let timestamp = msg
318        .properties
319        .timestamp()
320        .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
321
322    let routing = msg.routing_key.to_string();
323    let exchange = msg.exchange.to_string();
324    let keys = Keys {
325        routing_key_field: &config.routing_key_field,
326        exchange_key: &config.exchange_key,
327        offset_key: &config.offset_key,
328        routing: &routing,
329        exchange: &exchange,
330        delivery_tag: msg.delivery_tag as i64,
331    };
332    let events_received = register!(EventsReceived);
333
334    let stream = stream! {
335        while let Some(result) = stream.next().await {
336            match result {
337                Ok((events, byte_size)) => {
338                    emit!(AmqpBytesReceived {
339                        byte_size,
340                        protocol: "amqp_0_9_1",
341                    });
342
343                    events_received.emit(CountByteSize(
344                        events.len(),
345                        events.estimated_json_encoded_size_of(),
346                    ));
347
348                    for mut event in events {
349                        if let Event::Log(ref mut log) = event {
350                            populate_log_event(log,
351                                        timestamp,
352                                        &keys,
353                                        log_namespace);
354                        }
355
356                        yield event;
357                    }
358                }
359                Err(error) => {
360                    use vector_lib::codecs::StreamDecodingError as _;
361
362                    // Error is logged by `codecs::Decoder`, no further handling
363                    // is needed here.
364                    if !error.can_continue() {
365                        break;
366                    }
367                }
368            }
369        }
370    }
371    .boxed();
372
373    finalize_event_stream(finalizer, out, stream, msg).await;
374
375    Ok(())
376}
377
378/// Send the event stream created by the framed read to the `out` stream.
379async fn finalize_event_stream(
380    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
381    out: &mut SourceSender,
382    mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
383    msg: Delivery,
384) {
385    match finalizer {
386        Some(finalizer) => {
387            let (batch, receiver) = BatchNotifier::new_with_receiver();
388            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
389
390            match out.send_event_stream(&mut stream).await {
391                Err(_) => {
392                    emit!(StreamClosedError { count: 1 });
393                }
394                Ok(_) => {
395                    finalizer.add(msg.into(), receiver);
396                }
397            }
398        }
399        None => match out.send_event_stream(&mut stream).await {
400            Err(_) => {
401                emit!(StreamClosedError { count: 1 });
402            }
403            Ok(_) => {
404                let ack_options = lapin::options::BasicAckOptions::default();
405                if let Err(error) = msg.acker.ack(ack_options).await {
406                    emit!(AmqpAckError { error });
407                }
408            }
409        },
410    }
411}
412
413/// Runs the `AMQP` source involving the main loop pulling data from the server.
414async fn run_amqp_source(
415    config: AmqpSourceConfig,
416    shutdown: ShutdownSignal,
417    mut out: SourceSender,
418    channel: Channel,
419    log_namespace: LogNamespace,
420    acknowledgements: bool,
421) -> Result<(), ()> {
422    let (finalizer, mut ack_stream) =
423        UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
424
425    debug!("Starting amqp source, listening to queue {}.", config.queue);
426    let mut consumer = channel
427        .basic_consume(
428            &config.queue,
429            &config.consumer,
430            lapin::options::BasicConsumeOptions::default(),
431            lapin::types::FieldTable::default(),
432        )
433        .await
434        .map_err(|error| {
435            error!(message = "Failed to consume.", error = ?error, internal_log_rate_limit = true);
436        })?
437        .fuse();
438    let mut shutdown = shutdown.fuse();
439    loop {
440        tokio::select! {
441            _ = &mut shutdown => break,
442            entry = ack_stream.next() => {
443                if let Some((status, entry)) = entry {
444                    handle_ack(status, entry).await;
445                }
446            },
447            opt_m = consumer.next() => {
448                if let Some(try_m) = opt_m {
449                    match try_m {
450                        Err(error) => {
451                            emit!(AmqpEventError { error });
452                            return Err(());
453                        }
454                        Ok(msg) => {
455                            receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
456                        }
457                    }
458                } else {
459                    break
460                }
461            }
462        };
463    }
464
465    Ok(())
466}
467
468async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
469    match status {
470        BatchStatus::Delivered => {
471            let ack_options = lapin::options::BasicAckOptions::default();
472            if let Err(error) = entry.acker.ack(ack_options).await {
473                emit!(AmqpAckError { error });
474            }
475        }
476        BatchStatus::Errored => {
477            let ack_options = lapin::options::BasicRejectOptions::default();
478            if let Err(error) = entry.acker.reject(ack_options).await {
479                emit!(AmqpRejectError { error });
480            }
481        }
482        BatchStatus::Rejected => {
483            let ack_options = lapin::options::BasicRejectOptions::default();
484            if let Err(error) = entry.acker.reject(ack_options).await {
485                emit!(AmqpRejectError { error });
486            }
487        }
488    }
489}
490
491#[cfg(test)]
492pub mod test {
493    use vector_lib::lookup::OwnedTargetPath;
494    use vector_lib::schema::Definition;
495    use vector_lib::tls::TlsConfig;
496    use vrl::value::kind::Collection;
497
498    use super::*;
499
500    #[test]
501    fn generate_config() {
502        crate::test_util::test_generate_config::<AmqpSourceConfig>();
503    }
504
505    pub fn make_config() -> AmqpSourceConfig {
506        let mut config = AmqpSourceConfig {
507            queue: "it".to_string(),
508            ..Default::default()
509        };
510        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
511        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
512        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
513        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
514        config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
515
516        config
517    }
518
519    pub fn make_tls_config() -> AmqpSourceConfig {
520        let mut config = AmqpSourceConfig {
521            queue: "it".to_string(),
522            ..Default::default()
523        };
524        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
525        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
526        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
527        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
528        let ca_file =
529            std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
530        config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
531        let tls = TlsConfig {
532            ca_file: Some(ca_file.as_str().into()),
533            ..Default::default()
534        };
535        config.connection.tls = Some(tls);
536        config
537    }
538
539    #[test]
540    fn output_schema_definition_vector_namespace() {
541        let config = AmqpSourceConfig {
542            log_namespace: Some(true),
543            ..Default::default()
544        };
545
546        let definition = config
547            .outputs(LogNamespace::Vector)
548            .remove(0)
549            .schema_definition(true);
550
551        let expected_definition =
552            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
553                .with_meaning(OwnedTargetPath::event_root(), "message")
554                .with_metadata_field(
555                    &owned_value_path!("vector", "source_type"),
556                    Kind::bytes(),
557                    None,
558                )
559                .with_metadata_field(
560                    &owned_value_path!("vector", "ingest_timestamp"),
561                    Kind::timestamp(),
562                    None,
563                )
564                .with_metadata_field(
565                    &owned_value_path!("amqp", "timestamp"),
566                    Kind::timestamp(),
567                    Some("timestamp"),
568                )
569                .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
570                .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
571                .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
572
573        assert_eq!(definition, Some(expected_definition));
574    }
575
576    #[test]
577    fn output_schema_definition_legacy_namespace() {
578        let config = AmqpSourceConfig::default();
579
580        let definition = config
581            .outputs(LogNamespace::Legacy)
582            .remove(0)
583            .schema_definition(true);
584
585        let expected_definition = Definition::new_with_default_metadata(
586            Kind::object(Collection::empty()),
587            [LogNamespace::Legacy],
588        )
589        .with_event_field(
590            &owned_value_path!("message"),
591            Kind::bytes(),
592            Some("message"),
593        )
594        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
595        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
596        .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
597        .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
598        .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
599
600        assert_eq!(definition, Some(expected_definition));
601    }
602}
603
604/// Integration tests use the docker compose files in `scripts/integration/docker-compose.amqp.yml`.
605#[cfg(feature = "amqp-integration-tests")]
606#[cfg(test)]
607mod integration_test {
608    use super::test::*;
609    use super::*;
610    use crate::{
611        amqp::await_connection,
612        shutdown::ShutdownSignal,
613        test_util::{
614            components::{run_and_assert_source_compliance, SOURCE_TAGS},
615            random_string,
616        },
617        SourceSender,
618    };
619    use chrono::Utc;
620    use lapin::options::*;
621    use lapin::BasicProperties;
622    use tokio::time::Duration;
623    use vector_lib::config::log_schema;
624
625    #[tokio::test]
626    async fn amqp_source_create_ok() {
627        let config = make_config();
628        await_connection(&config.connection).await;
629        assert!(amqp_source(
630            &config,
631            ShutdownSignal::noop(),
632            SourceSender::new_test().0,
633            LogNamespace::Legacy,
634            false,
635        )
636        .await
637        .is_ok());
638    }
639
640    #[tokio::test]
641    async fn amqp_tls_source_create_ok() {
642        let config = make_tls_config();
643        await_connection(&config.connection).await;
644
645        assert!(amqp_source(
646            &config,
647            ShutdownSignal::noop(),
648            SourceSender::new_test().0,
649            LogNamespace::Legacy,
650            false,
651        )
652        .await
653        .is_ok());
654    }
655
656    async fn send_event(
657        channel: &lapin::Channel,
658        exchange: &str,
659        routing_key: &str,
660        text: &str,
661        _timestamp: i64,
662    ) {
663        let payload = text.as_bytes();
664        let payload_len = payload.len();
665        trace!("Sending message of length {} to {}.", payload_len, exchange,);
666
667        channel
668            .basic_publish(
669                exchange,
670                routing_key,
671                BasicPublishOptions::default(),
672                payload.as_ref(),
673                BasicProperties::default(),
674            )
675            .await
676            .unwrap()
677            .await
678            .unwrap();
679    }
680
681    async fn source_consume_event(mut config: AmqpSourceConfig) {
682        let exchange = format!("test-{}-exchange", random_string(10));
683        let queue = format!("test-{}-queue", random_string(10));
684        let routing_key = "my_key";
685        trace!("Test exchange name: {}.", exchange);
686        let consumer = format!("test-consumer-{}", random_string(10));
687
688        config.consumer = consumer;
689        config.queue = queue;
690
691        let (_conn, channel) = config.connection.connect().await.unwrap();
692        let exchange_opts = lapin::options::ExchangeDeclareOptions {
693            auto_delete: true,
694            ..Default::default()
695        };
696
697        channel
698            .exchange_declare(
699                &exchange,
700                lapin::ExchangeKind::Fanout,
701                exchange_opts,
702                lapin::types::FieldTable::default(),
703            )
704            .await
705            .unwrap();
706
707        let queue_opts = QueueDeclareOptions {
708            auto_delete: true,
709            ..Default::default()
710        };
711        channel
712            .queue_declare(
713                &config.queue,
714                queue_opts,
715                lapin::types::FieldTable::default(),
716            )
717            .await
718            .unwrap();
719
720        channel
721            .queue_bind(
722                &config.queue,
723                &exchange,
724                "",
725                lapin::options::QueueBindOptions::default(),
726                lapin::types::FieldTable::default(),
727            )
728            .await
729            .unwrap();
730
731        trace!("Sending event...");
732        let now = Utc::now();
733        send_event(
734            &channel,
735            &exchange,
736            routing_key,
737            "my message",
738            now.timestamp_millis(),
739        )
740        .await;
741
742        trace!("Receiving event...");
743        let events =
744            run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
745        assert!(!events.is_empty());
746
747        let log = events[0].as_log();
748        trace!("{:?}", log);
749        assert_eq!(*log.get_message().unwrap(), "my message".into());
750        assert_eq!(log["routing"], routing_key.into());
751        assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
752        let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
753            .as_timestamp()
754            .unwrap();
755        assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
756        assert_eq!(log["exchange"], exchange.into());
757    }
758
759    #[tokio::test]
760    async fn amqp_source_consume_event() {
761        let config = make_config();
762        await_connection(&config.connection).await;
763        source_consume_event(config).await;
764    }
765
766    #[tokio::test]
767    async fn amqp_tls_source_consume_event() {
768        let config = make_tls_config();
769        await_connection(&config.connection).await;
770        source_consume_event(config).await;
771    }
772}