1use indexmap::IndexMap;
2use vector_lib::config::{clone_input_definitions, LogNamespace};
3use vector_lib::configurable::configurable_component;
4use vector_lib::transform::SyncTransform;
5
6use crate::{
7 conditions::{AnyCondition, Condition, ConditionConfig, VrlConfig},
8 config::{
9 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
10 TransformOutput,
11 },
12 event::Event,
13 schema,
14 transforms::Transform,
15};
16
17pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";
18
19#[derive(Clone)]
20pub struct Route {
21 conditions: Vec<(String, Condition)>,
22 reroute_unmatched: bool,
23}
24
25impl Route {
26 pub fn new(config: &RouteConfig, context: &TransformContext) -> crate::Result<Self> {
27 let mut conditions = Vec::with_capacity(config.route.len());
28 for (output_name, condition) in config.route.iter() {
29 let condition = condition.build(&context.enrichment_tables)?;
30 conditions.push((output_name.clone(), condition));
31 }
32 Ok(Self {
33 conditions,
34 reroute_unmatched: config.reroute_unmatched,
35 })
36 }
37}
38
39impl SyncTransform for Route {
40 fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) {
41 let mut check_failed: usize = 0;
42 for (output_name, condition) in &self.conditions {
43 let (result, event) = condition.check(event.clone());
44 if result {
45 output.push(Some(output_name), event);
46 } else {
47 check_failed += 1;
48 }
49 }
50 if self.reroute_unmatched && check_failed == self.conditions.len() {
51 output.push(Some(UNMATCHED_ROUTE), event);
52 }
53 }
54}
55
56#[configurable_component(transform(
58 "route",
59 "Split a stream of events into multiple sub-streams based on user-supplied conditions."
60))]
61#[derive(Clone, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct RouteConfig {
64 #[serde(default = "crate::serde::default_true")]
73 #[configurable(metadata(docs::human_name = "Reroute Unmatched Events"))]
74 reroute_unmatched: bool,
75
76 #[configurable(metadata(docs::additional_props_description = "An individual route."))]
88 #[configurable(metadata(docs::examples = "route_examples()"))]
89 route: IndexMap<String, AnyCondition>,
90}
91
92fn route_examples() -> IndexMap<String, AnyCondition> {
93 IndexMap::from([
94 (
95 "foo-exists".to_owned(),
96 AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
97 source: "exists(.foo)".to_owned(),
98 ..Default::default()
99 })),
100 ),
101 (
102 "foo-does-not-exist".to_owned(),
103 AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
104 source: "!exists(.foo)".to_owned(),
105 ..Default::default()
106 })),
107 ),
108 ])
109}
110
111impl GenerateConfig for RouteConfig {
112 fn generate_config() -> toml::Value {
113 toml::Value::try_from(Self {
114 reroute_unmatched: true,
115 route: route_examples(),
116 })
117 .unwrap()
118 }
119}
120
121#[async_trait::async_trait]
122#[typetag::serde(name = "route")]
123impl TransformConfig for RouteConfig {
124 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
125 let route = Route::new(self, context)?;
126 Ok(Transform::synchronous(route))
127 }
128
129 fn input(&self) -> Input {
130 Input::all()
131 }
132
133 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
134 if self.route.contains_key(UNMATCHED_ROUTE) {
135 Err(vec![format!(
136 "cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
137 )])
138 } else {
139 Ok(())
140 }
141 }
142
143 fn outputs(
144 &self,
145 _: vector_lib::enrichment::TableRegistry,
146 input_definitions: &[(OutputId, schema::Definition)],
147 _: LogNamespace,
148 ) -> Vec<TransformOutput> {
149 let mut result: Vec<TransformOutput> = self
150 .route
151 .keys()
152 .map(|output_name| {
153 TransformOutput::new(
154 DataType::all_bits(),
155 clone_input_definitions(input_definitions),
156 )
157 .with_port(output_name)
158 })
159 .collect();
160 if self.reroute_unmatched {
161 result.push(
162 TransformOutput::new(
163 DataType::all_bits(),
164 clone_input_definitions(input_definitions),
165 )
166 .with_port(UNMATCHED_ROUTE),
167 );
168 }
169 result
170 }
171
172 fn enable_concurrency(&self) -> bool {
173 true
174 }
175}
176
177#[cfg(test)]
178mod test {
179 use std::collections::HashMap;
180
181 use indoc::indoc;
182 use vector_lib::transform::TransformOutputsBuf;
183
184 use super::*;
185 use crate::{
186 config::{build_unit_tests, ConfigBuilder},
187 test_util::components::{init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS},
188 };
189
190 #[test]
191 fn generate_config() {
192 crate::test_util::test_generate_config::<super::RouteConfig>();
193 }
194
195 #[test]
196 fn can_serialize_remap() {
197 let config = toml::from_str::<RouteConfig>(
200 r#"
201 route.first.type = "vrl"
202 route.first.source = '.message == "hello world"'
203 "#,
204 )
205 .unwrap();
206
207 assert_eq!(
208 serde_json::to_string(&config).unwrap(),
209 r#"{"reroute_unmatched":true,"route":{"first":{"type":"vrl","source":".message == \"hello world\""}}}"#
210 );
211 }
212
213 #[test]
214 fn route_pass_all_route_conditions() {
215 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
216 let event = Event::from_json_value(
217 serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
218 LogNamespace::Legacy,
219 )
220 .unwrap();
221 let config = toml::from_str::<RouteConfig>(
222 r#"
223 route.first.type = "vrl"
224 route.first.source = '.message == "hello world"'
225
226 route.second.type = "vrl"
227 route.second.source = '.second == "second"'
228
229 route.third.type = "vrl"
230 route.third.source = '.third == "third"'
231 "#,
232 )
233 .unwrap();
234
235 let mut transform = Route::new(&config, &Default::default()).unwrap();
236 let mut outputs = TransformOutputsBuf::new_with_capacity(
237 output_names
238 .iter()
239 .map(|output_name| {
240 TransformOutput::new(DataType::all_bits(), HashMap::new())
241 .with_port(output_name.to_owned())
242 })
243 .collect(),
244 1,
245 );
246
247 transform.transform(event.clone(), &mut outputs);
248 for output_name in output_names {
249 let mut events: Vec<_> = outputs.drain_named(output_name).collect();
250 if output_name == UNMATCHED_ROUTE {
251 assert!(events.is_empty());
252 } else {
253 assert_eq!(events.len(), 1);
254 assert_eq!(events.pop().unwrap(), event);
255 }
256 }
257 }
258
259 #[test]
260 fn route_pass_one_route_condition() {
261 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
262 let event = Event::from_json_value(
263 serde_json::json!({"message": "hello world"}),
264 LogNamespace::Legacy,
265 )
266 .unwrap();
267 let config = toml::from_str::<RouteConfig>(
268 r#"
269 route.first.type = "vrl"
270 route.first.source = '.message == "hello world"'
271
272 route.second.type = "vrl"
273 route.second.source = '.second == "second"'
274
275 route.third.type = "vrl"
276 route.third.source = '.third == "third"'
277 "#,
278 )
279 .unwrap();
280
281 let mut transform = Route::new(&config, &Default::default()).unwrap();
282 let mut outputs = TransformOutputsBuf::new_with_capacity(
283 output_names
284 .iter()
285 .map(|output_name| {
286 TransformOutput::new(DataType::all_bits(), HashMap::new())
287 .with_port(output_name.to_owned())
288 })
289 .collect(),
290 1,
291 );
292
293 transform.transform(event.clone(), &mut outputs);
294 for output_name in output_names {
295 let mut events: Vec<_> = outputs.drain_named(output_name).collect();
296 if output_name == "first" {
297 assert_eq!(events.len(), 1);
298 assert_eq!(events.pop().unwrap(), event);
299 }
300 assert_eq!(events.len(), 0);
301 }
302 }
303
304 #[test]
305 fn route_pass_no_route_condition() {
306 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
307 let event =
308 Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
309 .unwrap();
310 let config = toml::from_str::<RouteConfig>(
311 r#"
312 route.first.type = "vrl"
313 route.first.source = '.message == "hello world"'
314
315 route.second.type = "vrl"
316 route.second.source = '.second == "second"'
317
318 route.third.type = "vrl"
319 route.third.source = '.third == "third"'
320 "#,
321 )
322 .unwrap();
323
324 let mut transform = Route::new(&config, &Default::default()).unwrap();
325 let mut outputs = TransformOutputsBuf::new_with_capacity(
326 output_names
327 .iter()
328 .map(|output_name| {
329 TransformOutput::new(DataType::all_bits(), HashMap::new())
330 .with_port(output_name.to_owned())
331 })
332 .collect(),
333 1,
334 );
335
336 transform.transform(event.clone(), &mut outputs);
337 for output_name in output_names {
338 let mut events: Vec<_> = outputs.drain_named(output_name).collect();
339 if output_name == UNMATCHED_ROUTE {
340 assert_eq!(events.len(), 1);
341 assert_eq!(events.pop().unwrap(), event);
342 }
343 assert_eq!(events.len(), 0);
344 }
345 }
346
347 #[test]
348 fn route_no_unmatched_output() {
349 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
350 let event =
351 Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
352 .unwrap();
353 let config = toml::from_str::<RouteConfig>(
354 r#"
355 reroute_unmatched = false
356
357 route.first.type = "vrl"
358 route.first.source = '.message == "hello world"'
359
360 route.second.type = "vrl"
361 route.second.source = '.second == "second"'
362
363 route.third.type = "vrl"
364 route.third.source = '.third == "third"'
365 "#,
366 )
367 .unwrap();
368
369 let mut transform = Route::new(&config, &Default::default()).unwrap();
370 let mut outputs = TransformOutputsBuf::new_with_capacity(
371 output_names
372 .iter()
373 .map(|output_name| {
374 TransformOutput::new(DataType::all_bits(), HashMap::new())
375 .with_port(output_name.to_owned())
376 })
377 .collect(),
378 1,
379 );
380
381 transform.transform(event.clone(), &mut outputs);
382 for output_name in output_names {
383 let events: Vec<_> = outputs.drain_named(output_name).collect();
384 assert_eq!(events.len(), 0);
385 }
386 }
387
388 #[tokio::test]
389 async fn route_metrics_with_output_tag() {
390 init_test();
391
392 let config: ConfigBuilder = toml::from_str(indoc! {r#"
393 [transforms.foo]
394 inputs = []
395 type = "route"
396 [transforms.foo.route.first]
397 type = "is_log"
398
399 [[tests]]
400 name = "metric output"
401
402 [tests.input]
403 insert_at = "foo"
404 value = "none"
405
406 [[tests.outputs]]
407 extract_from = "foo.first"
408 [[tests.outputs.conditions]]
409 type = "vrl"
410 source = "true"
411 "#})
412 .unwrap();
413
414 let mut tests = build_unit_tests(config).await.unwrap();
415 assert!(tests.remove(0).run().await.errors.is_empty());
416 COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
418 }
419}