1use std::{io::Cursor, pin::Pin};
4
5use async_stream::stream;
6use bytes::Bytes;
7use chrono::{TimeZone, Utc};
8use futures::{FutureExt, StreamExt};
9use futures_util::Stream;
10use lapin::{Channel, acker::Acker, message::Delivery};
11use snafu::Snafu;
12use tokio_util::codec::FramedRead;
13use vector_lib::{
14 EstimatedJsonEncodedSizeOf,
15 codecs::decoding::{DeserializerConfig, FramingConfig},
16 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig, log_schema},
17 configurable::configurable_component,
18 event::{Event, LogEvent},
19 finalizer::UnorderedFinalizer,
20 internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
21 lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path},
22};
23use vrl::value::Kind;
24
25use crate::{
26 SourceSender,
27 amqp::AmqpConfig,
28 codecs::{Decoder, DecodingConfig},
29 config::{SourceConfig, SourceContext, SourceOutput},
30 event::{BatchNotifier, BatchStatus},
31 internal_events::{
32 StreamClosedError,
33 source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
34 },
35 serde::{bool_or_struct, default_decoding, default_framing_message_based},
36 shutdown::ShutdownSignal,
37};
38
39#[derive(Debug, Snafu)]
40enum BuildError {
41 #[snafu(display("Could not create AMQP consumer: {}", source))]
42 AmqpCreateError {
43 source: Box<dyn std::error::Error + Send + Sync>,
44 },
45}
46
47#[configurable_component(source(
51 "amqp",
52 "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
53))]
54#[derive(Clone, Debug, Derivative)]
55#[derivative(Default)]
56#[serde(deny_unknown_fields)]
57pub struct AmqpSourceConfig {
58 #[serde(default = "default_queue")]
60 pub(crate) queue: String,
61
62 #[serde(default = "default_consumer")]
64 #[configurable(metadata(docs::examples = "consumer-group-name"))]
65 pub(crate) consumer: String,
66
67 #[serde(flatten)]
68 pub(crate) connection: AmqpConfig,
69
70 #[serde(default = "default_routing_key_field")]
72 #[derivative(Default(value = "default_routing_key_field()"))]
73 pub(crate) routing_key_field: OptionalValuePath,
74
75 #[serde(default = "default_exchange_key")]
77 #[derivative(Default(value = "default_exchange_key()"))]
78 pub(crate) exchange_key: OptionalValuePath,
79
80 #[serde(default = "default_offset_key")]
82 #[derivative(Default(value = "default_offset_key()"))]
83 pub(crate) offset_key: OptionalValuePath,
84
85 #[configurable(metadata(docs::hidden))]
87 #[serde(default)]
88 pub log_namespace: Option<bool>,
89
90 #[configurable(derived)]
91 #[serde(default = "default_framing_message_based")]
92 #[derivative(Default(value = "default_framing_message_based()"))]
93 pub(crate) framing: FramingConfig,
94
95 #[configurable(derived)]
96 #[serde(default = "default_decoding")]
97 #[derivative(Default(value = "default_decoding()"))]
98 pub(crate) decoding: DeserializerConfig,
99
100 #[configurable(derived)]
101 #[serde(default, deserialize_with = "bool_or_struct")]
102 pub(crate) acknowledgements: SourceAcknowledgementsConfig,
103}
104
105fn default_queue() -> String {
106 "vector".into()
107}
108
109fn default_consumer() -> String {
110 "vector".into()
111}
112
113fn default_routing_key_field() -> OptionalValuePath {
114 OptionalValuePath::from(owned_value_path!("routing"))
115}
116
117fn default_exchange_key() -> OptionalValuePath {
118 OptionalValuePath::from(owned_value_path!("exchange"))
119}
120
121fn default_offset_key() -> OptionalValuePath {
122 OptionalValuePath::from(owned_value_path!("offset"))
123}
124
125impl_generate_config_from_default!(AmqpSourceConfig);
126
127impl AmqpSourceConfig {
128 fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
129 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
130 }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde(name = "amqp")]
135impl SourceConfig for AmqpSourceConfig {
136 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
137 let log_namespace = cx.log_namespace(self.log_namespace);
138 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
139
140 amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
141 }
142
143 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
144 let log_namespace = global_log_namespace.merge(self.log_namespace);
145 let schema_definition = self
146 .decoding
147 .schema_definition(log_namespace)
148 .with_standard_vector_source_metadata()
149 .with_source_metadata(
150 AmqpSourceConfig::NAME,
151 None,
152 &owned_value_path!("timestamp"),
153 Kind::timestamp(),
154 Some("timestamp"),
155 )
156 .with_source_metadata(
157 AmqpSourceConfig::NAME,
158 self.routing_key_field
159 .path
160 .clone()
161 .map(LegacyKey::InsertIfEmpty),
162 &owned_value_path!("routing"),
163 Kind::bytes(),
164 None,
165 )
166 .with_source_metadata(
167 AmqpSourceConfig::NAME,
168 self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
169 &owned_value_path!("exchange"),
170 Kind::bytes(),
171 None,
172 )
173 .with_source_metadata(
174 AmqpSourceConfig::NAME,
175 self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
176 &owned_value_path!("offset"),
177 Kind::integer(),
178 None,
179 );
180
181 vec![SourceOutput::new_maybe_logs(
182 self.decoding.output_type(),
183 schema_definition,
184 )]
185 }
186
187 fn can_acknowledge(&self) -> bool {
188 true
189 }
190}
191
192#[derive(Debug)]
193struct FinalizerEntry {
194 acker: Acker,
195}
196
197impl From<Delivery> for FinalizerEntry {
198 fn from(delivery: Delivery) -> Self {
199 Self {
200 acker: delivery.acker,
201 }
202 }
203}
204
205pub(crate) async fn amqp_source(
206 config: &AmqpSourceConfig,
207 shutdown: ShutdownSignal,
208 out: SourceSender,
209 log_namespace: LogNamespace,
210 acknowledgements: bool,
211) -> crate::Result<super::Source> {
212 let config = config.clone();
213 let (_conn, channel) = config
214 .connection
215 .connect()
216 .await
217 .map_err(|source| BuildError::AmqpCreateError { source })?;
218
219 Ok(Box::pin(run_amqp_source(
220 config,
221 shutdown,
222 out,
223 channel,
224 log_namespace,
225 acknowledgements,
226 )))
227}
228
229struct Keys<'a> {
230 routing_key_field: &'a OptionalValuePath,
231 routing: &'a str,
232 exchange_key: &'a OptionalValuePath,
233 exchange: &'a str,
234 offset_key: &'a OptionalValuePath,
235 delivery_tag: i64,
236}
237
238fn populate_log_event(
240 log: &mut LogEvent,
241 timestamp: Option<chrono::DateTime<Utc>>,
242 keys: &Keys<'_>,
243 log_namespace: LogNamespace,
244) {
245 log_namespace.insert_source_metadata(
246 AmqpSourceConfig::NAME,
247 log,
248 keys.routing_key_field
249 .path
250 .as_ref()
251 .map(LegacyKey::InsertIfEmpty),
252 path!("routing"),
253 keys.routing.to_string(),
254 );
255
256 log_namespace.insert_source_metadata(
257 AmqpSourceConfig::NAME,
258 log,
259 keys.exchange_key
260 .path
261 .as_ref()
262 .map(LegacyKey::InsertIfEmpty),
263 path!("exchange"),
264 keys.exchange.to_string(),
265 );
266
267 log_namespace.insert_source_metadata(
268 AmqpSourceConfig::NAME,
269 log,
270 keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
271 path!("offset"),
272 keys.delivery_tag,
273 );
274
275 log_namespace.insert_vector_metadata(
276 log,
277 log_schema().source_type_key(),
278 path!("source_type"),
279 Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
280 );
281
282 match log_namespace {
286 LogNamespace::Vector => {
287 if let Some(timestamp) = timestamp {
288 log.insert(
289 metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
290 timestamp,
291 );
292 };
293
294 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
295 }
296 LogNamespace::Legacy => {
297 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
298 log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
299 }
300 }
301 };
302}
303
304async fn receive_event(
306 config: &AmqpSourceConfig,
307 out: &mut SourceSender,
308 log_namespace: LogNamespace,
309 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
310 msg: Delivery,
311) -> Result<(), ()> {
312 let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
313 let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
314 let mut stream = FramedRead::new(payload, decoder);
315
316 let timestamp = msg
318 .properties
319 .timestamp()
320 .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
321
322 let routing = msg.routing_key.to_string();
323 let exchange = msg.exchange.to_string();
324 let keys = Keys {
325 routing_key_field: &config.routing_key_field,
326 exchange_key: &config.exchange_key,
327 offset_key: &config.offset_key,
328 routing: &routing,
329 exchange: &exchange,
330 delivery_tag: msg.delivery_tag as i64,
331 };
332 let events_received = register!(EventsReceived);
333
334 let stream = stream! {
335 while let Some(result) = stream.next().await {
336 match result {
337 Ok((events, byte_size)) => {
338 emit!(AmqpBytesReceived {
339 byte_size,
340 protocol: "amqp_0_9_1",
341 });
342
343 events_received.emit(CountByteSize(
344 events.len(),
345 events.estimated_json_encoded_size_of(),
346 ));
347
348 for mut event in events {
349 if let Event::Log(ref mut log) = event {
350 populate_log_event(log,
351 timestamp,
352 &keys,
353 log_namespace);
354 }
355
356 yield event;
357 }
358 }
359 Err(error) => {
360 use vector_lib::codecs::StreamDecodingError as _;
361
362 if !error.can_continue() {
365 break;
366 }
367 }
368 }
369 }
370 }
371 .boxed();
372
373 finalize_event_stream(finalizer, out, stream, msg).await;
374
375 Ok(())
376}
377
378async fn finalize_event_stream(
380 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
381 out: &mut SourceSender,
382 mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
383 msg: Delivery,
384) {
385 match finalizer {
386 Some(finalizer) => {
387 let (batch, receiver) = BatchNotifier::new_with_receiver();
388 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
389
390 match out.send_event_stream(&mut stream).await {
391 Err(_) => {
392 emit!(StreamClosedError { count: 1 });
393 }
394 Ok(_) => {
395 finalizer.add(msg.into(), receiver);
396 }
397 }
398 }
399 None => match out.send_event_stream(&mut stream).await {
400 Err(_) => {
401 emit!(StreamClosedError { count: 1 });
402 }
403 Ok(_) => {
404 let ack_options = lapin::options::BasicAckOptions::default();
405 if let Err(error) = msg.acker.ack(ack_options).await {
406 emit!(AmqpAckError { error });
407 }
408 }
409 },
410 }
411}
412
413async fn run_amqp_source(
415 config: AmqpSourceConfig,
416 shutdown: ShutdownSignal,
417 mut out: SourceSender,
418 channel: Channel,
419 log_namespace: LogNamespace,
420 acknowledgements: bool,
421) -> Result<(), ()> {
422 let (finalizer, mut ack_stream) =
423 UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
424
425 debug!("Starting amqp source, listening to queue {}.", config.queue);
426 let mut consumer = channel
427 .basic_consume(
428 &config.queue,
429 &config.consumer,
430 lapin::options::BasicConsumeOptions::default(),
431 lapin::types::FieldTable::default(),
432 )
433 .await
434 .map_err(|error| {
435 error!(message = "Failed to consume.", error = ?error, internal_log_rate_limit = true);
436 })?
437 .fuse();
438 let mut shutdown = shutdown.fuse();
439 loop {
440 tokio::select! {
441 _ = &mut shutdown => break,
442 entry = ack_stream.next() => {
443 if let Some((status, entry)) = entry {
444 handle_ack(status, entry).await;
445 }
446 },
447 opt_m = consumer.next() => {
448 if let Some(try_m) = opt_m {
449 match try_m {
450 Err(error) => {
451 emit!(AmqpEventError { error });
452 return Err(());
453 }
454 Ok(msg) => {
455 receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
456 }
457 }
458 } else {
459 break
460 }
461 }
462 };
463 }
464
465 Ok(())
466}
467
468async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
469 match status {
470 BatchStatus::Delivered => {
471 let ack_options = lapin::options::BasicAckOptions::default();
472 if let Err(error) = entry.acker.ack(ack_options).await {
473 emit!(AmqpAckError { error });
474 }
475 }
476 BatchStatus::Errored => {
477 let ack_options = lapin::options::BasicRejectOptions::default();
478 if let Err(error) = entry.acker.reject(ack_options).await {
479 emit!(AmqpRejectError { error });
480 }
481 }
482 BatchStatus::Rejected => {
483 let ack_options = lapin::options::BasicRejectOptions::default();
484 if let Err(error) = entry.acker.reject(ack_options).await {
485 emit!(AmqpRejectError { error });
486 }
487 }
488 }
489}
490
491#[cfg(test)]
492pub mod test {
493 use vector_lib::{lookup::OwnedTargetPath, schema::Definition, tls::TlsConfig};
494 use vrl::value::kind::Collection;
495
496 use super::*;
497
498 #[test]
499 fn generate_config() {
500 crate::test_util::test_generate_config::<AmqpSourceConfig>();
501 }
502
503 pub fn make_config() -> AmqpSourceConfig {
504 let mut config = AmqpSourceConfig {
505 queue: "it".to_string(),
506 ..Default::default()
507 };
508 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
509 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
510 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
511 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
512 config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
513
514 config
515 }
516
517 pub fn make_tls_config() -> AmqpSourceConfig {
518 let mut config = AmqpSourceConfig {
519 queue: "it".to_string(),
520 ..Default::default()
521 };
522 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
523 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
524 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
525 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
526 let ca_file =
527 std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
528 config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
529 let tls = TlsConfig {
530 ca_file: Some(ca_file.as_str().into()),
531 ..Default::default()
532 };
533 config.connection.tls = Some(tls);
534 config
535 }
536
537 #[test]
538 fn output_schema_definition_vector_namespace() {
539 let config = AmqpSourceConfig {
540 log_namespace: Some(true),
541 ..Default::default()
542 };
543
544 let definition = config
545 .outputs(LogNamespace::Vector)
546 .remove(0)
547 .schema_definition(true);
548
549 let expected_definition =
550 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
551 .with_meaning(OwnedTargetPath::event_root(), "message")
552 .with_metadata_field(
553 &owned_value_path!("vector", "source_type"),
554 Kind::bytes(),
555 None,
556 )
557 .with_metadata_field(
558 &owned_value_path!("vector", "ingest_timestamp"),
559 Kind::timestamp(),
560 None,
561 )
562 .with_metadata_field(
563 &owned_value_path!("amqp", "timestamp"),
564 Kind::timestamp(),
565 Some("timestamp"),
566 )
567 .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
568 .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
569 .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
570
571 assert_eq!(definition, Some(expected_definition));
572 }
573
574 #[test]
575 fn output_schema_definition_legacy_namespace() {
576 let config = AmqpSourceConfig::default();
577
578 let definition = config
579 .outputs(LogNamespace::Legacy)
580 .remove(0)
581 .schema_definition(true);
582
583 let expected_definition = Definition::new_with_default_metadata(
584 Kind::object(Collection::empty()),
585 [LogNamespace::Legacy],
586 )
587 .with_event_field(
588 &owned_value_path!("message"),
589 Kind::bytes(),
590 Some("message"),
591 )
592 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
593 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
594 .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
595 .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
596 .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
597
598 assert_eq!(definition, Some(expected_definition));
599 }
600}
601
602#[cfg(feature = "amqp-integration-tests")]
604#[cfg(test)]
605mod integration_test {
606 use chrono::Utc;
607 use lapin::{BasicProperties, options::*};
608 use tokio::time::Duration;
609 use vector_lib::config::log_schema;
610
611 use super::{test::*, *};
612 use crate::{
613 SourceSender,
614 amqp::await_connection,
615 shutdown::ShutdownSignal,
616 test_util::{
617 components::{SOURCE_TAGS, run_and_assert_source_compliance},
618 random_string,
619 },
620 };
621
622 #[tokio::test]
623 async fn amqp_source_create_ok() {
624 let config = make_config();
625 await_connection(&config.connection).await;
626 assert!(
627 amqp_source(
628 &config,
629 ShutdownSignal::noop(),
630 SourceSender::new_test().0,
631 LogNamespace::Legacy,
632 false,
633 )
634 .await
635 .is_ok()
636 );
637 }
638
639 #[tokio::test]
640 async fn amqp_tls_source_create_ok() {
641 let config = make_tls_config();
642 await_connection(&config.connection).await;
643
644 assert!(
645 amqp_source(
646 &config,
647 ShutdownSignal::noop(),
648 SourceSender::new_test().0,
649 LogNamespace::Legacy,
650 false,
651 )
652 .await
653 .is_ok()
654 );
655 }
656
657 async fn send_event(
658 channel: &lapin::Channel,
659 exchange: &str,
660 routing_key: &str,
661 text: &str,
662 _timestamp: i64,
663 ) {
664 let payload = text.as_bytes();
665 let payload_len = payload.len();
666 trace!("Sending message of length {} to {}.", payload_len, exchange,);
667
668 channel
669 .basic_publish(
670 exchange,
671 routing_key,
672 BasicPublishOptions::default(),
673 payload.as_ref(),
674 BasicProperties::default(),
675 )
676 .await
677 .unwrap()
678 .await
679 .unwrap();
680 }
681
682 async fn source_consume_event(mut config: AmqpSourceConfig) {
683 let exchange = format!("test-{}-exchange", random_string(10));
684 let queue = format!("test-{}-queue", random_string(10));
685 let routing_key = "my_key";
686 trace!("Test exchange name: {}.", exchange);
687 let consumer = format!("test-consumer-{}", random_string(10));
688
689 config.consumer = consumer;
690 config.queue = queue;
691
692 let (_conn, channel) = config.connection.connect().await.unwrap();
693 let exchange_opts = lapin::options::ExchangeDeclareOptions {
694 auto_delete: true,
695 ..Default::default()
696 };
697
698 channel
699 .exchange_declare(
700 &exchange,
701 lapin::ExchangeKind::Fanout,
702 exchange_opts,
703 lapin::types::FieldTable::default(),
704 )
705 .await
706 .unwrap();
707
708 let queue_opts = QueueDeclareOptions {
709 auto_delete: true,
710 ..Default::default()
711 };
712 channel
713 .queue_declare(
714 &config.queue,
715 queue_opts,
716 lapin::types::FieldTable::default(),
717 )
718 .await
719 .unwrap();
720
721 channel
722 .queue_bind(
723 &config.queue,
724 &exchange,
725 "",
726 lapin::options::QueueBindOptions::default(),
727 lapin::types::FieldTable::default(),
728 )
729 .await
730 .unwrap();
731
732 trace!("Sending event...");
733 let now = Utc::now();
734 send_event(
735 &channel,
736 &exchange,
737 routing_key,
738 "my message",
739 now.timestamp_millis(),
740 )
741 .await;
742
743 trace!("Receiving event...");
744 let events =
745 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
746 assert!(!events.is_empty());
747
748 let log = events[0].as_log();
749 trace!("{:?}", log);
750 assert_eq!(*log.get_message().unwrap(), "my message".into());
751 assert_eq!(log["routing"], routing_key.into());
752 assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
753 let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
754 .as_timestamp()
755 .unwrap();
756 assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
757 assert_eq!(log["exchange"], exchange.into());
758 }
759
760 #[tokio::test]
761 async fn amqp_source_consume_event() {
762 let config = make_config();
763 await_connection(&config.connection).await;
764 source_consume_event(config).await;
765 }
766
767 #[tokio::test]
768 async fn amqp_tls_source_consume_event() {
769 let config = make_tls_config();
770 await_connection(&config.connection).await;
771 source_consume_event(config).await;
772 }
773}