vector/transforms/
route.rs

1use indexmap::IndexMap;
2use vector_lib::config::{clone_input_definitions, LogNamespace};
3use vector_lib::configurable::configurable_component;
4use vector_lib::transform::SyncTransform;
5
6use crate::{
7    conditions::{AnyCondition, Condition, ConditionConfig, VrlConfig},
8    config::{
9        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
10        TransformOutput,
11    },
12    event::Event,
13    schema,
14    transforms::Transform,
15};
16
17pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";
18
19#[derive(Clone)]
20pub struct Route {
21    conditions: Vec<(String, Condition)>,
22    reroute_unmatched: bool,
23}
24
25impl Route {
26    pub fn new(config: &RouteConfig, context: &TransformContext) -> crate::Result<Self> {
27        let mut conditions = Vec::with_capacity(config.route.len());
28        for (output_name, condition) in config.route.iter() {
29            let condition = condition.build(&context.enrichment_tables)?;
30            conditions.push((output_name.clone(), condition));
31        }
32        Ok(Self {
33            conditions,
34            reroute_unmatched: config.reroute_unmatched,
35        })
36    }
37}
38
39impl SyncTransform for Route {
40    fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) {
41        let mut check_failed: usize = 0;
42        for (output_name, condition) in &self.conditions {
43            let (result, event) = condition.check(event.clone());
44            if result {
45                output.push(Some(output_name), event);
46            } else {
47                check_failed += 1;
48            }
49        }
50        if self.reroute_unmatched && check_failed == self.conditions.len() {
51            output.push(Some(UNMATCHED_ROUTE), event);
52        }
53    }
54}
55
56/// Configuration for the `route` transform.
57#[configurable_component(transform(
58    "route",
59    "Split a stream of events into multiple sub-streams based on user-supplied conditions."
60))]
61#[derive(Clone, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct RouteConfig {
64    /// Reroutes unmatched events to a named output instead of silently discarding them.
65    ///
66    /// Normally, if an event doesn't match any defined route, it is sent to the `<transform_name>._unmatched`
67    /// output for further processing. In some cases, you may want to simply discard unmatched events and not
68    /// process them any further.
69    ///
70    /// In these cases, `reroute_unmatched` can be set to `false` to disable the `<transform_name>._unmatched`
71    /// output and instead silently discard any unmatched events.
72    #[serde(default = "crate::serde::default_true")]
73    #[configurable(metadata(docs::human_name = "Reroute Unmatched Events"))]
74    reroute_unmatched: bool,
75
76    /// A map from route identifiers to logical conditions.
77    /// Each condition represents a filter which is applied to each event.
78    ///
79    /// The following identifiers are reserved output names and thus cannot be used as route IDs:
80    /// - `_unmatched`
81    /// - `_default`
82    ///
83    /// Each route can then be referenced as an input by other components with the name
84    /// `<transform_name>.<route_id>`. If an event doesn’t match any route, and if `reroute_unmatched`
85    /// is set to `true` (the default), it is sent to the `<transform_name>._unmatched` output.
86    /// Otherwise, the unmatched event is instead silently discarded.
87    #[configurable(metadata(docs::additional_props_description = "An individual route."))]
88    #[configurable(metadata(docs::examples = "route_examples()"))]
89    route: IndexMap<String, AnyCondition>,
90}
91
92fn route_examples() -> IndexMap<String, AnyCondition> {
93    IndexMap::from([
94        (
95            "foo-exists".to_owned(),
96            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
97                source: "exists(.foo)".to_owned(),
98                ..Default::default()
99            })),
100        ),
101        (
102            "foo-does-not-exist".to_owned(),
103            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
104                source: "!exists(.foo)".to_owned(),
105                ..Default::default()
106            })),
107        ),
108    ])
109}
110
111impl GenerateConfig for RouteConfig {
112    fn generate_config() -> toml::Value {
113        toml::Value::try_from(Self {
114            reroute_unmatched: true,
115            route: route_examples(),
116        })
117        .unwrap()
118    }
119}
120
121#[async_trait::async_trait]
122#[typetag::serde(name = "route")]
123impl TransformConfig for RouteConfig {
124    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
125        let route = Route::new(self, context)?;
126        Ok(Transform::synchronous(route))
127    }
128
129    fn input(&self) -> Input {
130        Input::all()
131    }
132
133    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
134        if self.route.contains_key(UNMATCHED_ROUTE) {
135            Err(vec![format!(
136                "cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
137            )])
138        } else {
139            Ok(())
140        }
141    }
142
143    fn outputs(
144        &self,
145        _: vector_lib::enrichment::TableRegistry,
146        input_definitions: &[(OutputId, schema::Definition)],
147        _: LogNamespace,
148    ) -> Vec<TransformOutput> {
149        let mut result: Vec<TransformOutput> = self
150            .route
151            .keys()
152            .map(|output_name| {
153                TransformOutput::new(
154                    DataType::all_bits(),
155                    clone_input_definitions(input_definitions),
156                )
157                .with_port(output_name)
158            })
159            .collect();
160        if self.reroute_unmatched {
161            result.push(
162                TransformOutput::new(
163                    DataType::all_bits(),
164                    clone_input_definitions(input_definitions),
165                )
166                .with_port(UNMATCHED_ROUTE),
167            );
168        }
169        result
170    }
171
172    fn enable_concurrency(&self) -> bool {
173        true
174    }
175}
176
177#[cfg(test)]
178mod test {
179    use std::collections::HashMap;
180
181    use indoc::indoc;
182    use vector_lib::transform::TransformOutputsBuf;
183
184    use super::*;
185    use crate::{
186        config::{build_unit_tests, ConfigBuilder},
187        test_util::components::{init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS},
188    };
189
190    #[test]
191    fn generate_config() {
192        crate::test_util::test_generate_config::<super::RouteConfig>();
193    }
194
195    #[test]
196    fn can_serialize_remap() {
197        // We need to serialize the config to check if a config has
198        // changed when reloading.
199        let config = toml::from_str::<RouteConfig>(
200            r#"
201            route.first.type = "vrl"
202            route.first.source = '.message == "hello world"'
203        "#,
204        )
205        .unwrap();
206
207        assert_eq!(
208            serde_json::to_string(&config).unwrap(),
209            r#"{"reroute_unmatched":true,"route":{"first":{"type":"vrl","source":".message == \"hello world\""}}}"#
210        );
211    }
212
213    #[test]
214    fn route_pass_all_route_conditions() {
215        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
216        let event = Event::from_json_value(
217            serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
218            LogNamespace::Legacy,
219        )
220        .unwrap();
221        let config = toml::from_str::<RouteConfig>(
222            r#"
223            route.first.type = "vrl"
224            route.first.source = '.message == "hello world"'
225
226            route.second.type = "vrl"
227            route.second.source = '.second == "second"'
228
229            route.third.type = "vrl"
230            route.third.source = '.third == "third"'
231        "#,
232        )
233        .unwrap();
234
235        let mut transform = Route::new(&config, &Default::default()).unwrap();
236        let mut outputs = TransformOutputsBuf::new_with_capacity(
237            output_names
238                .iter()
239                .map(|output_name| {
240                    TransformOutput::new(DataType::all_bits(), HashMap::new())
241                        .with_port(output_name.to_owned())
242                })
243                .collect(),
244            1,
245        );
246
247        transform.transform(event.clone(), &mut outputs);
248        for output_name in output_names {
249            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
250            if output_name == UNMATCHED_ROUTE {
251                assert!(events.is_empty());
252            } else {
253                assert_eq!(events.len(), 1);
254                assert_eq!(events.pop().unwrap(), event);
255            }
256        }
257    }
258
259    #[test]
260    fn route_pass_one_route_condition() {
261        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
262        let event = Event::from_json_value(
263            serde_json::json!({"message": "hello world"}),
264            LogNamespace::Legacy,
265        )
266        .unwrap();
267        let config = toml::from_str::<RouteConfig>(
268            r#"
269            route.first.type = "vrl"
270            route.first.source = '.message == "hello world"'
271
272            route.second.type = "vrl"
273            route.second.source = '.second == "second"'
274
275            route.third.type = "vrl"
276            route.third.source = '.third == "third"'
277        "#,
278        )
279        .unwrap();
280
281        let mut transform = Route::new(&config, &Default::default()).unwrap();
282        let mut outputs = TransformOutputsBuf::new_with_capacity(
283            output_names
284                .iter()
285                .map(|output_name| {
286                    TransformOutput::new(DataType::all_bits(), HashMap::new())
287                        .with_port(output_name.to_owned())
288                })
289                .collect(),
290            1,
291        );
292
293        transform.transform(event.clone(), &mut outputs);
294        for output_name in output_names {
295            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
296            if output_name == "first" {
297                assert_eq!(events.len(), 1);
298                assert_eq!(events.pop().unwrap(), event);
299            }
300            assert_eq!(events.len(), 0);
301        }
302    }
303
304    #[test]
305    fn route_pass_no_route_condition() {
306        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
307        let event =
308            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
309                .unwrap();
310        let config = toml::from_str::<RouteConfig>(
311            r#"
312            route.first.type = "vrl"
313            route.first.source = '.message == "hello world"'
314
315            route.second.type = "vrl"
316            route.second.source = '.second == "second"'
317
318            route.third.type = "vrl"
319            route.third.source = '.third == "third"'
320        "#,
321        )
322        .unwrap();
323
324        let mut transform = Route::new(&config, &Default::default()).unwrap();
325        let mut outputs = TransformOutputsBuf::new_with_capacity(
326            output_names
327                .iter()
328                .map(|output_name| {
329                    TransformOutput::new(DataType::all_bits(), HashMap::new())
330                        .with_port(output_name.to_owned())
331                })
332                .collect(),
333            1,
334        );
335
336        transform.transform(event.clone(), &mut outputs);
337        for output_name in output_names {
338            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
339            if output_name == UNMATCHED_ROUTE {
340                assert_eq!(events.len(), 1);
341                assert_eq!(events.pop().unwrap(), event);
342            }
343            assert_eq!(events.len(), 0);
344        }
345    }
346
347    #[test]
348    fn route_no_unmatched_output() {
349        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
350        let event =
351            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
352                .unwrap();
353        let config = toml::from_str::<RouteConfig>(
354            r#"
355            reroute_unmatched = false
356
357            route.first.type = "vrl"
358            route.first.source = '.message == "hello world"'
359
360            route.second.type = "vrl"
361            route.second.source = '.second == "second"'
362
363            route.third.type = "vrl"
364            route.third.source = '.third == "third"'
365        "#,
366        )
367        .unwrap();
368
369        let mut transform = Route::new(&config, &Default::default()).unwrap();
370        let mut outputs = TransformOutputsBuf::new_with_capacity(
371            output_names
372                .iter()
373                .map(|output_name| {
374                    TransformOutput::new(DataType::all_bits(), HashMap::new())
375                        .with_port(output_name.to_owned())
376                })
377                .collect(),
378            1,
379        );
380
381        transform.transform(event.clone(), &mut outputs);
382        for output_name in output_names {
383            let events: Vec<_> = outputs.drain_named(output_name).collect();
384            assert_eq!(events.len(), 0);
385        }
386    }
387
388    #[tokio::test]
389    async fn route_metrics_with_output_tag() {
390        init_test();
391
392        let config: ConfigBuilder = toml::from_str(indoc! {r#"
393            [transforms.foo]
394            inputs = []
395            type = "route"
396            [transforms.foo.route.first]
397                type = "is_log"
398
399            [[tests]]
400            name = "metric output"
401
402            [tests.input]
403                insert_at = "foo"
404                value = "none"
405
406            [[tests.outputs]]
407                extract_from = "foo.first"
408                [[tests.outputs.conditions]]
409                type = "vrl"
410                source = "true"
411        "#})
412        .unwrap();
413
414        let mut tests = build_unit_tests(config).await.unwrap();
415        assert!(tests.remove(0).run().await.errors.is_empty());
416        // Check that metrics were emitted with output tag
417        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
418    }
419}