1use std::{io::Cursor, pin::Pin};
4
5use async_stream::stream;
6use bytes::Bytes;
7use chrono::{TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use futures_util::Stream;
10use lapin::{Channel, acker::Acker, message::Delivery, options::BasicQosOptions};
11use snafu::Snafu;
12use tokio_util::codec::FramedRead;
13use vector_lib::{
14 EstimatedJsonEncodedSizeOf,
15 codecs::decoding::{DeserializerConfig, FramingConfig},
16 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
17 configurable::configurable_component,
18 event::{Event, LogEvent},
19 finalizer::UnorderedFinalizer,
20 internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
21 lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path},
22};
23use vrl::value::Kind;
24
25use crate::{
26 SourceSender,
27 amqp::AmqpConfig,
28 codecs::{Decoder, DecodingConfig},
29 config::{SourceConfig, SourceContext, SourceOutput},
30 event::{BatchNotifier, BatchStatus},
31 internal_events::{
32 StreamClosedError,
33 source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
34 },
35 serde::{bool_or_struct, default_decoding, default_framing_message_based},
36 shutdown::ShutdownSignal,
37};
38
39#[derive(Debug, Snafu)]
40enum BuildError {
41 #[snafu(display("Could not create AMQP consumer: {}", source))]
42 AmqpCreateError {
43 source: Box<dyn std::error::Error + Send + Sync>,
44 },
45}
46
47#[configurable_component(source(
51 "amqp",
52 "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
53))]
54#[derive(Clone, Debug, Derivative)]
55#[derivative(Default)]
56#[serde(deny_unknown_fields)]
57pub struct AmqpSourceConfig {
58 #[serde(default = "default_queue")]
60 pub(crate) queue: String,
61
62 #[serde(default = "default_consumer")]
64 #[configurable(metadata(docs::examples = "consumer-group-name"))]
65 pub(crate) consumer: String,
66
67 #[serde(flatten)]
68 pub(crate) connection: AmqpConfig,
69
70 #[serde(default = "default_routing_key_field")]
72 #[derivative(Default(value = "default_routing_key_field()"))]
73 pub(crate) routing_key_field: OptionalValuePath,
74
75 #[serde(default = "default_exchange_key")]
77 #[derivative(Default(value = "default_exchange_key()"))]
78 pub(crate) exchange_key: OptionalValuePath,
79
80 #[serde(default = "default_offset_key")]
82 #[derivative(Default(value = "default_offset_key()"))]
83 pub(crate) offset_key: OptionalValuePath,
84
85 #[configurable(metadata(docs::hidden))]
87 #[serde(default)]
88 pub log_namespace: Option<bool>,
89
90 #[configurable(derived)]
91 #[serde(default = "default_framing_message_based")]
92 #[derivative(Default(value = "default_framing_message_based()"))]
93 pub(crate) framing: FramingConfig,
94
95 #[configurable(derived)]
96 #[serde(default = "default_decoding")]
97 #[derivative(Default(value = "default_decoding()"))]
98 pub(crate) decoding: DeserializerConfig,
99
100 #[configurable(derived)]
101 #[serde(default, deserialize_with = "bool_or_struct")]
102 pub(crate) acknowledgements: SourceAcknowledgementsConfig,
103
104 #[serde(default)]
112 #[configurable(metadata(docs::examples = 100))]
113 pub(crate) prefetch_count: Option<u16>,
114}
115
116fn default_queue() -> String {
117 "vector".into()
118}
119
120fn default_consumer() -> String {
121 "vector".into()
122}
123
124fn default_routing_key_field() -> OptionalValuePath {
125 OptionalValuePath::from(owned_value_path!("routing"))
126}
127
128fn default_exchange_key() -> OptionalValuePath {
129 OptionalValuePath::from(owned_value_path!("exchange"))
130}
131
132fn default_offset_key() -> OptionalValuePath {
133 OptionalValuePath::from(owned_value_path!("offset"))
134}
135
136impl_generate_config_from_default!(AmqpSourceConfig);
137
138impl AmqpSourceConfig {
139 fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
140 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
141 }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde(name = "amqp")]
146impl SourceConfig for AmqpSourceConfig {
147 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
148 let log_namespace = cx.log_namespace(self.log_namespace);
149 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
150
151 amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
152 }
153
154 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
155 let log_namespace = global_log_namespace.merge(self.log_namespace);
156 let schema_definition = self
157 .decoding
158 .schema_definition(log_namespace)
159 .with_standard_vector_source_metadata()
160 .with_source_metadata(
161 AmqpSourceConfig::NAME,
162 None,
163 &owned_value_path!("timestamp"),
164 Kind::timestamp(),
165 Some("timestamp"),
166 )
167 .with_source_metadata(
168 AmqpSourceConfig::NAME,
169 self.routing_key_field
170 .path
171 .clone()
172 .map(LegacyKey::InsertIfEmpty),
173 &owned_value_path!("routing"),
174 Kind::bytes(),
175 None,
176 )
177 .with_source_metadata(
178 AmqpSourceConfig::NAME,
179 self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
180 &owned_value_path!("exchange"),
181 Kind::bytes(),
182 None,
183 )
184 .with_source_metadata(
185 AmqpSourceConfig::NAME,
186 self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
187 &owned_value_path!("offset"),
188 Kind::integer(),
189 None,
190 );
191
192 vec![SourceOutput::new_maybe_logs(
193 self.decoding.output_type(),
194 schema_definition,
195 )]
196 }
197
198 fn can_acknowledge(&self) -> bool {
199 true
200 }
201}
202
203#[derive(Debug)]
204struct FinalizerEntry {
205 acker: Acker,
206}
207
208impl From<Delivery> for FinalizerEntry {
209 fn from(delivery: Delivery) -> Self {
210 Self {
211 acker: delivery.acker,
212 }
213 }
214}
215
216pub(crate) async fn amqp_source(
217 config: &AmqpSourceConfig,
218 shutdown: ShutdownSignal,
219 out: SourceSender,
220 log_namespace: LogNamespace,
221 acknowledgements: bool,
222) -> crate::Result<super::Source> {
223 let config = config.clone();
224 let (_conn, channel) = config
225 .connection
226 .connect()
227 .await
228 .map_err(|source| BuildError::AmqpCreateError { source })?;
229
230 Ok(Box::pin(run_amqp_source(
231 config,
232 shutdown,
233 out,
234 channel,
235 log_namespace,
236 acknowledgements,
237 )))
238}
239
240struct Keys<'a> {
241 routing_key_field: &'a OptionalValuePath,
242 routing: &'a str,
243 exchange_key: &'a OptionalValuePath,
244 exchange: &'a str,
245 offset_key: &'a OptionalValuePath,
246 delivery_tag: i64,
247}
248
249fn populate_log_event(
251 log: &mut LogEvent,
252 timestamp: Option<chrono::DateTime<Utc>>,
253 keys: &Keys<'_>,
254 log_namespace: LogNamespace,
255) {
256 log_namespace.insert_source_metadata(
257 AmqpSourceConfig::NAME,
258 log,
259 keys.routing_key_field
260 .path
261 .as_ref()
262 .map(LegacyKey::InsertIfEmpty),
263 path!("routing"),
264 keys.routing.to_string(),
265 );
266
267 log_namespace.insert_source_metadata(
268 AmqpSourceConfig::NAME,
269 log,
270 keys.exchange_key
271 .path
272 .as_ref()
273 .map(LegacyKey::InsertIfEmpty),
274 path!("exchange"),
275 keys.exchange.to_string(),
276 );
277
278 log_namespace.insert_source_metadata(
279 AmqpSourceConfig::NAME,
280 log,
281 keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
282 path!("offset"),
283 keys.delivery_tag,
284 );
285
286 log_namespace.insert_vector_metadata(
287 log,
288 log_schema().source_type_key(),
289 path!("source_type"),
290 Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
291 );
292
293 match log_namespace {
297 LogNamespace::Vector => {
298 if let Some(timestamp) = timestamp {
299 log.insert(
300 metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
301 timestamp,
302 );
303 };
304
305 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
306 }
307 LogNamespace::Legacy => {
308 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
309 log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
310 }
311 }
312 };
313}
314
315async fn receive_event(
317 config: &AmqpSourceConfig,
318 out: &mut SourceSender,
319 log_namespace: LogNamespace,
320 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
321 msg: Delivery,
322) -> Result<(), ()> {
323 let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
324 let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
325 let mut stream = FramedRead::new(payload, decoder);
326
327 let timestamp = msg
329 .properties
330 .timestamp()
331 .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
332
333 let routing = msg.routing_key.to_string();
334 let exchange = msg.exchange.to_string();
335 let keys = Keys {
336 routing_key_field: &config.routing_key_field,
337 exchange_key: &config.exchange_key,
338 offset_key: &config.offset_key,
339 routing: &routing,
340 exchange: &exchange,
341 delivery_tag: msg.delivery_tag as i64,
342 };
343 let events_received = register!(EventsReceived);
344
345 let stream = stream! {
346 while let Some(result) = stream.next().await {
347 match result {
348 Ok((events, byte_size)) => {
349 emit!(AmqpBytesReceived {
350 byte_size,
351 protocol: "amqp_0_9_1",
352 });
353
354 events_received.emit(CountByteSize(
355 events.len(),
356 events.estimated_json_encoded_size_of(),
357 ));
358
359 for mut event in events {
360 if let Event::Log(ref mut log) = event {
361 populate_log_event(log,
362 timestamp,
363 &keys,
364 log_namespace);
365 }
366
367 yield event;
368 }
369 }
370 Err(error) => {
371 use vector_lib::codecs::StreamDecodingError as _;
372
373 if !error.can_continue() {
376 break;
377 }
378 }
379 }
380 }
381 }
382 .boxed();
383
384 finalize_event_stream(finalizer, out, stream, msg).await;
385
386 Ok(())
387}
388
389async fn finalize_event_stream(
391 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
392 out: &mut SourceSender,
393 mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
394 msg: Delivery,
395) {
396 match finalizer {
397 Some(finalizer) => {
398 let (batch, receiver) = BatchNotifier::new_with_receiver();
399 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
400
401 match out.send_event_stream(&mut stream).await {
402 Err(_) => {
403 emit!(StreamClosedError { count: 1 });
404 }
405 Ok(_) => {
406 finalizer.add(msg.into(), receiver);
407 }
408 }
409 }
410 None => match out.send_event_stream(&mut stream).await {
411 Err(_) => {
412 emit!(StreamClosedError { count: 1 });
413 }
414 Ok(_) => {
415 let ack_options = lapin::options::BasicAckOptions::default();
416 if let Err(error) = msg.acker.ack(ack_options).await {
417 emit!(AmqpAckError { error });
418 }
419 }
420 },
421 }
422}
423
424async fn run_amqp_source(
426 config: AmqpSourceConfig,
427 shutdown: ShutdownSignal,
428 mut out: SourceSender,
429 channel: Channel,
430 log_namespace: LogNamespace,
431 acknowledgements: bool,
432) -> Result<(), ()> {
433 let (finalizer, mut ack_stream) =
434 UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
435
436 if let Some(count) = config.prefetch_count {
438 channel
440 .basic_qos(count, BasicQosOptions { global: false })
441 .await
442 .map_err(|error| {
443 error!(message = "Failed to apply basic_qos.", ?error);
444 })?;
445 }
446
447 debug!("Starting amqp source, listening to queue {}.", config.queue);
448 let mut consumer = channel
449 .basic_consume(
450 &config.queue,
451 &config.consumer,
452 lapin::options::BasicConsumeOptions::default(),
453 lapin::types::FieldTable::default(),
454 )
455 .await
456 .map_err(|error| {
457 error!(message = "Failed to consume.", ?error);
458 })?
459 .fuse();
460 let mut shutdown = shutdown.fuse();
461 loop {
462 tokio::select! {
463 _ = &mut shutdown => break,
464 entry = ack_stream.next() => {
465 if let Some((status, entry)) = entry {
466 handle_ack(status, entry).await;
467 }
468 },
469 opt_m = consumer.next() => {
470 if let Some(try_m) = opt_m {
471 match try_m {
472 Err(error) => {
473 emit!(AmqpEventError { error });
474 return Err(());
475 }
476 Ok(msg) => {
477 receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
478 }
479 }
480 } else {
481 break
482 }
483 }
484 };
485 }
486
487 Ok(())
488}
489
490async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
491 match status {
492 BatchStatus::Delivered => {
493 let ack_options = lapin::options::BasicAckOptions::default();
494 if let Err(error) = entry.acker.ack(ack_options).await {
495 emit!(AmqpAckError { error });
496 }
497 }
498 BatchStatus::Errored => {
499 let ack_options = lapin::options::BasicRejectOptions::default();
500 if let Err(error) = entry.acker.reject(ack_options).await {
501 emit!(AmqpRejectError { error });
502 }
503 }
504 BatchStatus::Rejected => {
505 let ack_options = lapin::options::BasicRejectOptions::default();
506 if let Err(error) = entry.acker.reject(ack_options).await {
507 emit!(AmqpRejectError { error });
508 }
509 }
510 }
511}
512
513#[cfg(test)]
514pub mod test {
515 use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig};
516 use vrl::value::kind::Collection;
517
518 use super::*;
519
520 #[test]
521 fn generate_config() {
522 crate::test_util::test_generate_config::<AmqpSourceConfig>();
523 }
524
525 pub fn make_config() -> AmqpSourceConfig {
526 let mut config = AmqpSourceConfig {
527 queue: "it".to_string(),
528 ..Default::default()
529 };
530 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
531 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
532 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
533 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
534 config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
535
536 config
537 }
538
539 pub fn make_tls_config() -> AmqpSourceConfig {
540 let mut config = AmqpSourceConfig {
541 queue: "it".to_string(),
542 ..Default::default()
543 };
544 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
545 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
546 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
547 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
548 let ca_file =
549 std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
550 config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
551 let tls = TlsConfig {
552 ca_file: Some(ca_file.as_str().into()),
553 ..Default::default()
554 };
555 config.connection.tls = Some(tls);
556 config
557 }
558
559 #[test]
560 fn output_schema_definition_vector_namespace() {
561 let config = AmqpSourceConfig {
562 log_namespace: Some(true),
563 ..Default::default()
564 };
565
566 let definition = config
567 .outputs(LogNamespace::Vector)
568 .remove(0)
569 .schema_definition(true);
570
571 let expected_definition =
572 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
573 .with_meaning(OwnedTargetPath::event_root(), "message")
574 .with_metadata_field(
575 &owned_value_path!("vector", "source_type"),
576 Kind::bytes(),
577 None,
578 )
579 .with_metadata_field(
580 &owned_value_path!("vector", "ingest_timestamp"),
581 Kind::timestamp(),
582 None,
583 )
584 .with_metadata_field(
585 &owned_value_path!("amqp", "timestamp"),
586 Kind::timestamp(),
587 Some("timestamp"),
588 )
589 .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
590 .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
591 .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
592
593 assert_eq!(definition, Some(expected_definition));
594 }
595
596 #[test]
597 fn output_schema_definition_legacy_namespace() {
598 let config = AmqpSourceConfig::default();
599
600 let definition = config
601 .outputs(LogNamespace::Legacy)
602 .remove(0)
603 .schema_definition(true);
604
605 let expected_definition = Definition::new_with_default_metadata(
606 Kind::object(Collection::empty()),
607 [LogNamespace::Legacy],
608 )
609 .with_event_field(
610 &owned_value_path!("message"),
611 Kind::bytes(),
612 Some("message"),
613 )
614 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
615 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
616 .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
617 .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
618 .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
619
620 assert_eq!(definition, Some(expected_definition));
621 }
622}
623
624#[cfg(feature = "amqp-integration-tests")]
626#[cfg(test)]
627mod integration_test {
628 use chrono::Utc;
629 use lapin::{BasicProperties, options::*};
630 use tokio::time::Duration;
631 use vector_lib::config::log_schema;
632
633 use super::{test::*, *};
634 use crate::{
635 SourceSender,
636 amqp::await_connection,
637 shutdown::ShutdownSignal,
638 test_util::{
639 components::{SOURCE_TAGS, run_and_assert_source_compliance},
640 random_string,
641 },
642 };
643
644 #[tokio::test]
645 async fn amqp_source_create_ok() {
646 let config = make_config();
647 await_connection(&config.connection).await;
648 assert!(
649 amqp_source(
650 &config,
651 ShutdownSignal::noop(),
652 SourceSender::new_test().0,
653 LogNamespace::Legacy,
654 false,
655 )
656 .await
657 .is_ok()
658 );
659 }
660
661 #[tokio::test]
662 async fn amqp_tls_source_create_ok() {
663 let config = make_tls_config();
664 await_connection(&config.connection).await;
665
666 assert!(
667 amqp_source(
668 &config,
669 ShutdownSignal::noop(),
670 SourceSender::new_test().0,
671 LogNamespace::Legacy,
672 false,
673 )
674 .await
675 .is_ok()
676 );
677 }
678
679 async fn send_event(
680 channel: &lapin::Channel,
681 exchange: &str,
682 routing_key: &str,
683 text: &str,
684 _timestamp: i64,
685 ) {
686 let payload = text.as_bytes();
687 let payload_len = payload.len();
688 trace!("Sending message of length {} to {}.", payload_len, exchange,);
689
690 channel
691 .basic_publish(
692 exchange,
693 routing_key,
694 BasicPublishOptions::default(),
695 payload.as_ref(),
696 BasicProperties::default(),
697 )
698 .await
699 .unwrap()
700 .await
701 .unwrap();
702 }
703
704 async fn source_consume_event(mut config: AmqpSourceConfig) {
705 let exchange = format!("test-{}-exchange", random_string(10));
706 let queue = format!("test-{}-queue", random_string(10));
707 let routing_key = "my_key";
708 trace!("Test exchange name: {}.", exchange);
709 let consumer = format!("test-consumer-{}", random_string(10));
710
711 config.consumer = consumer;
712 config.queue = queue;
713
714 let (_conn, channel) = config.connection.connect().await.unwrap();
715 let exchange_opts = lapin::options::ExchangeDeclareOptions {
716 auto_delete: true,
717 ..Default::default()
718 };
719
720 channel
721 .exchange_declare(
722 &exchange,
723 lapin::ExchangeKind::Fanout,
724 exchange_opts,
725 lapin::types::FieldTable::default(),
726 )
727 .await
728 .unwrap();
729
730 let queue_opts = QueueDeclareOptions {
731 auto_delete: true,
732 ..Default::default()
733 };
734 channel
735 .queue_declare(
736 &config.queue,
737 queue_opts,
738 lapin::types::FieldTable::default(),
739 )
740 .await
741 .unwrap();
742
743 channel
744 .queue_bind(
745 &config.queue,
746 &exchange,
747 "",
748 lapin::options::QueueBindOptions::default(),
749 lapin::types::FieldTable::default(),
750 )
751 .await
752 .unwrap();
753
754 trace!("Sending event...");
755 let now = Utc::now();
756 send_event(
757 &channel,
758 &exchange,
759 routing_key,
760 "my message",
761 now.timestamp_millis(),
762 )
763 .await;
764
765 trace!("Receiving event...");
766 let events =
767 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
768 assert!(!events.is_empty());
769
770 let log = events[0].as_log();
771 trace!("{:?}", log);
772 assert_eq!(*log.get_message().unwrap(), "my message".into());
773 assert_eq!(log["routing"], routing_key.into());
774 assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
775 let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
776 .as_timestamp()
777 .unwrap();
778 assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
779 assert_eq!(log["exchange"], exchange.into());
780 }
781
782 #[tokio::test]
783 async fn amqp_source_consume_event() {
784 let config = make_config();
785 await_connection(&config.connection).await;
786 source_consume_event(config).await;
787 }
788
789 #[tokio::test]
790 async fn amqp_tls_source_consume_event() {
791 let config = make_tls_config();
792 await_connection(&config.connection).await;
793 source_consume_event(config).await;
794 }
795}