vector/transforms/exclusive_route/
config.rs

1use std::hash::{Hash, Hasher};
2
3use vector_lib::config::clone_input_definitions;
4
5use crate::{
6    conditions::{AnyCondition, ConditionConfig, VrlConfig},
7    config::{
8        DataType, GenerateConfig, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
9        TransformOutput,
10    },
11    schema,
12    sinks::prelude::configurable_component,
13    transforms::{Transform, exclusive_route::transform::ExclusiveRoute},
14};
15
16pub(super) const UNMATCHED_ROUTE: &str = "_unmatched";
17
18/// Individual route configuration.
19#[configurable_component]
20#[derive(Clone, Debug)]
21pub struct Route {
22    /// The name of the route is also the name of the transform port.
23    ///
24    ///  The `_unmatched` name is reserved and thus cannot be used as route ID.
25    ///
26    /// Each route can then be referenced as an input by other components with the name
27    ///  `<transform_name>.<name>`. If an event doesn’t match any route,
28    /// it is sent to the `<transform_name>._unmatched` output.
29    pub name: String,
30
31    /// Each condition represents a filter which is applied to each event.
32    pub condition: AnyCondition,
33}
34
35impl Hash for Route {
36    fn hash<H: Hasher>(&self, state: &mut H) {
37        self.name.hash(state);
38    }
39}
40
41impl PartialEq for Route {
42    fn eq(&self, other: &Self) -> bool {
43        self.name == other.name
44    }
45}
46
47impl Eq for Route {}
48
49/// Configuration for the `route` transform.
50#[configurable_component(transform(
51    "exclusive_route",
52    "Split a stream of events into unique sub-streams based on user-supplied conditions."
53))]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct ExclusiveRouteConfig {
57    /// An array of named routes. The route names are expected to be unique.
58    /// Routes are evaluated in order from first to last, and only the first matching route receives each event
59    /// (first-match-wins).
60    #[configurable(metadata(docs::examples = "routes_example()"))]
61    pub routes: Vec<Route>,
62}
63
64fn routes_example() -> Vec<Route> {
65    vec![
66        Route {
67            name: "foo-and-bar-exist".to_owned(),
68            condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
69                source: "exists(.foo) && exists(.bar)".to_owned(),
70                ..Default::default()
71            })),
72        },
73        Route {
74            name: "only-foo-exists".to_owned(),
75            condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
76                source: "exists(.foo)".to_owned(),
77                ..Default::default()
78            })),
79        },
80    ]
81}
82
83impl GenerateConfig for ExclusiveRouteConfig {
84    fn generate_config() -> toml::Value {
85        toml::Value::try_from(Self {
86            routes: routes_example(),
87        })
88        .unwrap()
89    }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "exclusive_route")]
94impl TransformConfig for ExclusiveRouteConfig {
95    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
96        let route = ExclusiveRoute::new(self, context)?;
97        Ok(Transform::synchronous(route))
98    }
99
100    fn input(&self) -> Input {
101        Input::all()
102    }
103
104    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
105        let mut errors = Vec::new();
106
107        let mut counts = std::collections::HashMap::new();
108        for route in &self.routes {
109            *counts.entry(route.name.clone()).or_insert(0) += 1;
110        }
111
112        let duplicates: Vec<String> = counts
113            .iter()
114            .filter(|&(_, &count)| count > 1)
115            .map(|(name, _)| name.clone())
116            .collect();
117
118        if !duplicates.is_empty() {
119            errors.push(format!("Found routes with duplicate names: {duplicates:?}"));
120        }
121
122        if self
123            .routes
124            .iter()
125            .any(|route| route.name == UNMATCHED_ROUTE)
126        {
127            errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name."));
128        }
129
130        if errors.is_empty() {
131            Ok(())
132        } else {
133            Err(errors)
134        }
135    }
136
137    fn outputs(
138        &self,
139        _: vector_lib::enrichment::TableRegistry,
140        input_definitions: &[(OutputId, schema::Definition)],
141        _: LogNamespace,
142    ) -> Vec<TransformOutput> {
143        let mut outputs: Vec<_> = self
144            .routes
145            .iter()
146            .map(|route| {
147                TransformOutput::new(
148                    DataType::all_bits(),
149                    clone_input_definitions(input_definitions),
150                )
151                .with_port(route.name.clone())
152            })
153            .collect();
154        outputs.push(
155            TransformOutput::new(
156                DataType::all_bits(),
157                clone_input_definitions(input_definitions),
158            )
159            .with_port(UNMATCHED_ROUTE),
160        );
161        outputs
162    }
163
164    fn enable_concurrency(&self) -> bool {
165        true
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use indoc::indoc;
172
173    use super::ExclusiveRouteConfig;
174
175    #[test]
176    fn generate_config() {
177        crate::test_util::test_generate_config::<ExclusiveRouteConfig>();
178    }
179
180    #[test]
181    fn can_serialize_remap() {
182        // We need to serialize the config to check if a config has
183        // changed when reloading.
184        let config = serde_yaml::from_str::<ExclusiveRouteConfig>(indoc! {r#"
185                routes:
186                    - name: a
187                      condition:
188                        type = "vrl"
189                        source = '.message == "hello world"'
190            "#})
191        .unwrap();
192
193        assert_eq!(
194            serde_json::to_string(&config).unwrap(),
195            r#"{"routes":[{"name":"a","condition":"type = \"vrl\" source = '.message == \"hello world\"'"}]}"#
196        );
197    }
198}