use lapin::tcp::{OwnedIdentity, OwnedTLSConfig};
use vector_lib::configurable::configurable_component;
#[configurable_component]
#[derive(Clone, Debug)]
pub(crate) struct AmqpConfig {
#[configurable(metadata(
docs::examples = "amqp://user:password@127.0.0.1:5672/%2f?timeout=10",
))]
pub(crate) connection_string: String,
#[configurable(derived)]
pub(crate) tls: Option<crate::tls::TlsConfig>,
}
impl Default for AmqpConfig {
fn default() -> Self {
Self {
connection_string: "amqp://127.0.0.1/%2f".to_string(),
tls: None,
}
}
}
#[cfg(feature = "amqp-integration-tests")]
#[cfg(test)]
pub(crate) async fn await_connection(connection: &AmqpConfig) {
let mut pause = tokio::time::Duration::from_millis(1);
let mut attempts = 0;
loop {
let connection = connection.clone();
if connection.connect().await.is_ok() {
return;
}
attempts += 1;
if attempts == 5 {
return;
}
tokio::time::sleep(pause).await;
pause *= 2;
}
}
impl AmqpConfig {
pub(crate) async fn connect(
&self,
) -> Result<(lapin::Connection, lapin::Channel), Box<dyn std::error::Error + Send + Sync>> {
let addr = self.connection_string.clone();
let conn = match &self.tls {
Some(tls) => {
let cert_chain = if let Some(ca) = &tls.ca_file {
Some(tokio::fs::read_to_string(ca.to_owned()).await?)
} else {
None
};
let identity = if let Some(identity) = &tls.key_file {
let der = tokio::fs::read(identity.to_owned()).await?;
Some(OwnedIdentity {
der,
password: tls
.key_pass
.as_ref()
.map(|s| s.to_string())
.unwrap_or_else(String::default),
})
} else {
None
};
let tls_config = OwnedTLSConfig {
identity,
cert_chain,
};
lapin::Connection::connect_with_config(
&addr,
lapin::ConnectionProperties::default(),
tls_config,
)
.await
}
None => lapin::Connection::connect(&addr, lapin::ConnectionProperties::default()).await,
}?;
let channel = conn.create_channel().await?;
Ok((conn, channel))
}
}