vector/transforms/reduce/
config.rs1use std::{collections::HashMap, num::NonZeroUsize, time::Duration};
2
3use indexmap::IndexMap;
4use serde_with::serde_as;
5use vector_lib::configurable::configurable_component;
6use vrl::{
7 path::{PathPrefix, parse_target_path},
8 prelude::{Collection, KeyString, Kind},
9};
10
11use crate::{
12 conditions::AnyCondition,
13 config::{
14 DataType, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
15 TransformOutput, schema,
16 },
17 schema::Definition,
18 transforms::{
19 Transform,
20 reduce::{merge_strategy::MergeStrategy, transform::Reduce},
21 },
22};
23
24#[serde_as]
26#[configurable_component(transform(
27 "reduce",
28 "Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
29))]
30#[derive(Clone, Debug, Derivative)]
31#[derivative(Default)]
32#[serde(deny_unknown_fields)]
33pub struct ReduceConfig {
34 #[serde(default = "default_expire_after_ms")]
37 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
38 #[derivative(Default(value = "default_expire_after_ms()"))]
39 #[configurable(metadata(docs::human_name = "Expire After"))]
40 pub expire_after_ms: Duration,
41
42 #[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
45 #[derivative(Default(value = "Option::None"))]
46 #[configurable(metadata(docs::human_name = "End-Every Period"))]
47 pub end_every_period_ms: Option<Duration>,
48
49 #[serde(default = "default_flush_period_ms")]
51 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
52 #[derivative(Default(value = "default_flush_period_ms()"))]
53 #[configurable(metadata(docs::human_name = "Flush Period"))]
54 pub flush_period_ms: Duration,
55
56 pub max_events: Option<NonZeroUsize>,
58
59 #[serde(default)]
71 #[configurable(metadata(
72 docs::examples = "request_id",
73 docs::examples = "user_id",
74 docs::examples = "transaction_id",
75 ))]
76 pub group_by: Vec<String>,
77
78 #[serde(default)]
91 #[configurable(metadata(
92 docs::additional_props_description = "An individual merge strategy."
93 ))]
94 pub merge_strategies: IndexMap<KeyString, MergeStrategy>,
95
96 pub ends_when: Option<AnyCondition>,
101
102 pub starts_when: Option<AnyCondition>,
107}
108
109const fn default_expire_after_ms() -> Duration {
110 Duration::from_millis(30000)
111}
112
113const fn default_flush_period_ms() -> Duration {
114 Duration::from_millis(1000)
115}
116
117impl_generate_config_from_default!(ReduceConfig);
118
119#[async_trait::async_trait]
120#[typetag::serde(name = "reduce")]
121impl TransformConfig for ReduceConfig {
122 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
123 Reduce::new(self, &context.enrichment_tables).map(Transform::event_task)
124 }
125
126 fn input(&self) -> Input {
127 Input::log()
128 }
129
130 fn outputs(
131 &self,
132 _: vector_lib::enrichment::TableRegistry,
133 input_definitions: &[(OutputId, schema::Definition)],
134 _: LogNamespace,
135 ) -> Vec<TransformOutput> {
136 let merged_definition: Definition = input_definitions
139 .iter()
140 .map(|(_output, definition)| definition.clone())
141 .reduce(Definition::merge)
142 .unwrap_or_else(Definition::any);
143
144 let mut schema_definition = merged_definition;
145
146 for (key, merge_strategy) in self.merge_strategies.iter() {
147 let key = if let Ok(key) = parse_target_path(key) {
148 key
149 } else {
150 continue;
151 };
152
153 let input_kind = match key.prefix {
154 PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
155 PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
156 };
157
158 let new_kind = match merge_strategy {
159 MergeStrategy::Discard | MergeStrategy::Retain => {
160 input_kind.clone()
162 }
163 MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => {
164 match (input_kind.contains_integer(), input_kind.contains_float()) {
166 (true, true) => Kind::float().or_integer(),
167 (true, false) => Kind::integer(),
168 (false, true) => Kind::float(),
169 (false, false) => Kind::undefined(),
170 }
171 }
172 MergeStrategy::Array => {
173 let unknown_kind = input_kind.clone();
174 Kind::array(Collection::empty().with_unknown(unknown_kind))
175 }
176 MergeStrategy::Concat => {
177 let mut new_kind = Kind::never();
178
179 if input_kind.contains_bytes() {
180 new_kind.add_bytes();
181 }
182 if let Some(array) = input_kind.as_array() {
183 let array_elements = array.reduced_kind().union(input_kind.without_array());
186 new_kind.add_array(Collection::empty().with_unknown(array_elements));
187 }
188 new_kind
189 }
190 MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => {
191 if input_kind.contains_bytes() {
193 Kind::bytes()
194 } else {
195 Kind::undefined()
196 }
197 }
198 MergeStrategy::ShortestArray | MergeStrategy::LongestArray => {
199 if let Some(array) = input_kind.as_array() {
200 Kind::array(array.clone())
201 } else {
202 Kind::undefined()
203 }
204 }
205 MergeStrategy::FlatUnique => {
206 let mut array_elements = input_kind.without_array().without_object();
207 if let Some(array) = input_kind.as_array() {
208 array_elements = array_elements.union(array.reduced_kind());
209 }
210 if let Some(object) = input_kind.as_object() {
211 array_elements = array_elements.union(object.reduced_kind());
212 }
213 Kind::array(Collection::empty().with_unknown(array_elements))
214 }
215 };
216
217 let new_kind = if input_kind.contains_undefined() {
219 new_kind.or_undefined()
220 } else {
221 new_kind
222 };
223
224 schema_definition = schema_definition.with_field(&key, new_kind, None);
225 }
226
227 let mut output_definitions = HashMap::new();
229 for (output, _input) in input_definitions {
230 output_definitions.insert(output.clone(), schema_definition.clone());
231 }
232
233 vec![TransformOutput::new(DataType::Log, output_definitions)]
234 }
235}
236
237#[cfg(test)]
238mod test {
239 use super::*;
240
241 #[test]
242 fn generate_config() {
243 crate::test_util::test_generate_config::<ReduceConfig>();
244 }
245}