vector/sinks/amqp/
config.rs

1//! Configuration functionality for the `AMQP` sink.
2use lapin::{BasicProperties, types::ShortString};
3use vector_lib::{
4    codecs::TextSerializerConfig,
5    internal_event::{error_stage, error_type},
6};
7
8use super::{channel::AmqpSinkChannels, sink::AmqpSink};
9use crate::{amqp::AmqpConfig, sinks::prelude::*};
10
11/// AMQP properties configuration.
12#[configurable_component]
13#[configurable(title = "Configure the AMQP message properties.")]
14#[derive(Clone, Debug, Default)]
15pub struct AmqpPropertiesConfig {
16    /// Content-Type for the AMQP messages.
17    pub(crate) content_type: Option<String>,
18
19    /// Content-Encoding for the AMQP messages.
20    pub(crate) content_encoding: Option<String>,
21
22    /// Expiration for AMQP messages (in milliseconds).
23    pub(crate) expiration_ms: Option<u64>,
24
25    /// Priority for AMQP messages. It can be templated to an integer between 0 and 255 inclusive.
26    pub(crate) priority: Option<UnsignedIntTemplate>,
27}
28
29impl AmqpPropertiesConfig {
30    pub(super) fn build(&self, event: &Event) -> Option<BasicProperties> {
31        let mut prop = BasicProperties::default();
32        if let Some(content_type) = &self.content_type {
33            prop = prop.with_content_type(ShortString::from(content_type.clone()));
34        }
35        if let Some(content_encoding) = &self.content_encoding {
36            prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
37        }
38        if let Some(expiration_ms) = &self.expiration_ms {
39            prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
40        }
41        if let Some(priority_template) = &self.priority {
42            let priority = priority_template.render(event).unwrap_or_else(|error| {
43                warn!(
44                    message = "Failed to render numeric template for \"properties.priority\".",
45                    error = %error,
46                    error_type = error_type::TEMPLATE_FAILED,
47                    stage = error_stage::PROCESSING,
48                );
49                Default::default()
50            });
51
52            // Clamp the value to the range of 0-255, as AMQP priority is a u8.
53            let priority = priority.clamp(0, u8::MAX.into()) as u8;
54            prop = prop.with_priority(priority);
55        }
56        Some(prop)
57    }
58}
59
60/// Configuration for the `amqp` sink.
61///
62/// Supports AMQP version 0.9.1
63#[configurable_component(sink(
64    "amqp",
65    "Send events to AMQP 0.9.1 compatible brokers like RabbitMQ."
66))]
67#[derive(Clone, Debug)]
68pub struct AmqpSinkConfig {
69    /// The exchange to publish messages to.
70    pub(crate) exchange: Template,
71
72    /// Template used to generate a routing key which corresponds to a queue binding.
73    pub(crate) routing_key: Option<Template>,
74
75    /// AMQP message properties.
76    pub(crate) properties: Option<AmqpPropertiesConfig>,
77
78    #[serde(flatten)]
79    pub(crate) connection: AmqpConfig,
80
81    #[configurable(derived)]
82    pub(crate) encoding: EncodingConfig,
83
84    #[configurable(derived)]
85    #[serde(
86        default,
87        deserialize_with = "crate::serde::bool_or_struct",
88        skip_serializing_if = "crate::serde::is_default"
89    )]
90    pub(crate) acknowledgements: AcknowledgementsConfig,
91
92    /// Maximum number of AMQP channels to keep active (channels are created as needed).
93    #[serde(default = "default_max_channels")]
94    pub(crate) max_channels: u32,
95}
96
97const fn default_max_channels() -> u32 {
98    4
99}
100
101impl Default for AmqpSinkConfig {
102    fn default() -> Self {
103        Self {
104            exchange: Template::try_from("vector").unwrap(),
105            routing_key: None,
106            properties: None,
107            encoding: TextSerializerConfig::default().into(),
108            connection: AmqpConfig::default(),
109            acknowledgements: AcknowledgementsConfig::default(),
110            max_channels: default_max_channels(),
111        }
112    }
113}
114
115impl GenerateConfig for AmqpSinkConfig {
116    fn generate_config() -> toml::Value {
117        toml::from_str(
118            r#"connection_string = "amqp://localhost:5672/%2f"
119            routing_key = "user_id"
120            exchange = "test"
121            encoding.codec = "json"
122            max_channels = 4"#,
123        )
124        .unwrap()
125    }
126}
127
128#[async_trait::async_trait]
129#[typetag::serde(name = "amqp")]
130impl SinkConfig for AmqpSinkConfig {
131    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
132        let sink = AmqpSink::new(self.clone()).await?;
133        let hc = healthcheck(sink.channels.clone()).boxed();
134        Ok((VectorSink::from_event_streamsink(sink), hc))
135    }
136
137    fn input(&self) -> Input {
138        Input::new(DataType::Log)
139    }
140
141    fn acknowledgements(&self) -> &AcknowledgementsConfig {
142        &self.acknowledgements
143    }
144}
145
146pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> {
147    trace!("Healthcheck started.");
148
149    let channel = channels.get().await?;
150
151    if !channel.status().connected() {
152        return Err(Box::new(std::io::Error::new(
153            std::io::ErrorKind::BrokenPipe,
154            "Not Connected",
155        )));
156    }
157
158    trace!("Healthcheck completed.");
159    Ok(())
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165    use crate::config::format::{Format, deserialize};
166
167    #[test]
168    pub fn generate_config() {
169        crate::test_util::test_generate_config::<AmqpSinkConfig>();
170    }
171
172    fn assert_config_priority_eq(config: AmqpSinkConfig, event: &LogEvent, priority: u8) {
173        assert_eq!(
174            config
175                .properties
176                .unwrap()
177                .priority
178                .unwrap()
179                .render(event)
180                .unwrap(),
181            priority as u64
182        );
183    }
184
185    #[test]
186    pub fn parse_config_priority_static() {
187        for (format, config) in [
188            (
189                Format::Yaml,
190                r#"
191            exchange: "test"
192            routing_key: "user_id"
193            encoding:
194                codec: "json"
195            connection_string: "amqp://user:password@127.0.0.1:5672/"
196            properties:
197                priority: 1
198            "#,
199            ),
200            (
201                Format::Toml,
202                r#"
203            exchange = "test"
204            routing_key = "user_id"
205            encoding.codec = "json"
206            connection_string = "amqp://user:password@127.0.0.1:5672/"
207            properties = { priority = 1 }
208            "#,
209            ),
210            (
211                Format::Json,
212                r#"
213            {
214                "exchange": "test",
215                "routing_key": "user_id",
216                "encoding": {
217                    "codec": "json"
218                },
219                "connection_string": "amqp://user:password@127.0.0.1:5672/",
220                "properties": {
221                    "priority": 1
222                }
223            }
224            "#,
225            ),
226        ] {
227            let config: AmqpSinkConfig = deserialize(config, format).unwrap();
228            let event = LogEvent::from_str_legacy("message");
229            assert_config_priority_eq(config, &event, 1);
230        }
231    }
232
233    #[test]
234    pub fn parse_config_priority_templated() {
235        for (format, config) in [
236            (
237                Format::Yaml,
238                r#"
239            exchange: "test"
240            routing_key: "user_id"
241            encoding:
242                codec: "json"
243            connection_string: "amqp://user:password@127.0.0.1:5672/"
244            properties:
245                priority: "{{ .priority }}"
246            "#,
247            ),
248            (
249                Format::Toml,
250                r#"
251            exchange = "test"
252            routing_key = "user_id"
253            encoding.codec = "json"
254            connection_string = "amqp://user:password@127.0.0.1:5672/"
255            properties = { priority = "{{ .priority }}" }
256            "#,
257            ),
258            (
259                Format::Json,
260                r#"
261            {
262                "exchange": "test",
263                "routing_key": "user_id",
264                "encoding": {
265                    "codec": "json"
266                },
267                "connection_string": "amqp://user:password@127.0.0.1:5672/",
268                "properties": {
269                    "priority": "{{ .priority }}"
270                }
271            }
272            "#,
273            ),
274        ] {
275            let config: AmqpSinkConfig = deserialize(config, format).unwrap();
276            let event = {
277                let mut event = LogEvent::from_str_legacy("message");
278                event.insert("priority", 2);
279                event
280            };
281            assert_config_priority_eq(config, &event, 2);
282        }
283    }
284}