1use indexmap::IndexMap;
2use vector_lib::{
3 config::{LogNamespace, clone_input_definitions},
4 configurable::configurable_component,
5 transform::SyncTransform,
6};
7
8use crate::{
9 conditions::{AnyCondition, Condition, ConditionConfig, VrlConfig},
10 config::{
11 DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
12 TransformOutput,
13 },
14 event::Event,
15 schema,
16 transforms::Transform,
17};
18
19pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";
20
21#[derive(Clone)]
22pub struct Route {
23 conditions: Vec<(String, Condition)>,
24 reroute_unmatched: bool,
25}
26
27impl Route {
28 pub fn new(config: &RouteConfig, context: &TransformContext) -> crate::Result<Self> {
29 let mut conditions = Vec::with_capacity(config.route.len());
30 for (output_name, condition) in config.route.iter() {
31 let condition = condition.build(&context.enrichment_tables)?;
32 conditions.push((output_name.clone(), condition));
33 }
34 Ok(Self {
35 conditions,
36 reroute_unmatched: config.reroute_unmatched,
37 })
38 }
39}
40
41impl SyncTransform for Route {
42 fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) {
43 let mut check_failed: usize = 0;
44 for (output_name, condition) in &self.conditions {
45 let (result, event) = condition.check(event.clone());
46 if result {
47 output.push(Some(output_name), event);
48 } else {
49 check_failed += 1;
50 }
51 }
52 if self.reroute_unmatched && check_failed == self.conditions.len() {
53 output.push(Some(UNMATCHED_ROUTE), event);
54 }
55 }
56}
57
58#[configurable_component(transform(
60 "route",
61 "Split a stream of events into multiple sub-streams based on user-supplied conditions."
62))]
63#[derive(Clone, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct RouteConfig {
66 #[serde(default = "crate::serde::default_true")]
75 #[configurable(metadata(docs::human_name = "Reroute Unmatched Events"))]
76 reroute_unmatched: bool,
77
78 #[configurable(metadata(docs::additional_props_description = "An individual route."))]
90 #[configurable(metadata(docs::examples = "route_examples()"))]
91 route: IndexMap<String, AnyCondition>,
92}
93
94fn route_examples() -> IndexMap<String, AnyCondition> {
95 IndexMap::from([
96 (
97 "foo-exists".to_owned(),
98 AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
99 source: "exists(.foo)".to_owned(),
100 ..Default::default()
101 })),
102 ),
103 (
104 "foo-does-not-exist".to_owned(),
105 AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
106 source: "!exists(.foo)".to_owned(),
107 ..Default::default()
108 })),
109 ),
110 ])
111}
112
113impl GenerateConfig for RouteConfig {
114 fn generate_config() -> toml::Value {
115 toml::Value::try_from(Self {
116 reroute_unmatched: true,
117 route: route_examples(),
118 })
119 .unwrap()
120 }
121}
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "route")]
125impl TransformConfig for RouteConfig {
126 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
127 let route = Route::new(self, context)?;
128 Ok(Transform::synchronous(route))
129 }
130
131 fn input(&self) -> Input {
132 Input::all()
133 }
134
135 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
136 if self.route.contains_key(UNMATCHED_ROUTE) {
137 Err(vec![format!(
138 "cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
139 )])
140 } else {
141 Ok(())
142 }
143 }
144
145 fn outputs(
146 &self,
147 _: vector_lib::enrichment::TableRegistry,
148 input_definitions: &[(OutputId, schema::Definition)],
149 _: LogNamespace,
150 ) -> Vec<TransformOutput> {
151 let mut result: Vec<TransformOutput> = self
152 .route
153 .keys()
154 .map(|output_name| {
155 TransformOutput::new(
156 DataType::all_bits(),
157 clone_input_definitions(input_definitions),
158 )
159 .with_port(output_name)
160 })
161 .collect();
162 if self.reroute_unmatched {
163 result.push(
164 TransformOutput::new(
165 DataType::all_bits(),
166 clone_input_definitions(input_definitions),
167 )
168 .with_port(UNMATCHED_ROUTE),
169 );
170 }
171 result
172 }
173
174 fn enable_concurrency(&self) -> bool {
175 true
176 }
177}
178
179#[cfg(test)]
180mod test {
181 use std::collections::HashMap;
182
183 use indoc::indoc;
184 use vector_lib::transform::TransformOutputsBuf;
185
186 use super::*;
187 use crate::{
188 config::{ConfigBuilder, build_unit_tests},
189 test_util::components::{COMPONENT_MULTIPLE_OUTPUTS_TESTS, init_test},
190 };
191
192 #[test]
193 fn generate_config() {
194 crate::test_util::test_generate_config::<super::RouteConfig>();
195 }
196
197 #[test]
198 fn can_serialize_remap() {
199 let config = toml::from_str::<RouteConfig>(
202 r#"
203 route.first.type = "vrl"
204 route.first.source = '.message == "hello world"'
205 "#,
206 )
207 .unwrap();
208
209 assert_eq!(
210 serde_json::to_string(&config).unwrap(),
211 r#"{"reroute_unmatched":true,"route":{"first":{"type":"vrl","source":".message == \"hello world\""}}}"#
212 );
213 }
214
215 #[test]
216 fn route_pass_all_route_conditions() {
217 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
218 let event = Event::from_json_value(
219 serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
220 LogNamespace::Legacy,
221 )
222 .unwrap();
223 let config = toml::from_str::<RouteConfig>(
224 r#"
225 route.first.type = "vrl"
226 route.first.source = '.message == "hello world"'
227
228 route.second.type = "vrl"
229 route.second.source = '.second == "second"'
230
231 route.third.type = "vrl"
232 route.third.source = '.third == "third"'
233 "#,
234 )
235 .unwrap();
236
237 let mut transform = Route::new(&config, &Default::default()).unwrap();
238 let mut outputs = TransformOutputsBuf::new_with_capacity(
239 output_names
240 .iter()
241 .map(|output_name| {
242 TransformOutput::new(DataType::all_bits(), HashMap::new())
243 .with_port(output_name.to_owned())
244 })
245 .collect(),
246 1,
247 );
248
249 transform.transform(event.clone(), &mut outputs);
250 for output_name in output_names {
251 let mut events: Vec<_> = outputs.drain_named(output_name).collect();
252 if output_name == UNMATCHED_ROUTE {
253 assert!(events.is_empty());
254 } else {
255 assert_eq!(events.len(), 1);
256 assert_eq!(events.pop().unwrap(), event);
257 }
258 }
259 }
260
261 #[test]
262 fn route_pass_one_route_condition() {
263 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
264 let event = Event::from_json_value(
265 serde_json::json!({"message": "hello world"}),
266 LogNamespace::Legacy,
267 )
268 .unwrap();
269 let config = toml::from_str::<RouteConfig>(
270 r#"
271 route.first.type = "vrl"
272 route.first.source = '.message == "hello world"'
273
274 route.second.type = "vrl"
275 route.second.source = '.second == "second"'
276
277 route.third.type = "vrl"
278 route.third.source = '.third == "third"'
279 "#,
280 )
281 .unwrap();
282
283 let mut transform = Route::new(&config, &Default::default()).unwrap();
284 let mut outputs = TransformOutputsBuf::new_with_capacity(
285 output_names
286 .iter()
287 .map(|output_name| {
288 TransformOutput::new(DataType::all_bits(), HashMap::new())
289 .with_port(output_name.to_owned())
290 })
291 .collect(),
292 1,
293 );
294
295 transform.transform(event.clone(), &mut outputs);
296 for output_name in output_names {
297 let mut events: Vec<_> = outputs.drain_named(output_name).collect();
298 if output_name == "first" {
299 assert_eq!(events.len(), 1);
300 assert_eq!(events.pop().unwrap(), event);
301 }
302 assert_eq!(events.len(), 0);
303 }
304 }
305
306 #[test]
307 fn route_pass_no_route_condition() {
308 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
309 let event =
310 Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
311 .unwrap();
312 let config = toml::from_str::<RouteConfig>(
313 r#"
314 route.first.type = "vrl"
315 route.first.source = '.message == "hello world"'
316
317 route.second.type = "vrl"
318 route.second.source = '.second == "second"'
319
320 route.third.type = "vrl"
321 route.third.source = '.third == "third"'
322 "#,
323 )
324 .unwrap();
325
326 let mut transform = Route::new(&config, &Default::default()).unwrap();
327 let mut outputs = TransformOutputsBuf::new_with_capacity(
328 output_names
329 .iter()
330 .map(|output_name| {
331 TransformOutput::new(DataType::all_bits(), HashMap::new())
332 .with_port(output_name.to_owned())
333 })
334 .collect(),
335 1,
336 );
337
338 transform.transform(event.clone(), &mut outputs);
339 for output_name in output_names {
340 let mut events: Vec<_> = outputs.drain_named(output_name).collect();
341 if output_name == UNMATCHED_ROUTE {
342 assert_eq!(events.len(), 1);
343 assert_eq!(events.pop().unwrap(), event);
344 }
345 assert_eq!(events.len(), 0);
346 }
347 }
348
349 #[test]
350 fn route_no_unmatched_output() {
351 let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
352 let event =
353 Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
354 .unwrap();
355 let config = toml::from_str::<RouteConfig>(
356 r#"
357 reroute_unmatched = false
358
359 route.first.type = "vrl"
360 route.first.source = '.message == "hello world"'
361
362 route.second.type = "vrl"
363 route.second.source = '.second == "second"'
364
365 route.third.type = "vrl"
366 route.third.source = '.third == "third"'
367 "#,
368 )
369 .unwrap();
370
371 let mut transform = Route::new(&config, &Default::default()).unwrap();
372 let mut outputs = TransformOutputsBuf::new_with_capacity(
373 output_names
374 .iter()
375 .map(|output_name| {
376 TransformOutput::new(DataType::all_bits(), HashMap::new())
377 .with_port(output_name.to_owned())
378 })
379 .collect(),
380 1,
381 );
382
383 transform.transform(event.clone(), &mut outputs);
384 for output_name in output_names {
385 let events: Vec<_> = outputs.drain_named(output_name).collect();
386 assert_eq!(events.len(), 0);
387 }
388 }
389
390 #[tokio::test]
391 async fn route_metrics_with_output_tag() {
392 init_test();
393
394 let config: ConfigBuilder = toml::from_str(indoc! {r#"
395 [transforms.foo]
396 inputs = []
397 type = "route"
398 [transforms.foo.route.first]
399 type = "is_log"
400
401 [[tests]]
402 name = "metric output"
403
404 [tests.input]
405 insert_at = "foo"
406 value = "none"
407
408 [[tests.outputs]]
409 extract_from = "foo.first"
410 [[tests.outputs.conditions]]
411 type = "vrl"
412 source = "true"
413 "#})
414 .unwrap();
415
416 let mut tests = build_unit_tests(config).await.unwrap();
417 assert!(tests.remove(0).run().await.errors.is_empty());
418 COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
420 }
421}