use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::time::Duration;
use indexmap::IndexMap;
use serde_with::serde_as;
use vrl::path::{parse_target_path, PathPrefix};
use vrl::prelude::{Collection, KeyString, Kind};
use vector_lib::configurable::configurable_component;
use crate::conditions::AnyCondition;
use crate::config::{
schema, DataType, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
TransformOutput,
};
use crate::schema::Definition;
use crate::transforms::reduce::merge_strategy::MergeStrategy;
use crate::transforms::{reduce::transform::Reduce, Transform};
#[serde_as]
#[configurable_component(transform(
"reduce",
"Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct ReduceConfig {
#[serde(default = "default_expire_after_ms")]
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
#[derivative(Default(value = "default_expire_after_ms()"))]
#[configurable(metadata(docs::human_name = "Expire After"))]
pub expire_after_ms: Duration,
#[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
#[derivative(Default(value = "Option::None"))]
#[configurable(metadata(docs::human_name = "End-Every Period"))]
pub end_every_period_ms: Option<Duration>,
#[serde(default = "default_flush_period_ms")]
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
#[derivative(Default(value = "default_flush_period_ms()"))]
#[configurable(metadata(docs::human_name = "Flush Period"))]
pub flush_period_ms: Duration,
pub max_events: Option<NonZeroUsize>,
#[serde(default)]
#[configurable(metadata(
docs::examples = "request_id",
docs::examples = "user_id",
docs::examples = "transaction_id",
))]
pub group_by: Vec<String>,
#[serde(default)]
#[configurable(metadata(
docs::additional_props_description = "An individual merge strategy."
))]
pub merge_strategies: IndexMap<KeyString, MergeStrategy>,
pub ends_when: Option<AnyCondition>,
pub starts_when: Option<AnyCondition>,
}
const fn default_expire_after_ms() -> Duration {
Duration::from_millis(30000)
}
const fn default_flush_period_ms() -> Duration {
Duration::from_millis(1000)
}
impl_generate_config_from_default!(ReduceConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "reduce")]
impl TransformConfig for ReduceConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Reduce::new(self, &context.enrichment_tables).map(Transform::event_task)
}
fn input(&self) -> Input {
Input::log()
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
let merged_definition: Definition = input_definitions
.iter()
.map(|(_output, definition)| definition.clone())
.reduce(Definition::merge)
.unwrap_or_else(Definition::any);
let mut schema_definition = merged_definition;
for (key, merge_strategy) in self.merge_strategies.iter() {
let key = if let Ok(key) = parse_target_path(key) {
key
} else {
continue;
};
let input_kind = match key.prefix {
PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
};
let new_kind = match merge_strategy {
MergeStrategy::Discard | MergeStrategy::Retain => {
input_kind.clone()
}
MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => {
match (input_kind.contains_integer(), input_kind.contains_float()) {
(true, true) => Kind::float().or_integer(),
(true, false) => Kind::integer(),
(false, true) => Kind::float(),
(false, false) => Kind::undefined(),
}
}
MergeStrategy::Array => {
let unknown_kind = input_kind.clone();
Kind::array(Collection::empty().with_unknown(unknown_kind))
}
MergeStrategy::Concat => {
let mut new_kind = Kind::never();
if input_kind.contains_bytes() {
new_kind.add_bytes();
}
if let Some(array) = input_kind.as_array() {
let array_elements = array.reduced_kind().union(input_kind.without_array());
new_kind.add_array(Collection::empty().with_unknown(array_elements));
}
new_kind
}
MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => {
if input_kind.contains_bytes() {
Kind::bytes()
} else {
Kind::undefined()
}
}
MergeStrategy::ShortestArray | MergeStrategy::LongestArray => {
if let Some(array) = input_kind.as_array() {
Kind::array(array.clone())
} else {
Kind::undefined()
}
}
MergeStrategy::FlatUnique => {
let mut array_elements = input_kind.without_array().without_object();
if let Some(array) = input_kind.as_array() {
array_elements = array_elements.union(array.reduced_kind());
}
if let Some(object) = input_kind.as_object() {
array_elements = array_elements.union(object.reduced_kind());
}
Kind::array(Collection::empty().with_unknown(array_elements))
}
};
let new_kind = if input_kind.contains_undefined() {
new_kind.or_undefined()
} else {
new_kind
};
schema_definition = schema_definition.with_field(&key, new_kind, None);
}
let mut output_definitions = HashMap::new();
for (output, _input) in input_definitions {
output_definitions.insert(output.clone(), schema_definition.clone());
}
vec![TransformOutput::new(DataType::Log, output_definitions)]
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<ReduceConfig>();
}
}