1use std::{io::Cursor, pin::Pin};
4
5use async_stream::stream;
6use bytes::Bytes;
7use chrono::{TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use futures_util::Stream;
10use lapin::{Channel, acker::Acker, message::Delivery, options::BasicQosOptions};
11use snafu::Snafu;
12use vector_lib::{
13 EstimatedJsonEncodedSizeOf,
14 codecs::{
15 DecoderFramedRead,
16 decoding::{DeserializerConfig, FramingConfig},
17 },
18 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
19 configurable::configurable_component,
20 event::{Event, LogEvent},
21 finalizer::UnorderedFinalizer,
22 internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
23 lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path},
24};
25use vrl::value::Kind;
26
27use crate::{
28 SourceSender,
29 amqp::AmqpConfig,
30 codecs::{Decoder, DecodingConfig},
31 config::{SourceConfig, SourceContext, SourceOutput},
32 event::{BatchNotifier, BatchStatus},
33 internal_events::{
34 StreamClosedError,
35 source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
36 },
37 serde::{bool_or_struct, default_decoding, default_framing_message_based},
38 shutdown::ShutdownSignal,
39};
40
41#[derive(Debug, Snafu)]
42enum BuildError {
43 #[snafu(display("Could not create AMQP consumer: {}", source))]
44 AmqpCreateError {
45 source: Box<dyn std::error::Error + Send + Sync>,
46 },
47}
48
49#[configurable_component(source(
53 "amqp",
54 "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
55))]
56#[derive(Clone, Debug, Derivative)]
57#[derivative(Default)]
58#[serde(deny_unknown_fields)]
59pub struct AmqpSourceConfig {
60 #[serde(default = "default_queue")]
62 pub(crate) queue: String,
63
64 #[serde(default = "default_consumer")]
66 #[configurable(metadata(docs::examples = "consumer-group-name"))]
67 pub(crate) consumer: String,
68
69 #[serde(flatten)]
70 pub(crate) connection: AmqpConfig,
71
72 #[serde(default = "default_routing_key_field")]
74 #[derivative(Default(value = "default_routing_key_field()"))]
75 pub(crate) routing_key_field: OptionalValuePath,
76
77 #[serde(default = "default_exchange_key")]
79 #[derivative(Default(value = "default_exchange_key()"))]
80 pub(crate) exchange_key: OptionalValuePath,
81
82 #[serde(default = "default_offset_key")]
84 #[derivative(Default(value = "default_offset_key()"))]
85 pub(crate) offset_key: OptionalValuePath,
86
87 #[configurable(metadata(docs::hidden))]
89 #[serde(default)]
90 pub log_namespace: Option<bool>,
91
92 #[configurable(derived)]
93 #[serde(default = "default_framing_message_based")]
94 #[derivative(Default(value = "default_framing_message_based()"))]
95 pub(crate) framing: FramingConfig,
96
97 #[configurable(derived)]
98 #[serde(default = "default_decoding")]
99 #[derivative(Default(value = "default_decoding()"))]
100 pub(crate) decoding: DeserializerConfig,
101
102 #[configurable(derived)]
103 #[serde(default, deserialize_with = "bool_or_struct")]
104 pub(crate) acknowledgements: SourceAcknowledgementsConfig,
105
106 #[serde(default)]
114 #[configurable(metadata(docs::examples = 100))]
115 pub(crate) prefetch_count: Option<u16>,
116}
117
118fn default_queue() -> String {
119 "vector".into()
120}
121
122fn default_consumer() -> String {
123 "vector".into()
124}
125
126fn default_routing_key_field() -> OptionalValuePath {
127 OptionalValuePath::from(owned_value_path!("routing"))
128}
129
130fn default_exchange_key() -> OptionalValuePath {
131 OptionalValuePath::from(owned_value_path!("exchange"))
132}
133
134fn default_offset_key() -> OptionalValuePath {
135 OptionalValuePath::from(owned_value_path!("offset"))
136}
137
138impl_generate_config_from_default!(AmqpSourceConfig);
139
140impl AmqpSourceConfig {
141 fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
142 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
143 }
144}
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "amqp")]
148impl SourceConfig for AmqpSourceConfig {
149 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
150 let log_namespace = cx.log_namespace(self.log_namespace);
151 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
152
153 amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
154 }
155
156 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
157 let log_namespace = global_log_namespace.merge(self.log_namespace);
158 let schema_definition = self
159 .decoding
160 .schema_definition(log_namespace)
161 .with_standard_vector_source_metadata()
162 .with_source_metadata(
163 AmqpSourceConfig::NAME,
164 None,
165 &owned_value_path!("timestamp"),
166 Kind::timestamp(),
167 Some("timestamp"),
168 )
169 .with_source_metadata(
170 AmqpSourceConfig::NAME,
171 self.routing_key_field
172 .path
173 .clone()
174 .map(LegacyKey::InsertIfEmpty),
175 &owned_value_path!("routing"),
176 Kind::bytes(),
177 None,
178 )
179 .with_source_metadata(
180 AmqpSourceConfig::NAME,
181 self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
182 &owned_value_path!("exchange"),
183 Kind::bytes(),
184 None,
185 )
186 .with_source_metadata(
187 AmqpSourceConfig::NAME,
188 self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
189 &owned_value_path!("offset"),
190 Kind::integer(),
191 None,
192 );
193
194 vec![SourceOutput::new_maybe_logs(
195 self.decoding.output_type(),
196 schema_definition,
197 )]
198 }
199
200 fn can_acknowledge(&self) -> bool {
201 true
202 }
203}
204
205#[derive(Debug)]
206struct FinalizerEntry {
207 acker: Acker,
208}
209
210impl From<Delivery> for FinalizerEntry {
211 fn from(delivery: Delivery) -> Self {
212 Self {
213 acker: delivery.acker,
214 }
215 }
216}
217
218pub(crate) async fn amqp_source(
219 config: &AmqpSourceConfig,
220 shutdown: ShutdownSignal,
221 out: SourceSender,
222 log_namespace: LogNamespace,
223 acknowledgements: bool,
224) -> crate::Result<super::Source> {
225 let config = config.clone();
226 let (_conn, channel) = config
227 .connection
228 .connect()
229 .await
230 .map_err(|source| BuildError::AmqpCreateError { source })?;
231
232 Ok(Box::pin(run_amqp_source(
233 config,
234 shutdown,
235 out,
236 channel,
237 log_namespace,
238 acknowledgements,
239 )))
240}
241
242struct Keys<'a> {
243 routing_key_field: &'a OptionalValuePath,
244 routing: &'a str,
245 exchange_key: &'a OptionalValuePath,
246 exchange: &'a str,
247 offset_key: &'a OptionalValuePath,
248 delivery_tag: i64,
249}
250
251fn populate_log_event(
253 log: &mut LogEvent,
254 timestamp: Option<chrono::DateTime<Utc>>,
255 keys: &Keys<'_>,
256 log_namespace: LogNamespace,
257) {
258 log_namespace.insert_source_metadata(
259 AmqpSourceConfig::NAME,
260 log,
261 keys.routing_key_field
262 .path
263 .as_ref()
264 .map(LegacyKey::InsertIfEmpty),
265 path!("routing"),
266 keys.routing.to_string(),
267 );
268
269 log_namespace.insert_source_metadata(
270 AmqpSourceConfig::NAME,
271 log,
272 keys.exchange_key
273 .path
274 .as_ref()
275 .map(LegacyKey::InsertIfEmpty),
276 path!("exchange"),
277 keys.exchange.to_string(),
278 );
279
280 log_namespace.insert_source_metadata(
281 AmqpSourceConfig::NAME,
282 log,
283 keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
284 path!("offset"),
285 keys.delivery_tag,
286 );
287
288 log_namespace.insert_vector_metadata(
289 log,
290 log_schema().source_type_key(),
291 path!("source_type"),
292 Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
293 );
294
295 match log_namespace {
299 LogNamespace::Vector => {
300 if let Some(timestamp) = timestamp {
301 log.insert(
302 metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
303 timestamp,
304 );
305 };
306
307 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
308 }
309 LogNamespace::Legacy => {
310 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
311 log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
312 }
313 }
314 };
315}
316
317async fn receive_event(
319 config: &AmqpSourceConfig,
320 out: &mut SourceSender,
321 log_namespace: LogNamespace,
322 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
323 msg: Delivery,
324) -> Result<(), ()> {
325 let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
326 let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
327 let mut stream = DecoderFramedRead::new(payload, decoder);
328
329 let timestamp = msg
331 .properties
332 .timestamp()
333 .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
334
335 let routing = msg.routing_key.to_string();
336 let exchange = msg.exchange.to_string();
337 let keys = Keys {
338 routing_key_field: &config.routing_key_field,
339 exchange_key: &config.exchange_key,
340 offset_key: &config.offset_key,
341 routing: &routing,
342 exchange: &exchange,
343 delivery_tag: msg.delivery_tag as i64,
344 };
345 let events_received = register!(EventsReceived);
346
347 let stream = stream! {
348 while let Some(result) = stream.next().await {
349 match result {
350 Ok((events, byte_size)) => {
351 emit!(AmqpBytesReceived {
352 byte_size,
353 protocol: "amqp_0_9_1",
354 });
355
356 events_received.emit(CountByteSize(
357 events.len(),
358 events.estimated_json_encoded_size_of(),
359 ));
360
361 for mut event in events {
362 if let Event::Log(ref mut log) = event {
363 populate_log_event(log,
364 timestamp,
365 &keys,
366 log_namespace);
367 }
368
369 yield event;
370 }
371 }
372 Err(error) => {
373 use vector_lib::codecs::StreamDecodingError as _;
374
375 if !error.can_continue() {
378 break;
379 }
380 }
381 }
382 }
383 }
384 .boxed();
385
386 finalize_event_stream(finalizer, out, stream, msg).await;
387
388 Ok(())
389}
390
391async fn finalize_event_stream(
393 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
394 out: &mut SourceSender,
395 mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
396 msg: Delivery,
397) {
398 match finalizer {
399 Some(finalizer) => {
400 let (batch, receiver) = BatchNotifier::new_with_receiver();
401 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
402
403 match out.send_event_stream(&mut stream).await {
404 Err(_) => {
405 emit!(StreamClosedError { count: 1 });
406 }
407 Ok(_) => {
408 finalizer.add(msg.into(), receiver);
409 }
410 }
411 }
412 None => match out.send_event_stream(&mut stream).await {
413 Err(_) => {
414 emit!(StreamClosedError { count: 1 });
415 }
416 Ok(_) => {
417 let ack_options = lapin::options::BasicAckOptions::default();
418 if let Err(error) = msg.acker.ack(ack_options).await {
419 emit!(AmqpAckError { error });
420 }
421 }
422 },
423 }
424}
425
426async fn run_amqp_source(
428 config: AmqpSourceConfig,
429 shutdown: ShutdownSignal,
430 mut out: SourceSender,
431 channel: Channel,
432 log_namespace: LogNamespace,
433 acknowledgements: bool,
434) -> Result<(), ()> {
435 let (finalizer, mut ack_stream) =
436 UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
437
438 if let Some(count) = config.prefetch_count {
440 channel
442 .basic_qos(count, BasicQosOptions { global: false })
443 .await
444 .map_err(|error| {
445 error!(message = "Failed to apply basic_qos.", ?error);
446 })?;
447 }
448
449 debug!("Starting amqp source, listening to queue {}.", config.queue);
450 let mut consumer = channel
451 .basic_consume(
452 &config.queue,
453 &config.consumer,
454 lapin::options::BasicConsumeOptions::default(),
455 lapin::types::FieldTable::default(),
456 )
457 .await
458 .map_err(|error| {
459 error!(message = "Failed to consume.", ?error);
460 })?
461 .fuse();
462 let mut shutdown = shutdown.fuse();
463 loop {
464 tokio::select! {
465 _ = &mut shutdown => break,
466 entry = ack_stream.next() => {
467 if let Some((status, entry)) = entry {
468 handle_ack(status, entry).await;
469 }
470 },
471 opt_m = consumer.next() => {
472 if let Some(try_m) = opt_m {
473 match try_m {
474 Err(error) => {
475 emit!(AmqpEventError { error });
476 return Err(());
477 }
478 Ok(msg) => {
479 receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
480 }
481 }
482 } else {
483 break
484 }
485 }
486 };
487 }
488
489 Ok(())
490}
491
492async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
493 match status {
494 BatchStatus::Delivered => {
495 let ack_options = lapin::options::BasicAckOptions::default();
496 if let Err(error) = entry.acker.ack(ack_options).await {
497 emit!(AmqpAckError { error });
498 }
499 }
500 BatchStatus::Errored => {
501 let ack_options = lapin::options::BasicRejectOptions::default();
502 if let Err(error) = entry.acker.reject(ack_options).await {
503 emit!(AmqpRejectError { error });
504 }
505 }
506 BatchStatus::Rejected => {
507 let ack_options = lapin::options::BasicRejectOptions::default();
508 if let Err(error) = entry.acker.reject(ack_options).await {
509 emit!(AmqpRejectError { error });
510 }
511 }
512 }
513}
514
515#[cfg(test)]
516pub mod test {
517 use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig};
518 use vrl::value::kind::Collection;
519
520 use super::*;
521
522 #[test]
523 fn generate_config() {
524 crate::test_util::test_generate_config::<AmqpSourceConfig>();
525 }
526
527 pub fn make_config() -> AmqpSourceConfig {
528 let mut config = AmqpSourceConfig {
529 queue: "it".to_string(),
530 ..Default::default()
531 };
532 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
533 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
534 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
535 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
536 config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
537
538 config
539 }
540
541 pub fn make_tls_config() -> AmqpSourceConfig {
542 let mut config = AmqpSourceConfig {
543 queue: "it".to_string(),
544 ..Default::default()
545 };
546 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
547 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
548 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
549 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
550 let ca_file =
551 std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
552 config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
553 let tls = TlsConfig {
554 ca_file: Some(ca_file.as_str().into()),
555 ..Default::default()
556 };
557 config.connection.tls = Some(tls);
558 config
559 }
560
561 #[test]
562 fn output_schema_definition_vector_namespace() {
563 let config = AmqpSourceConfig {
564 log_namespace: Some(true),
565 ..Default::default()
566 };
567
568 let definition = config
569 .outputs(LogNamespace::Vector)
570 .remove(0)
571 .schema_definition(true);
572
573 let expected_definition =
574 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
575 .with_meaning(OwnedTargetPath::event_root(), "message")
576 .with_metadata_field(
577 &owned_value_path!("vector", "source_type"),
578 Kind::bytes(),
579 None,
580 )
581 .with_metadata_field(
582 &owned_value_path!("vector", "ingest_timestamp"),
583 Kind::timestamp(),
584 None,
585 )
586 .with_metadata_field(
587 &owned_value_path!("amqp", "timestamp"),
588 Kind::timestamp(),
589 Some("timestamp"),
590 )
591 .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
592 .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
593 .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
594
595 assert_eq!(definition, Some(expected_definition));
596 }
597
598 #[test]
599 fn output_schema_definition_legacy_namespace() {
600 let config = AmqpSourceConfig::default();
601
602 let definition = config
603 .outputs(LogNamespace::Legacy)
604 .remove(0)
605 .schema_definition(true);
606
607 let expected_definition = Definition::new_with_default_metadata(
608 Kind::object(Collection::empty()),
609 [LogNamespace::Legacy],
610 )
611 .with_event_field(
612 &owned_value_path!("message"),
613 Kind::bytes(),
614 Some("message"),
615 )
616 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
617 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
618 .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
619 .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
620 .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
621
622 assert_eq!(definition, Some(expected_definition));
623 }
624}
625
626#[cfg(feature = "amqp-integration-tests")]
628#[cfg(test)]
629mod integration_test {
630 use chrono::Utc;
631 use lapin::{BasicProperties, options::*};
632 use tokio::time::Duration;
633 use vector_lib::config::log_schema;
634
635 use super::{test::*, *};
636 use crate::{
637 SourceSender,
638 amqp::await_connection,
639 shutdown::ShutdownSignal,
640 test_util::{
641 components::{SOURCE_TAGS, run_and_assert_source_compliance},
642 random_string,
643 },
644 };
645
646 #[tokio::test]
647 async fn amqp_source_create_ok() {
648 let config = make_config();
649 await_connection(&config.connection).await;
650 assert!(
651 amqp_source(
652 &config,
653 ShutdownSignal::noop(),
654 SourceSender::new_test().0,
655 LogNamespace::Legacy,
656 false,
657 )
658 .await
659 .is_ok()
660 );
661 }
662
663 #[tokio::test]
664 async fn amqp_tls_source_create_ok() {
665 let config = make_tls_config();
666 await_connection(&config.connection).await;
667
668 assert!(
669 amqp_source(
670 &config,
671 ShutdownSignal::noop(),
672 SourceSender::new_test().0,
673 LogNamespace::Legacy,
674 false,
675 )
676 .await
677 .is_ok()
678 );
679 }
680
681 async fn send_event(
682 channel: &lapin::Channel,
683 exchange: &str,
684 routing_key: &str,
685 text: &str,
686 _timestamp: i64,
687 ) {
688 let payload = text.as_bytes();
689 let payload_len = payload.len();
690 trace!("Sending message of length {} to {}.", payload_len, exchange,);
691
692 channel
693 .basic_publish(
694 exchange,
695 routing_key,
696 BasicPublishOptions::default(),
697 payload.as_ref(),
698 BasicProperties::default(),
699 )
700 .await
701 .unwrap()
702 .await
703 .unwrap();
704 }
705
706 async fn source_consume_event(mut config: AmqpSourceConfig) {
707 let exchange = format!("test-{}-exchange", random_string(10));
708 let queue = format!("test-{}-queue", random_string(10));
709 let routing_key = "my_key";
710 trace!("Test exchange name: {}.", exchange);
711 let consumer = format!("test-consumer-{}", random_string(10));
712
713 config.consumer = consumer;
714 config.queue = queue;
715
716 let (_conn, channel) = config.connection.connect().await.unwrap();
717 let exchange_opts = lapin::options::ExchangeDeclareOptions {
718 auto_delete: true,
719 ..Default::default()
720 };
721
722 channel
723 .exchange_declare(
724 &exchange,
725 lapin::ExchangeKind::Fanout,
726 exchange_opts,
727 lapin::types::FieldTable::default(),
728 )
729 .await
730 .unwrap();
731
732 let queue_opts = QueueDeclareOptions {
733 auto_delete: true,
734 ..Default::default()
735 };
736 channel
737 .queue_declare(
738 &config.queue,
739 queue_opts,
740 lapin::types::FieldTable::default(),
741 )
742 .await
743 .unwrap();
744
745 channel
746 .queue_bind(
747 &config.queue,
748 &exchange,
749 "",
750 lapin::options::QueueBindOptions::default(),
751 lapin::types::FieldTable::default(),
752 )
753 .await
754 .unwrap();
755
756 trace!("Sending event...");
757 let now = Utc::now();
758 send_event(
759 &channel,
760 &exchange,
761 routing_key,
762 "my message",
763 now.timestamp_millis(),
764 )
765 .await;
766
767 trace!("Receiving event...");
768 let events =
769 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
770 assert!(!events.is_empty());
771
772 let log = events[0].as_log();
773 trace!("{:?}", log);
774 assert_eq!(*log.get_message().unwrap(), "my message".into());
775 assert_eq!(log["routing"], routing_key.into());
776 assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
777 let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
778 .as_timestamp()
779 .unwrap();
780 assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
781 assert_eq!(log["exchange"], exchange.into());
782 }
783
784 #[tokio::test]
785 async fn amqp_source_consume_event() {
786 let config = make_config();
787 await_connection(&config.connection).await;
788 source_consume_event(config).await;
789 }
790
791 #[tokio::test]
792 async fn amqp_tls_source_consume_event() {
793 let config = make_tls_config();
794 await_connection(&config.connection).await;
795 source_consume_event(config).await;
796 }
797}