vector/transforms/
route.rs

1use indexmap::IndexMap;
2use vector_lib::{
3    config::{LogNamespace, clone_input_definitions},
4    configurable::configurable_component,
5    transform::SyncTransform,
6};
7
8use crate::{
9    conditions::{AnyCondition, Condition, ConditionConfig, VrlConfig},
10    config::{
11        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
12        TransformOutput,
13    },
14    event::Event,
15    schema,
16    transforms::Transform,
17};
18
19pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";
20
21#[derive(Clone)]
22pub struct Route {
23    conditions: Vec<(String, Condition)>,
24    reroute_unmatched: bool,
25}
26
27impl Route {
28    pub fn new(config: &RouteConfig, context: &TransformContext) -> crate::Result<Self> {
29        let mut conditions = Vec::with_capacity(config.route.len());
30        for (output_name, condition) in config.route.iter() {
31            let condition = condition.build(&context.enrichment_tables)?;
32            conditions.push((output_name.clone(), condition));
33        }
34        Ok(Self {
35            conditions,
36            reroute_unmatched: config.reroute_unmatched,
37        })
38    }
39}
40
41impl SyncTransform for Route {
42    fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) {
43        let mut check_failed: usize = 0;
44        for (output_name, condition) in &self.conditions {
45            let (result, event) = condition.check(event.clone());
46            if result {
47                output.push(Some(output_name), event);
48            } else {
49                check_failed += 1;
50            }
51        }
52        if self.reroute_unmatched && check_failed == self.conditions.len() {
53            output.push(Some(UNMATCHED_ROUTE), event);
54        }
55    }
56}
57
58/// Configuration for the `route` transform.
59#[configurable_component(transform(
60    "route",
61    "Split a stream of events into multiple sub-streams based on user-supplied conditions."
62))]
63#[derive(Clone, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct RouteConfig {
66    /// Reroutes unmatched events to a named output instead of silently discarding them.
67    ///
68    /// Normally, if an event doesn't match any defined route, it is sent to the `<transform_name>._unmatched`
69    /// output for further processing. In some cases, you may want to simply discard unmatched events and not
70    /// process them any further.
71    ///
72    /// In these cases, `reroute_unmatched` can be set to `false` to disable the `<transform_name>._unmatched`
73    /// output and instead silently discard any unmatched events.
74    #[serde(default = "crate::serde::default_true")]
75    #[configurable(metadata(docs::human_name = "Reroute Unmatched Events"))]
76    reroute_unmatched: bool,
77
78    /// A map from route identifiers to logical conditions.
79    /// Each condition represents a filter which is applied to each event.
80    ///
81    /// The following identifiers are reserved output names and thus cannot be used as route IDs:
82    /// - `_unmatched`
83    /// - `_default`
84    ///
85    /// Each route can then be referenced as an input by other components with the name
86    /// `<transform_name>.<route_id>`. If an event doesn’t match any route, and if `reroute_unmatched`
87    /// is set to `true` (the default), it is sent to the `<transform_name>._unmatched` output.
88    /// Otherwise, the unmatched event is instead silently discarded.
89    #[configurable(metadata(docs::additional_props_description = "An individual route."))]
90    #[configurable(metadata(docs::examples = "route_examples()"))]
91    route: IndexMap<String, AnyCondition>,
92}
93
94fn route_examples() -> IndexMap<String, AnyCondition> {
95    IndexMap::from([
96        (
97            "foo-exists".to_owned(),
98            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
99                source: "exists(.foo)".to_owned(),
100                ..Default::default()
101            })),
102        ),
103        (
104            "foo-does-not-exist".to_owned(),
105            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
106                source: "!exists(.foo)".to_owned(),
107                ..Default::default()
108            })),
109        ),
110    ])
111}
112
113impl GenerateConfig for RouteConfig {
114    fn generate_config() -> toml::Value {
115        toml::Value::try_from(Self {
116            reroute_unmatched: true,
117            route: route_examples(),
118        })
119        .unwrap()
120    }
121}
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "route")]
125impl TransformConfig for RouteConfig {
126    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
127        let route = Route::new(self, context)?;
128        Ok(Transform::synchronous(route))
129    }
130
131    fn input(&self) -> Input {
132        Input::all()
133    }
134
135    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
136        if self.route.contains_key(UNMATCHED_ROUTE) {
137            Err(vec![format!(
138                "cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
139            )])
140        } else {
141            Ok(())
142        }
143    }
144
145    fn outputs(
146        &self,
147        _: vector_lib::enrichment::TableRegistry,
148        input_definitions: &[(OutputId, schema::Definition)],
149        _: LogNamespace,
150    ) -> Vec<TransformOutput> {
151        let mut result: Vec<TransformOutput> = self
152            .route
153            .keys()
154            .map(|output_name| {
155                TransformOutput::new(
156                    DataType::all_bits(),
157                    clone_input_definitions(input_definitions),
158                )
159                .with_port(output_name)
160            })
161            .collect();
162        if self.reroute_unmatched {
163            result.push(
164                TransformOutput::new(
165                    DataType::all_bits(),
166                    clone_input_definitions(input_definitions),
167                )
168                .with_port(UNMATCHED_ROUTE),
169            );
170        }
171        result
172    }
173
174    fn enable_concurrency(&self) -> bool {
175        true
176    }
177}
178
179#[cfg(test)]
180mod test {
181    use std::collections::HashMap;
182
183    use indoc::indoc;
184    use vector_lib::transform::TransformOutputsBuf;
185
186    use super::*;
187    use crate::{
188        config::{ConfigBuilder, build_unit_tests},
189        test_util::components::{COMPONENT_MULTIPLE_OUTPUTS_TESTS, init_test},
190    };
191
192    #[test]
193    fn generate_config() {
194        crate::test_util::test_generate_config::<super::RouteConfig>();
195    }
196
197    #[test]
198    fn can_serialize_remap() {
199        // We need to serialize the config to check if a config has
200        // changed when reloading.
201        let config = toml::from_str::<RouteConfig>(
202            r#"
203            route.first.type = "vrl"
204            route.first.source = '.message == "hello world"'
205        "#,
206        )
207        .unwrap();
208
209        assert_eq!(
210            serde_json::to_string(&config).unwrap(),
211            r#"{"reroute_unmatched":true,"route":{"first":{"type":"vrl","source":".message == \"hello world\""}}}"#
212        );
213    }
214
215    #[test]
216    fn route_pass_all_route_conditions() {
217        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
218        let event = Event::from_json_value(
219            serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
220            LogNamespace::Legacy,
221        )
222        .unwrap();
223        let config = toml::from_str::<RouteConfig>(
224            r#"
225            route.first.type = "vrl"
226            route.first.source = '.message == "hello world"'
227
228            route.second.type = "vrl"
229            route.second.source = '.second == "second"'
230
231            route.third.type = "vrl"
232            route.third.source = '.third == "third"'
233        "#,
234        )
235        .unwrap();
236
237        let mut transform = Route::new(&config, &Default::default()).unwrap();
238        let mut outputs = TransformOutputsBuf::new_with_capacity(
239            output_names
240                .iter()
241                .map(|output_name| {
242                    TransformOutput::new(DataType::all_bits(), HashMap::new())
243                        .with_port(output_name.to_owned())
244                })
245                .collect(),
246            1,
247        );
248
249        transform.transform(event.clone(), &mut outputs);
250        for output_name in output_names {
251            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
252            if output_name == UNMATCHED_ROUTE {
253                assert!(events.is_empty());
254            } else {
255                assert_eq!(events.len(), 1);
256                assert_eq!(events.pop().unwrap(), event);
257            }
258        }
259    }
260
261    #[test]
262    fn route_pass_one_route_condition() {
263        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
264        let event = Event::from_json_value(
265            serde_json::json!({"message": "hello world"}),
266            LogNamespace::Legacy,
267        )
268        .unwrap();
269        let config = toml::from_str::<RouteConfig>(
270            r#"
271            route.first.type = "vrl"
272            route.first.source = '.message == "hello world"'
273
274            route.second.type = "vrl"
275            route.second.source = '.second == "second"'
276
277            route.third.type = "vrl"
278            route.third.source = '.third == "third"'
279        "#,
280        )
281        .unwrap();
282
283        let mut transform = Route::new(&config, &Default::default()).unwrap();
284        let mut outputs = TransformOutputsBuf::new_with_capacity(
285            output_names
286                .iter()
287                .map(|output_name| {
288                    TransformOutput::new(DataType::all_bits(), HashMap::new())
289                        .with_port(output_name.to_owned())
290                })
291                .collect(),
292            1,
293        );
294
295        transform.transform(event.clone(), &mut outputs);
296        for output_name in output_names {
297            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
298            if output_name == "first" {
299                assert_eq!(events.len(), 1);
300                assert_eq!(events.pop().unwrap(), event);
301            }
302            assert_eq!(events.len(), 0);
303        }
304    }
305
306    #[test]
307    fn route_pass_no_route_condition() {
308        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
309        let event =
310            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
311                .unwrap();
312        let config = toml::from_str::<RouteConfig>(
313            r#"
314            route.first.type = "vrl"
315            route.first.source = '.message == "hello world"'
316
317            route.second.type = "vrl"
318            route.second.source = '.second == "second"'
319
320            route.third.type = "vrl"
321            route.third.source = '.third == "third"'
322        "#,
323        )
324        .unwrap();
325
326        let mut transform = Route::new(&config, &Default::default()).unwrap();
327        let mut outputs = TransformOutputsBuf::new_with_capacity(
328            output_names
329                .iter()
330                .map(|output_name| {
331                    TransformOutput::new(DataType::all_bits(), HashMap::new())
332                        .with_port(output_name.to_owned())
333                })
334                .collect(),
335            1,
336        );
337
338        transform.transform(event.clone(), &mut outputs);
339        for output_name in output_names {
340            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
341            if output_name == UNMATCHED_ROUTE {
342                assert_eq!(events.len(), 1);
343                assert_eq!(events.pop().unwrap(), event);
344            }
345            assert_eq!(events.len(), 0);
346        }
347    }
348
349    #[test]
350    fn route_no_unmatched_output() {
351        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
352        let event =
353            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
354                .unwrap();
355        let config = toml::from_str::<RouteConfig>(
356            r#"
357            reroute_unmatched = false
358
359            route.first.type = "vrl"
360            route.first.source = '.message == "hello world"'
361
362            route.second.type = "vrl"
363            route.second.source = '.second == "second"'
364
365            route.third.type = "vrl"
366            route.third.source = '.third == "third"'
367        "#,
368        )
369        .unwrap();
370
371        let mut transform = Route::new(&config, &Default::default()).unwrap();
372        let mut outputs = TransformOutputsBuf::new_with_capacity(
373            output_names
374                .iter()
375                .map(|output_name| {
376                    TransformOutput::new(DataType::all_bits(), HashMap::new())
377                        .with_port(output_name.to_owned())
378                })
379                .collect(),
380            1,
381        );
382
383        transform.transform(event.clone(), &mut outputs);
384        for output_name in output_names {
385            let events: Vec<_> = outputs.drain_named(output_name).collect();
386            assert_eq!(events.len(), 0);
387        }
388    }
389
390    #[tokio::test]
391    async fn route_metrics_with_output_tag() {
392        init_test();
393
394        let config: ConfigBuilder = toml::from_str(indoc! {r#"
395            [transforms.foo]
396            inputs = []
397            type = "route"
398            [transforms.foo.route.first]
399                type = "is_log"
400
401            [[tests]]
402            name = "metric output"
403
404            [tests.input]
405                insert_at = "foo"
406                value = "none"
407
408            [[tests.outputs]]
409                extract_from = "foo.first"
410                [[tests.outputs.conditions]]
411                type = "vrl"
412                source = "true"
413        "#})
414        .unwrap();
415
416        let mut tests = build_unit_tests(config).await.unwrap();
417        assert!(tests.remove(0).run().await.errors.is_empty());
418        // Check that metrics were emitted with output tag
419        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
420    }
421}