vector/transforms/reduce/
config.rs1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::time::Duration;
4
5use indexmap::IndexMap;
6use serde_with::serde_as;
7use vrl::path::{parse_target_path, PathPrefix};
8use vrl::prelude::{Collection, KeyString, Kind};
9
10use vector_lib::configurable::configurable_component;
11
12use crate::conditions::AnyCondition;
13use crate::config::{
14 schema, DataType, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
15 TransformOutput,
16};
17use crate::schema::Definition;
18use crate::transforms::reduce::merge_strategy::MergeStrategy;
19use crate::transforms::{reduce::transform::Reduce, Transform};
20
21#[serde_as]
23#[configurable_component(transform(
24"reduce",
25"Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
26))]
27#[derive(Clone, Debug, Derivative)]
28#[derivative(Default)]
29#[serde(deny_unknown_fields)]
30pub struct ReduceConfig {
31 #[serde(default = "default_expire_after_ms")]
34 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
35 #[derivative(Default(value = "default_expire_after_ms()"))]
36 #[configurable(metadata(docs::human_name = "Expire After"))]
37 pub expire_after_ms: Duration,
38
39 #[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
42 #[derivative(Default(value = "Option::None"))]
43 #[configurable(metadata(docs::human_name = "End-Every Period"))]
44 pub end_every_period_ms: Option<Duration>,
45
46 #[serde(default = "default_flush_period_ms")]
48 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
49 #[derivative(Default(value = "default_flush_period_ms()"))]
50 #[configurable(metadata(docs::human_name = "Flush Period"))]
51 pub flush_period_ms: Duration,
52
53 pub max_events: Option<NonZeroUsize>,
55
56 #[serde(default)]
68 #[configurable(metadata(
69 docs::examples = "request_id",
70 docs::examples = "user_id",
71 docs::examples = "transaction_id",
72 ))]
73 pub group_by: Vec<String>,
74
75 #[serde(default)]
88 #[configurable(metadata(
89 docs::additional_props_description = "An individual merge strategy."
90 ))]
91 pub merge_strategies: IndexMap<KeyString, MergeStrategy>,
92
93 pub ends_when: Option<AnyCondition>,
98
99 pub starts_when: Option<AnyCondition>,
104}
105
106const fn default_expire_after_ms() -> Duration {
107 Duration::from_millis(30000)
108}
109
110const fn default_flush_period_ms() -> Duration {
111 Duration::from_millis(1000)
112}
113
114impl_generate_config_from_default!(ReduceConfig);
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "reduce")]
118impl TransformConfig for ReduceConfig {
119 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
120 Reduce::new(self, &context.enrichment_tables).map(Transform::event_task)
121 }
122
123 fn input(&self) -> Input {
124 Input::log()
125 }
126
127 fn outputs(
128 &self,
129 _: vector_lib::enrichment::TableRegistry,
130 input_definitions: &[(OutputId, schema::Definition)],
131 _: LogNamespace,
132 ) -> Vec<TransformOutput> {
133 let merged_definition: Definition = input_definitions
136 .iter()
137 .map(|(_output, definition)| definition.clone())
138 .reduce(Definition::merge)
139 .unwrap_or_else(Definition::any);
140
141 let mut schema_definition = merged_definition;
142
143 for (key, merge_strategy) in self.merge_strategies.iter() {
144 let key = if let Ok(key) = parse_target_path(key) {
145 key
146 } else {
147 continue;
148 };
149
150 let input_kind = match key.prefix {
151 PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
152 PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
153 };
154
155 let new_kind = match merge_strategy {
156 MergeStrategy::Discard | MergeStrategy::Retain => {
157 input_kind.clone()
159 }
160 MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => {
161 match (input_kind.contains_integer(), input_kind.contains_float()) {
163 (true, true) => Kind::float().or_integer(),
164 (true, false) => Kind::integer(),
165 (false, true) => Kind::float(),
166 (false, false) => Kind::undefined(),
167 }
168 }
169 MergeStrategy::Array => {
170 let unknown_kind = input_kind.clone();
171 Kind::array(Collection::empty().with_unknown(unknown_kind))
172 }
173 MergeStrategy::Concat => {
174 let mut new_kind = Kind::never();
175
176 if input_kind.contains_bytes() {
177 new_kind.add_bytes();
178 }
179 if let Some(array) = input_kind.as_array() {
180 let array_elements = array.reduced_kind().union(input_kind.without_array());
183 new_kind.add_array(Collection::empty().with_unknown(array_elements));
184 }
185 new_kind
186 }
187 MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => {
188 if input_kind.contains_bytes() {
190 Kind::bytes()
191 } else {
192 Kind::undefined()
193 }
194 }
195 MergeStrategy::ShortestArray | MergeStrategy::LongestArray => {
196 if let Some(array) = input_kind.as_array() {
197 Kind::array(array.clone())
198 } else {
199 Kind::undefined()
200 }
201 }
202 MergeStrategy::FlatUnique => {
203 let mut array_elements = input_kind.without_array().without_object();
204 if let Some(array) = input_kind.as_array() {
205 array_elements = array_elements.union(array.reduced_kind());
206 }
207 if let Some(object) = input_kind.as_object() {
208 array_elements = array_elements.union(object.reduced_kind());
209 }
210 Kind::array(Collection::empty().with_unknown(array_elements))
211 }
212 };
213
214 let new_kind = if input_kind.contains_undefined() {
216 new_kind.or_undefined()
217 } else {
218 new_kind
219 };
220
221 schema_definition = schema_definition.with_field(&key, new_kind, None);
222 }
223
224 let mut output_definitions = HashMap::new();
226 for (output, _input) in input_definitions {
227 output_definitions.insert(output.clone(), schema_definition.clone());
228 }
229
230 vec![TransformOutput::new(DataType::Log, output_definitions)]
231 }
232}
233
234#[cfg(test)]
235mod test {
236 use super::*;
237
238 #[test]
239 fn generate_config() {
240 crate::test_util::test_generate_config::<ReduceConfig>();
241 }
242}