vector/
amqp.rs

1//! Functionality supporting both the `[crate::sources::amqp]` source and `[crate::sinks::amqp]` sink.
2use lapin::tcp::{OwnedIdentity, OwnedTLSConfig};
3use vector_lib::configurable::configurable_component;
4
5/// AMQP connection options.
6#[configurable_component]
7#[derive(Clone, Debug)]
8pub(crate) struct AmqpConfig {
9    /// URI for the AMQP server.
10    ///
11    /// The URI has the format of
12    /// `amqp://<user>:<password>@<host>:<port>/<vhost>?timeout=<seconds>`.
13    ///
14    /// The default vhost can be specified by using a value of `%2f`.
15    ///
16    /// To connect over TLS, a scheme of `amqps` can be specified instead. For example,
17    /// `amqps://...`. Additional TLS settings, such as client certificate verification, can be
18    /// configured under the `tls` section.
19    #[configurable(metadata(
20        docs::examples = "amqp://user:password@127.0.0.1:5672/%2f?timeout=10",
21    ))]
22    pub(crate) connection_string: String,
23
24    #[configurable(derived)]
25    pub(crate) tls: Option<crate::tls::TlsConfig>,
26}
27
28impl Default for AmqpConfig {
29    fn default() -> Self {
30        Self {
31            connection_string: "amqp://127.0.0.1/%2f".to_string(),
32            tls: None,
33        }
34    }
35}
36
37/// Polls the connection until a connection can be made.
38/// Gives up after 5 attempts.
39#[cfg(feature = "amqp-integration-tests")]
40#[cfg(test)]
41pub(crate) async fn await_connection(connection: &AmqpConfig) {
42    let mut pause = tokio::time::Duration::from_millis(1);
43    let mut attempts = 0;
44
45    loop {
46        let connection = connection.clone();
47        if connection.connect().await.is_ok() {
48            return;
49        }
50        attempts += 1;
51
52        if attempts == 5 {
53            return;
54        }
55
56        tokio::time::sleep(pause).await;
57        pause *= 2;
58    }
59}
60
61impl AmqpConfig {
62    pub(crate) async fn connect(
63        &self,
64    ) -> Result<(lapin::Connection, lapin::Channel), Box<dyn std::error::Error + Send + Sync>> {
65        let addr = self.connection_string.clone();
66        let conn = match &self.tls {
67            Some(tls) => {
68                let cert_chain = if let Some(ca) = &tls.ca_file {
69                    Some(tokio::fs::read_to_string(ca.to_owned()).await?)
70                } else {
71                    None
72                };
73                let identity = if let Some(identity) = &tls.key_file {
74                    let der = tokio::fs::read(identity.to_owned()).await?;
75                    Some(OwnedIdentity {
76                        der,
77                        password: tls
78                            .key_pass
79                            .as_ref()
80                            .map(|s| s.to_string())
81                            .unwrap_or_else(String::default),
82                    })
83                } else {
84                    None
85                };
86                let tls_config = OwnedTLSConfig {
87                    identity,
88                    cert_chain,
89                };
90                lapin::Connection::connect_with_config(
91                    &addr,
92                    lapin::ConnectionProperties::default(),
93                    tls_config,
94                )
95                .await
96            }
97            None => lapin::Connection::connect(&addr, lapin::ConnectionProperties::default()).await,
98        }?;
99        let channel = conn.create_channel().await?;
100        Ok((conn, channel))
101    }
102}