vector/sinks/amqp/
config.rs1use lapin::{BasicProperties, types::ShortString};
3use vector_lib::{
4 codecs::TextSerializerConfig,
5 internal_event::{error_stage, error_type},
6};
7
8use super::{channel::AmqpSinkChannels, sink::AmqpSink};
9use crate::{amqp::AmqpConfig, sinks::prelude::*};
10
11#[configurable_component]
13#[configurable(title = "Configure the AMQP message properties.")]
14#[derive(Clone, Debug, Default)]
15pub struct AmqpPropertiesConfig {
16 pub(crate) content_type: Option<String>,
18
19 pub(crate) content_encoding: Option<String>,
21
22 pub(crate) expiration_ms: Option<u64>,
24
25 pub(crate) priority: Option<UnsignedIntTemplate>,
27}
28
29impl AmqpPropertiesConfig {
30 pub(super) fn build(&self, event: &Event) -> Option<BasicProperties> {
31 let mut prop = BasicProperties::default();
32 if let Some(content_type) = &self.content_type {
33 prop = prop.with_content_type(ShortString::from(content_type.clone()));
34 }
35 if let Some(content_encoding) = &self.content_encoding {
36 prop = prop.with_content_encoding(ShortString::from(content_encoding.clone()));
37 }
38 if let Some(expiration_ms) = &self.expiration_ms {
39 prop = prop.with_expiration(ShortString::from(expiration_ms.to_string()));
40 }
41 if let Some(priority_template) = &self.priority {
42 let priority = priority_template.render(event).unwrap_or_else(|error| {
43 warn!(
44 message = "Failed to render numeric template for \"properties.priority\".",
45 error = %error,
46 error_type = error_type::TEMPLATE_FAILED,
47 stage = error_stage::PROCESSING,
48 internal_log_rate_limit = true,
49 );
50 Default::default()
51 });
52
53 let priority = priority.clamp(0, u8::MAX.into()) as u8;
55 prop = prop.with_priority(priority);
56 }
57 Some(prop)
58 }
59}
60
61#[configurable_component(sink(
65 "amqp",
66 "Send events to AMQP 0.9.1 compatible brokers like RabbitMQ."
67))]
68#[derive(Clone, Debug)]
69pub struct AmqpSinkConfig {
70 pub(crate) exchange: Template,
72
73 pub(crate) routing_key: Option<Template>,
75
76 pub(crate) properties: Option<AmqpPropertiesConfig>,
78
79 #[serde(flatten)]
80 pub(crate) connection: AmqpConfig,
81
82 #[configurable(derived)]
83 pub(crate) encoding: EncodingConfig,
84
85 #[configurable(derived)]
86 #[serde(
87 default,
88 deserialize_with = "crate::serde::bool_or_struct",
89 skip_serializing_if = "crate::serde::is_default"
90 )]
91 pub(crate) acknowledgements: AcknowledgementsConfig,
92
93 #[serde(default = "default_max_channels")]
95 pub(crate) max_channels: u32,
96}
97
98const fn default_max_channels() -> u32 {
99 4
100}
101
102impl Default for AmqpSinkConfig {
103 fn default() -> Self {
104 Self {
105 exchange: Template::try_from("vector").unwrap(),
106 routing_key: None,
107 properties: None,
108 encoding: TextSerializerConfig::default().into(),
109 connection: AmqpConfig::default(),
110 acknowledgements: AcknowledgementsConfig::default(),
111 max_channels: default_max_channels(),
112 }
113 }
114}
115
116impl GenerateConfig for AmqpSinkConfig {
117 fn generate_config() -> toml::Value {
118 toml::from_str(
119 r#"connection_string = "amqp://localhost:5672/%2f"
120 routing_key = "user_id"
121 exchange = "test"
122 encoding.codec = "json"
123 max_channels = 4"#,
124 )
125 .unwrap()
126 }
127}
128
129#[async_trait::async_trait]
130#[typetag::serde(name = "amqp")]
131impl SinkConfig for AmqpSinkConfig {
132 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
133 let sink = AmqpSink::new(self.clone()).await?;
134 let hc = healthcheck(sink.channels.clone()).boxed();
135 Ok((VectorSink::from_event_streamsink(sink), hc))
136 }
137
138 fn input(&self) -> Input {
139 Input::new(DataType::Log)
140 }
141
142 fn acknowledgements(&self) -> &AcknowledgementsConfig {
143 &self.acknowledgements
144 }
145}
146
147pub(super) async fn healthcheck(channels: AmqpSinkChannels) -> crate::Result<()> {
148 trace!("Healthcheck started.");
149
150 let channel = channels.get().await?;
151
152 if !channel.status().connected() {
153 return Err(Box::new(std::io::Error::new(
154 std::io::ErrorKind::BrokenPipe,
155 "Not Connected",
156 )));
157 }
158
159 trace!("Healthcheck completed.");
160 Ok(())
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use crate::config::format::{Format, deserialize};
167
168 #[test]
169 pub fn generate_config() {
170 crate::test_util::test_generate_config::<AmqpSinkConfig>();
171 }
172
173 fn assert_config_priority_eq(config: AmqpSinkConfig, event: &LogEvent, priority: u8) {
174 assert_eq!(
175 config
176 .properties
177 .unwrap()
178 .priority
179 .unwrap()
180 .render(event)
181 .unwrap(),
182 priority as u64
183 );
184 }
185
186 #[test]
187 pub fn parse_config_priority_static() {
188 for (format, config) in [
189 (
190 Format::Yaml,
191 r#"
192 exchange: "test"
193 routing_key: "user_id"
194 encoding:
195 codec: "json"
196 connection_string: "amqp://user:password@127.0.0.1:5672/"
197 properties:
198 priority: 1
199 "#,
200 ),
201 (
202 Format::Toml,
203 r#"
204 exchange = "test"
205 routing_key = "user_id"
206 encoding.codec = "json"
207 connection_string = "amqp://user:password@127.0.0.1:5672/"
208 properties = { priority = 1 }
209 "#,
210 ),
211 (
212 Format::Json,
213 r#"
214 {
215 "exchange": "test",
216 "routing_key": "user_id",
217 "encoding": {
218 "codec": "json"
219 },
220 "connection_string": "amqp://user:password@127.0.0.1:5672/",
221 "properties": {
222 "priority": 1
223 }
224 }
225 "#,
226 ),
227 ] {
228 let config: AmqpSinkConfig = deserialize(config, format).unwrap();
229 let event = LogEvent::from_str_legacy("message");
230 assert_config_priority_eq(config, &event, 1);
231 }
232 }
233
234 #[test]
235 pub fn parse_config_priority_templated() {
236 for (format, config) in [
237 (
238 Format::Yaml,
239 r#"
240 exchange: "test"
241 routing_key: "user_id"
242 encoding:
243 codec: "json"
244 connection_string: "amqp://user:password@127.0.0.1:5672/"
245 properties:
246 priority: "{{ .priority }}"
247 "#,
248 ),
249 (
250 Format::Toml,
251 r#"
252 exchange = "test"
253 routing_key = "user_id"
254 encoding.codec = "json"
255 connection_string = "amqp://user:password@127.0.0.1:5672/"
256 properties = { priority = "{{ .priority }}" }
257 "#,
258 ),
259 (
260 Format::Json,
261 r#"
262 {
263 "exchange": "test",
264 "routing_key": "user_id",
265 "encoding": {
266 "codec": "json"
267 },
268 "connection_string": "amqp://user:password@127.0.0.1:5672/",
269 "properties": {
270 "priority": "{{ .priority }}"
271 }
272 }
273 "#,
274 ),
275 ] {
276 let config: AmqpSinkConfig = deserialize(config, format).unwrap();
277 let event = {
278 let mut event = LogEvent::from_str_legacy("message");
279 event.insert("priority", 2);
280 event
281 };
282 assert_config_priority_eq(config, &event, 2);
283 }
284 }
285}