vector/transforms/reduce/
config.rs

1use std::{collections::HashMap, num::NonZeroUsize, time::Duration};
2
3use indexmap::IndexMap;
4use serde_with::serde_as;
5use vector_lib::configurable::configurable_component;
6use vrl::{
7    path::{PathPrefix, parse_target_path},
8    prelude::{Collection, KeyString, Kind},
9};
10
11use crate::{
12    conditions::AnyCondition,
13    config::{
14        DataType, Input, LogNamespace, OutputId, TransformConfig, TransformContext,
15        TransformOutput, schema,
16    },
17    schema::Definition,
18    transforms::{
19        Transform,
20        reduce::{merge_strategy::MergeStrategy, transform::Reduce},
21    },
22};
23
24/// Configuration for the `reduce` transform.
25#[serde_as]
26#[configurable_component(transform(
27    "reduce",
28    "Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
29))]
30#[derive(Clone, Debug, Derivative)]
31#[derivative(Default)]
32#[serde(deny_unknown_fields)]
33pub struct ReduceConfig {
34    /// The maximum period of time to wait after the last event is received, in milliseconds, before
35    /// a combined event should be considered complete.
36    #[serde(default = "default_expire_after_ms")]
37    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
38    #[derivative(Default(value = "default_expire_after_ms()"))]
39    #[configurable(metadata(docs::human_name = "Expire After"))]
40    pub expire_after_ms: Duration,
41
42    /// If supplied, every time this interval elapses for a given grouping, the reduced value
43    /// for that grouping is flushed. Checked every flush_period_ms.
44    #[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
45    #[derivative(Default(value = "Option::None"))]
46    #[configurable(metadata(docs::human_name = "End-Every Period"))]
47    pub end_every_period_ms: Option<Duration>,
48
49    /// The interval to check for and flush any expired events, in milliseconds.
50    #[serde(default = "default_flush_period_ms")]
51    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
52    #[derivative(Default(value = "default_flush_period_ms()"))]
53    #[configurable(metadata(docs::human_name = "Flush Period"))]
54    pub flush_period_ms: Duration,
55
56    /// The maximum number of events to group together.
57    pub max_events: Option<NonZeroUsize>,
58
59    /// An ordered list of fields by which to group events.
60    ///
61    /// Each group with matching values for the specified keys is reduced independently, allowing
62    /// you to keep independent event streams separate. Note that each field specified, will be reduced
63    /// with the default merge strategy based on its type unless a merge strategy is explicitly defined
64    /// in `merge_strategies`.
65    ///
66    /// This field is optional and when not specified, all events are reduced in a single group.
67    ///
68    /// For example, if `group_by = ["host", "region"]`, then all incoming events that have the same
69    /// host and region are grouped together before being reduced.
70    #[serde(default)]
71    #[configurable(metadata(
72        docs::examples = "request_id",
73        docs::examples = "user_id",
74        docs::examples = "transaction_id",
75    ))]
76    pub group_by: Vec<String>,
77
78    /// A map of field names to custom merge strategies.
79    ///
80    /// For each field specified, the given strategy is used for combining events rather than
81    /// the default behavior.
82    ///
83    /// The default behavior is as follows:
84    ///
85    /// - The first value of a string field is kept and subsequent values are discarded.
86    /// - For timestamp fields the first is kept and a new field `[field-name]_end` is added with
87    ///   the last received timestamp value.
88    /// - Numeric values are summed.
89    /// - For nested paths, the field value is retrieved and then reduced using the default strategies mentioned above (unless explicitly specified otherwise).
90    #[serde(default)]
91    #[configurable(metadata(
92        docs::additional_props_description = "An individual merge strategy."
93    ))]
94    pub merge_strategies: IndexMap<KeyString, MergeStrategy>,
95
96    /// A condition used to distinguish the final event of a transaction.
97    ///
98    /// If this condition resolves to `true` for an event, the current transaction is immediately
99    /// flushed with this event.
100    pub ends_when: Option<AnyCondition>,
101
102    /// A condition used to distinguish the first event of a transaction.
103    ///
104    /// If this condition resolves to `true` for an event, the previous transaction is flushed
105    /// (without this event) and a new transaction is started.
106    pub starts_when: Option<AnyCondition>,
107}
108
109const fn default_expire_after_ms() -> Duration {
110    Duration::from_millis(30000)
111}
112
113const fn default_flush_period_ms() -> Duration {
114    Duration::from_millis(1000)
115}
116
117impl_generate_config_from_default!(ReduceConfig);
118
119#[async_trait::async_trait]
120#[typetag::serde(name = "reduce")]
121impl TransformConfig for ReduceConfig {
122    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
123        Reduce::new(self, &context.enrichment_tables).map(Transform::event_task)
124    }
125
126    fn input(&self) -> Input {
127        Input::log()
128    }
129
130    fn outputs(
131        &self,
132        _: vector_lib::enrichment::TableRegistry,
133        input_definitions: &[(OutputId, schema::Definition)],
134        _: LogNamespace,
135    ) -> Vec<TransformOutput> {
136        // Events may be combined, so there isn't a true single "source" for events.
137        // All of the definitions must be merged.
138        let merged_definition: Definition = input_definitions
139            .iter()
140            .map(|(_output, definition)| definition.clone())
141            .reduce(Definition::merge)
142            .unwrap_or_else(Definition::any);
143
144        let mut schema_definition = merged_definition;
145
146        for (key, merge_strategy) in self.merge_strategies.iter() {
147            let key = if let Ok(key) = parse_target_path(key) {
148                key
149            } else {
150                continue;
151            };
152
153            let input_kind = match key.prefix {
154                PathPrefix::Event => schema_definition.event_kind().at_path(&key.path),
155                PathPrefix::Metadata => schema_definition.metadata_kind().at_path(&key.path),
156            };
157
158            let new_kind = match merge_strategy {
159                MergeStrategy::Discard | MergeStrategy::Retain => {
160                    /* does not change the type */
161                    input_kind.clone()
162                }
163                MergeStrategy::Sum | MergeStrategy::Max | MergeStrategy::Min => {
164                    // only keeps integer / float values
165                    match (input_kind.contains_integer(), input_kind.contains_float()) {
166                        (true, true) => Kind::float().or_integer(),
167                        (true, false) => Kind::integer(),
168                        (false, true) => Kind::float(),
169                        (false, false) => Kind::undefined(),
170                    }
171                }
172                MergeStrategy::Array => {
173                    let unknown_kind = input_kind.clone();
174                    Kind::array(Collection::empty().with_unknown(unknown_kind))
175                }
176                MergeStrategy::Concat => {
177                    let mut new_kind = Kind::never();
178
179                    if input_kind.contains_bytes() {
180                        new_kind.add_bytes();
181                    }
182                    if let Some(array) = input_kind.as_array() {
183                        // array elements can be either any type that the field can be, or any
184                        // element of the array
185                        let array_elements = array.reduced_kind().union(input_kind.without_array());
186                        new_kind.add_array(Collection::empty().with_unknown(array_elements));
187                    }
188                    new_kind
189                }
190                MergeStrategy::ConcatNewline | MergeStrategy::ConcatRaw => {
191                    // can only produce bytes (or undefined)
192                    if input_kind.contains_bytes() {
193                        Kind::bytes()
194                    } else {
195                        Kind::undefined()
196                    }
197                }
198                MergeStrategy::ShortestArray | MergeStrategy::LongestArray => {
199                    if let Some(array) = input_kind.as_array() {
200                        Kind::array(array.clone())
201                    } else {
202                        Kind::undefined()
203                    }
204                }
205                MergeStrategy::FlatUnique => {
206                    let mut array_elements = input_kind.without_array().without_object();
207                    if let Some(array) = input_kind.as_array() {
208                        array_elements = array_elements.union(array.reduced_kind());
209                    }
210                    if let Some(object) = input_kind.as_object() {
211                        array_elements = array_elements.union(object.reduced_kind());
212                    }
213                    Kind::array(Collection::empty().with_unknown(array_elements))
214                }
215            };
216
217            // all of the merge strategies are optional. They won't produce a value unless a value actually exists
218            let new_kind = if input_kind.contains_undefined() {
219                new_kind.or_undefined()
220            } else {
221                new_kind
222            };
223
224            schema_definition = schema_definition.with_field(&key, new_kind, None);
225        }
226
227        // the same schema definition is used for all inputs
228        let mut output_definitions = HashMap::new();
229        for (output, _input) in input_definitions {
230            output_definitions.insert(output.clone(), schema_definition.clone());
231        }
232
233        vec![TransformOutput::new(DataType::Log, output_definitions)]
234    }
235}
236
237#[cfg(test)]
238mod test {
239    use super::*;
240
241    #[test]
242    fn generate_config() {
243        crate::test_util::test_generate_config::<ReduceConfig>();
244    }
245}