vector/transforms/exclusive_route/
config.rs

1use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig};
2use crate::config::{
3    DataType, GenerateConfig, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
4    TransformOutput,
5};
6use crate::schema;
7use crate::sinks::prelude::configurable_component;
8use crate::transforms::exclusive_route::transform::ExclusiveRoute;
9use crate::transforms::Transform;
10use std::hash::{Hash, Hasher};
11use vector_lib::config::clone_input_definitions;
12
13pub(super) const UNMATCHED_ROUTE: &str = "_unmatched";
14
15/// Individual route configuration.
16#[configurable_component]
17#[derive(Clone, Debug)]
18pub struct Route {
19    /// The name of the route is also the name of the transform port.
20    ///
21    ///  The `_unmatched` name is reserved and thus cannot be used as route ID.
22    ///
23    /// Each route can then be referenced as an input by other components with the name
24    ///  `<transform_name>.<name>`. If an event doesn’t match any route,
25    /// it is sent to the `<transform_name>._unmatched` output.
26    pub name: String,
27
28    /// Each condition represents a filter which is applied to each event.
29    pub condition: AnyCondition,
30}
31
32impl Hash for Route {
33    fn hash<H: Hasher>(&self, state: &mut H) {
34        self.name.hash(state);
35    }
36}
37
38impl PartialEq for Route {
39    fn eq(&self, other: &Self) -> bool {
40        self.name == other.name
41    }
42}
43
44impl Eq for Route {}
45
46/// Configuration for the `route` transform.
47#[configurable_component(transform(
48    "exclusive_route",
49    "Split a stream of events into unique sub-streams based on user-supplied conditions."
50))]
51#[derive(Clone, Debug)]
52#[serde(deny_unknown_fields)]
53pub struct ExclusiveRouteConfig {
54    /// An array of named routes. The route names are expected to be unique.
55    #[configurable(metadata(docs::examples = "routes_example()"))]
56    pub routes: Vec<Route>,
57}
58
59fn routes_example() -> Vec<Route> {
60    vec![
61        Route {
62            name: "foo-and-bar-exist".to_owned(),
63            condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
64                source: "exists(.foo) && exists(.bar)".to_owned(),
65                ..Default::default()
66            })),
67        },
68        Route {
69            name: "only-foo-exists".to_owned(),
70            condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
71                source: "exists(.foo)".to_owned(),
72                ..Default::default()
73            })),
74        },
75    ]
76}
77
78impl GenerateConfig for ExclusiveRouteConfig {
79    fn generate_config() -> toml::Value {
80        toml::Value::try_from(Self {
81            routes: routes_example(),
82        })
83        .unwrap()
84    }
85}
86
87#[async_trait::async_trait]
88#[typetag::serde(name = "exclusive_route")]
89impl TransformConfig for ExclusiveRouteConfig {
90    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
91        let route = ExclusiveRoute::new(self, context)?;
92        Ok(Transform::synchronous(route))
93    }
94
95    fn input(&self) -> Input {
96        Input::all()
97    }
98
99    fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
100        let mut errors = Vec::new();
101
102        let mut counts = std::collections::HashMap::new();
103        for route in &self.routes {
104            *counts.entry(route.name.clone()).or_insert(0) += 1;
105        }
106
107        let duplicates: Vec<String> = counts
108            .iter()
109            .filter(|&(_, &count)| count > 1)
110            .map(|(name, _)| name.clone())
111            .collect();
112
113        if !duplicates.is_empty() {
114            errors.push(format!("Found routes with duplicate names: {duplicates:?}"));
115        }
116
117        if self
118            .routes
119            .iter()
120            .any(|route| route.name == UNMATCHED_ROUTE)
121        {
122            errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name."));
123        }
124
125        if errors.is_empty() {
126            Ok(())
127        } else {
128            Err(errors)
129        }
130    }
131
132    fn outputs(
133        &self,
134        _: vector_lib::enrichment::TableRegistry,
135        input_definitions: &[(OutputId, schema::Definition)],
136        _: LogNamespace,
137    ) -> Vec<TransformOutput> {
138        let mut outputs: Vec<_> = self
139            .routes
140            .iter()
141            .map(|route| {
142                TransformOutput::new(
143                    DataType::all_bits(),
144                    clone_input_definitions(input_definitions),
145                )
146                .with_port(route.name.clone())
147            })
148            .collect();
149        outputs.push(
150            TransformOutput::new(
151                DataType::all_bits(),
152                clone_input_definitions(input_definitions),
153            )
154            .with_port(UNMATCHED_ROUTE),
155        );
156        outputs
157    }
158
159    fn enable_concurrency(&self) -> bool {
160        true
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::ExclusiveRouteConfig;
167    use indoc::indoc;
168
169    #[test]
170    fn generate_config() {
171        crate::test_util::test_generate_config::<ExclusiveRouteConfig>();
172    }
173
174    #[test]
175    fn can_serialize_remap() {
176        // We need to serialize the config to check if a config has
177        // changed when reloading.
178        let config = serde_yaml::from_str::<ExclusiveRouteConfig>(indoc! {r#"
179                routes:
180                    - name: a
181                      condition:
182                        type = "vrl"
183                        source = '.message == "hello world"'
184            "#})
185        .unwrap();
186
187        assert_eq!(
188            serde_json::to_string(&config).unwrap(),
189            r#"{"routes":[{"name":"a","condition":"type = \"vrl\" source = '.message == \"hello world\"'"}]}"#
190        );
191    }
192}