1use lapin::tcp::{OwnedIdentity, OwnedTLSConfig};
3use vector_lib::configurable::configurable_component;
4
5#[configurable_component]
7#[derive(Clone, Debug)]
8pub(crate) struct AmqpConfig {
9 #[configurable(metadata(
20 docs::examples = "amqp://user:password@127.0.0.1:5672/%2f?timeout=10",
21 ))]
22 pub(crate) connection_string: String,
23
24 #[configurable(derived)]
25 pub(crate) tls: Option<crate::tls::TlsConfig>,
26}
27
28impl Default for AmqpConfig {
29 fn default() -> Self {
30 Self {
31 connection_string: "amqp://127.0.0.1/%2f".to_string(),
32 tls: None,
33 }
34 }
35}
36
37#[cfg(feature = "amqp-integration-tests")]
40#[cfg(test)]
41pub(crate) async fn await_connection(connection: &AmqpConfig) {
42 let mut pause = tokio::time::Duration::from_millis(1);
43 let mut attempts = 0;
44
45 loop {
46 let connection = connection.clone();
47 if connection.connect().await.is_ok() {
48 return;
49 }
50 attempts += 1;
51
52 if attempts == 5 {
53 return;
54 }
55
56 tokio::time::sleep(pause).await;
57 pause *= 2;
58 }
59}
60
61impl AmqpConfig {
62 pub(crate) async fn connect(
63 &self,
64 ) -> Result<(lapin::Connection, lapin::Channel), Box<dyn std::error::Error + Send + Sync>> {
65 let addr = self.connection_string.clone();
66 let conn = match &self.tls {
67 Some(tls) => {
68 let cert_chain = if let Some(ca) = &tls.ca_file {
69 Some(tokio::fs::read_to_string(ca.to_owned()).await?)
70 } else {
71 None
72 };
73 let identity = if let Some(identity) = &tls.key_file {
74 let der = tokio::fs::read(identity.to_owned()).await?;
75 Some(OwnedIdentity {
76 der,
77 password: tls
78 .key_pass
79 .as_ref()
80 .map(|s| s.to_string())
81 .unwrap_or_else(String::default),
82 })
83 } else {
84 None
85 };
86 let tls_config = OwnedTLSConfig {
87 identity,
88 cert_chain,
89 };
90 lapin::Connection::connect_with_config(
91 &addr,
92 lapin::ConnectionProperties::default(),
93 tls_config,
94 )
95 .await
96 }
97 None => lapin::Connection::connect(&addr, lapin::ConnectionProperties::default()).await,
98 }?;
99 let channel = conn.create_channel().await?;
100 Ok((conn, channel))
101 }
102}