use crate::{
amqp::AmqpConfig,
codecs::{Decoder, DecodingConfig},
config::{SourceConfig, SourceContext, SourceOutput},
event::{BatchNotifier, BatchStatus},
internal_events::{
source::{AmqpAckError, AmqpBytesReceived, AmqpEventError, AmqpRejectError},
StreamClosedError,
},
serde::{bool_or_struct, default_decoding, default_framing_message_based},
shutdown::ShutdownSignal,
SourceSender,
};
use async_stream::stream;
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use futures::{FutureExt, StreamExt};
use futures_util::Stream;
use lapin::{acker::Acker, message::Delivery, Channel};
use snafu::Snafu;
use std::{io::Cursor, pin::Pin};
use tokio_util::codec::FramedRead;
use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, metadata_path, owned_value_path, path};
use vector_lib::{
config::{log_schema, LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
event::{Event, LogEvent},
EstimatedJsonEncodedSizeOf,
};
use vector_lib::{
finalizer::UnorderedFinalizer,
internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _},
};
use vrl::value::Kind;
#[derive(Debug, Snafu)]
enum BuildError {
#[snafu(display("Could not create AMQP consumer: {}", source))]
AmqpCreateError {
source: Box<dyn std::error::Error + Send + Sync>,
},
}
#[configurable_component(source(
"amqp",
"Collect events from AMQP 0.9.1 compatible brokers like RabbitMQ."
))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct AmqpSourceConfig {
#[serde(default = "default_queue")]
pub(crate) queue: String,
#[serde(default = "default_consumer")]
#[configurable(metadata(docs::examples = "consumer-group-name"))]
pub(crate) consumer: String,
#[serde(flatten)]
pub(crate) connection: AmqpConfig,
#[serde(default = "default_routing_key_field")]
#[derivative(Default(value = "default_routing_key_field()"))]
pub(crate) routing_key_field: OptionalValuePath,
#[serde(default = "default_exchange_key")]
#[derivative(Default(value = "default_exchange_key()"))]
pub(crate) exchange_key: OptionalValuePath,
#[serde(default = "default_offset_key")]
#[derivative(Default(value = "default_offset_key()"))]
pub(crate) offset_key: OptionalValuePath,
#[configurable(metadata(docs::hidden))]
#[serde(default)]
pub log_namespace: Option<bool>,
#[configurable(derived)]
#[serde(default = "default_framing_message_based")]
#[derivative(Default(value = "default_framing_message_based()"))]
pub(crate) framing: FramingConfig,
#[configurable(derived)]
#[serde(default = "default_decoding")]
#[derivative(Default(value = "default_decoding()"))]
pub(crate) decoding: DeserializerConfig,
#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
pub(crate) acknowledgements: SourceAcknowledgementsConfig,
}
fn default_queue() -> String {
"vector".into()
}
fn default_consumer() -> String {
"vector".into()
}
fn default_routing_key_field() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("routing"))
}
fn default_exchange_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("exchange"))
}
fn default_offset_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("offset"))
}
impl_generate_config_from_default!(AmqpSourceConfig);
impl AmqpSourceConfig {
fn decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace).build()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "amqp")]
impl SourceConfig for AmqpSourceConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let schema_definition = self
.decoding
.schema_definition(log_namespace)
.with_standard_vector_source_metadata()
.with_source_metadata(
AmqpSourceConfig::NAME,
None,
&owned_value_path!("timestamp"),
Kind::timestamp(),
Some("timestamp"),
)
.with_source_metadata(
AmqpSourceConfig::NAME,
self.routing_key_field
.path
.clone()
.map(LegacyKey::InsertIfEmpty),
&owned_value_path!("routing"),
Kind::bytes(),
None,
)
.with_source_metadata(
AmqpSourceConfig::NAME,
self.exchange_key.path.clone().map(LegacyKey::InsertIfEmpty),
&owned_value_path!("exchange"),
Kind::bytes(),
None,
)
.with_source_metadata(
AmqpSourceConfig::NAME,
self.offset_key.path.clone().map(LegacyKey::InsertIfEmpty),
&owned_value_path!("offset"),
Kind::integer(),
None,
);
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
}
fn can_acknowledge(&self) -> bool {
true
}
}
#[derive(Debug)]
struct FinalizerEntry {
acker: Acker,
}
impl From<Delivery> for FinalizerEntry {
fn from(delivery: Delivery) -> Self {
Self {
acker: delivery.acker,
}
}
}
pub(crate) async fn amqp_source(
config: &AmqpSourceConfig,
shutdown: ShutdownSignal,
out: SourceSender,
log_namespace: LogNamespace,
acknowledgements: bool,
) -> crate::Result<super::Source> {
let config = config.clone();
let (_conn, channel) = config
.connection
.connect()
.await
.map_err(|source| BuildError::AmqpCreateError { source })?;
Ok(Box::pin(run_amqp_source(
config,
shutdown,
out,
channel,
log_namespace,
acknowledgements,
)))
}
struct Keys<'a> {
routing_key_field: &'a OptionalValuePath,
routing: &'a str,
exchange_key: &'a OptionalValuePath,
exchange: &'a str,
offset_key: &'a OptionalValuePath,
delivery_tag: i64,
}
fn populate_log_event(
log: &mut LogEvent,
timestamp: Option<chrono::DateTime<Utc>>,
keys: &Keys<'_>,
log_namespace: LogNamespace,
) {
log_namespace.insert_source_metadata(
AmqpSourceConfig::NAME,
log,
keys.routing_key_field
.path
.as_ref()
.map(LegacyKey::InsertIfEmpty),
path!("routing"),
keys.routing.to_string(),
);
log_namespace.insert_source_metadata(
AmqpSourceConfig::NAME,
log,
keys.exchange_key
.path
.as_ref()
.map(LegacyKey::InsertIfEmpty),
path!("exchange"),
keys.exchange.to_string(),
);
log_namespace.insert_source_metadata(
AmqpSourceConfig::NAME,
log,
keys.offset_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
path!("offset"),
keys.delivery_tag,
);
log_namespace.insert_vector_metadata(
log,
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
);
match log_namespace {
LogNamespace::Vector => {
if let Some(timestamp) = timestamp {
log.insert(
metadata_path!(AmqpSourceConfig::NAME, "timestamp"),
timestamp,
);
};
log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
}
LogNamespace::Legacy => {
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
log.try_insert(timestamp_key, timestamp.unwrap_or_else(Utc::now));
}
}
};
}
async fn receive_event(
config: &AmqpSourceConfig,
out: &mut SourceSender,
log_namespace: LogNamespace,
finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
msg: Delivery,
) -> Result<(), ()> {
let payload = Cursor::new(Bytes::copy_from_slice(&msg.data));
let decoder = config.decoder(log_namespace).map_err(|_e| ())?;
let mut stream = FramedRead::new(payload, decoder);
let timestamp = msg
.properties
.timestamp()
.and_then(|millis| Utc.timestamp_millis_opt(millis as _).latest());
let routing = msg.routing_key.to_string();
let exchange = msg.exchange.to_string();
let keys = Keys {
routing_key_field: &config.routing_key_field,
exchange_key: &config.exchange_key,
offset_key: &config.offset_key,
routing: &routing,
exchange: &exchange,
delivery_tag: msg.delivery_tag as i64,
};
let events_received = register!(EventsReceived);
let stream = stream! {
while let Some(result) = stream.next().await {
match result {
Ok((events, byte_size)) => {
emit!(AmqpBytesReceived {
byte_size,
protocol: "amqp_0_9_1",
});
events_received.emit(CountByteSize(
events.len(),
events.estimated_json_encoded_size_of(),
));
for mut event in events {
if let Event::Log(ref mut log) = event {
populate_log_event(log,
timestamp,
&keys,
log_namespace);
}
yield event;
}
}
Err(error) => {
use vector_lib::codecs::StreamDecodingError as _;
if !error.can_continue() {
break;
}
}
}
}
}
.boxed();
finalize_event_stream(finalizer, out, stream, msg).await;
Ok(())
}
async fn finalize_event_stream(
finalizer: Option<&UnorderedFinalizer<FinalizerEntry>>,
out: &mut SourceSender,
mut stream: Pin<Box<dyn Stream<Item = Event> + Send + '_>>,
msg: Delivery,
) {
match finalizer {
Some(finalizer) => {
let (batch, receiver) = BatchNotifier::new_with_receiver();
let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
match out.send_event_stream(&mut stream).await {
Err(_) => {
emit!(StreamClosedError { count: 1 });
}
Ok(_) => {
finalizer.add(msg.into(), receiver);
}
}
}
None => match out.send_event_stream(&mut stream).await {
Err(_) => {
emit!(StreamClosedError { count: 1 });
}
Ok(_) => {
let ack_options = lapin::options::BasicAckOptions::default();
if let Err(error) = msg.acker.ack(ack_options).await {
emit!(AmqpAckError { error });
}
}
},
}
}
async fn run_amqp_source(
config: AmqpSourceConfig,
shutdown: ShutdownSignal,
mut out: SourceSender,
channel: Channel,
log_namespace: LogNamespace,
acknowledgements: bool,
) -> Result<(), ()> {
let (finalizer, mut ack_stream) =
UnorderedFinalizer::<FinalizerEntry>::maybe_new(acknowledgements, Some(shutdown.clone()));
debug!("Starting amqp source, listening to queue {}.", config.queue);
let mut consumer = channel
.basic_consume(
&config.queue,
&config.consumer,
lapin::options::BasicConsumeOptions::default(),
lapin::types::FieldTable::default(),
)
.await
.map_err(|error| {
error!(message = "Failed to consume.", error = ?error, internal_log_rate_limit = true);
})?
.fuse();
let mut shutdown = shutdown.fuse();
loop {
tokio::select! {
_ = &mut shutdown => break,
entry = ack_stream.next() => {
if let Some((status, entry)) = entry {
handle_ack(status, entry).await;
}
},
opt_m = consumer.next() => {
if let Some(try_m) = opt_m {
match try_m {
Err(error) => {
emit!(AmqpEventError { error });
return Err(());
}
Ok(msg) => {
receive_event(&config, &mut out, log_namespace, finalizer.as_ref(), msg).await?
}
}
} else {
break
}
}
};
}
Ok(())
}
async fn handle_ack(status: BatchStatus, entry: FinalizerEntry) {
match status {
BatchStatus::Delivered => {
let ack_options = lapin::options::BasicAckOptions::default();
if let Err(error) = entry.acker.ack(ack_options).await {
emit!(AmqpAckError { error });
}
}
BatchStatus::Errored => {
let ack_options = lapin::options::BasicRejectOptions::default();
if let Err(error) = entry.acker.reject(ack_options).await {
emit!(AmqpRejectError { error });
}
}
BatchStatus::Rejected => {
let ack_options = lapin::options::BasicRejectOptions::default();
if let Err(error) = entry.acker.reject(ack_options).await {
emit!(AmqpRejectError { error });
}
}
}
}
#[cfg(test)]
pub mod test {
use vector_lib::lookup::OwnedTargetPath;
use vector_lib::schema::Definition;
use vector_lib::tls::TlsConfig;
use vrl::value::kind::Collection;
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<AmqpSourceConfig>();
}
pub fn make_config() -> AmqpSourceConfig {
let mut config = AmqpSourceConfig {
queue: "it".to_string(),
..Default::default()
};
let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
config.connection.connection_string =
format!("amqp://{}:{}@{}:5672/{}", user, pass, host, vhost);
config
}
pub fn make_tls_config() -> AmqpSourceConfig {
let mut config = AmqpSourceConfig {
queue: "it".to_string(),
..Default::default()
};
let user = std::env::var("AMQP_USER").unwrap_or_else(|_| "guest".to_string());
let pass = std::env::var("AMQP_PASSWORD").unwrap_or_else(|_| "guest".to_string());
let vhost = std::env::var("AMQP_VHOST").unwrap_or_else(|_| "%2f".to_string());
let host = std::env::var("AMQP_HOST").unwrap_or_else(|_| "rabbitmq".to_string());
let ca_file =
std::env::var("AMQP_CA_FILE").unwrap_or_else(|_| "/certs/ca.cert.pem".to_string());
config.connection.connection_string =
format!("amqps://{}:{}@{}/{}", user, pass, host, vhost);
let tls = TlsConfig {
ca_file: Some(ca_file.as_str().into()),
..Default::default()
};
config.connection.tls = Some(tls);
config
}
#[test]
fn output_schema_definition_vector_namespace() {
let config = AmqpSourceConfig {
log_namespace: Some(true),
..Default::default()
};
let definition = config
.outputs(LogNamespace::Vector)
.remove(0)
.schema_definition(true);
let expected_definition =
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), "message")
.with_metadata_field(
&owned_value_path!("vector", "source_type"),
Kind::bytes(),
None,
)
.with_metadata_field(
&owned_value_path!("vector", "ingest_timestamp"),
Kind::timestamp(),
None,
)
.with_metadata_field(
&owned_value_path!("amqp", "timestamp"),
Kind::timestamp(),
Some("timestamp"),
)
.with_metadata_field(&owned_value_path!("amqp", "routing"), Kind::bytes(), None)
.with_metadata_field(&owned_value_path!("amqp", "exchange"), Kind::bytes(), None)
.with_metadata_field(&owned_value_path!("amqp", "offset"), Kind::integer(), None);
assert_eq!(definition, Some(expected_definition));
}
#[test]
fn output_schema_definition_legacy_namespace() {
let config = AmqpSourceConfig::default();
let definition = config
.outputs(LogNamespace::Legacy)
.remove(0)
.schema_definition(true);
let expected_definition = Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[LogNamespace::Legacy],
)
.with_event_field(
&owned_value_path!("message"),
Kind::bytes(),
Some("message"),
)
.with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
.with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
.with_event_field(&owned_value_path!("routing"), Kind::bytes(), None)
.with_event_field(&owned_value_path!("exchange"), Kind::bytes(), None)
.with_event_field(&owned_value_path!("offset"), Kind::integer(), None);
assert_eq!(definition, Some(expected_definition));
}
}
#[cfg(feature = "amqp-integration-tests")]
#[cfg(test)]
mod integration_test {
use super::test::*;
use super::*;
use crate::{
amqp::await_connection,
shutdown::ShutdownSignal,
test_util::{
components::{run_and_assert_source_compliance, SOURCE_TAGS},
random_string,
},
SourceSender,
};
use chrono::Utc;
use lapin::options::*;
use lapin::BasicProperties;
use tokio::time::Duration;
use vector_lib::config::log_schema;
#[tokio::test]
async fn amqp_source_create_ok() {
let config = make_config();
await_connection(&config.connection).await;
assert!(amqp_source(
&config,
ShutdownSignal::noop(),
SourceSender::new_test().0,
LogNamespace::Legacy,
false,
)
.await
.is_ok());
}
#[tokio::test]
async fn amqp_tls_source_create_ok() {
let config = make_tls_config();
await_connection(&config.connection).await;
assert!(amqp_source(
&config,
ShutdownSignal::noop(),
SourceSender::new_test().0,
LogNamespace::Legacy,
false,
)
.await
.is_ok());
}
async fn send_event(
channel: &lapin::Channel,
exchange: &str,
routing_key: &str,
text: &str,
_timestamp: i64,
) {
let payload = text.as_bytes();
let payload_len = payload.len();
trace!("Sending message of length {} to {}.", payload_len, exchange,);
channel
.basic_publish(
exchange,
routing_key,
BasicPublishOptions::default(),
payload.as_ref(),
BasicProperties::default(),
)
.await
.unwrap()
.await
.unwrap();
}
async fn source_consume_event(mut config: AmqpSourceConfig) {
let exchange = format!("test-{}-exchange", random_string(10));
let queue = format!("test-{}-queue", random_string(10));
let routing_key = "my_key";
trace!("Test exchange name: {}.", exchange);
let consumer = format!("test-consumer-{}", random_string(10));
config.consumer = consumer;
config.queue = queue;
let (_conn, channel) = config.connection.connect().await.unwrap();
let exchange_opts = lapin::options::ExchangeDeclareOptions {
auto_delete: true,
..Default::default()
};
channel
.exchange_declare(
&exchange,
lapin::ExchangeKind::Fanout,
exchange_opts,
lapin::types::FieldTable::default(),
)
.await
.unwrap();
let queue_opts = QueueDeclareOptions {
auto_delete: true,
..Default::default()
};
channel
.queue_declare(
&config.queue,
queue_opts,
lapin::types::FieldTable::default(),
)
.await
.unwrap();
channel
.queue_bind(
&config.queue,
&exchange,
"",
lapin::options::QueueBindOptions::default(),
lapin::types::FieldTable::default(),
)
.await
.unwrap();
trace!("Sending event...");
let now = Utc::now();
send_event(
&channel,
&exchange,
routing_key,
"my message",
now.timestamp_millis(),
)
.await;
trace!("Receiving event...");
let events =
run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
assert!(!events.is_empty());
let log = events[0].as_log();
trace!("{:?}", log);
assert_eq!(*log.get_message().unwrap(), "my message".into());
assert_eq!(log["routing"], routing_key.into());
assert_eq!(*log.get_source_type().unwrap(), "amqp".into());
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
.as_timestamp()
.unwrap();
assert!(log_ts.signed_duration_since(now) < chrono::Duration::seconds(1));
assert_eq!(log["exchange"], exchange.into());
}
#[tokio::test]
async fn amqp_source_consume_event() {
let config = make_config();
await_connection(&config.connection).await;
source_consume_event(config).await;
}
#[tokio::test]
async fn amqp_tls_source_consume_event() {
let config = make_tls_config();
await_connection(&config.connection).await;
source_consume_event(config).await;
}
}