vector/transforms/exclusive_route/
config.rs1use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig};
2use crate::config::{
3 DataType, GenerateConfig, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
4 TransformOutput,
5};
6use crate::schema;
7use crate::sinks::prelude::configurable_component;
8use crate::transforms::exclusive_route::transform::ExclusiveRoute;
9use crate::transforms::Transform;
10use std::hash::{Hash, Hasher};
11use vector_lib::config::clone_input_definitions;
12
13pub(super) const UNMATCHED_ROUTE: &str = "_unmatched";
14
15#[configurable_component]
17#[derive(Clone, Debug)]
18pub struct Route {
19 pub name: String,
27
28 pub condition: AnyCondition,
30}
31
32impl Hash for Route {
33 fn hash<H: Hasher>(&self, state: &mut H) {
34 self.name.hash(state);
35 }
36}
37
38impl PartialEq for Route {
39 fn eq(&self, other: &Self) -> bool {
40 self.name == other.name
41 }
42}
43
44impl Eq for Route {}
45
46#[configurable_component(transform(
48 "exclusive_route",
49 "Split a stream of events into unique sub-streams based on user-supplied conditions."
50))]
51#[derive(Clone, Debug)]
52#[serde(deny_unknown_fields)]
53pub struct ExclusiveRouteConfig {
54 #[configurable(metadata(docs::examples = "routes_example()"))]
56 pub routes: Vec<Route>,
57}
58
59fn routes_example() -> Vec<Route> {
60 vec![
61 Route {
62 name: "foo-and-bar-exist".to_owned(),
63 condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
64 source: "exists(.foo) && exists(.bar)".to_owned(),
65 ..Default::default()
66 })),
67 },
68 Route {
69 name: "only-foo-exists".to_owned(),
70 condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
71 source: "exists(.foo)".to_owned(),
72 ..Default::default()
73 })),
74 },
75 ]
76}
77
78impl GenerateConfig for ExclusiveRouteConfig {
79 fn generate_config() -> toml::Value {
80 toml::Value::try_from(Self {
81 routes: routes_example(),
82 })
83 .unwrap()
84 }
85}
86
87#[async_trait::async_trait]
88#[typetag::serde(name = "exclusive_route")]
89impl TransformConfig for ExclusiveRouteConfig {
90 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
91 let route = ExclusiveRoute::new(self, context)?;
92 Ok(Transform::synchronous(route))
93 }
94
95 fn input(&self) -> Input {
96 Input::all()
97 }
98
99 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
100 let mut errors = Vec::new();
101
102 let mut counts = std::collections::HashMap::new();
103 for route in &self.routes {
104 *counts.entry(route.name.clone()).or_insert(0) += 1;
105 }
106
107 let duplicates: Vec<String> = counts
108 .iter()
109 .filter(|&(_, &count)| count > 1)
110 .map(|(name, _)| name.clone())
111 .collect();
112
113 if !duplicates.is_empty() {
114 errors.push(format!("Found routes with duplicate names: {duplicates:?}"));
115 }
116
117 if self
118 .routes
119 .iter()
120 .any(|route| route.name == UNMATCHED_ROUTE)
121 {
122 errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name."));
123 }
124
125 if errors.is_empty() {
126 Ok(())
127 } else {
128 Err(errors)
129 }
130 }
131
132 fn outputs(
133 &self,
134 _: vector_lib::enrichment::TableRegistry,
135 input_definitions: &[(OutputId, schema::Definition)],
136 _: LogNamespace,
137 ) -> Vec<TransformOutput> {
138 let mut outputs: Vec<_> = self
139 .routes
140 .iter()
141 .map(|route| {
142 TransformOutput::new(
143 DataType::all_bits(),
144 clone_input_definitions(input_definitions),
145 )
146 .with_port(route.name.clone())
147 })
148 .collect();
149 outputs.push(
150 TransformOutput::new(
151 DataType::all_bits(),
152 clone_input_definitions(input_definitions),
153 )
154 .with_port(UNMATCHED_ROUTE),
155 );
156 outputs
157 }
158
159 fn enable_concurrency(&self) -> bool {
160 true
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::ExclusiveRouteConfig;
167 use indoc::indoc;
168
169 #[test]
170 fn generate_config() {
171 crate::test_util::test_generate_config::<ExclusiveRouteConfig>();
172 }
173
174 #[test]
175 fn can_serialize_remap() {
176 let config = serde_yaml::from_str::<ExclusiveRouteConfig>(indoc! {r#"
179 routes:
180 - name: a
181 condition:
182 type = "vrl"
183 source = '.message == "hello world"'
184 "#})
185 .unwrap();
186
187 assert_eq!(
188 serde_json::to_string(&config).unwrap(),
189 r#"{"routes":[{"name":"a","condition":"type = \"vrl\" source = '.message == \"hello world\"'"}]}"#
190 );
191 }
192}