vector/sources/
amqp.rs

1//! `AMQP` source.
2//! Handles version AMQP 0.9.1 which is used by RabbitMQ.
3use std::{io::Cursor, pin::Pin};
4
5use async_stream::stream;
6use bytes::Bytes;
7use chrono::{TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use futures_util::Stream;
10use lapin::{Channel, acker::Acker, message::Delivery, options::BasicQosOptions};
11use snafu::Snafu;
12use vector_lib::{
13    EstimatedJsonEncodedSizeOf,
14    codecs::{
15        DecoderFramedRead,
16        decoding::{DeserializerConfig, FramingConfig},
17    },
18    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
19    configurable::configurable_component,
20    event::{Event, LogEvent},
21    finalizer::UnorderedFinalizer,
22    internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
23    lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path},
24};
25use vrl::value::Kind;
26
27use crate::{
28    SourceSender,
29    amqp::AmqpConfig,
30    codecs::{Decoder, DecodingConfig},
31    config::{SourceConfig, SourceContext, SourceOutput},
32    event::{BatchNotifier, BatchStatus},
33    internal_events::{
34        StreamClosedError,
35        source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
36    },
37    serde::{bool_or_struct, default_decoding, default_framing_message_based},
38    shutdown::ShutdownSignal,
39};
40
41#[derive(Debug, Snafu)]
42enum BuildError {
43    #[snafu(display("Could not create AMQP consumer: {}", source))]
44    AmqpCreateError {
45        source: Box<dyn std::error::Error + Send + Sync>,
46    },
47}
48
49/// Configuration for the `amqp` source.
50///
51/// Supports AMQP version 0.9.1
52#[configurable_component(source(
53    "amqp",
54    "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
55))]
56#[derive(Clone, Debug, Derivative)]
57#[derivative(Default)]
58#[serde(deny_unknown_fields)]
59pub struct AmqpSourceConfig {
60    /// The name of the queue to consume.
61    #[serde(default = "default_queue")]
62    pub(crate) queue: String,
63
64    /// The identifier for the consumer.
65    #[serde(default = "default_consumer")]
66    #[configurable(metadata(docs::examples = "consumer-group-name"))]
67    pub(crate) consumer: String,
68
69    #[serde(flatten)]
70    pub(crate) connection: AmqpConfig,
71
72    /// The `AMQP` routing key.
73    #[serde(default = "default_routing_key_field")]
74    #[derivative(Default(value = "default_routing_key_field()"))]
75    pub(crate) routing_key_field: OptionalValuePath,
76
77    /// The `AMQP` exchange key.
78    #[serde(default = "default_exchange_key")]
79    #[derivative(Default(value = "default_exchange_key()"))]
80    pub(crate) exchange_key: OptionalValuePath,
81
82    /// The `AMQP` offset key.
83    #[serde(default = "default_offset_key")]
84    #[derivative(Default(value = "default_offset_key()"))]
85    pub(crate) offset_key: OptionalValuePath,
86
87    /// The namespace to use for logs. This overrides the global setting.
88    #[configurable(metadata(docs::hidden))]
89    #[serde(default)]
90    pub log_namespace: Option<bool>,
91
92    #[configurable(derived)]
93    #[serde(default = "default_framing_message_based")]
94    #[derivative(Default(value = "default_framing_message_based()"))]
95    pub(crate) framing: FramingConfig,
96
97    #[configurable(derived)]
98    #[serde(default = "default_decoding")]
99    #[derivative(Default(value = "default_decoding()"))]
100    pub(crate) decoding: DeserializerConfig,
101
102    #[configurable(derived)]
103    #[serde(default, deserialize_with = "bool_or_struct")]
104    pub(crate) acknowledgements: SourceAcknowledgementsConfig,
105
106    /// Maximum number of unacknowledged messages the broker will deliver to this consumer.
107    ///
108    /// This controls flow control via AMQP QoS prefetch. Lower values limit memory usage and
109    /// prevent overwhelming slow consumers, but may reduce throughput. Higher values increase
110    /// throughput but consume more memory.
111    ///
112    /// If not set, the broker/client default applies (often unlimited).
113    #[serde(default)]
114    #[configurable(metadata(docs::examples = 100))]
115    pub(crate) prefetch_count: Option<u16>,
116}
117
118fn default_queue() -> String {
119    "vector".into()
120}
121
122fn default_consumer() -> String {
123    "vector".into()
124}
125
126fn default_routing_key_field() -> OptionalValuePath {
127    OptionalValuePath::from(owned_value_path!("routing"))
128}
129
130fn default_exchange_key() -> OptionalValuePath {
131    OptionalValuePath::from(owned_value_path!("exchange"))
132}
133
134fn default_offset_key() -> OptionalValuePath {
135    OptionalValuePath::from(owned_value_path!("offset"))
136}
137
138impl_generate_config_from_default!(AmqpSourceConfig);
139
140impl AmqpSourceConfig {
141    fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
142        DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
143    }
144}
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "amqp")]
148impl SourceConfig for AmqpSourceConfig {
149    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
150        let log_namespace = cx.log_namespace(self.log_namespace);
151        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
152
153        amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
154    }
155
156    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
157        let log_namespace = global_log_namespace.merge(self.log_namespace);
158        let schema_definition = self
159            .decoding
160            .schema_definition(log_namespace)
161            .with_standard_vector_source_metadata()
162            .with_source_metadata(
163                AmqpSourceConfig::NAME,
164                None,
165                &owned_value_path!("timestamp"),
166                Kind::timestamp(),
167                Some("timestamp"),
168            )
169            .with_source_metadata(
170                AmqpSourceConfig::NAME,
171                self.routing_key_field
172                    .path
173                    .clone()
174                    .map(LegacyKey::InsertIfEmpty),
175                &owned_value_path!("routing"),
176                Kind::bytes(),
177                None,
178            )
179            .with_source_metadata(
180                AmqpSourceConfig::NAME,
181                self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
182                &owned_value_path!("exchange"),
183                Kind::bytes(),
184                None,
185            )
186            .with_source_metadata(
187                AmqpSourceConfig::NAME,
188                self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
189                &owned_value_path!("offset"),
190                Kind::integer(),
191                None,
192            );
193
194        vec![SourceOutput::new_maybe_logs(
195            self.decoding.output_type(),
196            schema_definition,
197        )]
198    }
199
200    fn can_acknowledge(&self) -> bool {
201        true
202    }
203}
204
205#[derive(Debug)]
206struct FinalizerEntry {
207    acker: Acker,
208}
209
210impl From<Delivery> for FinalizerEntry {
211    fn from(delivery: Delivery) -> Self {
212        Self {
213            acker: delivery.acker,
214        }
215    }
216}
217
218pub(crate) async fn amqp_source(
219    config: &AmqpSourceConfig,
220    shutdown: ShutdownSignal,
221    out: SourceSender,
222    log_namespace: LogNamespace,
223    acknowledgements: bool,
224) -> crate::Result<super::Source> {
225    let config = config.clone();
226    let (_conn, channel) = config
227        .connection
228        .connect()
229        .await
230        .map_err(|source| BuildError::AmqpCreateError { source })?;
231
232    Ok(Box::pin(run_amqp_source(
233        config,
234        shutdown,
235        out,
236        channel,
237        log_namespace,
238        acknowledgements,
239    )))
240}
241
242struct Keys<'a> {
243    routing_key_field: &'a OptionalValuePath,
244    routing: &'a str,
245    exchange_key: &'a OptionalValuePath,
246    exchange: &'a str,
247    offset_key: &'a OptionalValuePath,
248    delivery_tag: i64,
249}
250
251/// Populates the decoded event with extra metadata.
252fn populate_log_event(
253    log: &mut LogEvent,
254    timestamp: Option<chrono::DateTime<Utc>>,
255    keys: &Keys<'_>,
256    log_namespace: LogNamespace,
257) {
258    log_namespace.insert_source_metadata(
259        AmqpSourceConfig::NAME,
260        log,
261        keys.routing_key_field
262            .path
263            .as_ref()
264            .map(LegacyKey::InsertIfEmpty),
265        path!("routing"),
266        keys.routing.to_string(),
267    );
268
269    log_namespace.insert_source_metadata(
270        AmqpSourceConfig::NAME,
271        log,
272        keys.exchange_key
273            .path
274            .as_ref()
275            .map(LegacyKey::InsertIfEmpty),
276        path!("exchange"),
277        keys.exchange.to_string(),
278    );
279
280    log_namespace.insert_source_metadata(
281        AmqpSourceConfig::NAME,
282        log,
283        keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
284        path!("offset"),
285        keys.delivery_tag,
286    );
287
288    log_namespace.insert_vector_metadata(
289        log,
290        log_schema().source_type_key(),
291        path!("source_type"),
292        Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
293    );
294
295    // This handles the transition from the original timestamp logic. Originally the
296    // `timestamp_key` was populated by the `properties.timestamp()` time on the message, falling
297    // back to calling `now()`.
298    match log_namespace {
299        LogNamespace::Vector => {
300            if let Some(timestamp) = timestamp {
301                log.insert(
302                    metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
303                    timestamp,
304                );
305            };
306
307            log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
308        }
309        LogNamespace::Legacy => {
310            if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
311                log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
312            }
313        }
314    };
315}
316
317/// Receives an event from `AMQP` and pushes it along the pipeline.
318async fn receive_event(
319    config: &AmqpSourceConfig,
320    out: &mut SourceSender,
321    log_namespace: LogNamespace,
322    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
323    msg: Delivery,
324) -> Result<(), ()> {
325    let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
326    let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
327    let mut stream = DecoderFramedRead::new(payload, decoder);
328
329    // Extract timestamp from AMQP message
330    let timestamp = msg
331        .properties
332        .timestamp()
333        .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
334
335    let routing = msg.routing_key.to_string();
336    let exchange = msg.exchange.to_string();
337    let keys = Keys {
338        routing_key_field: &config.routing_key_field,
339        exchange_key: &config.exchange_key,
340        offset_key: &config.offset_key,
341        routing: &routing,
342        exchange: &exchange,
343        delivery_tag: msg.delivery_tag as i64,
344    };
345    let events_received = register!(EventsReceived);
346
347    let stream = stream! {
348        while let Some(result) = stream.next().await {
349            match result {
350                Ok((events, byte_size)) => {
351                    emit!(AmqpBytesReceived {
352                        byte_size,
353                        protocol: "amqp_0_9_1",
354                    });
355
356                    events_received.emit(CountByteSize(
357                        events.len(),
358                        events.estimated_json_encoded_size_of(),
359                    ));
360
361                    for mut event in events {
362                        if let Event::Log(ref mut log) = event {
363                            populate_log_event(log,
364                                        timestamp,
365                                        &keys,
366                                        log_namespace);
367                        }
368
369                        yield event;
370                    }
371                }
372                Err(error) => {
373                    use vector_lib::codecs::StreamDecodingError as _;
374
375                    // Error is logged by `codecs::Decoder`, no further handling
376                    // is needed here.
377                    if !error.can_continue() {
378                        break;
379                    }
380                }
381            }
382        }
383    }
384    .boxed();
385
386    finalize_event_stream(finalizer, out, stream, msg).await;
387
388    Ok(())
389}
390
391/// Send the event stream created by the framed read to the `out` stream.
392async fn finalize_event_stream(
393    finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
394    out: &mut SourceSender,
395    mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
396    msg: Delivery,
397) {
398    match finalizer {
399        Some(finalizer) => {
400            let (batch, receiver) = BatchNotifier::new_with_receiver();
401            let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
402
403            match out.send_event_stream(&mut stream).await {
404                Err(_) => {
405                    emit!(StreamClosedError { count: 1 });
406                }
407                Ok(_) => {
408                    finalizer.add(msg.into(), receiver);
409                }
410            }
411        }
412        None => match out.send_event_stream(&mut stream).await {
413            Err(_) => {
414                emit!(StreamClosedError { count: 1 });
415            }
416            Ok(_) => {
417                let ack_options = lapin::options::BasicAckOptions::default();
418                if let Err(error) = msg.acker.ack(ack_options).await {
419                    emit!(AmqpAckError { error });
420                }
421            }
422        },
423    }
424}
425
426/// Runs the `AMQP` source involving the main loop pulling data from the server.
427async fn run_amqp_source(
428    config: AmqpSourceConfig,
429    shutdown: ShutdownSignal,
430    mut out: SourceSender,
431    channel: Channel,
432    log_namespace: LogNamespace,
433    acknowledgements: bool,
434) -> Result<(), ()> {
435    let (finalizer, mut ack_stream) =
436        UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
437
438    // Apply AMQP QoS (prefetch) before starting consumption.
439    if let Some(count) = config.prefetch_count {
440        // per-consumer prefetch (global = false)
441        channel
442            .basic_qos(count, BasicQosOptions { global: false })
443            .await
444            .map_err(|error| {
445                error!(message = "Failed to apply basic_qos.", ?error);
446            })?;
447    }
448
449    debug!("Starting amqp source, listening to queue {}.", config.queue);
450    let mut consumer = channel
451        .basic_consume(
452            &config.queue,
453            &config.consumer,
454            lapin::options::BasicConsumeOptions::default(),
455            lapin::types::FieldTable::default(),
456        )
457        .await
458        .map_err(|error| {
459            error!(message = "Failed to consume.", ?error);
460        })?
461        .fuse();
462    let mut shutdown = shutdown.fuse();
463    loop {
464        tokio::select! {
465            _ = &mut shutdown => break,
466            entry = ack_stream.next() => {
467                if let Some((status, entry)) = entry {
468                    handle_ack(status, entry).await;
469                }
470            },
471            opt_m = consumer.next() => {
472                if let Some(try_m) = opt_m {
473                    match try_m {
474                        Err(error) => {
475                            emit!(AmqpEventError { error });
476                            return Err(());
477                        }
478                        Ok(msg) => {
479                            receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
480                        }
481                    }
482                } else {
483                    break
484                }
485            }
486        };
487    }
488
489    Ok(())
490}
491
492async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
493    match status {
494        BatchStatus::Delivered => {
495            let ack_options = lapin::options::BasicAckOptions::default();
496            if let Err(error) = entry.acker.ack(ack_options).await {
497                emit!(AmqpAckError { error });
498            }
499        }
500        BatchStatus::Errored => {
501            let ack_options = lapin::options::BasicRejectOptions::default();
502            if let Err(error) = entry.acker.reject(ack_options).await {
503                emit!(AmqpRejectError { error });
504            }
505        }
506        BatchStatus::Rejected => {
507            let ack_options = lapin::options::BasicRejectOptions::default();
508            if let Err(error) = entry.acker.reject(ack_options).await {
509                emit!(AmqpRejectError { error });
510            }
511        }
512    }
513}
514
515#[cfg(test)]
516pub mod test {
517    use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig};
518    use vrl::value::kind::Collection;
519
520    use super::*;
521
522    #[test]
523    fn generate_config() {
524        crate::test_util::test_generate_config::<AmqpSourceConfig>();
525    }
526
527    pub fn make_config() -> AmqpSourceConfig {
528        let mut config = AmqpSourceConfig {
529            queue: "it".to_string(),
530            ..Default::default()
531        };
532        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
533        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
534        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
535        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
536        config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
537
538        config
539    }
540
541    pub fn make_tls_config() -> AmqpSourceConfig {
542        let mut config = AmqpSourceConfig {
543            queue: "it".to_string(),
544            ..Default::default()
545        };
546        let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
547        let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
548        let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
549        let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
550        let ca_file =
551            std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
552        config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
553        let tls = TlsConfig {
554            ca_file: Some(ca_file.as_str().into()),
555            ..Default::default()
556        };
557        config.connection.tls = Some(tls);
558        config
559    }
560
561    #[test]
562    fn output_schema_definition_vector_namespace() {
563        let config = AmqpSourceConfig {
564            log_namespace: Some(true),
565            ..Default::default()
566        };
567
568        let definition = config
569            .outputs(LogNamespace::Vector)
570            .remove(0)
571            .schema_definition(true);
572
573        let expected_definition =
574            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
575                .with_meaning(OwnedTargetPath::event_root(), "message")
576                .with_metadata_field(
577                    &owned_value_path!("vector", "source_type"),
578                    Kind::bytes(),
579                    None,
580                )
581                .with_metadata_field(
582                    &owned_value_path!("vector", "ingest_timestamp"),
583                    Kind::timestamp(),
584                    None,
585                )
586                .with_metadata_field(
587                    &owned_value_path!("amqp", "timestamp"),
588                    Kind::timestamp(),
589                    Some("timestamp"),
590                )
591                .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
592                .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
593                .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
594
595        assert_eq!(definition, Some(expected_definition));
596    }
597
598    #[test]
599    fn output_schema_definition_legacy_namespace() {
600        let config = AmqpSourceConfig::default();
601
602        let definition = config
603            .outputs(LogNamespace::Legacy)
604            .remove(0)
605            .schema_definition(true);
606
607        let expected_definition = Definition::new_with_default_metadata(
608            Kind::object(Collection::empty()),
609            [LogNamespace::Legacy],
610        )
611        .with_event_field(
612            &owned_value_path!("message"),
613            Kind::bytes(),
614            Some("message"),
615        )
616        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
617        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
618        .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
619        .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
620        .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
621
622        assert_eq!(definition, Some(expected_definition));
623    }
624}
625
626/// Integration tests use the docker compose files in `tests/integration/docker-compose.amqp.yml`.
627#[cfg(feature = "amqp-integration-tests")]
628#[cfg(test)]
629mod integration_test {
630    use chrono::Utc;
631    use lapin::{BasicProperties, options::*};
632    use tokio::time::Duration;
633    use vector_lib::config::log_schema;
634
635    use super::{test::*, *};
636    use crate::{
637        SourceSender,
638        amqp::await_connection,
639        shutdown::ShutdownSignal,
640        test_util::{
641            components::{SOURCE_TAGS, run_and_assert_source_compliance},
642            random_string,
643        },
644    };
645
646    #[tokio::test]
647    async fn amqp_source_create_ok() {
648        let config = make_config();
649        await_connection(&config.connection).await;
650        assert!(
651            amqp_source(
652                &config,
653                ShutdownSignal::noop(),
654                SourceSender::new_test().0,
655                LogNamespace::Legacy,
656                false,
657            )
658            .await
659            .is_ok()
660        );
661    }
662
663    #[tokio::test]
664    async fn amqp_tls_source_create_ok() {
665        let config = make_tls_config();
666        await_connection(&config.connection).await;
667
668        assert!(
669            amqp_source(
670                &config,
671                ShutdownSignal::noop(),
672                SourceSender::new_test().0,
673                LogNamespace::Legacy,
674                false,
675            )
676            .await
677            .is_ok()
678        );
679    }
680
681    async fn send_event(
682        channel: &lapin::Channel,
683        exchange: &str,
684        routing_key: &str,
685        text: &str,
686        _timestamp: i64,
687    ) {
688        let payload = text.as_bytes();
689        let payload_len = payload.len();
690        trace!("Sending message of length {} to {}.", payload_len, exchange,);
691
692        channel
693            .basic_publish(
694                exchange,
695                routing_key,
696                BasicPublishOptions::default(),
697                payload.as_ref(),
698                BasicProperties::default(),
699            )
700            .await
701            .unwrap()
702            .await
703            .unwrap();
704    }
705
706    async fn source_consume_event(mut config: AmqpSourceConfig) {
707        let exchange = format!("test-{}-exchange", random_string(10));
708        let queue = format!("test-{}-queue", random_string(10));
709        let routing_key = "my_key";
710        trace!("Test exchange name: {}.", exchange);
711        let consumer = format!("test-consumer-{}", random_string(10));
712
713        config.consumer = consumer;
714        config.queue = queue;
715
716        let (_conn, channel) = config.connection.connect().await.unwrap();
717        let exchange_opts = lapin::options::ExchangeDeclareOptions {
718            auto_delete: true,
719            ..Default::default()
720        };
721
722        channel
723            .exchange_declare(
724                &exchange,
725                lapin::ExchangeKind::Fanout,
726                exchange_opts,
727                lapin::types::FieldTable::default(),
728            )
729            .await
730            .unwrap();
731
732        let queue_opts = QueueDeclareOptions {
733            auto_delete: true,
734            ..Default::default()
735        };
736        channel
737            .queue_declare(
738                &config.queue,
739                queue_opts,
740                lapin::types::FieldTable::default(),
741            )
742            .await
743            .unwrap();
744
745        channel
746            .queue_bind(
747                &config.queue,
748                &exchange,
749                "",
750                lapin::options::QueueBindOptions::default(),
751                lapin::types::FieldTable::default(),
752            )
753            .await
754            .unwrap();
755
756        trace!("Sending event...");
757        let now = Utc::now();
758        send_event(
759            &channel,
760            &exchange,
761            routing_key,
762            "my message",
763            now.timestamp_millis(),
764        )
765        .await;
766
767        trace!("Receiving event...");
768        let events =
769            run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
770        assert!(!events.is_empty());
771
772        let log = events[0].as_log();
773        trace!("{:?}", log);
774        assert_eq!(*log.get_message().unwrap(), "my message".into());
775        assert_eq!(log["routing"], routing_key.into());
776        assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
777        let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
778            .as_timestamp()
779            .unwrap();
780        assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
781        assert_eq!(log["exchange"], exchange.into());
782    }
783
784    #[tokio::test]
785    async fn amqp_source_consume_event() {
786        let config = make_config();
787        await_connection(&config.connection).await;
788        source_consume_event(config).await;
789    }
790
791    #[tokio::test]
792    async fn amqp_tls_source_consume_event() {
793        let config = make_tls_config();
794        await_connection(&config.connection).await;
795        source_consume_event(config).await;
796    }
797}