1use std::{io::Cursor, pin::Pin};
4
5use async_stream::stream;
6use bytes::Bytes;
7use chrono::{TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use futures_util::Stream;
10use lapin::{Acker, Channel, message::Delivery, options::BasicQosOptions};
11use snafu::Snafu;
12use vector_lib::{
13 EstimatedJsonEncodedSizeOf,
14 codecs::{
15 DecoderFramedRead,
16 decoding::{DeserializerConfig, FramingConfig},
17 },
18 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
19 configurable::configurable_component,
20 event::{Event, LogEvent},
21 finalizer::UnorderedFinalizer,
22 internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
23 lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path},
24};
25use vrl::value::Kind;
26
27use crate::{
28 SourceSender,
29 amqp::AmqpConfig,
30 codecs::{Decoder, DecodingConfig},
31 config::{SourceConfig, SourceContext, SourceOutput},
32 event::{BatchNotifier, BatchStatus},
33 internal_events::{
34 StreamClosedError,
35 source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
36 },
37 serde::{bool_or_struct, default_decoding, default_framing_message_based},
38 shutdown::ShutdownSignal,
39};
40
41#[derive(Debug, Snafu)]
42enum BuildError {
43 #[snafu(display("Could not create AMQP consumer: {}", source))]
44 AmqpCreateError {
45 source: Box<dyn std::error::Error + Send + Sync>,
46 },
47}
48
49#[configurable_component(source(
53 "amqp",
54 "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
55))]
56#[derive(Clone, Debug, Derivative)]
57#[derivative(Default)]
58#[serde(deny_unknown_fields)]
59pub struct AmqpSourceConfig {
60 #[serde(default = "default_queue")]
62 pub(crate) queue: String,
63
64 #[serde(default = "default_consumer")]
66 #[configurable(metadata(docs::examples = "consumer-group-name"))]
67 pub(crate) consumer: String,
68
69 #[serde(flatten)]
70 pub(crate) connection: AmqpConfig,
71
72 #[serde(default = "default_routing_key_field")]
74 #[derivative(Default(value = "default_routing_key_field()"))]
75 pub(crate) routing_key_field: OptionalValuePath,
76
77 #[serde(default = "default_exchange_key")]
79 #[derivative(Default(value = "default_exchange_key()"))]
80 pub(crate) exchange_key: OptionalValuePath,
81
82 #[serde(default = "default_offset_key")]
84 #[derivative(Default(value = "default_offset_key()"))]
85 pub(crate) offset_key: OptionalValuePath,
86
87 #[configurable(metadata(docs::hidden))]
89 #[serde(default)]
90 pub log_namespace: Option<bool>,
91
92 #[configurable(derived)]
93 #[serde(default = "default_framing_message_based")]
94 #[derivative(Default(value = "default_framing_message_based()"))]
95 pub(crate) framing: FramingConfig,
96
97 #[configurable(derived)]
98 #[serde(default = "default_decoding")]
99 #[derivative(Default(value = "default_decoding()"))]
100 pub(crate) decoding: DeserializerConfig,
101
102 #[configurable(derived)]
103 #[serde(default, deserialize_with = "bool_or_struct")]
104 pub(crate) acknowledgements: SourceAcknowledgementsConfig,
105
106 #[serde(default)]
114 #[configurable(metadata(docs::examples = 100))]
115 pub(crate) prefetch_count: Option<u16>,
116}
117
118fn default_queue() -> String {
119 "vector".into()
120}
121
122fn default_consumer() -> String {
123 "vector".into()
124}
125
126fn default_routing_key_field() -> OptionalValuePath {
127 OptionalValuePath::from(owned_value_path!("routing"))
128}
129
130fn default_exchange_key() -> OptionalValuePath {
131 OptionalValuePath::from(owned_value_path!("exchange"))
132}
133
134fn default_offset_key() -> OptionalValuePath {
135 OptionalValuePath::from(owned_value_path!("offset"))
136}
137
138impl_generate_config_from_default!(AmqpSourceConfig);
139
140impl AmqpSourceConfig {
141 fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
142 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
143 }
144}
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "amqp")]
148impl SourceConfig for AmqpSourceConfig {
149 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
150 let log_namespace = cx.log_namespace(self.log_namespace);
151 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
152
153 amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
154 }
155
156 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
157 let log_namespace = global_log_namespace.merge(self.log_namespace);
158 let schema_definition = self
159 .decoding
160 .schema_definition(log_namespace)
161 .with_standard_vector_source_metadata()
162 .with_source_metadata(
163 AmqpSourceConfig::NAME,
164 None,
165 &owned_value_path!("timestamp"),
166 Kind::timestamp(),
167 Some("timestamp"),
168 )
169 .with_source_metadata(
170 AmqpSourceConfig::NAME,
171 self.routing_key_field
172 .path
173 .clone()
174 .map(LegacyKey::InsertIfEmpty),
175 &owned_value_path!("routing"),
176 Kind::bytes(),
177 None,
178 )
179 .with_source_metadata(
180 AmqpSourceConfig::NAME,
181 self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
182 &owned_value_path!("exchange"),
183 Kind::bytes(),
184 None,
185 )
186 .with_source_metadata(
187 AmqpSourceConfig::NAME,
188 self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
189 &owned_value_path!("offset"),
190 Kind::integer(),
191 None,
192 );
193
194 vec![SourceOutput::new_maybe_logs(
195 self.decoding.output_type(),
196 schema_definition,
197 )]
198 }
199
200 fn can_acknowledge(&self) -> bool {
201 true
202 }
203}
204
205#[derive(Debug)]
206struct FinalizerEntry {
207 acker: Acker,
208}
209
210impl From<Delivery> for FinalizerEntry {
211 fn from(delivery: Delivery) -> Self {
212 Self {
213 acker: delivery.acker,
214 }
215 }
216}
217
218pub(crate) async fn amqp_source(
219 config: &AmqpSourceConfig,
220 shutdown: ShutdownSignal,
221 out: SourceSender,
222 log_namespace: LogNamespace,
223 acknowledgements: bool,
224) -> crate::Result<super::Source> {
225 let config = config.clone();
226 let (_conn, channel) = config
227 .connection
228 .connect()
229 .await
230 .map_err(|source| BuildError::AmqpCreateError { source })?;
231
232 Ok(Box::pin(run_amqp_source(
233 config,
234 shutdown,
235 out,
236 channel,
237 log_namespace,
238 acknowledgements,
239 )))
240}
241
242struct Keys<'a> {
243 routing_key_field: &'a OptionalValuePath,
244 routing: &'a str,
245 exchange_key: &'a OptionalValuePath,
246 exchange: &'a str,
247 offset_key: &'a OptionalValuePath,
248 delivery_tag: i64,
249}
250
251fn populate_log_event(
253 log: &mut LogEvent,
254 timestamp: Option<chrono::DateTime<Utc>>,
255 keys: &Keys<'_>,
256 log_namespace: LogNamespace,
257) {
258 log_namespace.insert_source_metadata(
259 AmqpSourceConfig::NAME,
260 log,
261 keys.routing_key_field
262 .path
263 .as_ref()
264 .map(LegacyKey::InsertIfEmpty),
265 path!("routing"),
266 keys.routing.to_string(),
267 );
268
269 log_namespace.insert_source_metadata(
270 AmqpSourceConfig::NAME,
271 log,
272 keys.exchange_key
273 .path
274 .as_ref()
275 .map(LegacyKey::InsertIfEmpty),
276 path!("exchange"),
277 keys.exchange.to_string(),
278 );
279
280 log_namespace.insert_source_metadata(
281 AmqpSourceConfig::NAME,
282 log,
283 keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
284 path!("offset"),
285 keys.delivery_tag,
286 );
287
288 log_namespace.insert_vector_metadata(
289 log,
290 log_schema().source_type_key(),
291 path!("source_type"),
292 Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
293 );
294
295 match log_namespace {
299 LogNamespace::Vector => {
300 if let Some(timestamp) = timestamp {
301 log.insert(
302 metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
303 timestamp,
304 );
305 };
306
307 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
308 }
309 LogNamespace::Legacy => {
310 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
311 log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
312 }
313 }
314 };
315}
316
317async fn receive_event(
319 config: &AmqpSourceConfig,
320 out: &mut SourceSender,
321 log_namespace: LogNamespace,
322 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
323 msg: Delivery,
324) -> Result<(), ()> {
325 let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
326 let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
327 let mut stream = DecoderFramedRead::new(payload, decoder);
328
329 let timestamp = msg
331 .properties
332 .timestamp()
333 .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
334
335 let routing = msg.routing_key.to_string();
336 let exchange = msg.exchange.to_string();
337 let keys = Keys {
338 routing_key_field: &config.routing_key_field,
339 exchange_key: &config.exchange_key,
340 offset_key: &config.offset_key,
341 routing: &routing,
342 exchange: &exchange,
343 delivery_tag: msg.delivery_tag as i64,
344 };
345 let events_received = register!(EventsReceived);
346
347 let stream = stream! {
348 while let Some(result) = stream.next().await {
349 match result {
350 Ok((events, byte_size)) => {
351 emit!(AmqpBytesReceived {
352 byte_size,
353 protocol: "amqp_0_9_1",
354 });
355
356 events_received.emit(CountByteSize(
357 events.len(),
358 events.estimated_json_encoded_size_of(),
359 ));
360
361 for mut event in events {
362 if let Event::Log(ref mut log) = event {
363 populate_log_event(log,
364 timestamp,
365 &keys,
366 log_namespace);
367 }
368
369 yield event;
370 }
371 }
372 Err(error) => {
373 use vector_lib::codecs::StreamDecodingError as _;
374
375 if !error.can_continue() {
378 break;
379 }
380 }
381 }
382 }
383 }
384 .boxed();
385
386 finalize_event_stream(finalizer, out, stream, msg).await;
387
388 Ok(())
389}
390
391async fn finalize_event_stream(
393 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
394 out: &mut SourceSender,
395 mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
396 msg: Delivery,
397) {
398 match finalizer {
399 Some(finalizer) => {
400 let (batch, receiver) = BatchNotifier::new_with_receiver();
401 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
402
403 match out.send_event_stream(&mut stream).await {
404 Err(_) => {
405 emit!(StreamClosedError { count: 1 });
406 }
407 Ok(_) => {
408 finalizer.add(msg.into(), receiver);
409 }
410 }
411 }
412 None => match out.send_event_stream(&mut stream).await {
413 Err(_) => {
414 emit!(StreamClosedError { count: 1 });
415 }
416 Ok(_) => {
417 let ack_options = lapin::options::BasicAckOptions::default();
418 if let Err(error) = msg.acker.ack(ack_options).await {
419 emit!(AmqpAckError { error });
420 }
421 }
422 },
423 }
424}
425
426async fn run_amqp_source(
428 config: AmqpSourceConfig,
429 shutdown: ShutdownSignal,
430 mut out: SourceSender,
431 channel: Channel,
432 log_namespace: LogNamespace,
433 acknowledgements: bool,
434) -> Result<(), ()> {
435 let (finalizer, mut ack_stream) =
436 UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
437
438 if let Some(count) = config.prefetch_count {
440 channel
442 .basic_qos(count, BasicQosOptions { global: false })
443 .await
444 .map_err(|error| {
445 error!(message = "Failed to apply basic_qos.", ?error);
446 })?;
447 }
448
449 debug!("Starting amqp source, listening to queue {}.", config.queue);
450 let mut consumer = channel
451 .basic_consume(
452 config.queue.clone().into(),
453 config.consumer.clone().into(),
454 lapin::options::BasicConsumeOptions::default(),
455 lapin::types::FieldTable::default(),
456 )
457 .await
458 .map_err(|error| {
459 error!(message = "Failed to consume.", ?error);
460 })?
461 .fuse();
462 let mut shutdown = shutdown.fuse();
463 loop {
464 tokio::select! {
465 _ = &mut shutdown => break,
466 entry = ack_stream.next() => {
467 if let Some((status, entry)) = entry {
468 handle_ack(status, entry).await;
469 }
470 },
471 opt_m = consumer.next() => {
472 if let Some(try_m) = opt_m {
473 match try_m {
474 Err(error) => {
475 emit!(AmqpEventError { error });
476 return Err(());
477 }
478 Ok(msg) => {
479 receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
480 }
481 }
482 } else {
483 break
484 }
485 }
486 };
487 }
488
489 Ok(())
490}
491
492async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
493 match status {
494 BatchStatus::Delivered => {
495 let ack_options = lapin::options::BasicAckOptions::default();
496 if let Err(error) = entry.acker.ack(ack_options).await {
497 emit!(AmqpAckError { error });
498 }
499 }
500 BatchStatus::Errored => {
501 let ack_options = lapin::options::BasicRejectOptions::default();
502 if let Err(error) = entry.acker.reject(ack_options).await {
503 emit!(AmqpRejectError { error });
504 }
505 }
506 BatchStatus::Rejected => {
507 let ack_options = lapin::options::BasicRejectOptions::default();
508 if let Err(error) = entry.acker.reject(ack_options).await {
509 emit!(AmqpRejectError { error });
510 }
511 }
512 }
513}
514
515#[cfg(test)]
516pub mod test {
517 use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig};
518 use vrl::value::kind::Collection;
519
520 use super::*;
521
522 #[test]
523 fn generate_config() {
524 crate::test_util::test_generate_config::<AmqpSourceConfig>();
525 }
526
527 pub fn make_config() -> AmqpSourceConfig {
528 let mut config = AmqpSourceConfig {
529 queue: "it".to_string(),
530 ..Default::default()
531 };
532 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
533 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
534 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
535 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
536 config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
537
538 config
539 }
540
541 pub fn make_tls_config() -> AmqpSourceConfig {
542 let mut config = AmqpSourceConfig {
543 queue: "it".to_string(),
544 ..Default::default()
545 };
546 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
547 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
548 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
549 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
550 let ca_file =
551 std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
552 config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
553 let tls = TlsConfig {
554 ca_file: Some(ca_file.as_str().into()),
555 ..Default::default()
556 };
557 config.connection.tls = Some(tls);
558 config
559 }
560
561 #[test]
562 fn output_schema_definition_vector_namespace() {
563 let config = AmqpSourceConfig {
564 log_namespace: Some(true),
565 ..Default::default()
566 };
567
568 let definition = config
569 .outputs(LogNamespace::Vector)
570 .remove(0)
571 .schema_definition(true);
572
573 let expected_definition =
574 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
575 .with_meaning(OwnedTargetPath::event_root(), "message")
576 .with_metadata_field(
577 &owned_value_path!("vector", "source_type"),
578 Kind::bytes(),
579 None,
580 )
581 .with_metadata_field(
582 &owned_value_path!("vector", "ingest_timestamp"),
583 Kind::timestamp(),
584 None,
585 )
586 .with_metadata_field(
587 &owned_value_path!("amqp", "timestamp"),
588 Kind::timestamp(),
589 Some("timestamp"),
590 )
591 .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
592 .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
593 .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
594
595 assert_eq!(definition, Some(expected_definition));
596 }
597
598 #[test]
599 fn output_schema_definition_legacy_namespace() {
600 let config = AmqpSourceConfig::default();
601
602 let definition = config
603 .outputs(LogNamespace::Legacy)
604 .remove(0)
605 .schema_definition(true);
606
607 let expected_definition = Definition::new_with_default_metadata(
608 Kind::object(Collection::empty()),
609 [LogNamespace::Legacy],
610 )
611 .with_event_field(
612 &owned_value_path!("message"),
613 Kind::bytes(),
614 Some("message"),
615 )
616 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
617 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
618 .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
619 .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
620 .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
621
622 assert_eq!(definition, Some(expected_definition));
623 }
624}
625
626#[cfg(feature = "amqp-integration-tests")]
628#[cfg(test)]
629mod integration_test {
630 use chrono::Utc;
631 use lapin::types::ShortString;
632 use lapin::{BasicProperties, options::*};
633 use tokio::time::Duration;
634 use vector_lib::config::log_schema;
635
636 use super::{test::*, *};
637 use crate::{
638 SourceSender,
639 amqp::await_connection,
640 shutdown::ShutdownSignal,
641 test_util::{
642 components::{SOURCE_TAGS, run_and_assert_source_compliance},
643 random_string,
644 },
645 };
646
647 #[tokio::test]
648 async fn amqp_source_create_ok() {
649 let config = make_config();
650 await_connection(&config.connection).await;
651 assert!(
652 amqp_source(
653 &config,
654 ShutdownSignal::noop(),
655 SourceSender::new_test().0,
656 LogNamespace::Legacy,
657 false,
658 )
659 .await
660 .is_ok()
661 );
662 }
663
664 #[tokio::test]
665 async fn amqp_tls_source_create_ok() {
666 let config = make_tls_config();
667 await_connection(&config.connection).await;
668
669 assert!(
670 amqp_source(
671 &config,
672 ShutdownSignal::noop(),
673 SourceSender::new_test().0,
674 LogNamespace::Legacy,
675 false,
676 )
677 .await
678 .is_ok()
679 );
680 }
681
682 async fn send_event(
683 channel: &lapin::Channel,
684 exchange: &str,
685 routing_key: &str,
686 text: &str,
687 _timestamp: i64,
688 ) {
689 let payload = text.as_bytes();
690 let payload_len = payload.len();
691 trace!("Sending message of length {} to {}.", payload_len, exchange,);
692
693 channel
694 .basic_publish(
695 exchange.into(),
696 routing_key.into(),
697 BasicPublishOptions::default(),
698 payload.as_ref(),
699 BasicProperties::default(),
700 )
701 .await
702 .unwrap()
703 .await
704 .unwrap();
705 }
706
707 async fn source_consume_event(mut config: AmqpSourceConfig) {
708 let exchange = format!("test-{}-exchange", random_string(10));
709 let queue = format!("test-{}-queue", random_string(10));
710 let routing_key = "my_key";
711 trace!("Test exchange name: {}.", exchange);
712 let exchange: ShortString = exchange.into();
713 let consumer = format!("test-consumer-{}", random_string(10));
714
715 config.consumer = consumer;
716 config.queue = queue;
717
718 let (_conn, channel) = config.connection.connect().await.unwrap();
719 let exchange_opts = lapin::options::ExchangeDeclareOptions {
720 auto_delete: true,
721 ..Default::default()
722 };
723
724 channel
725 .exchange_declare(
726 exchange.clone(),
727 lapin::ExchangeKind::Fanout,
728 exchange_opts,
729 lapin::types::FieldTable::default(),
730 )
731 .await
732 .unwrap();
733
734 let queue_opts = QueueDeclareOptions {
735 auto_delete: true,
736 ..Default::default()
737 };
738 let queue: ShortString = config.queue.clone().into();
739 channel
740 .queue_declare(
741 queue.clone(),
742 queue_opts,
743 lapin::types::FieldTable::default(),
744 )
745 .await
746 .unwrap();
747
748 channel
749 .queue_bind(
750 queue,
751 exchange.clone(),
752 "".into(),
753 lapin::options::QueueBindOptions::default(),
754 lapin::types::FieldTable::default(),
755 )
756 .await
757 .unwrap();
758
759 trace!("Sending event...");
760 let now = Utc::now();
761 send_event(
762 &channel,
763 exchange.as_str(),
764 routing_key,
765 "my message",
766 now.timestamp_millis(),
767 )
768 .await;
769
770 trace!("Receiving event...");
771 let events =
772 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
773 assert!(!events.is_empty());
774
775 let log = events[0].as_log();
776 trace!("{:?}", log);
777 assert_eq!(*log.get_message().unwrap(), "my message".into());
778 assert_eq!(log["routing"], routing_key.into());
779 assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
780 let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
781 .as_timestamp()
782 .unwrap();
783 assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
784 assert_eq!(log["exchange"], exchange.as_str().into());
785 }
786
787 #[tokio::test]
788 async fn amqp_source_consume_event() {
789 let config = make_config();
790 await_connection(&config.connection).await;
791 source_consume_event(config).await;
792 }
793
794 #[tokio::test]
795 async fn amqp_tls_source_consume_event() {
796 let config = make_tls_config();
797 await_connection(&config.connection).await;
798 source_consume_event(config).await;
799 }
800}