1use crate::{
4 amqp::AmqpConfig,
5 codecs::{Decoder, DecodingConfig},
6 config::{SourceConfig, SourceContext, SourceOutput},
7 event::{BatchNotifier, BatchStatus},
8 internal_events::{
9 source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
10 StreamClosedError,
11 },
12 serde::{bool_or_struct, default_decoding, default_framing_message_based},
13 shutdown::ShutdownSignal,
14 SourceSender,
15};
16use async_stream::stream;
17use bytes::Bytes;
18use chrono::{TimeZone, Utc};
19use futures::{FutureExt, StreamExt};
20use futures_util::Stream;
21use lapin::{acker::Acker, message::Delivery, Channel};
22use snafu::Snafu;
23use std::{io::Cursor, pin::Pin};
24use tokio_util::codec::FramedRead;
25use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
26use vector_lib::configurable::configurable_component;
27use vector_lib::lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path};
28use vector_lib::{
29 config::{log_schema, LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
30 event::{Event, LogEvent},
31 EstimatedJsonEncodedSizeOf,
32};
33use vector_lib::{
34 finalizer::UnorderedFinalizer,
35 internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
36};
37use vrl::value::Kind;
38
39#[derive(Debug, Snafu)]
40enum BuildError {
41 #[snafu(display("Could not create AMQP consumer: {}", source))]
42 AmqpCreateError {
43 source: Box<dyn std::error::Error + Send + Sync>,
44 },
45}
46
47#[configurable_component(source(
51 "amqp",
52 "Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
53))]
54#[derive(Clone, Debug, Derivative)]
55#[derivative(Default)]
56#[serde(deny_unknown_fields)]
57pub struct AmqpSourceConfig {
58 #[serde(default = "default_queue")]
60 pub(crate) queue: String,
61
62 #[serde(default = "default_consumer")]
64 #[configurable(metadata(docs::examples = "consumer-group-name"))]
65 pub(crate) consumer: String,
66
67 #[serde(flatten)]
68 pub(crate) connection: AmqpConfig,
69
70 #[serde(default = "default_routing_key_field")]
72 #[derivative(Default(value = "default_routing_key_field()"))]
73 pub(crate) routing_key_field: OptionalValuePath,
74
75 #[serde(default = "default_exchange_key")]
77 #[derivative(Default(value = "default_exchange_key()"))]
78 pub(crate) exchange_key: OptionalValuePath,
79
80 #[serde(default = "default_offset_key")]
82 #[derivative(Default(value = "default_offset_key()"))]
83 pub(crate) offset_key: OptionalValuePath,
84
85 #[configurable(metadata(docs::hidden))]
87 #[serde(default)]
88 pub log_namespace: Option<bool>,
89
90 #[configurable(derived)]
91 #[serde(default = "default_framing_message_based")]
92 #[derivative(Default(value = "default_framing_message_based()"))]
93 pub(crate) framing: FramingConfig,
94
95 #[configurable(derived)]
96 #[serde(default = "default_decoding")]
97 #[derivative(Default(value = "default_decoding()"))]
98 pub(crate) decoding: DeserializerConfig,
99
100 #[configurable(derived)]
101 #[serde(default, deserialize_with = "bool_or_struct")]
102 pub(crate) acknowledgements: SourceAcknowledgementsConfig,
103}
104
105fn default_queue() -> String {
106 "vector".into()
107}
108
109fn default_consumer() -> String {
110 "vector".into()
111}
112
113fn default_routing_key_field() -> OptionalValuePath {
114 OptionalValuePath::from(owned_value_path!("routing"))
115}
116
117fn default_exchange_key() -> OptionalValuePath {
118 OptionalValuePath::from(owned_value_path!("exchange"))
119}
120
121fn default_offset_key() -> OptionalValuePath {
122 OptionalValuePath::from(owned_value_path!("offset"))
123}
124
125impl_generate_config_from_default!(AmqpSourceConfig);
126
127impl AmqpSourceConfig {
128 fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
129 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
130 }
131}
132
133#[async_trait::async_trait]
134#[typetag::serde(name = "amqp")]
135impl SourceConfig for AmqpSourceConfig {
136 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
137 let log_namespace = cx.log_namespace(self.log_namespace);
138 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
139
140 amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
141 }
142
143 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
144 let log_namespace = global_log_namespace.merge(self.log_namespace);
145 let schema_definition = self
146 .decoding
147 .schema_definition(log_namespace)
148 .with_standard_vector_source_metadata()
149 .with_source_metadata(
150 AmqpSourceConfig::NAME,
151 None,
152 &owned_value_path!("timestamp"),
153 Kind::timestamp(),
154 Some("timestamp"),
155 )
156 .with_source_metadata(
157 AmqpSourceConfig::NAME,
158 self.routing_key_field
159 .path
160 .clone()
161 .map(LegacyKey::InsertIfEmpty),
162 &owned_value_path!("routing"),
163 Kind::bytes(),
164 None,
165 )
166 .with_source_metadata(
167 AmqpSourceConfig::NAME,
168 self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
169 &owned_value_path!("exchange"),
170 Kind::bytes(),
171 None,
172 )
173 .with_source_metadata(
174 AmqpSourceConfig::NAME,
175 self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
176 &owned_value_path!("offset"),
177 Kind::integer(),
178 None,
179 );
180
181 vec![SourceOutput::new_maybe_logs(
182 self.decoding.output_type(),
183 schema_definition,
184 )]
185 }
186
187 fn can_acknowledge(&self) -> bool {
188 true
189 }
190}
191
192#[derive(Debug)]
193struct FinalizerEntry {
194 acker: Acker,
195}
196
197impl From<Delivery> for FinalizerEntry {
198 fn from(delivery: Delivery) -> Self {
199 Self {
200 acker: delivery.acker,
201 }
202 }
203}
204
205pub(crate) async fn amqp_source(
206 config: &AmqpSourceConfig,
207 shutdown: ShutdownSignal,
208 out: SourceSender,
209 log_namespace: LogNamespace,
210 acknowledgements: bool,
211) -> crate::Result<super::Source> {
212 let config = config.clone();
213 let (_conn, channel) = config
214 .connection
215 .connect()
216 .await
217 .map_err(|source| BuildError::AmqpCreateError { source })?;
218
219 Ok(Box::pin(run_amqp_source(
220 config,
221 shutdown,
222 out,
223 channel,
224 log_namespace,
225 acknowledgements,
226 )))
227}
228
229struct Keys<'a> {
230 routing_key_field: &'a OptionalValuePath,
231 routing: &'a str,
232 exchange_key: &'a OptionalValuePath,
233 exchange: &'a str,
234 offset_key: &'a OptionalValuePath,
235 delivery_tag: i64,
236}
237
238fn populate_log_event(
240 log: &mut LogEvent,
241 timestamp: Option<chrono::DateTime<Utc>>,
242 keys: &Keys<'_>,
243 log_namespace: LogNamespace,
244) {
245 log_namespace.insert_source_metadata(
246 AmqpSourceConfig::NAME,
247 log,
248 keys.routing_key_field
249 .path
250 .as_ref()
251 .map(LegacyKey::InsertIfEmpty),
252 path!("routing"),
253 keys.routing.to_string(),
254 );
255
256 log_namespace.insert_source_metadata(
257 AmqpSourceConfig::NAME,
258 log,
259 keys.exchange_key
260 .path
261 .as_ref()
262 .map(LegacyKey::InsertIfEmpty),
263 path!("exchange"),
264 keys.exchange.to_string(),
265 );
266
267 log_namespace.insert_source_metadata(
268 AmqpSourceConfig::NAME,
269 log,
270 keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
271 path!("offset"),
272 keys.delivery_tag,
273 );
274
275 log_namespace.insert_vector_metadata(
276 log,
277 log_schema().source_type_key(),
278 path!("source_type"),
279 Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
280 );
281
282 match log_namespace {
286 LogNamespace::Vector => {
287 if let Some(timestamp) = timestamp {
288 log.insert(
289 metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
290 timestamp,
291 );
292 };
293
294 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
295 }
296 LogNamespace::Legacy => {
297 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
298 log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
299 }
300 }
301 };
302}
303
304async fn receive_event(
306 config: &AmqpSourceConfig,
307 out: &mut SourceSender,
308 log_namespace: LogNamespace,
309 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
310 msg: Delivery,
311) -> Result<(), ()> {
312 let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
313 let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
314 let mut stream = FramedRead::new(payload, decoder);
315
316 let timestamp = msg
318 .properties
319 .timestamp()
320 .and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
321
322 let routing = msg.routing_key.to_string();
323 let exchange = msg.exchange.to_string();
324 let keys = Keys {
325 routing_key_field: &config.routing_key_field,
326 exchange_key: &config.exchange_key,
327 offset_key: &config.offset_key,
328 routing: &routing,
329 exchange: &exchange,
330 delivery_tag: msg.delivery_tag as i64,
331 };
332 let events_received = register!(EventsReceived);
333
334 let stream = stream! {
335 while let Some(result) = stream.next().await {
336 match result {
337 Ok((events, byte_size)) => {
338 emit!(AmqpBytesReceived {
339 byte_size,
340 protocol: "amqp_0_9_1",
341 });
342
343 events_received.emit(CountByteSize(
344 events.len(),
345 events.estimated_json_encoded_size_of(),
346 ));
347
348 for mut event in events {
349 if let Event::Log(ref mut log) = event {
350 populate_log_event(log,
351 timestamp,
352 &keys,
353 log_namespace);
354 }
355
356 yield event;
357 }
358 }
359 Err(error) => {
360 use vector_lib::codecs::StreamDecodingError as _;
361
362 if !error.can_continue() {
365 break;
366 }
367 }
368 }
369 }
370 }
371 .boxed();
372
373 finalize_event_stream(finalizer, out, stream, msg).await;
374
375 Ok(())
376}
377
378async fn finalize_event_stream(
380 finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
381 out: &mut SourceSender,
382 mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
383 msg: Delivery,
384) {
385 match finalizer {
386 Some(finalizer) => {
387 let (batch, receiver) = BatchNotifier::new_with_receiver();
388 let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
389
390 match out.send_event_stream(&mut stream).await {
391 Err(_) => {
392 emit!(StreamClosedError { count: 1 });
393 }
394 Ok(_) => {
395 finalizer.add(msg.into(), receiver);
396 }
397 }
398 }
399 None => match out.send_event_stream(&mut stream).await {
400 Err(_) => {
401 emit!(StreamClosedError { count: 1 });
402 }
403 Ok(_) => {
404 let ack_options = lapin::options::BasicAckOptions::default();
405 if let Err(error) = msg.acker.ack(ack_options).await {
406 emit!(AmqpAckError { error });
407 }
408 }
409 },
410 }
411}
412
413async fn run_amqp_source(
415 config: AmqpSourceConfig,
416 shutdown: ShutdownSignal,
417 mut out: SourceSender,
418 channel: Channel,
419 log_namespace: LogNamespace,
420 acknowledgements: bool,
421) -> Result<(), ()> {
422 let (finalizer, mut ack_stream) =
423 UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
424
425 debug!("Starting amqp source, listening to queue {}.", config.queue);
426 let mut consumer = channel
427 .basic_consume(
428 &config.queue,
429 &config.consumer,
430 lapin::options::BasicConsumeOptions::default(),
431 lapin::types::FieldTable::default(),
432 )
433 .await
434 .map_err(|error| {
435 error!(message = "Failed to consume.", error = ?error, internal_log_rate_limit = true);
436 })?
437 .fuse();
438 let mut shutdown = shutdown.fuse();
439 loop {
440 tokio::select! {
441 _ = &mut shutdown => break,
442 entry = ack_stream.next() => {
443 if let Some((status, entry)) = entry {
444 handle_ack(status, entry).await;
445 }
446 },
447 opt_m = consumer.next() => {
448 if let Some(try_m) = opt_m {
449 match try_m {
450 Err(error) => {
451 emit!(AmqpEventError { error });
452 return Err(());
453 }
454 Ok(msg) => {
455 receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
456 }
457 }
458 } else {
459 break
460 }
461 }
462 };
463 }
464
465 Ok(())
466}
467
468async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
469 match status {
470 BatchStatus::Delivered => {
471 let ack_options = lapin::options::BasicAckOptions::default();
472 if let Err(error) = entry.acker.ack(ack_options).await {
473 emit!(AmqpAckError { error });
474 }
475 }
476 BatchStatus::Errored => {
477 let ack_options = lapin::options::BasicRejectOptions::default();
478 if let Err(error) = entry.acker.reject(ack_options).await {
479 emit!(AmqpRejectError { error });
480 }
481 }
482 BatchStatus::Rejected => {
483 let ack_options = lapin::options::BasicRejectOptions::default();
484 if let Err(error) = entry.acker.reject(ack_options).await {
485 emit!(AmqpRejectError { error });
486 }
487 }
488 }
489}
490
491#[cfg(test)]
492pub mod test {
493 use vector_lib::lookup::OwnedTargetPath;
494 use vector_lib::schema::Definition;
495 use vector_lib::tls::TlsConfig;
496 use vrl::value::kind::Collection;
497
498 use super::*;
499
500 #[test]
501 fn generate_config() {
502 crate::test_util::test_generate_config::<AmqpSourceConfig>();
503 }
504
505 pub fn make_config() -> AmqpSourceConfig {
506 let mut config = AmqpSourceConfig {
507 queue: "it".to_string(),
508 ..Default::default()
509 };
510 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
511 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
512 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
513 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
514 config.connection.connection_string = format!("amqp://{user}:{pass}@{host}:5672/{vhost}");
515
516 config
517 }
518
519 pub fn make_tls_config() -> AmqpSourceConfig {
520 let mut config = AmqpSourceConfig {
521 queue: "it".to_string(),
522 ..Default::default()
523 };
524 let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
525 let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
526 let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
527 let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
528 let ca_file =
529 std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
530 config.connection.connection_string = format!("amqps://{user}:{pass}@{host}/{vhost}");
531 let tls = TlsConfig {
532 ca_file: Some(ca_file.as_str().into()),
533 ..Default::default()
534 };
535 config.connection.tls = Some(tls);
536 config
537 }
538
539 #[test]
540 fn output_schema_definition_vector_namespace() {
541 let config = AmqpSourceConfig {
542 log_namespace: Some(true),
543 ..Default::default()
544 };
545
546 let definition = config
547 .outputs(LogNamespace::Vector)
548 .remove(0)
549 .schema_definition(true);
550
551 let expected_definition =
552 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
553 .with_meaning(OwnedTargetPath::event_root(), "message")
554 .with_metadata_field(
555 &owned_value_path!("vector", "source_type"),
556 Kind::bytes(),
557 None,
558 )
559 .with_metadata_field(
560 &owned_value_path!("vector", "ingest_timestamp"),
561 Kind::timestamp(),
562 None,
563 )
564 .with_metadata_field(
565 &owned_value_path!("amqp", "timestamp"),
566 Kind::timestamp(),
567 Some("timestamp"),
568 )
569 .with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
570 .with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
571 .with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
572
573 assert_eq!(definition, Some(expected_definition));
574 }
575
576 #[test]
577 fn output_schema_definition_legacy_namespace() {
578 let config = AmqpSourceConfig::default();
579
580 let definition = config
581 .outputs(LogNamespace::Legacy)
582 .remove(0)
583 .schema_definition(true);
584
585 let expected_definition = Definition::new_with_default_metadata(
586 Kind::object(Collection::empty()),
587 [LogNamespace::Legacy],
588 )
589 .with_event_field(
590 &owned_value_path!("message"),
591 Kind::bytes(),
592 Some("message"),
593 )
594 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
595 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
596 .with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
597 .with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
598 .with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
599
600 assert_eq!(definition, Some(expected_definition));
601 }
602}
603
604#[cfg(feature = "amqp-integration-tests")]
606#[cfg(test)]
607mod integration_test {
608 use super::test::*;
609 use super::*;
610 use crate::{
611 amqp::await_connection,
612 shutdown::ShutdownSignal,
613 test_util::{
614 components::{run_and_assert_source_compliance, SOURCE_TAGS},
615 random_string,
616 },
617 SourceSender,
618 };
619 use chrono::Utc;
620 use lapin::options::*;
621 use lapin::BasicProperties;
622 use tokio::time::Duration;
623 use vector_lib::config::log_schema;
624
625 #[tokio::test]
626 async fn amqp_source_create_ok() {
627 let config = make_config();
628 await_connection(&config.connection).await;
629 assert!(amqp_source(
630 &config,
631 ShutdownSignal::noop(),
632 SourceSender::new_test().0,
633 LogNamespace::Legacy,
634 false,
635 )
636 .await
637 .is_ok());
638 }
639
640 #[tokio::test]
641 async fn amqp_tls_source_create_ok() {
642 let config = make_tls_config();
643 await_connection(&config.connection).await;
644
645 assert!(amqp_source(
646 &config,
647 ShutdownSignal::noop(),
648 SourceSender::new_test().0,
649 LogNamespace::Legacy,
650 false,
651 )
652 .await
653 .is_ok());
654 }
655
656 async fn send_event(
657 channel: &lapin::Channel,
658 exchange: &str,
659 routing_key: &str,
660 text: &str,
661 _timestamp: i64,
662 ) {
663 let payload = text.as_bytes();
664 let payload_len = payload.len();
665 trace!("Sending message of length {} to {}.", payload_len, exchange,);
666
667 channel
668 .basic_publish(
669 exchange,
670 routing_key,
671 BasicPublishOptions::default(),
672 payload.as_ref(),
673 BasicProperties::default(),
674 )
675 .await
676 .unwrap()
677 .await
678 .unwrap();
679 }
680
681 async fn source_consume_event(mut config: AmqpSourceConfig) {
682 let exchange = format!("test-{}-exchange", random_string(10));
683 let queue = format!("test-{}-queue", random_string(10));
684 let routing_key = "my_key";
685 trace!("Test exchange name: {}.", exchange);
686 let consumer = format!("test-consumer-{}", random_string(10));
687
688 config.consumer = consumer;
689 config.queue = queue;
690
691 let (_conn, channel) = config.connection.connect().await.unwrap();
692 let exchange_opts = lapin::options::ExchangeDeclareOptions {
693 auto_delete: true,
694 ..Default::default()
695 };
696
697 channel
698 .exchange_declare(
699 &exchange,
700 lapin::ExchangeKind::Fanout,
701 exchange_opts,
702 lapin::types::FieldTable::default(),
703 )
704 .await
705 .unwrap();
706
707 let queue_opts = QueueDeclareOptions {
708 auto_delete: true,
709 ..Default::default()
710 };
711 channel
712 .queue_declare(
713 &config.queue,
714 queue_opts,
715 lapin::types::FieldTable::default(),
716 )
717 .await
718 .unwrap();
719
720 channel
721 .queue_bind(
722 &config.queue,
723 &exchange,
724 "",
725 lapin::options::QueueBindOptions::default(),
726 lapin::types::FieldTable::default(),
727 )
728 .await
729 .unwrap();
730
731 trace!("Sending event...");
732 let now = Utc::now();
733 send_event(
734 &channel,
735 &exchange,
736 routing_key,
737 "my message",
738 now.timestamp_millis(),
739 )
740 .await;
741
742 trace!("Receiving event...");
743 let events =
744 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
745 assert!(!events.is_empty());
746
747 let log = events[0].as_log();
748 trace!("{:?}", log);
749 assert_eq!(*log.get_message().unwrap(), "my message".into());
750 assert_eq!(log["routing"], routing_key.into());
751 assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
752 let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
753 .as_timestamp()
754 .unwrap();
755 assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
756 assert_eq!(log["exchange"], exchange.into());
757 }
758
759 #[tokio::test]
760 async fn amqp_source_consume_event() {
761 let config = make_config();
762 await_connection(&config.connection).await;
763 source_consume_event(config).await;
764 }
765
766 #[tokio::test]
767 async fn amqp_tls_source_consume_event() {
768 let config = make_tls_config();
769 await_connection(&config.connection).await;
770 source_consume_event(config).await;
771 }
772}