vector/sources/
amqp.rs

1//! `AMQP` source.
2//! Handles version AMQP 0.9.1 which is used by RabbitMQ.
3use std::{io::Cursor, pin::Pin};
4
5use async_stream::stream;
6use bytes::Bytes;
7use chrono::{TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use futures_util::Stream;
10use lapin::{Channel, acker::Acker, message::Delivery, options::BasicQosOptions};
11use snafu::Snafu;
12use tokio_util::codec::FramedRead;
13use vector_lib::{
14    EstimatedJsonEncodedSizeOf,
15    codecs::decoding::{DeserializerConfig, FramingConfig},
16    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
17    configurable::configurable_component,
18    event::{Event, LogEvent},
19    finalizer::UnorderedFinalizer,
20    internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
21    lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path},
22};
23use vrl::value::Kind;
24
25use crate::{
26    SourceSender,
27    amqp::AmqpConfig,
28    codecs::{Decoder, DecodingConfig},
29    config::{SourceConfig, SourceContext, SourceOutput},
30    event::{BatchNotifier, BatchStatus},
31    internal_events::{
32        StreamClosedError,
33        source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
34    },
35    serde::{bool_or_struct, default_decoding, default_framing_message_based},
36    shutdown::ShutdownSignal,
37};
38
39#[derive(Debug, Snafu)]
40enum BuildError {
41    #[snafu(display("Could not create AMQP consumer: {}", source))]
42    AmqpCreateError {
43        source: Box<dyn std::error::Error + Send + Sync>,
44    },
45}
46
47/// Configuration for the `amqp` source.
48///
49/// Supports AMQP version 0.9.1
50#[configurable_component(source(
51    "amqp",
52    "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
53))]
54#[derive(Clone, Debug, Derivative)]
55#[derivative(Default)]
56#[serde(deny_unknown_fields)]
57pub struct AmqpSourceConfig {
58    /// The name of the queue to consume.
59    #[serde(default = "default_queue")]
60    pub(crate) queue: String,
61
62    /// The identifier for the consumer.
63    #[serde(default = "default_consumer")]
64    #[configurable(metadata(docs::examples = "consumer-group-name"))]
65    pub(crate) consumer: String,
66
67    #[serde(flatten)]
68    pub(crate) connection: AmqpConfig,
69
70    /// The `AMQP` routing key.
71    #[serde(default = "default_routing_key_field")]
72    #[derivative(Default(value = "default_routing_key_field()"))]
73    pub(crate) routing_key_field: OptionalValuePath,
74
75    /// The `AMQP` exchange key.
76    #[serde(default = "default_exchange_key")]
77    #[derivative(Default(value = "default_exchange_key()"))]
78    pub(crate) exchange_key: OptionalValuePath,
79
80    /// The `AMQP` offset key.
81    #[serde(default = "default_offset_key")]
82    #[derivative(Default(value = "default_offset_key()"))]
83    pub(crate) offset_key: OptionalValuePath,
84
85    /// The namespace to use for logs. This overrides the global setting.
86    #[configurable(metadata(docs::hidden))]
87    #[serde(default)]
88    pub log_namespace: Option<bool>,
89
90    #[configurable(derived)]
91    #[serde(default = "default_framing_message_based")]
92    #[derivative(Default(value = "default_framing_message_based()"))]
93    pub(crate) framing: FramingConfig,
94
95    #[configurable(derived)]
96    #[serde(default = "default_decoding")]
97    #[derivative(Default(value = "default_decoding()"))]
98    pub(crate) decoding: DeserializerConfig,
99
100    #[configurable(derived)]
101    #[serde(default, deserialize_with = "bool_or_struct")]
102    pub(crate) acknowledgements: SourceAcknowledgementsConfig,
103
104    /// Maximum number of unacknowledged messages the broker will deliver to this consumer.
105    ///
106    /// This controls flow control via AMQP QoS prefetch. Lower values limit memory usage and
107    /// prevent overwhelming slow consumers, but may reduce throughput. Higher values increase
108    /// throughput but consume more memory.
109    ///
110    /// If not set, the broker/client default applies (often unlimited).
111    #[serde(default)]
112    #[configurable(metadata(docs::examples = 100))]
113    pub(crate) prefetch_count: Option<u16>,
114}
115
116fn default_queue() -> String {
117    "vector".into()
118}
119
120fn default_consumer() -> String {
121    "vector".into()
122}
123
124fn default_routing_key_field() -> OptionalValuePath {
125    OptionalValuePath::from(owned_value_path!("routing"))
126}
127
128fn default_exchange_key() -> OptionalValuePath {
129    OptionalValuePath::from(owned_value_path!("exchange"))
130}
131
132fn default_offset_key() -> OptionalValuePath {
133    OptionalValuePath::from(owned_value_path!("offset"))
134}
135
136impl_generate_config_from_default!(AmqpSourceConfig);
137
138impl AmqpSourceConfig {
139    fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
140        DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
141    }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde(name = "amqp")]
146impl SourceConfig for AmqpSourceConfig {
147    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
148        let log_namespace = cx.log_namespace(self.log_namespace);
149        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
150
151        amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
152    }
153
154    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
155        let log_namespace = global_log_namespace.merge(self.log_namespace);
156        let schema_definition = self
157            .decoding
158            .schema_definition(log_namespace)
159            .with_standard_vector_source_metadata()
160            .with_source_metadata(
161                AmqpSourceConfig::NAME,
162                None,
163                &owned_value_path!("timestamp"),
164                Kind::timestamp(),
165                Some("timestamp"),
166            )
167            .with_source_metadata(
168                AmqpSourceConfig::NAME,
169                self.routing_key_field
170                    .path
171                    .clone()
172                    .map(LegacyKey::InsertIfEmpty),
173                &owned_value_path!("routing"),
174                Kind::bytes(),
175                None,
176            )
177            .with_source_metadata(
178                AmqpSourceConfig::NAME,
179                self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
180                &owned_value_path!("exchange"),
181                Kind::bytes(),
182                None,
183            )
184            .with_source_metadata(
185                AmqpSourceConfig::NAME,
186                self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
187                &owned_value_path!("offset"),
188                Kind::integer(),
189                None,
190            );
191
192        vec![SourceOutput::new_maybe_logs(
193            self.decoding.output_type(),
194            schema_definition,
195        )]
196    }
197
198    fn can_acknowledge(&self) -> bool {
199        true
200    }
201}
202
203#[derive(Debug)]
204struct FinalizerEntry {
205    acker: Acker,
206}
207
208impl From<Delivery> for FinalizerEntry {
209    fn from(delivery: Delivery) -> Self {
210        Self {
211            acker: delivery.acker,
212        }
213    }
214}
215
216pub(crate) async fn amqp_source(
217    config: &AmqpSourceConfig,
218    shutdown: ShutdownSignal,
219    out: SourceSender,
220    log_namespace: LogNamespace,
221    acknowledgements: bool,
222) -> crate::Result<super::Source> {
223    let config = config.clone();
224    let (_conn, channel) = config
225        .connection
226        .connect()
227        .await
228        .map_err(|source| BuildError::AmqpCreateError { source })?;
229
230    Ok(Box::pin(run_amqp_source(
231        config,
232        shutdown,
233        out,
234        channel,
235        log_namespace,
236        acknowledgements,
237    )))
238}
239
240struct Keys<'a> {
241    routing_key_field: &'a OptionalValuePath,
242    routing: &'a str,
243    exchange_key: &'a OptionalValuePath,
244    exchange: &'a str,
245    offset_key: &'a OptionalValuePath,
246    delivery_tag: i64,
247}
248
249/// Populates the decoded event with extra metadata.
250fn populate_log_event(
251    log: &mut LogEvent,
252    timestamp: Option<chrono::DateTime<Utc>>,
253    keys: &Keys<'_>,
254    log_namespace: LogNamespace,
255) {
256    log_namespace.insert_source_metadata(
257        AmqpSourceConfig::NAME,
258        log,
259        keys.routing_key_field
260            .path
261            .as_ref()
262            .map(LegacyKey::InsertIfEmpty),
263        path!("routing"),
264        keys.routing.to_string(),
265    );
266
267    log_namespace.insert_source_metadata(
268        AmqpSourceConfig::NAME,
269        log,
270        keys.exchange_key
271            .path
272            .as_ref()
273            .map(LegacyKey::InsertIfEmpty),
274        path!("exchange"),
275        keys.exchange.to_string(),
276    );
277
278    log_namespace.insert_source_metadata(
279        AmqpSourceConfig::NAME,
280        log,
281        keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
282        path!("offset"),
283        keys.delivery_tag,
284    );
285
286    log_namespace.insert_vector_metadata(
287        log,
288        log_schema().source_type_key(),
289        path!("source_type"),
290        Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
291    );
292
293    // This handles the transition from the original timestamp logic. Originally the
294    // `timestamp_key` was populated by the `properties.timestamp()` time on the message, falling
295    // back to calling `now()`.
296    match log_namespace {
297        LogNamespace::Vector => {
298            if let Some(timestamp) = timestamp {
299                log.insert(
300                    metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
301                    timestamp,
302                );
303            };
304
305            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
306        }
307        LogNamespace::Legacy => {
308            if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
309                log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
310            }
311        }
312    };
313}
314
315/// Receives an event from `AMQP` and pushes it along the pipeline.
316async fn receive_event(
317    config: &AmqpSourceConfig,
318    out: &mut SourceSender,
319    log_namespace: LogNamespace,
320    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
321    msg: Delivery,
322) -> Result<(), ()> {
323    let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
324    let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
325    let mut stream = FramedRead::new(payload, decoder);
326
327    // Extract timestamp from AMQP message
328    let timestamp = msg
329        .properties
330        .timestamp()
331        .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
332
333    let routing = msg.routing_key.to_string();
334    let exchange = msg.exchange.to_string();
335    let keys = Keys {
336        routing_key_field: &config.routing_key_field,
337        exchange_key: &config.exchange_key,
338        offset_key: &config.offset_key,
339        routing: &routing,
340        exchange: &exchange,
341        delivery_tag: msg.delivery_tag as i64,
342    };
343    let events_received = register!(EventsReceived);
344
345    let stream = stream! {
346        while let Some(result) = stream.next().await {
347            match result {
348                Ok((events, byte_size)) => {
349                    emit!(AmqpBytesReceived {
350                        byte_size,
351                        protocol: "amqp_0_9_1",
352                    });
353
354                    events_received.emit(CountByteSize(
355                        events.len(),
356                        events.estimated_json_encoded_size_of(),
357                    ));
358
359                    for mut event in events {
360                        if let Event::Log(ref mut log) = event {
361                            populate_log_event(log,
362                                        timestamp,
363                                        &keys,
364                                        log_namespace);
365                        }
366
367                        yield event;
368                    }
369                }
370                Err(error) => {
371                    use vector_lib::codecs::StreamDecodingError as _;
372
373                    // Error is logged by `codecs::Decoder`, no further handling
374                    // is needed here.
375                    if !error.can_continue() {
376                        break;
377                    }
378                }
379            }
380        }
381    }
382    .boxed();
383
384    finalize_event_stream(finalizer, out, stream, msg).await;
385
386    Ok(())
387}
388
389/// Send the event stream created by the framed read to the `out` stream.
390async fn finalize_event_stream(
391    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
392    out: &mut SourceSender,
393    mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
394    msg: Delivery,
395) {
396    match finalizer {
397        Some(finalizer) => {
398            let (batch, receiver) = BatchNotifier::new_with_receiver();
399            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
400
401            match out.send_event_stream(&mut stream).await {
402                Err(_) => {
403                    emit!(StreamClosedError { count: 1 });
404                }
405                Ok(_) => {
406                    finalizer.add(msg.into(), receiver);
407                }
408            }
409        }
410        None => match out.send_event_stream(&mut stream).await {
411            Err(_) => {
412                emit!(StreamClosedError { count: 1 });
413            }
414            Ok(_) => {
415                let ack_options = lapin::options::BasicAckOptions::default();
416                if let Err(error) = msg.acker.ack(ack_options).await {
417                    emit!(AmqpAckError { error });
418                }
419            }
420        },
421    }
422}
423
424/// Runs the `AMQP` source involving the main loop pulling data from the server.
425async fn run_amqp_source(
426    config: AmqpSourceConfig,
427    shutdown: ShutdownSignal,
428    mut out: SourceSender,
429    channel: Channel,
430    log_namespace: LogNamespace,
431    acknowledgements: bool,
432) -> Result<(), ()> {
433    let (finalizer, mut ack_stream) =
434        UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
435
436    // Apply AMQP QoS (prefetch) before starting consumption.
437    if let Some(count) = config.prefetch_count {
438        // per-consumer prefetch (global = false)
439        channel
440            .basic_qos(count, BasicQosOptions { global: false })
441            .await
442            .map_err(|error| {
443                error!(message = "Failed to apply basic_qos.", ?error);
444            })?;
445    }
446
447    debug!("Starting amqp source, listening to queue {}.", config.queue);
448    let mut consumer = channel
449        .basic_consume(
450            &config.queue,
451            &config.consumer,
452            lapin::options::BasicConsumeOptions::default(),
453            lapin::types::FieldTable::default(),
454        )
455        .await
456        .map_err(|error| {
457            error!(message = "Failed to consume.", ?error);
458        })?
459        .fuse();
460    let mut shutdown = shutdown.fuse();
461    loop {
462        tokio::select! {
463            _ = &mut shutdown => break,
464            entry = ack_stream.next() => {
465                if let Some((status, entry)) = entry {
466                    handle_ack(status, entry).await;
467                }
468            },
469            opt_m = consumer.next() => {
470                if let Some(try_m) = opt_m {
471                    match try_m {
472                        Err(error) => {
473                            emit!(AmqpEventError { error });
474                            return Err(());
475                        }
476                        Ok(msg) => {
477                            receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
478                        }
479                    }
480                } else {
481                    break
482                }
483            }
484        };
485    }
486
487    Ok(())
488}
489
490async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
491    match status {
492        BatchStatus::Delivered => {
493            let ack_options = lapin::options::BasicAckOptions::default();
494            if let Err(error) = entry.acker.ack(ack_options).await {
495                emit!(AmqpAckError { error });
496            }
497        }
498        BatchStatus::Errored => {
499            let ack_options = lapin::options::BasicRejectOptions::default();
500            if let Err(error) = entry.acker.reject(ack_options).await {
501                emit!(AmqpRejectError { error });
502            }
503        }
504        BatchStatus::Rejected => {
505            let ack_options = lapin::options::BasicRejectOptions::default();
506            if let Err(error) = entry.acker.reject(ack_options).await {
507                emit!(AmqpRejectError { error });
508            }
509        }
510    }
511}
512
513#[cfg(test)]
514pub mod test {
515    use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig};
516    use vrl::value::kind::Collection;
517
518    use super::*;
519
520    #[test]
521    fn generate_config() {
522        crate::test_util::test_generate_config::<AmqpSourceConfig>();
523    }
524
525    pub fn make_config() -> AmqpSourceConfig {
526        let mut config = AmqpSourceConfig {
527            queue: "it".to_string(),
528            ..Default::default()
529        };
530        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
531        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
532        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
533        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
534        config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
535
536        config
537    }
538
539    pub fn make_tls_config() -> AmqpSourceConfig {
540        let mut config = AmqpSourceConfig {
541            queue: "it".to_string(),
542            ..Default::default()
543        };
544        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
545        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
546        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
547        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
548        let ca_file =
549            std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
550        config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
551        let tls = TlsConfig {
552            ca_file: Some(ca_file.as_str().into()),
553            ..Default::default()
554        };
555        config.connection.tls = Some(tls);
556        config
557    }
558
559    #[test]
560    fn output_schema_definition_vector_namespace() {
561        let config = AmqpSourceConfig {
562            log_namespace: Some(true),
563            ..Default::default()
564        };
565
566        let definition = config
567            .outputs(LogNamespace::Vector)
568            .remove(0)
569            .schema_definition(true);
570
571        let expected_definition =
572            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
573                .with_meaning(OwnedTargetPath::event_root(), "message")
574                .with_metadata_field(
575                    &owned_value_path!("vector", "source_type"),
576                    Kind::bytes(),
577                    None,
578                )
579                .with_metadata_field(
580                    &owned_value_path!("vector", "ingest_timestamp"),
581                    Kind::timestamp(),
582                    None,
583                )
584                .with_metadata_field(
585                    &owned_value_path!("amqp", "timestamp"),
586                    Kind::timestamp(),
587                    Some("timestamp"),
588                )
589                .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
590                .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
591                .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
592
593        assert_eq!(definition, Some(expected_definition));
594    }
595
596    #[test]
597    fn output_schema_definition_legacy_namespace() {
598        let config = AmqpSourceConfig::default();
599
600        let definition = config
601            .outputs(LogNamespace::Legacy)
602            .remove(0)
603            .schema_definition(true);
604
605        let expected_definition = Definition::new_with_default_metadata(
606            Kind::object(Collection::empty()),
607            [LogNamespace::Legacy],
608        )
609        .with_event_field(
610            &owned_value_path!("message"),
611            Kind::bytes(),
612            Some("message"),
613        )
614        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
615        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
616        .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
617        .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
618        .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
619
620        assert_eq!(definition, Some(expected_definition));
621    }
622}
623
624/// Integration tests use the docker compose files in `tests/integration/docker-compose.amqp.yml`.
625#[cfg(feature = "amqp-integration-tests")]
626#[cfg(test)]
627mod integration_test {
628    use chrono::Utc;
629    use lapin::{BasicProperties, options::*};
630    use tokio::time::Duration;
631    use vector_lib::config::log_schema;
632
633    use super::{test::*, *};
634    use crate::{
635        SourceSender,
636        amqp::await_connection,
637        shutdown::ShutdownSignal,
638        test_util::{
639            components::{SOURCE_TAGS, run_and_assert_source_compliance},
640            random_string,
641        },
642    };
643
644    #[tokio::test]
645    async fn amqp_source_create_ok() {
646        let config = make_config();
647        await_connection(&config.connection).await;
648        assert!(
649            amqp_source(
650                &config,
651                ShutdownSignal::noop(),
652                SourceSender::new_test().0,
653                LogNamespace::Legacy,
654                false,
655            )
656            .await
657            .is_ok()
658        );
659    }
660
661    #[tokio::test]
662    async fn amqp_tls_source_create_ok() {
663        let config = make_tls_config();
664        await_connection(&config.connection).await;
665
666        assert!(
667            amqp_source(
668                &config,
669                ShutdownSignal::noop(),
670                SourceSender::new_test().0,
671                LogNamespace::Legacy,
672                false,
673            )
674            .await
675            .is_ok()
676        );
677    }
678
679    async fn send_event(
680        channel: &lapin::Channel,
681        exchange: &str,
682        routing_key: &str,
683        text: &str,
684        _timestamp: i64,
685    ) {
686        let payload = text.as_bytes();
687        let payload_len = payload.len();
688        trace!("Sending message of length {} to {}.", payload_len, exchange,);
689
690        channel
691            .basic_publish(
692                exchange,
693                routing_key,
694                BasicPublishOptions::default(),
695                payload.as_ref(),
696                BasicProperties::default(),
697            )
698            .await
699            .unwrap()
700            .await
701            .unwrap();
702    }
703
704    async fn source_consume_event(mut config: AmqpSourceConfig) {
705        let exchange = format!("test-{}-exchange", random_string(10));
706        let queue = format!("test-{}-queue", random_string(10));
707        let routing_key = "my_key";
708        trace!("Test exchange name: {}.", exchange);
709        let consumer = format!("test-consumer-{}", random_string(10));
710
711        config.consumer = consumer;
712        config.queue = queue;
713
714        let (_conn, channel) = config.connection.connect().await.unwrap();
715        let exchange_opts = lapin::options::ExchangeDeclareOptions {
716            auto_delete: true,
717            ..Default::default()
718        };
719
720        channel
721            .exchange_declare(
722                &exchange,
723                lapin::ExchangeKind::Fanout,
724                exchange_opts,
725                lapin::types::FieldTable::default(),
726            )
727            .await
728            .unwrap();
729
730        let queue_opts = QueueDeclareOptions {
731            auto_delete: true,
732            ..Default::default()
733        };
734        channel
735            .queue_declare(
736                &config.queue,
737                queue_opts,
738                lapin::types::FieldTable::default(),
739            )
740            .await
741            .unwrap();
742
743        channel
744            .queue_bind(
745                &config.queue,
746                &exchange,
747                "",
748                lapin::options::QueueBindOptions::default(),
749                lapin::types::FieldTable::default(),
750            )
751            .await
752            .unwrap();
753
754        trace!("Sending event...");
755        let now = Utc::now();
756        send_event(
757            &channel,
758            &exchange,
759            routing_key,
760            "my message",
761            now.timestamp_millis(),
762        )
763        .await;
764
765        trace!("Receiving event...");
766        let events =
767            run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
768        assert!(!events.is_empty());
769
770        let log = events[0].as_log();
771        trace!("{:?}", log);
772        assert_eq!(*log.get_message().unwrap(), "my message".into());
773        assert_eq!(log["routing"], routing_key.into());
774        assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
775        let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
776            .as_timestamp()
777            .unwrap();
778        assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
779        assert_eq!(log["exchange"], exchange.into());
780    }
781
782    #[tokio::test]
783    async fn amqp_source_consume_event() {
784        let config = make_config();
785        await_connection(&config.connection).await;
786        source_consume_event(config).await;
787    }
788
789    #[tokio::test]
790    async fn amqp_tls_source_consume_event() {
791        let config = make_tls_config();
792        await_connection(&config.connection).await;
793        source_consume_event(config).await;
794    }
795}