vector/sinks/amqp/
config.rs

1//! Configuration functionality for the `AMQP` sink.
2use super::channel::AmqpSinkChannels;
3use crate::{amqp::AmqpConfig, sinks::prelude::*};
4use lapin::{types::ShortString, BasicProperties};
5use vector_lib::{
6    codecs::TextSerializerConfig,
7    internal_event::{error_stage, error_type},
8};
9
10use super::sink::AmqpSink;
11
12/// AMQP properties configuration.
13#[configurable_component]
14#[configurable(title = "Configure the AMQP message properties.")]
15#[derive(Clone, Debug, Default)]
16pub struct AmqpPropertiesConfig {
17    /// Content-Type for the AMQP messages.
18    pub(crate) content_type: Option<String>,
19
20    /// Content-Encoding for the AMQP messages.
21    pub(crate) content_encoding: Option<String>,
22
23    /// Expiration for AMQP messages (in milliseconds).
24    pub(crate) expiration_ms: Option<u64>,
25
26    /// Priority for AMQP messages. It can be templated to an integer between 0 and 255 inclusive.
27    pub(crate) priority: Option<UnsignedIntTemplate>,
28}
29
30impl AmqpPropertiesConfig {
31    pub(super) fn build(&self, event: &Event) -> Option<BasicProperties> {
32        let mut prop = BasicProperties::default();
33        if let Some(content_type) = &self.content_type {
34            prop = prop.with_content_type(ShortString::from(content_type.clone()));
35        }
36        if let Some(content_encoding) = &self.content_encoding {
37            prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
38        }
39        if let Some(expiration_ms) = &self.expiration_ms {
40            prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
41        }
42        if let Some(priority_template) = &self.priority {
43            let priority = priority_template.render(event).unwrap_or_else(|error| {
44                warn!(
45                    message = "Failed to render numeric template for \"properties.priority\".",
46                    error = %error,
47                    error_type = error_type::TEMPLATE_FAILED,
48                    stage = error_stage::PROCESSING,
49                    internal_log_rate_limit = true,
50                );
51                Default::default()
52            });
53
54            // Clamp the value to the range of 0-255, as AMQP priority is a u8.
55            let priority = priority.clamp(0, u8::MAX.into()) as u8;
56            prop = prop.with_priority(priority);
57        }
58        Some(prop)
59    }
60}
61
62/// Configuration for the `amqp` sink.
63///
64/// Supports AMQP version 0.9.1
65#[configurable_component(sink(
66    "amqp",
67    "Send events to AMQP 0.9.1 compatible brokers like RabbitMQ."
68))]
69#[derive(Clone, Debug)]
70pub struct AmqpSinkConfig {
71    /// The exchange to publish messages to.
72    pub(crate) exchange: Template,
73
74    /// Template used to generate a routing key which corresponds to a queue binding.
75    pub(crate) routing_key: Option<Template>,
76
77    /// AMQP message properties.
78    pub(crate) properties: Option<AmqpPropertiesConfig>,
79
80    #[serde(flatten)]
81    pub(crate) connection: AmqpConfig,
82
83    #[configurable(derived)]
84    pub(crate) encoding: EncodingConfig,
85
86    #[configurable(derived)]
87    #[serde(
88        default,
89        deserialize_with = "crate::serde::bool_or_struct",
90        skip_serializing_if = "crate::serde::is_default"
91    )]
92    pub(crate) acknowledgements: AcknowledgementsConfig,
93
94    /// Maximum number of AMQP channels to keep active (channels are created as needed).
95    #[serde(default = "default_max_channels")]
96    pub(crate) max_channels: u32,
97}
98
99const fn default_max_channels() -> u32 {
100    4
101}
102
103impl Default for AmqpSinkConfig {
104    fn default() -> Self {
105        Self {
106            exchange: Template::try_from("vector").unwrap(),
107            routing_key: None,
108            properties: None,
109            encoding: TextSerializerConfig::default().into(),
110            connection: AmqpConfig::default(),
111            acknowledgements: AcknowledgementsConfig::default(),
112            max_channels: default_max_channels(),
113        }
114    }
115}
116
117impl GenerateConfig for AmqpSinkConfig {
118    fn generate_config() -> toml::Value {
119        toml::from_str(
120            r#"connection_string = "amqp://localhost:5672/%2f"
121            routing_key = "user_id"
122            exchange = "test"
123            encoding.codec = "json"
124            max_channels = 4"#,
125        )
126        .unwrap()
127    }
128}
129
130#[async_trait::async_trait]
131#[typetag::serde(name = "amqp")]
132impl SinkConfig for AmqpSinkConfig {
133    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
134        let sink = AmqpSink::new(self.clone()).await?;
135        let hc = healthcheck(sink.channels.clone()).boxed();
136        Ok((VectorSink::from_event_streamsink(sink), hc))
137    }
138
139    fn input(&self) -> Input {
140        Input::new(DataType::Log)
141    }
142
143    fn acknowledgements(&self) -> &AcknowledgementsConfig {
144        &self.acknowledgements
145    }
146}
147
148pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> {
149    trace!("Healthcheck started.");
150
151    let channel = channels.get().await?;
152
153    if !channel.status().connected() {
154        return Err(Box::new(std::io::Error::new(
155            std::io::ErrorKind::BrokenPipe,
156            "Not Connected",
157        )));
158    }
159
160    trace!("Healthcheck completed.");
161    Ok(())
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use crate::config::format::{deserialize, Format};
168
169    #[test]
170    pub fn generate_config() {
171        crate::test_util::test_generate_config::<AmqpSinkConfig>();
172    }
173
174    fn assert_config_priority_eq(config: AmqpSinkConfig, event: &LogEvent, priority: u8) {
175        assert_eq!(
176            config
177                .properties
178                .unwrap()
179                .priority
180                .unwrap()
181                .render(event)
182                .unwrap(),
183            priority as u64
184        );
185    }
186
187    #[test]
188    pub fn parse_config_priority_static() {
189        for (format, config) in [
190            (
191                Format::Yaml,
192                r#"
193            exchange: "test"
194            routing_key: "user_id"
195            encoding:
196                codec: "json"
197            connection_string: "amqp://user:password@127.0.0.1:5672/"
198            properties:
199                priority: 1
200            "#,
201            ),
202            (
203                Format::Toml,
204                r#"
205            exchange = "test"
206            routing_key = "user_id"
207            encoding.codec = "json"
208            connection_string = "amqp://user:password@127.0.0.1:5672/"
209            properties = { priority = 1 }
210            "#,
211            ),
212            (
213                Format::Json,
214                r#"
215            {
216                "exchange": "test",
217                "routing_key": "user_id",
218                "encoding": {
219                    "codec": "json"
220                },
221                "connection_string": "amqp://user:password@127.0.0.1:5672/",
222                "properties": {
223                    "priority": 1
224                }
225            }
226            "#,
227            ),
228        ] {
229            let config: AmqpSinkConfig = deserialize(config, format).unwrap();
230            let event = LogEvent::from_str_legacy("message");
231            assert_config_priority_eq(config, &event, 1);
232        }
233    }
234
235    #[test]
236    pub fn parse_config_priority_templated() {
237        for (format, config) in [
238            (
239                Format::Yaml,
240                r#"
241            exchange: "test"
242            routing_key: "user_id"
243            encoding:
244                codec: "json"
245            connection_string: "amqp://user:password@127.0.0.1:5672/"
246            properties:
247                priority: "{{ .priority }}"
248            "#,
249            ),
250            (
251                Format::Toml,
252                r#"
253            exchange = "test"
254            routing_key = "user_id"
255            encoding.codec = "json"
256            connection_string = "amqp://user:password@127.0.0.1:5672/"
257            properties = { priority = "{{ .priority }}" }
258            "#,
259            ),
260            (
261                Format::Json,
262                r#"
263            {
264                "exchange": "test",
265                "routing_key": "user_id",
266                "encoding": {
267                    "codec": "json"
268                },
269                "connection_string": "amqp://user:password@127.0.0.1:5672/",
270                "properties": {
271                    "priority": "{{ .priority }}"
272                }
273            }
274            "#,
275            ),
276        ] {
277            let config: AmqpSinkConfig = deserialize(config, format).unwrap();
278            let event = {
279                let mut event = LogEvent::from_str_legacy("message");
280                event.insert("priority", 2);
281                event
282            };
283            assert_config_priority_eq(config, &event, 2);
284        }
285    }
286}