vector/sinks/amqp/
config.rs1use super::channel::AmqpSinkChannels;
3use crate::{amqp::AmqpConfig, sinks::prelude::*};
4use lapin::{types::ShortString, BasicProperties};
5use vector_lib::{
6 codecs::TextSerializerConfig,
7 internal_event::{error_stage, error_type},
8};
9
10use super::sink::AmqpSink;
11
12#[configurable_component]
14#[configurable(title = "Configure the AMQP message properties.")]
15#[derive(Clone, Debug, Default)]
16pub struct AmqpPropertiesConfig {
17 pub(crate) content_type: Option<String>,
19
20 pub(crate) content_encoding: Option<String>,
22
23 pub(crate) expiration_ms: Option<u64>,
25
26 pub(crate) priority: Option<UnsignedIntTemplate>,
28}
29
30impl AmqpPropertiesConfig {
31 pub(super) fn build(&self, event: &Event) -> Option<BasicProperties> {
32 let mut prop = BasicProperties::default();
33 if let Some(content_type) = &self.content_type {
34 prop = prop.with_content_type(ShortString::from(content_type.clone()));
35 }
36 if let Some(content_encoding) = &self.content_encoding {
37 prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
38 }
39 if let Some(expiration_ms) = &self.expiration_ms {
40 prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
41 }
42 if let Some(priority_template) = &self.priority {
43 let priority = priority_template.render(event).unwrap_or_else(|error| {
44 warn!(
45 message = "Failed to render numeric template for \"properties.priority\".",
46 error = %error,
47 error_type = error_type::TEMPLATE_FAILED,
48 stage = error_stage::PROCESSING,
49 internal_log_rate_limit = true,
50 );
51 Default::default()
52 });
53
54 let priority = priority.clamp(0, u8::MAX.into()) as u8;
56 prop = prop.with_priority(priority);
57 }
58 Some(prop)
59 }
60}
61
62#[configurable_component(sink(
66 "amqp",
67 "Send events to AMQP 0.9.1 compatible brokers like RabbitMQ."
68))]
69#[derive(Clone, Debug)]
70pub struct AmqpSinkConfig {
71 pub(crate) exchange: Template,
73
74 pub(crate) routing_key: Option<Template>,
76
77 pub(crate) properties: Option<AmqpPropertiesConfig>,
79
80 #[serde(flatten)]
81 pub(crate) connection: AmqpConfig,
82
83 #[configurable(derived)]
84 pub(crate) encoding: EncodingConfig,
85
86 #[configurable(derived)]
87 #[serde(
88 default,
89 deserialize_with = "crate::serde::bool_or_struct",
90 skip_serializing_if = "crate::serde::is_default"
91 )]
92 pub(crate) acknowledgements: AcknowledgementsConfig,
93
94 #[serde(default = "default_max_channels")]
96 pub(crate) max_channels: u32,
97}
98
99const fn default_max_channels() -> u32 {
100 4
101}
102
103impl Default for AmqpSinkConfig {
104 fn default() -> Self {
105 Self {
106 exchange: Template::try_from("vector").unwrap(),
107 routing_key: None,
108 properties: None,
109 encoding: TextSerializerConfig::default().into(),
110 connection: AmqpConfig::default(),
111 acknowledgements: AcknowledgementsConfig::default(),
112 max_channels: default_max_channels(),
113 }
114 }
115}
116
117impl GenerateConfig for AmqpSinkConfig {
118 fn generate_config() -> toml::Value {
119 toml::from_str(
120 r#"connection_string = "amqp://localhost:5672/%2f"
121 routing_key = "user_id"
122 exchange = "test"
123 encoding.codec = "json"
124 max_channels = 4"#,
125 )
126 .unwrap()
127 }
128}
129
130#[async_trait::async_trait]
131#[typetag::serde(name = "amqp")]
132impl SinkConfig for AmqpSinkConfig {
133 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
134 let sink = AmqpSink::new(self.clone()).await?;
135 let hc = healthcheck(sink.channels.clone()).boxed();
136 Ok((VectorSink::from_event_streamsink(sink), hc))
137 }
138
139 fn input(&self) -> Input {
140 Input::new(DataType::Log)
141 }
142
143 fn acknowledgements(&self) -> &AcknowledgementsConfig {
144 &self.acknowledgements
145 }
146}
147
148pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> {
149 trace!("Healthcheck started.");
150
151 let channel = channels.get().await?;
152
153 if !channel.status().connected() {
154 return Err(Box::new(std::io::Error::new(
155 std::io::ErrorKind::BrokenPipe,
156 "Not Connected",
157 )));
158 }
159
160 trace!("Healthcheck completed.");
161 Ok(())
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::config::format::{deserialize, Format};
168
169 #[test]
170 pub fn generate_config() {
171 crate::test_util::test_generate_config::<AmqpSinkConfig>();
172 }
173
174 fn assert_config_priority_eq(config: AmqpSinkConfig, event: &LogEvent, priority: u8) {
175 assert_eq!(
176 config
177 .properties
178 .unwrap()
179 .priority
180 .unwrap()
181 .render(event)
182 .unwrap(),
183 priority as u64
184 );
185 }
186
187 #[test]
188 pub fn parse_config_priority_static() {
189 for (format, config) in [
190 (
191 Format::Yaml,
192 r#"
193 exchange: "test"
194 routing_key: "user_id"
195 encoding:
196 codec: "json"
197 connection_string: "amqp://user:password@127.0.0.1:5672/"
198 properties:
199 priority: 1
200 "#,
201 ),
202 (
203 Format::Toml,
204 r#"
205 exchange = "test"
206 routing_key = "user_id"
207 encoding.codec = "json"
208 connection_string = "amqp://user:password@127.0.0.1:5672/"
209 properties = { priority = 1 }
210 "#,
211 ),
212 (
213 Format::Json,
214 r#"
215 {
216 "exchange": "test",
217 "routing_key": "user_id",
218 "encoding": {
219 "codec": "json"
220 },
221 "connection_string": "amqp://user:password@127.0.0.1:5672/",
222 "properties": {
223 "priority": 1
224 }
225 }
226 "#,
227 ),
228 ] {
229 let config: AmqpSinkConfig = deserialize(config, format).unwrap();
230 let event = LogEvent::from_str_legacy("message");
231 assert_config_priority_eq(config, &event, 1);
232 }
233 }
234
235 #[test]
236 pub fn parse_config_priority_templated() {
237 for (format, config) in [
238 (
239 Format::Yaml,
240 r#"
241 exchange: "test"
242 routing_key: "user_id"
243 encoding:
244 codec: "json"
245 connection_string: "amqp://user:password@127.0.0.1:5672/"
246 properties:
247 priority: "{{ .priority }}"
248 "#,
249 ),
250 (
251 Format::Toml,
252 r#"
253 exchange = "test"
254 routing_key = "user_id"
255 encoding.codec = "json"
256 connection_string = "amqp://user:password@127.0.0.1:5672/"
257 properties = { priority = "{{ .priority }}" }
258 "#,
259 ),
260 (
261 Format::Json,
262 r#"
263 {
264 "exchange": "test",
265 "routing_key": "user_id",
266 "encoding": {
267 "codec": "json"
268 },
269 "connection_string": "amqp://user:password@127.0.0.1:5672/",
270 "properties": {
271 "priority": "{{ .priority }}"
272 }
273 }
274 "#,
275 ),
276 ] {
277 let config: AmqpSinkConfig = deserialize(config, format).unwrap();
278 let event = {
279 let mut event = LogEvent::from_str_legacy("message");
280 event.insert("priority", 2);
281 event
282 };
283 assert_config_priority_eq(config, &event, 2);
284 }
285 }
286}