vector/transforms/
route.rs

1use indexmap::IndexMap;
2use vector_lib::{
3    config::clone_input_definitions, configurable::configurable_component, transform::SyncTransform,
4};
5
6use crate::{
7    conditions::{AnyCondition, Condition, ConditionConfig, VrlConfig},
8    config::{
9        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
10        TransformOutput,
11    },
12    event::Event,
13    schema,
14    transforms::Transform,
15};
16
17pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";
18
19#[derive(Clone)]
20pub struct Route {
21    conditions: Vec<(String, Condition)>,
22    reroute_unmatched: bool,
23}
24
25impl Route {
26    pub fn new(config: &RouteConfig, context: &TransformContext) -> crate::Result<Self> {
27        let mut conditions = Vec::with_capacity(config.route.len());
28        for (output_name, condition) in config.route.iter() {
29            let condition =
30                condition.build(&context.enrichment_tables, &context.metrics_storage)?;
31            conditions.push((output_name.clone(), condition));
32        }
33        Ok(Self {
34            conditions,
35            reroute_unmatched: config.reroute_unmatched,
36        })
37    }
38}
39
40impl SyncTransform for Route {
41    fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) {
42        let mut check_failed: usize = 0;
43        for (output_name, condition) in &self.conditions {
44            let (result, event) = condition.check(event.clone());
45            if result {
46                output.push(Some(output_name), event);
47            } else {
48                check_failed += 1;
49            }
50        }
51        if self.reroute_unmatched && check_failed == self.conditions.len() {
52            output.push(Some(UNMATCHED_ROUTE), event);
53        }
54    }
55}
56
57/// Configuration for the `route` transform.
58#[configurable_component(transform(
59    "route",
60    "Split a stream of events into multiple sub-streams based on user-supplied conditions."
61))]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct RouteConfig {
65    /// Reroutes unmatched events to a named output instead of silently discarding them.
66    ///
67    /// Normally, if an event doesn't match any defined route, it is sent to the `<transform_name>._unmatched`
68    /// output for further processing. In some cases, you may want to simply discard unmatched events and not
69    /// process them any further.
70    ///
71    /// In these cases, `reroute_unmatched` can be set to `false` to disable the `<transform_name>._unmatched`
72    /// output and instead silently discard any unmatched events.
73    #[serde(default = "crate::serde::default_true")]
74    #[configurable(metadata(docs::human_name = "Reroute Unmatched Events"))]
75    reroute_unmatched: bool,
76
77    /// A map from route identifiers to logical conditions.
78    /// Each condition represents a filter which is applied to each event.
79    ///
80    /// The following identifiers are reserved output names and thus cannot be used as route IDs:
81    /// - `_unmatched`
82    /// - `_default`
83    ///
84    /// Each route can then be referenced as an input by other components with the name
85    /// `<transform_name>.<route_id>`. If an event doesn’t match any route, and if `reroute_unmatched`
86    /// is set to `true` (the default), it is sent to the `<transform_name>._unmatched` output.
87    /// Otherwise, the unmatched event is instead silently discarded.
88    #[configurable(metadata(docs::additional_props_description = "An individual route."))]
89    #[configurable(metadata(docs::examples = "route_examples()"))]
90    route: IndexMap<String, AnyCondition>,
91}
92
93fn route_examples() -> IndexMap<String, AnyCondition> {
94    IndexMap::from([
95        (
96            "foo-exists".to_owned(),
97            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
98                source: "exists(.foo)".to_owned(),
99                ..Default::default()
100            })),
101        ),
102        (
103            "foo-does-not-exist".to_owned(),
104            AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
105                source: "!exists(.foo)".to_owned(),
106                ..Default::default()
107            })),
108        ),
109    ])
110}
111
112impl GenerateConfig for RouteConfig {
113    fn generate_config() -> toml::Value {
114        toml::Value::try_from(Self {
115            reroute_unmatched: true,
116            route: route_examples(),
117        })
118        .unwrap()
119    }
120}
121
122#[async_trait::async_trait]
123#[typetag::serde(name = "route")]
124impl TransformConfig for RouteConfig {
125    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
126        let route = Route::new(self, context)?;
127        Ok(Transform::synchronous(route))
128    }
129
130    fn input(&self) -> Input {
131        Input::all()
132    }
133
134    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
135        if self.route.contains_key(UNMATCHED_ROUTE) {
136            Err(vec![format!(
137                "cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
138            )])
139        } else {
140            Ok(())
141        }
142    }
143
144    fn outputs(
145        &self,
146        _: &TransformContext,
147        input_definitions: &[(OutputId, schema::Definition)],
148    ) -> Vec<TransformOutput> {
149        let mut result: Vec<TransformOutput> = self
150            .route
151            .keys()
152            .map(|output_name| {
153                TransformOutput::new(
154                    DataType::all_bits(),
155                    clone_input_definitions(input_definitions),
156                )
157                .with_port(output_name)
158            })
159            .collect();
160        if self.reroute_unmatched {
161            result.push(
162                TransformOutput::new(
163                    DataType::all_bits(),
164                    clone_input_definitions(input_definitions),
165                )
166                .with_port(UNMATCHED_ROUTE),
167            );
168        }
169        result
170    }
171
172    fn enable_concurrency(&self) -> bool {
173        true
174    }
175}
176
177#[cfg(test)]
178mod test {
179    use std::collections::HashMap;
180
181    use indoc::indoc;
182    use vector_lib::{config::LogNamespace, transform::TransformOutputsBuf};
183
184    use super::*;
185    use crate::{
186        config::{ConfigBuilder, build_unit_tests},
187        test_util::components::{COMPONENT_MULTIPLE_OUTPUTS_TESTS, init_test},
188    };
189
190    #[test]
191    fn generate_config() {
192        crate::test_util::test_generate_config::<super::RouteConfig>();
193    }
194
195    #[test]
196    fn can_serialize_remap() {
197        // We need to serialize the config to check if a config has
198        // changed when reloading.
199        let config = toml::from_str::<RouteConfig>(
200            r#"
201            route.first.type = "vrl"
202            route.first.source = '.message == "hello world"'
203        "#,
204        )
205        .unwrap();
206
207        assert_eq!(
208            serde_json::to_string(&config).unwrap(),
209            r#"{"reroute_unmatched":true,"route":{"first":{"type":"vrl","source":".message == \"hello world\""}}}"#
210        );
211    }
212
213    #[test]
214    fn route_pass_all_route_conditions() {
215        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
216        let event = Event::from_json_value(
217            serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
218            LogNamespace::Legacy,
219        )
220        .unwrap();
221        let config = toml::from_str::<RouteConfig>(
222            r#"
223            route.first.type = "vrl"
224            route.first.source = '.message == "hello world"'
225
226            route.second.type = "vrl"
227            route.second.source = '.second == "second"'
228
229            route.third.type = "vrl"
230            route.third.source = '.third == "third"'
231        "#,
232        )
233        .unwrap();
234
235        let mut transform = Route::new(&config, &Default::default()).unwrap();
236        let mut outputs = TransformOutputsBuf::new_with_capacity(
237            output_names
238                .iter()
239                .map(|output_name| {
240                    TransformOutput::new(DataType::all_bits(), HashMap::new())
241                        .with_port(output_name.to_owned())
242                })
243                .collect(),
244            1,
245        );
246
247        transform.transform(event.clone(), &mut outputs);
248        for output_name in output_names {
249            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
250            if output_name == UNMATCHED_ROUTE {
251                assert!(events.is_empty());
252            } else {
253                assert_eq!(events.len(), 1);
254                assert_eq!(events.pop().unwrap(), event);
255            }
256        }
257    }
258
259    #[test]
260    fn route_pass_one_route_condition() {
261        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
262        let event = Event::from_json_value(
263            serde_json::json!({"message": "hello world"}),
264            LogNamespace::Legacy,
265        )
266        .unwrap();
267        let config = toml::from_str::<RouteConfig>(
268            r#"
269            route.first.type = "vrl"
270            route.first.source = '.message == "hello world"'
271
272            route.second.type = "vrl"
273            route.second.source = '.second == "second"'
274
275            route.third.type = "vrl"
276            route.third.source = '.third == "third"'
277        "#,
278        )
279        .unwrap();
280
281        let mut transform = Route::new(&config, &Default::default()).unwrap();
282        let mut outputs = TransformOutputsBuf::new_with_capacity(
283            output_names
284                .iter()
285                .map(|output_name| {
286                    TransformOutput::new(DataType::all_bits(), HashMap::new())
287                        .with_port(output_name.to_owned())
288                })
289                .collect(),
290            1,
291        );
292
293        transform.transform(event.clone(), &mut outputs);
294        for output_name in output_names {
295            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
296            if output_name == "first" {
297                assert_eq!(events.len(), 1);
298                assert_eq!(events.pop().unwrap(), event);
299            }
300            assert_eq!(events.len(), 0);
301        }
302    }
303
304    #[test]
305    fn route_pass_no_route_condition() {
306        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
307        let event =
308            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
309                .unwrap();
310        let config = toml::from_str::<RouteConfig>(
311            r#"
312            route.first.type = "vrl"
313            route.first.source = '.message == "hello world"'
314
315            route.second.type = "vrl"
316            route.second.source = '.second == "second"'
317
318            route.third.type = "vrl"
319            route.third.source = '.third == "third"'
320        "#,
321        )
322        .unwrap();
323
324        let mut transform = Route::new(&config, &Default::default()).unwrap();
325        let mut outputs = TransformOutputsBuf::new_with_capacity(
326            output_names
327                .iter()
328                .map(|output_name| {
329                    TransformOutput::new(DataType::all_bits(), HashMap::new())
330                        .with_port(output_name.to_owned())
331                })
332                .collect(),
333            1,
334        );
335
336        transform.transform(event.clone(), &mut outputs);
337        for output_name in output_names {
338            let mut events: Vec<_> = outputs.drain_named(output_name).collect();
339            if output_name == UNMATCHED_ROUTE {
340                assert_eq!(events.len(), 1);
341                assert_eq!(events.pop().unwrap(), event);
342            }
343            assert_eq!(events.len(), 0);
344        }
345    }
346
347    #[test]
348    fn route_no_unmatched_output() {
349        let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
350        let event =
351            Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
352                .unwrap();
353        let config = toml::from_str::<RouteConfig>(
354            r#"
355            reroute_unmatched = false
356
357            route.first.type = "vrl"
358            route.first.source = '.message == "hello world"'
359
360            route.second.type = "vrl"
361            route.second.source = '.second == "second"'
362
363            route.third.type = "vrl"
364            route.third.source = '.third == "third"'
365        "#,
366        )
367        .unwrap();
368
369        let mut transform = Route::new(&config, &Default::default()).unwrap();
370        let mut outputs = TransformOutputsBuf::new_with_capacity(
371            output_names
372                .iter()
373                .map(|output_name| {
374                    TransformOutput::new(DataType::all_bits(), HashMap::new())
375                        .with_port(output_name.to_owned())
376                })
377                .collect(),
378            1,
379        );
380
381        transform.transform(event.clone(), &mut outputs);
382        for output_name in output_names {
383            let events: Vec<_> = outputs.drain_named(output_name).collect();
384            assert_eq!(events.len(), 0);
385        }
386    }
387
388    #[tokio::test]
389    async fn route_metrics_with_output_tag() {
390        init_test();
391
392        let config: ConfigBuilder = toml::from_str(indoc! {r#"
393            [transforms.foo]
394            inputs = []
395            type = "route"
396            [transforms.foo.route.first]
397                type = "is_log"
398
399            [[tests]]
400            name = "metric output"
401
402            [tests.input]
403                insert_at = "foo"
404                value = "none"
405
406            [[tests.outputs]]
407                extract_from = "foo.first"
408                [[tests.outputs.conditions]]
409                type = "vrl"
410                source = "true"
411        "#})
412        .unwrap();
413
414        let mut tests = build_unit_tests(config).await.unwrap();
415        assert!(tests.remove(0).run().await.errors.is_empty());
416        // Check that metrics were emitted with output tag
417        COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
418    }
419}