vector/transforms/reduce/
config.rs

1use std::{collections::HashMap, num::NonZeroUsize, time::Duration};
2
3use indexmap::IndexMap;
4use serde_with::serde_as;
5use vector_lib::configurable::configurable_component;
6use vrl::{
7    path::{PathPrefix, parse_target_path},
8    prelude::{Collection, KeyString, Kind},
9};
10
11use crate::{
12    conditions::AnyCondition,
13    config::{
14        DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput, schema,
15    },
16    schema::Definition,
17    transforms::{
18        Transform,
19        reduce::{merge_strategy::MergeStrategy, transform::Reduce},
20    },
21};
22
23/// Configuration for the `reduce` transform.
24#[serde_as]
25#[configurable_component(transform(
26    "reduce",
27    "Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
28))]
29#[derive(Clone, Debug, Derivative)]
30#[derivative(Default)]
31#[serde(deny_unknown_fields)]
32pub struct ReduceConfig {
33    /// The maximum period of time to wait after the last event is received, in milliseconds, before
34    /// a combined event should be considered complete.
35    #[serde(default = "default_expire_after_ms")]
36    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
37    #[derivative(Default(value = "default_expire_after_ms()"))]
38    #[configurable(metadata(docs::human_name = "Expire After"))]
39    pub expire_after_ms: Duration,
40
41    /// If supplied, every time this interval elapses for a given grouping, the reduced value
42    /// for that grouping is flushed. Checked every flush_period_ms.
43    #[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
44    #[derivative(Default(value = "Option::None"))]
45    #[configurable(metadata(docs::human_name = "End-Every Period"))]
46    pub end_every_period_ms: Option<Duration>,
47
48    /// The interval to check for and flush any expired events, in milliseconds.
49    #[serde(default = "default_flush_period_ms")]
50    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
51    #[derivative(Default(value = "default_flush_period_ms()"))]
52    #[configurable(metadata(docs::human_name = "Flush Period"))]
53    pub flush_period_ms: Duration,
54
55    /// The maximum number of events to group together.
56    pub max_events: Option<NonZeroUsize>,
57
58    /// An ordered list of fields by which to group events.
59    ///
60    /// Each group with matching values for the specified keys is reduced independently, allowing
61    /// you to keep independent event streams separate. Note that each field specified, will be reduced
62    /// with the default merge strategy based on its type unless a merge strategy is explicitly defined
63    /// in `merge_strategies`.
64    ///
65    /// This field is optional and when not specified, all events are reduced in a single group.
66    ///
67    /// For example, if `group_by = ["host", "region"]`, then all incoming events that have the same
68    /// host and region are grouped together before being reduced.
69    #[serde(default)]
70    #[configurable(metadata(
71        docs::examples = "request_id",
72        docs::examples = "user_id",
73        docs::examples = "transaction_id",
74    ))]
75    pub group_by: Vec<String>,
76
77    /// A map of field names to custom merge strategies.
78    ///
79    /// For each field specified, the given strategy is used for combining events rather than
80    /// the default behavior.
81    ///
82    /// The default behavior is as follows:
83    ///
84    /// - The first value of a string field is kept and subsequent values are discarded.
85    /// - For timestamp fields the first is kept and a new field `[field-name]_end` is added with
86    ///   the last received timestamp value.
87    /// - Numeric values are summed.
88    /// - For nested paths, the field value is retrieved and then reduced using the default strategies mentioned above (unless explicitly specified otherwise).
89    #[serde(default)]
90    #[configurable(metadata(
91        docs::additional_props_description = "An individual merge strategy."
92    ))]
93    pub merge_strategies: IndexMap<KeyString, MergeStrategy>,
94
95    /// A condition used to distinguish the final event of a transaction.
96    ///
97    /// If this condition resolves to `true` for an event, the current transaction is immediately
98    /// flushed with this event.
99    pub ends_when: Option<AnyCondition>,
100
101    /// A condition used to distinguish the first event of a transaction.
102    ///
103    /// If this condition resolves to `true` for an event, the previous transaction is flushed
104    /// (without this event) and a new transaction is started.
105    pub starts_when: Option<AnyCondition>,
106}
107
108const fn default_expire_after_ms() -> Duration {
109    Duration::from_millis(30000)
110}
111
112const fn default_flush_period_ms() -> Duration {
113    Duration::from_millis(1000)
114}
115
116impl_generate_config_from_default!(ReduceConfig);
117
118#[async_trait::async_trait]
119#[typetag::serde(name = "reduce")]
120impl TransformConfig for ReduceConfig {
121    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
122        Reduce::new(self, &context.enrichment_tables, &context.metrics_storage)
123            .map(Transform::event_task)
124    }
125
126    fn input(&self) -> Input {
127        Input::log()
128    }
129
130    fn outputs(
131        &self,
132        _: &TransformContext,
133        input_definitions: &[(OutputId, schema::Definition)],
134    ) -> Vec<TransformOutput> {
135        // Events may be combined, so there isn't a true single "source" for events.
136        // All of the definitions must be merged.
137        let merged_definition: Definition = input_definitions
138            .iter()
139            .map(|(_output, definition)| definition.clone())
140            .reduce(Definition::merge)
141            .unwrap_or_else(Definition::any);
142
143        let mut schema_definition = merged_definition;
144
145        for (key, merge_strategy) in self.merge_strategies.iter() {
146            let key = if let Ok(key) = parse_target_path(key) {
147                key
148            } else {
149                continue;
150            };
151
152            let input_kind = match key.prefix {
153                PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
154                PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
155            };
156
157            let new_kind = match merge_strategy {
158                MergeStrategy::Discard | MergeStrategy::Retain => {
159                    /* does not change the type */
160                    input_kind.clone()
161                }
162                MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => {
163                    // only keeps integer / float values
164                    match (input_kind.contains_integer(), input_kind.contains_float()) {
165                        (true, true) => Kind::float().or_integer(),
166                        (true, false) => Kind::integer(),
167                        (false, true) => Kind::float(),
168                        (false, false) => Kind::undefined(),
169                    }
170                }
171                MergeStrategy::Array => {
172                    let unknown_kind = input_kind.clone();
173                    Kind::array(Collection::empty().with_unknown(unknown_kind))
174                }
175                MergeStrategy::Concat => {
176                    let mut new_kind = Kind::never();
177
178                    if input_kind.contains_bytes() {
179                        new_kind.add_bytes();
180                    }
181                    if let Some(array) = input_kind.as_array() {
182                        // array elements can be either any type that the field can be, or any
183                        // element of the array
184                        let array_elements = array.reduced_kind().union(input_kind.without_array());
185                        new_kind.add_array(Collection::empty().with_unknown(array_elements));
186                    }
187                    new_kind
188                }
189                MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => {
190                    // can only produce bytes (or undefined)
191                    if input_kind.contains_bytes() {
192                        Kind::bytes()
193                    } else {
194                        Kind::undefined()
195                    }
196                }
197                MergeStrategy::ShortestArray | MergeStrategy::LongestArray => {
198                    if let Some(array) = input_kind.as_array() {
199                        Kind::array(array.clone())
200                    } else {
201                        Kind::undefined()
202                    }
203                }
204                MergeStrategy::FlatUnique => {
205                    let mut array_elements = input_kind.without_array().without_object();
206                    if let Some(array) = input_kind.as_array() {
207                        array_elements = array_elements.union(array.reduced_kind());
208                    }
209                    if let Some(object) = input_kind.as_object() {
210                        array_elements = array_elements.union(object.reduced_kind());
211                    }
212                    Kind::array(Collection::empty().with_unknown(array_elements))
213                }
214            };
215
216            // all of the merge strategies are optional. They won't produce a value unless a value actually exists
217            let new_kind = if input_kind.contains_undefined() {
218                new_kind.or_undefined()
219            } else {
220                new_kind
221            };
222
223            schema_definition = schema_definition.with_field(&key, new_kind, None);
224        }
225
226        // the same schema definition is used for all inputs
227        let mut output_definitions = HashMap::new();
228        for (output, _input) in input_definitions {
229            output_definitions.insert(output.clone(), schema_definition.clone());
230        }
231
232        vec![TransformOutput::new(DataType::Log, output_definitions)]
233    }
234}
235
236#[cfg(test)]
237mod test {
238    use super::*;
239
240    #[test]
241    fn generate_config() {
242        crate::test_util::test_generate_config::<ReduceConfig>();
243    }
244}