vector/transforms/exclusive_route/
config.rs1use std::hash::{Hash, Hasher};
2
3use vector_lib::config::clone_input_definitions;
4
5use crate::{
6 conditions::{AnyCondition, ConditionConfig, VrlConfig},
7 config::{
8 DataType, GenerateConfig, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
9 TransformOutput,
10 },
11 schema,
12 sinks::prelude::configurable_component,
13 transforms::{Transform, exclusive_route::transform::ExclusiveRoute},
14};
15
16pub(super) const UNMATCHED_ROUTE: &str = "_unmatched";
17
18#[configurable_component]
20#[derive(Clone, Debug)]
21pub struct Route {
22 pub name: String,
30
31 pub condition: AnyCondition,
33}
34
35impl Hash for Route {
36 fn hash<H: Hasher>(&self, state: &mut H) {
37 self.name.hash(state);
38 }
39}
40
41impl PartialEq for Route {
42 fn eq(&self, other: &Self) -> bool {
43 self.name == other.name
44 }
45}
46
47impl Eq for Route {}
48
49#[configurable_component(transform(
51 "exclusive_route",
52 "Split a stream of events into unique sub-streams based on user-supplied conditions."
53))]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct ExclusiveRouteConfig {
57 #[configurable(metadata(docs::examples = "routes_example()"))]
61 pub routes: Vec<Route>,
62}
63
64fn routes_example() -> Vec<Route> {
65 vec![
66 Route {
67 name: "foo-and-bar-exist".to_owned(),
68 condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
69 source: "exists(.foo) && exists(.bar)".to_owned(),
70 ..Default::default()
71 })),
72 },
73 Route {
74 name: "only-foo-exists".to_owned(),
75 condition: AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
76 source: "exists(.foo)".to_owned(),
77 ..Default::default()
78 })),
79 },
80 ]
81}
82
83impl GenerateConfig for ExclusiveRouteConfig {
84 fn generate_config() -> toml::Value {
85 toml::Value::try_from(Self {
86 routes: routes_example(),
87 })
88 .unwrap()
89 }
90}
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "exclusive_route")]
94impl TransformConfig for ExclusiveRouteConfig {
95 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
96 let route = ExclusiveRoute::new(self, context)?;
97 Ok(Transform::synchronous(route))
98 }
99
100 fn input(&self) -> Input {
101 Input::all()
102 }
103
104 fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
105 let mut errors = Vec::new();
106
107 let mut counts = std::collections::HashMap::new();
108 for route in &self.routes {
109 *counts.entry(route.name.clone()).or_insert(0) += 1;
110 }
111
112 let duplicates: Vec<String> = counts
113 .iter()
114 .filter(|&(_, &count)| count > 1)
115 .map(|(name, _)| name.clone())
116 .collect();
117
118 if !duplicates.is_empty() {
119 errors.push(format!("Found routes with duplicate names: {duplicates:?}"));
120 }
121
122 if self
123 .routes
124 .iter()
125 .any(|route| route.name == UNMATCHED_ROUTE)
126 {
127 errors.push(format!("Using reserved '{UNMATCHED_ROUTE}' name."));
128 }
129
130 if errors.is_empty() {
131 Ok(())
132 } else {
133 Err(errors)
134 }
135 }
136
137 fn outputs(
138 &self,
139 _: vector_lib::enrichment::TableRegistry,
140 input_definitions: &[(OutputId, schema::Definition)],
141 _: LogNamespace,
142 ) -> Vec<TransformOutput> {
143 let mut outputs: Vec<_> = self
144 .routes
145 .iter()
146 .map(|route| {
147 TransformOutput::new(
148 DataType::all_bits(),
149 clone_input_definitions(input_definitions),
150 )
151 .with_port(route.name.clone())
152 })
153 .collect();
154 outputs.push(
155 TransformOutput::new(
156 DataType::all_bits(),
157 clone_input_definitions(input_definitions),
158 )
159 .with_port(UNMATCHED_ROUTE),
160 );
161 outputs
162 }
163
164 fn enable_concurrency(&self) -> bool {
165 true
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use indoc::indoc;
172
173 use super::ExclusiveRouteConfig;
174
175 #[test]
176 fn generate_config() {
177 crate::test_util::test_generate_config::<ExclusiveRouteConfig>();
178 }
179
180 #[test]
181 fn can_serialize_remap() {
182 let config = serde_yaml::from_str::<ExclusiveRouteConfig>(indoc! {r#"
185 routes:
186 - name: a
187 condition:
188 type = "vrl"
189 source = '.message == "hello world"'
190 "#})
191 .unwrap();
192
193 assert_eq!(
194 serde_json::to_string(&config).unwrap(),
195 r#"{"routes":[{"name":"a","condition":"type = \"vrl\" source = '.message == \"hello world\"'"}]}"#
196 );
197 }
198}