vector/sinks/amqp/
config.rs

1//! Configuration functionality for the `AMQP` sink.
2use lapin::{BasicProperties, types::ShortString};
3use vector_lib::{
4    codecs::TextSerializerConfig,
5    internal_event::{error_stage, error_type},
6};
7
8use super::{channel::AmqpSinkChannels, sink::AmqpSink};
9use crate::{amqp::AmqpConfig, sinks::prelude::*};
10
11/// AMQP properties configuration.
12#[configurable_component]
13#[configurable(title = "Configure the AMQP message properties.")]
14#[derive(Clone, Debug, Default)]
15pub struct AmqpPropertiesConfig {
16    /// Content-Type for the AMQP messages.
17    pub(crate) content_type: Option<String>,
18
19    /// Content-Encoding for the AMQP messages.
20    pub(crate) content_encoding: Option<String>,
21
22    /// Expiration for AMQP messages (in milliseconds).
23    pub(crate) expiration_ms: Option<u64>,
24
25    /// Priority for AMQP messages. It can be templated to an integer between 0 and 255 inclusive.
26    pub(crate) priority: Option<UnsignedIntTemplate>,
27}
28
29impl AmqpPropertiesConfig {
30    pub(super) fn build(&self, event: &Event) -> Option<BasicProperties> {
31        let mut prop = BasicProperties::default();
32        if let Some(content_type) = &self.content_type {
33            prop = prop.with_content_type(ShortString::from(content_type.clone()));
34        }
35        if let Some(content_encoding) = &self.content_encoding {
36            prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
37        }
38        if let Some(expiration_ms) = &self.expiration_ms {
39            prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
40        }
41        if let Some(priority_template) = &self.priority {
42            let priority = priority_template.render(event).unwrap_or_else(|error| {
43                warn!(
44                    message = "Failed to render numeric template for \"properties.priority\".",
45                    error = %error,
46                    error_type = error_type::TEMPLATE_FAILED,
47                    stage = error_stage::PROCESSING,
48                    internal_log_rate_limit = true,
49                );
50                Default::default()
51            });
52
53            // Clamp the value to the range of 0-255, as AMQP priority is a u8.
54            let priority = priority.clamp(0, u8::MAX.into()) as u8;
55            prop = prop.with_priority(priority);
56        }
57        Some(prop)
58    }
59}
60
61/// Configuration for the `amqp` sink.
62///
63/// Supports AMQP version 0.9.1
64#[configurable_component(sink(
65    "amqp",
66    "Send events to AMQP 0.9.1 compatible brokers like RabbitMQ."
67))]
68#[derive(Clone, Debug)]
69pub struct AmqpSinkConfig {
70    /// The exchange to publish messages to.
71    pub(crate) exchange: Template,
72
73    /// Template used to generate a routing key which corresponds to a queue binding.
74    pub(crate) routing_key: Option<Template>,
75
76    /// AMQP message properties.
77    pub(crate) properties: Option<AmqpPropertiesConfig>,
78
79    #[serde(flatten)]
80    pub(crate) connection: AmqpConfig,
81
82    #[configurable(derived)]
83    pub(crate) encoding: EncodingConfig,
84
85    #[configurable(derived)]
86    #[serde(
87        default,
88        deserialize_with = "crate::serde::bool_or_struct",
89        skip_serializing_if = "crate::serde::is_default"
90    )]
91    pub(crate) acknowledgements: AcknowledgementsConfig,
92
93    /// Maximum number of AMQP channels to keep active (channels are created as needed).
94    #[serde(default = "default_max_channels")]
95    pub(crate) max_channels: u32,
96}
97
98const fn default_max_channels() -> u32 {
99    4
100}
101
102impl Default for AmqpSinkConfig {
103    fn default() -> Self {
104        Self {
105            exchange: Template::try_from("vector").unwrap(),
106            routing_key: None,
107            properties: None,
108            encoding: TextSerializerConfig::default().into(),
109            connection: AmqpConfig::default(),
110            acknowledgements: AcknowledgementsConfig::default(),
111            max_channels: default_max_channels(),
112        }
113    }
114}
115
116impl GenerateConfig for AmqpSinkConfig {
117    fn generate_config() -> toml::Value {
118        toml::from_str(
119            r#"connection_string = "amqp://localhost:5672/%2f"
120            routing_key = "user_id"
121            exchange = "test"
122            encoding.codec = "json"
123            max_channels = 4"#,
124        )
125        .unwrap()
126    }
127}
128
129#[async_trait::async_trait]
130#[typetag::serde(name = "amqp")]
131impl SinkConfig for AmqpSinkConfig {
132    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
133        let sink = AmqpSink::new(self.clone()).await?;
134        let hc = healthcheck(sink.channels.clone()).boxed();
135        Ok((VectorSink::from_event_streamsink(sink), hc))
136    }
137
138    fn input(&self) -> Input {
139        Input::new(DataType::Log)
140    }
141
142    fn acknowledgements(&self) -> &AcknowledgementsConfig {
143        &self.acknowledgements
144    }
145}
146
147pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> {
148    trace!("Healthcheck started.");
149
150    let channel = channels.get().await?;
151
152    if !channel.status().connected() {
153        return Err(Box::new(std::io::Error::new(
154            std::io::ErrorKind::BrokenPipe,
155            "Not Connected",
156        )));
157    }
158
159    trace!("Healthcheck completed.");
160    Ok(())
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use crate::config::format::{Format, deserialize};
167
168    #[test]
169    pub fn generate_config() {
170        crate::test_util::test_generate_config::<AmqpSinkConfig>();
171    }
172
173    fn assert_config_priority_eq(config: AmqpSinkConfig, event: &LogEvent, priority: u8) {
174        assert_eq!(
175            config
176                .properties
177                .unwrap()
178                .priority
179                .unwrap()
180                .render(event)
181                .unwrap(),
182            priority as u64
183        );
184    }
185
186    #[test]
187    pub fn parse_config_priority_static() {
188        for (format, config) in [
189            (
190                Format::Yaml,
191                r#"
192            exchange: "test"
193            routing_key: "user_id"
194            encoding:
195                codec: "json"
196            connection_string: "amqp://user:password@127.0.0.1:5672/"
197            properties:
198                priority: 1
199            "#,
200            ),
201            (
202                Format::Toml,
203                r#"
204            exchange = "test"
205            routing_key = "user_id"
206            encoding.codec = "json"
207            connection_string = "amqp://user:password@127.0.0.1:5672/"
208            properties = { priority = 1 }
209            "#,
210            ),
211            (
212                Format::Json,
213                r#"
214            {
215                "exchange": "test",
216                "routing_key": "user_id",
217                "encoding": {
218                    "codec": "json"
219                },
220                "connection_string": "amqp://user:password@127.0.0.1:5672/",
221                "properties": {
222                    "priority": 1
223                }
224            }
225            "#,
226            ),
227        ] {
228            let config: AmqpSinkConfig = deserialize(config, format).unwrap();
229            let event = LogEvent::from_str_legacy("message");
230            assert_config_priority_eq(config, &event, 1);
231        }
232    }
233
234    #[test]
235    pub fn parse_config_priority_templated() {
236        for (format, config) in [
237            (
238                Format::Yaml,
239                r#"
240            exchange: "test"
241            routing_key: "user_id"
242            encoding:
243                codec: "json"
244            connection_string: "amqp://user:password@127.0.0.1:5672/"
245            properties:
246                priority: "{{ .priority }}"
247            "#,
248            ),
249            (
250                Format::Toml,
251                r#"
252            exchange = "test"
253            routing_key = "user_id"
254            encoding.codec = "json"
255            connection_string = "amqp://user:password@127.0.0.1:5672/"
256            properties = { priority = "{{ .priority }}" }
257            "#,
258            ),
259            (
260                Format::Json,
261                r#"
262            {
263                "exchange": "test",
264                "routing_key": "user_id",
265                "encoding": {
266                    "codec": "json"
267                },
268                "connection_string": "amqp://user:password@127.0.0.1:5672/",
269                "properties": {
270                    "priority": "{{ .priority }}"
271                }
272            }
273            "#,
274            ),
275        ] {
276            let config: AmqpSinkConfig = deserialize(config, format).unwrap();
277            let event = {
278                let mut event = LogEvent::from_str_legacy("message");
279                event.insert("priority", 2);
280                event
281            };
282            assert_config_priority_eq(config, &event, 2);
283        }
284    }
285}