vector/transforms/reduce/
config.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::time::Duration;
4
5use indexmap::IndexMap;
6use serde_with::serde_as;
7use vrl::path::{parse_target_path, PathPrefix};
8use vrl::prelude::{Collection, KeyString, Kind};
9
10use vector_lib::configurable::configurable_component;
11
12use crate::conditions::AnyCondition;
13use crate::config::{
14    schema, DataType, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
15    TransformOutput,
16};
17use crate::schema::Definition;
18use crate::transforms::reduce::merge_strategy::MergeStrategy;
19use crate::transforms::{reduce::transform::Reduce, Transform};
20
21/// Configuration for the `reduce` transform.
22#[serde_as]
23#[configurable_component(transform(
24"reduce",
25"Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
26))]
27#[derive(Clone, Debug, Derivative)]
28#[derivative(Default)]
29#[serde(deny_unknown_fields)]
30pub struct ReduceConfig {
31    /// The maximum period of time to wait after the last event is received, in milliseconds, before
32    /// a combined event should be considered complete.
33    #[serde(default = "default_expire_after_ms")]
34    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
35    #[derivative(Default(value = "default_expire_after_ms()"))]
36    #[configurable(metadata(docs::human_name = "Expire After"))]
37    pub expire_after_ms: Duration,
38
39    /// If supplied, every time this interval elapses for a given grouping, the reduced value
40    /// for that grouping is flushed. Checked every flush_period_ms.
41    #[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
42    #[derivative(Default(value = "Option::None"))]
43    #[configurable(metadata(docs::human_name = "End-Every Period"))]
44    pub end_every_period_ms: Option<Duration>,
45
46    /// The interval to check for and flush any expired events, in milliseconds.
47    #[serde(default = "default_flush_period_ms")]
48    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
49    #[derivative(Default(value = "default_flush_period_ms()"))]
50    #[configurable(metadata(docs::human_name = "Flush Period"))]
51    pub flush_period_ms: Duration,
52
53    /// The maximum number of events to group together.
54    pub max_events: Option<NonZeroUsize>,
55
56    /// An ordered list of fields by which to group events.
57    ///
58    /// Each group with matching values for the specified keys is reduced independently, allowing
59    /// you to keep independent event streams separate. Note that each field specified, will be reduced
60    /// with the default merge strategy based on its type unless a merge strategy is explicitly defined
61    /// in `merge_strategies`.
62    ///
63    /// This field is optional and when not specified, all events are reduced in a single group.
64    ///
65    /// For example, if `group_by = ["host", "region"]`, then all incoming events that have the same
66    /// host and region are grouped together before being reduced.
67    #[serde(default)]
68    #[configurable(metadata(
69        docs::examples = "request_id",
70        docs::examples = "user_id",
71        docs::examples = "transaction_id",
72    ))]
73    pub group_by: Vec<String>,
74
75    /// A map of field names to custom merge strategies.
76    ///
77    /// For each field specified, the given strategy is used for combining events rather than
78    /// the default behavior.
79    ///
80    /// The default behavior is as follows:
81    ///
82    /// - The first value of a string field is kept and subsequent values are discarded.
83    /// - For timestamp fields the first is kept and a new field `[field-name]_end` is added with
84    ///   the last received timestamp value.
85    /// - Numeric values are summed.
86    /// - For nested paths, the field value is retrieved and then reduced using the default strategies mentioned above (unless explicitly specified otherwise).
87    #[serde(default)]
88    #[configurable(metadata(
89        docs::additional_props_description = "An individual merge strategy."
90    ))]
91    pub merge_strategies: IndexMap<KeyString, MergeStrategy>,
92
93    /// A condition used to distinguish the final event of a transaction.
94    ///
95    /// If this condition resolves to `true` for an event, the current transaction is immediately
96    /// flushed with this event.
97    pub ends_when: Option<AnyCondition>,
98
99    /// A condition used to distinguish the first event of a transaction.
100    ///
101    /// If this condition resolves to `true` for an event, the previous transaction is flushed
102    /// (without this event) and a new transaction is started.
103    pub starts_when: Option<AnyCondition>,
104}
105
106const fn default_expire_after_ms() -> Duration {
107    Duration::from_millis(30000)
108}
109
110const fn default_flush_period_ms() -> Duration {
111    Duration::from_millis(1000)
112}
113
114impl_generate_config_from_default!(ReduceConfig);
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "reduce")]
118impl TransformConfig for ReduceConfig {
119    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
120        Reduce::new(self, &context.enrichment_tables).map(Transform::event_task)
121    }
122
123    fn input(&self) -> Input {
124        Input::log()
125    }
126
127    fn outputs(
128        &self,
129        _: vector_lib::enrichment::TableRegistry,
130        input_definitions: &[(OutputId, schema::Definition)],
131        _: LogNamespace,
132    ) -> Vec<TransformOutput> {
133        // Events may be combined, so there isn't a true single "source" for events.
134        // All of the definitions must be merged.
135        let merged_definition: Definition = input_definitions
136            .iter()
137            .map(|(_output, definition)| definition.clone())
138            .reduce(Definition::merge)
139            .unwrap_or_else(Definition::any);
140
141        let mut schema_definition = merged_definition;
142
143        for (key, merge_strategy) in self.merge_strategies.iter() {
144            let key = if let Ok(key) = parse_target_path(key) {
145                key
146            } else {
147                continue;
148            };
149
150            let input_kind = match key.prefix {
151                PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
152                PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
153            };
154
155            let new_kind = match merge_strategy {
156                MergeStrategy::Discard | MergeStrategy::Retain => {
157                    /* does not change the type */
158                    input_kind.clone()
159                }
160                MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => {
161                    // only keeps integer / float values
162                    match (input_kind.contains_integer(), input_kind.contains_float()) {
163                        (true, true) => Kind::float().or_integer(),
164                        (true, false) => Kind::integer(),
165                        (false, true) => Kind::float(),
166                        (false, false) => Kind::undefined(),
167                    }
168                }
169                MergeStrategy::Array => {
170                    let unknown_kind = input_kind.clone();
171                    Kind::array(Collection::empty().with_unknown(unknown_kind))
172                }
173                MergeStrategy::Concat => {
174                    let mut new_kind = Kind::never();
175
176                    if input_kind.contains_bytes() {
177                        new_kind.add_bytes();
178                    }
179                    if let Some(array) = input_kind.as_array() {
180                        // array elements can be either any type that the field can be, or any
181                        // element of the array
182                        let array_elements = array.reduced_kind().union(input_kind.without_array());
183                        new_kind.add_array(Collection::empty().with_unknown(array_elements));
184                    }
185                    new_kind
186                }
187                MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => {
188                    // can only produce bytes (or undefined)
189                    if input_kind.contains_bytes() {
190                        Kind::bytes()
191                    } else {
192                        Kind::undefined()
193                    }
194                }
195                MergeStrategy::ShortestArray | MergeStrategy::LongestArray => {
196                    if let Some(array) = input_kind.as_array() {
197                        Kind::array(array.clone())
198                    } else {
199                        Kind::undefined()
200                    }
201                }
202                MergeStrategy::FlatUnique => {
203                    let mut array_elements = input_kind.without_array().without_object();
204                    if let Some(array) = input_kind.as_array() {
205                        array_elements = array_elements.union(array.reduced_kind());
206                    }
207                    if let Some(object) = input_kind.as_object() {
208                        array_elements = array_elements.union(object.reduced_kind());
209                    }
210                    Kind::array(Collection::empty().with_unknown(array_elements))
211                }
212            };
213
214            // all of the merge strategies are optional. They won't produce a value unless a value actually exists
215            let new_kind = if input_kind.contains_undefined() {
216                new_kind.or_undefined()
217            } else {
218                new_kind
219            };
220
221            schema_definition = schema_definition.with_field(&key, new_kind, None);
222        }
223
224        // the same schema definition is used for all inputs
225        let mut output_definitions = HashMap::new();
226        for (output, _input) in input_definitions {
227            output_definitions.insert(output.clone(), schema_definition.clone());
228        }
229
230        vec![TransformOutput::new(DataType::Log, output_definitions)]
231    }
232}
233
234#[cfg(test)]
235mod test {
236    use super::*;
237
238    #[test]
239    fn generate_config() {
240        crate::test_util::test_generate_config::<ReduceConfig>();
241    }
242}