vector/sinks/amqp/
config.rs1use lapin::{BasicProperties, types::ShortString};
3use vector_lib::{
4 codecs::TextSerializerConfig,
5 internal_event::{error_stage, error_type},
6};
7
8use super::{channel::AmqpSinkChannels, sink::AmqpSink};
9use crate::{amqp::AmqpConfig, sinks::prelude::*};
10
11#[configurable_component]
13#[configurable(title = "Configure the AMQP message properties.")]
14#[derive(Clone, Debug, Default)]
15pub struct AmqpPropertiesConfig {
16 pub(crate) content_type: Option<String>,
18
19 pub(crate) content_encoding: Option<String>,
21
22 pub(crate) expiration_ms: Option<u64>,
24
25 pub(crate) priority: Option<UnsignedIntTemplate>,
27}
28
29impl AmqpPropertiesConfig {
30 pub(super) fn build(&self, event: &Event) -> Option<BasicProperties> {
31 let mut prop = BasicProperties::default();
32 if let Some(content_type) = &self.content_type {
33 prop = prop.with_content_type(ShortString::from(content_type.clone()));
34 }
35 if let Some(content_encoding) = &self.content_encoding {
36 prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
37 }
38 if let Some(expiration_ms) = &self.expiration_ms {
39 prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
40 }
41 if let Some(priority_template) = &self.priority {
42 let priority = priority_template.render(event).unwrap_or_else(|error| {
43 warn!(
44 message = "Failed to render numeric template for \"properties.priority\".",
45 error = %error,
46 error_type = error_type::TEMPLATE_FAILED,
47 stage = error_stage::PROCESSING,
48 );
49 Default::default()
50 });
51
52 let priority = priority.clamp(0, u8::MAX.into()) as u8;
54 prop = prop.with_priority(priority);
55 }
56 Some(prop)
57 }
58}
59
60#[configurable_component(sink(
64 "amqp",
65 "Send events to AMQP 0.9.1 compatible brokers like RabbitMQ."
66))]
67#[derive(Clone, Debug)]
68pub struct AmqpSinkConfig {
69 pub(crate) exchange: Template,
71
72 pub(crate) routing_key: Option<Template>,
74
75 pub(crate) properties: Option<AmqpPropertiesConfig>,
77
78 #[serde(flatten)]
79 pub(crate) connection: AmqpConfig,
80
81 #[configurable(derived)]
82 pub(crate) encoding: EncodingConfig,
83
84 #[configurable(derived)]
85 #[serde(
86 default,
87 deserialize_with = "crate::serde::bool_or_struct",
88 skip_serializing_if = "crate::serde::is_default"
89 )]
90 pub(crate) acknowledgements: AcknowledgementsConfig,
91
92 #[serde(default = "default_max_channels")]
94 pub(crate) max_channels: u32,
95}
96
97const fn default_max_channels() -> u32 {
98 4
99}
100
101impl Default for AmqpSinkConfig {
102 fn default() -> Self {
103 Self {
104 exchange: Template::try_from("vector").unwrap(),
105 routing_key: None,
106 properties: None,
107 encoding: TextSerializerConfig::default().into(),
108 connection: AmqpConfig::default(),
109 acknowledgements: AcknowledgementsConfig::default(),
110 max_channels: default_max_channels(),
111 }
112 }
113}
114
115impl GenerateConfig for AmqpSinkConfig {
116 fn generate_config() -> toml::Value {
117 toml::from_str(
118 r#"connection_string = "amqp://localhost:5672/%2f"
119 routing_key = "user_id"
120 exchange = "test"
121 encoding.codec = "json"
122 max_channels = 4"#,
123 )
124 .unwrap()
125 }
126}
127
128#[async_trait::async_trait]
129#[typetag::serde(name = "amqp")]
130impl SinkConfig for AmqpSinkConfig {
131 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
132 let sink = AmqpSink::new(self.clone()).await?;
133 let hc = healthcheck(sink.channels.clone()).boxed();
134 Ok((VectorSink::from_event_streamsink(sink), hc))
135 }
136
137 fn input(&self) -> Input {
138 Input::new(DataType::Log)
139 }
140
141 fn acknowledgements(&self) -> &AcknowledgementsConfig {
142 &self.acknowledgements
143 }
144}
145
146pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> {
147 trace!("Healthcheck started.");
148
149 let channel = channels.get().await?;
150
151 if !channel.status().connected() {
152 return Err(Box::new(std::io::Error::new(
153 std::io::ErrorKind::BrokenPipe,
154 "Not Connected",
155 )));
156 }
157
158 trace!("Healthcheck completed.");
159 Ok(())
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165 use crate::config::format::{Format, deserialize};
166
167 #[test]
168 pub fn generate_config() {
169 crate::test_util::test_generate_config::<AmqpSinkConfig>();
170 }
171
172 fn assert_config_priority_eq(config: AmqpSinkConfig, event: &LogEvent, priority: u8) {
173 assert_eq!(
174 config
175 .properties
176 .unwrap()
177 .priority
178 .unwrap()
179 .render(event)
180 .unwrap(),
181 priority as u64
182 );
183 }
184
185 #[test]
186 pub fn parse_config_priority_static() {
187 for (format, config) in [
188 (
189 Format::Yaml,
190 r#"
191 exchange: "test"
192 routing_key: "user_id"
193 encoding:
194 codec: "json"
195 connection_string: "amqp://user:password@127.0.0.1:5672/"
196 properties:
197 priority: 1
198 "#,
199 ),
200 (
201 Format::Toml,
202 r#"
203 exchange = "test"
204 routing_key = "user_id"
205 encoding.codec = "json"
206 connection_string = "amqp://user:password@127.0.0.1:5672/"
207 properties = { priority = 1 }
208 "#,
209 ),
210 (
211 Format::Json,
212 r#"
213 {
214 "exchange": "test",
215 "routing_key": "user_id",
216 "encoding": {
217 "codec": "json"
218 },
219 "connection_string": "amqp://user:password@127.0.0.1:5672/",
220 "properties": {
221 "priority": 1
222 }
223 }
224 "#,
225 ),
226 ] {
227 let config: AmqpSinkConfig = deserialize(config, format).unwrap();
228 let event = LogEvent::from_str_legacy("message");
229 assert_config_priority_eq(config, &event, 1);
230 }
231 }
232
233 #[test]
234 pub fn parse_config_priority_templated() {
235 for (format, config) in [
236 (
237 Format::Yaml,
238 r#"
239 exchange: "test"
240 routing_key: "user_id"
241 encoding:
242 codec: "json"
243 connection_string: "amqp://user:password@127.0.0.1:5672/"
244 properties:
245 priority: "{{ .priority }}"
246 "#,
247 ),
248 (
249 Format::Toml,
250 r#"
251 exchange = "test"
252 routing_key = "user_id"
253 encoding.codec = "json"
254 connection_string = "amqp://user:password@127.0.0.1:5672/"
255 properties = { priority = "{{ .priority }}" }
256 "#,
257 ),
258 (
259 Format::Json,
260 r#"
261 {
262 "exchange": "test",
263 "routing_key": "user_id",
264 "encoding": {
265 "codec": "json"
266 },
267 "connection_string": "amqp://user:password@127.0.0.1:5672/",
268 "properties": {
269 "priority": "{{ .priority }}"
270 }
271 }
272 "#,
273 ),
274 ] {
275 let config: AmqpSinkConfig = deserialize(config, format).unwrap();
276 let event = {
277 let mut event = LogEvent::from_str_legacy("message");
278 event.insert("priority", 2);
279 event
280 };
281 assert_config_priority_eq(config, &event, 2);
282 }
283 }
284}