use indexmap::IndexMap;
use vector_lib::config::{clone_input_definitions, LogNamespace};
use vector_lib::configurable::configurable_component;
use vector_lib::transform::SyncTransform;
use crate::{
conditions::{AnyCondition, Condition, ConditionConfig, VrlConfig},
config::{
DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
TransformOutput,
},
event::Event,
schema,
transforms::Transform,
};
pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";
#[derive(Clone)]
pub struct Route {
conditions: Vec<(String, Condition)>,
reroute_unmatched: bool,
}
impl Route {
pub fn new(config: &RouteConfig, context: &TransformContext) -> crate::Result<Self> {
let mut conditions = Vec::with_capacity(config.route.len());
for (output_name, condition) in config.route.iter() {
let condition = condition.build(&context.enrichment_tables)?;
conditions.push((output_name.clone(), condition));
}
Ok(Self {
conditions,
reroute_unmatched: config.reroute_unmatched,
})
}
}
impl SyncTransform for Route {
fn transform(&mut self, event: Event, output: &mut vector_lib::transform::TransformOutputsBuf) {
let mut check_failed: usize = 0;
for (output_name, condition) in &self.conditions {
let (result, event) = condition.check(event.clone());
if result {
output.push(Some(output_name), event);
} else {
check_failed += 1;
}
}
if self.reroute_unmatched && check_failed == self.conditions.len() {
output.push(Some(UNMATCHED_ROUTE), event);
}
}
}
#[configurable_component(transform(
"route",
"Split a stream of events into multiple sub-streams based on user-supplied conditions."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct RouteConfig {
#[serde(default = "crate::serde::default_true")]
#[configurable(metadata(docs::human_name = "Reroute Unmatched Events"))]
reroute_unmatched: bool,
#[configurable(metadata(docs::additional_props_description = "An individual route."))]
#[configurable(metadata(docs::examples = "route_examples()"))]
route: IndexMap<String, AnyCondition>,
}
fn route_examples() -> IndexMap<String, AnyCondition> {
IndexMap::from([
(
"foo-exists".to_owned(),
AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
source: "exists(.foo)".to_owned(),
..Default::default()
})),
),
(
"foo-does-not-exist".to_owned(),
AnyCondition::Map(ConditionConfig::Vrl(VrlConfig {
source: "!exists(.foo)".to_owned(),
..Default::default()
})),
),
])
}
impl GenerateConfig for RouteConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
reroute_unmatched: true,
route: route_examples(),
})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "route")]
impl TransformConfig for RouteConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
let route = Route::new(self, context)?;
Ok(Transform::synchronous(route))
}
fn input(&self) -> Input {
Input::all()
}
fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
if self.route.contains_key(UNMATCHED_ROUTE) {
Err(vec![format!(
"cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
)])
} else {
Ok(())
}
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
let mut result: Vec<TransformOutput> = self
.route
.keys()
.map(|output_name| {
TransformOutput::new(
DataType::all_bits(),
clone_input_definitions(input_definitions),
)
.with_port(output_name)
})
.collect();
if self.reroute_unmatched {
result.push(
TransformOutput::new(
DataType::all_bits(),
clone_input_definitions(input_definitions),
)
.with_port(UNMATCHED_ROUTE),
);
}
result
}
fn enable_concurrency(&self) -> bool {
true
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use indoc::indoc;
use vector_lib::transform::TransformOutputsBuf;
use super::*;
use crate::{
config::{build_unit_tests, ConfigBuilder},
test_util::components::{init_test, COMPONENT_MULTIPLE_OUTPUTS_TESTS},
};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<super::RouteConfig>();
}
#[test]
fn can_serialize_remap() {
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
route.first.source = '.message == "hello world"'
"#,
)
.unwrap();
assert_eq!(
serde_json::to_string(&config).unwrap(),
r#"{"reroute_unmatched":true,"route":{"first":{"type":"vrl","source":".message == \"hello world\""}}}"#
);
}
#[test]
fn route_pass_all_route_conditions() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::from_json_value(
serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
LogNamespace::Legacy,
)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
route.first.source = '.message == "hello world"'
route.second.type = "vrl"
route.second.source = '.second == "second"'
route.third.type = "vrl"
route.third.source = '.third == "third"'
"#,
)
.unwrap();
let mut transform = Route::new(&config, &Default::default()).unwrap();
let mut outputs = TransformOutputsBuf::new_with_capacity(
output_names
.iter()
.map(|output_name| {
TransformOutput::new(DataType::all_bits(), HashMap::new())
.with_port(output_name.to_owned())
})
.collect(),
1,
);
transform.transform(event.clone(), &mut outputs);
for output_name in output_names {
let mut events: Vec<_> = outputs.drain_named(output_name).collect();
if output_name == UNMATCHED_ROUTE {
assert!(events.is_empty());
} else {
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event);
}
}
}
#[test]
fn route_pass_one_route_condition() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::from_json_value(
serde_json::json!({"message": "hello world"}),
LogNamespace::Legacy,
)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
route.first.source = '.message == "hello world"'
route.second.type = "vrl"
route.second.source = '.second == "second"'
route.third.type = "vrl"
route.third.source = '.third == "third"'
"#,
)
.unwrap();
let mut transform = Route::new(&config, &Default::default()).unwrap();
let mut outputs = TransformOutputsBuf::new_with_capacity(
output_names
.iter()
.map(|output_name| {
TransformOutput::new(DataType::all_bits(), HashMap::new())
.with_port(output_name.to_owned())
})
.collect(),
1,
);
transform.transform(event.clone(), &mut outputs);
for output_name in output_names {
let mut events: Vec<_> = outputs.drain_named(output_name).collect();
if output_name == "first" {
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event);
}
assert_eq!(events.len(), 0);
}
}
#[test]
fn route_pass_no_route_condition() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event =
Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
route.first.source = '.message == "hello world"'
route.second.type = "vrl"
route.second.source = '.second == "second"'
route.third.type = "vrl"
route.third.source = '.third == "third"'
"#,
)
.unwrap();
let mut transform = Route::new(&config, &Default::default()).unwrap();
let mut outputs = TransformOutputsBuf::new_with_capacity(
output_names
.iter()
.map(|output_name| {
TransformOutput::new(DataType::all_bits(), HashMap::new())
.with_port(output_name.to_owned())
})
.collect(),
1,
);
transform.transform(event.clone(), &mut outputs);
for output_name in output_names {
let mut events: Vec<_> = outputs.drain_named(output_name).collect();
if output_name == UNMATCHED_ROUTE {
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event);
}
assert_eq!(events.len(), 0);
}
}
#[test]
fn route_no_unmatched_output() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event =
Event::from_json_value(serde_json::json!({"message": "NOPE"}), LogNamespace::Legacy)
.unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
reroute_unmatched = false
route.first.type = "vrl"
route.first.source = '.message == "hello world"'
route.second.type = "vrl"
route.second.source = '.second == "second"'
route.third.type = "vrl"
route.third.source = '.third == "third"'
"#,
)
.unwrap();
let mut transform = Route::new(&config, &Default::default()).unwrap();
let mut outputs = TransformOutputsBuf::new_with_capacity(
output_names
.iter()
.map(|output_name| {
TransformOutput::new(DataType::all_bits(), HashMap::new())
.with_port(output_name.to_owned())
})
.collect(),
1,
);
transform.transform(event.clone(), &mut outputs);
for output_name in output_names {
let events: Vec<_> = outputs.drain_named(output_name).collect();
assert_eq!(events.len(), 0);
}
}
#[tokio::test]
async fn route_metrics_with_output_tag() {
init_test();
let config: ConfigBuilder = toml::from_str(indoc! {r#"
[transforms.foo]
inputs = []
type = "route"
[transforms.foo.route.first]
type = "is_log"
[[tests]]
name = "metric output"
[tests.input]
insert_at = "foo"
value = "none"
[[tests.outputs]]
extract_from = "foo.first"
[[tests.outputs.conditions]]
type = "vrl"
source = "true"
"#})
.unwrap();
let mut tests = build_unit_tests(config).await.unwrap();
assert!(tests.remove(0).run().await.errors.is_empty());
COMPONENT_MULTIPLE_OUTPUTS_TESTS.assert(&["output"]);
}
}