vector/transforms/exclusive_route/
config.rs1use std::hash::{Hash, Hasher};
2
3use vector_lib::config::clone_input_definitions;
4
5use crate::{
6 conditions::{AnyCondition, ConditionConfig, VrlConfig},
7 config::{
8 DataType, GenerateConfig, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
9 TransformOutput,
10 },
11 schema,
12 sinks::prelude::configurable_component,
13 transforms::{Transform, exclusive_route::transform::ExclusiveRoute},
14};
15
16pub(super) const UNMATCHED_ROUTE: &str = "_unmatched";
17
18#[configurable_component]
20#[derive(Clone, Debug)]
21pub struct Route {
22 pub name: String,
30
31 pub condition: AnyCondition,
33}
34
35impl Hash for Route {
36 fn hash<H: Hasher>(&self, state: &mut H) {
37 self.name.hash(state);
38 }
39}
40
41impl PartialEq for Route {
42 fn eq(&self, other: &Self) -> bool {
43 self.name == other.name
44 }
45}
46
47impl Eq for Route {}
48
49#[configurable_component(transform(
51 "exclusive_route",
52 "Split a stream of events into unique sub-streams based on user-supplied conditions."
53))]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct ExclusiveRouteConfig {
57 #[configurable(metadata(docs::examples = "routes_example()"))]
59 pub routes: Vec<Route>,
60}
61
62fn routes_example() -> Vec<Route> {
63 vec![
64 Route {
65 name: "foo-and-bar-exist".to_owned(),
66 condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
67 source: "exists(.foo) && exists(.bar)".to_owned(),
68 ..Default::default()
69 })),
70 },
71 Route {
72 name: "only-foo-exists".to_owned(),
73 condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
74 source: "exists(.foo)".to_owned(),
75 ..Default::default()
76 })),
77 },
78 ]
79}
80
81impl GenerateConfig for ExclusiveRouteConfig {
82 fn generate_config() -> toml::Value {
83 toml::Value::try_from(Self {
84 routes: routes_example(),
85 })
86 .unwrap()
87 }
88}
89
90#[async_trait::async_trait]
91#[typetag::serde(name = "exclusive_route")]
92impl TransformConfig for ExclusiveRouteConfig {
93 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
94 let route = ExclusiveRoute::new(self, context)?;
95 Ok(Transform::synchronous(route))
96 }
97
98 fn input(&self) -> Input {
99 Input::all()
100 }
101
102 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
103 let mut errors = Vec::new();
104
105 let mut counts = std::collections::HashMap::new();
106 for route in &self.routes {
107 *counts.entry(route.name.clone()).or_insert(0) += 1;
108 }
109
110 let duplicates: Vec<String> = counts
111 .iter()
112 .filter(|&(_, &count)| count > 1)
113 .map(|(name, _)| name.clone())
114 .collect();
115
116 if !duplicates.is_empty() {
117 errors.push(format!("Found routes with duplicate names: {duplicates:?}"));
118 }
119
120 if self
121 .routes
122 .iter()
123 .any(|route| route.name == UNMATCHED_ROUTE)
124 {
125 errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name."));
126 }
127
128 if errors.is_empty() {
129 Ok(())
130 } else {
131 Err(errors)
132 }
133 }
134
135 fn outputs(
136 &self,
137 _: vector_lib::enrichment::TableRegistry,
138 input_definitions: &[(OutputId, schema::Definition)],
139 _: LogNamespace,
140 ) -> Vec<TransformOutput> {
141 let mut outputs: Vec<_> = self
142 .routes
143 .iter()
144 .map(|route| {
145 TransformOutput::new(
146 DataType::all_bits(),
147 clone_input_definitions(input_definitions),
148 )
149 .with_port(route.name.clone())
150 })
151 .collect();
152 outputs.push(
153 TransformOutput::new(
154 DataType::all_bits(),
155 clone_input_definitions(input_definitions),
156 )
157 .with_port(UNMATCHED_ROUTE),
158 );
159 outputs
160 }
161
162 fn enable_concurrency(&self) -> bool {
163 true
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use indoc::indoc;
170
171 use super::ExclusiveRouteConfig;
172
173 #[test]
174 fn generate_config() {
175 crate::test_util::test_generate_config::<ExclusiveRouteConfig>();
176 }
177
178 #[test]
179 fn can_serialize_remap() {
180 let config = serde_yaml::from_str::<ExclusiveRouteConfig>(indoc! {r#"
183 routes:
184 - name: a
185 condition:
186 type = "vrl"
187 source = '.message == "hello world"'
188 "#})
189 .unwrap();
190
191 assert_eq!(
192 serde_json::to_string(&config).unwrap(),
193 r#"{"routes":[{"name":"a","condition":"type = \"vrl\" source = '.message == \"hello world\"'"}]}"#
194 );
195 }
196}