vector/transforms/reduce/
transform.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::{Duration, Instant},
5};
6
7use futures::Stream;
8use indexmap::IndexMap;
9use vector_lib::stream::expiration_map::{Emitter, map_with_expiration};
10use vrl::{
11    path::{OwnedTargetPath, parse_target_path},
12    prelude::KeyString,
13};
14
15use crate::{
16    conditions::Condition,
17    event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
18    internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
19    transforms::{
20        TaskTransform,
21        reduce::{
22            config::ReduceConfig,
23            merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
24        },
25    },
26};
27
28#[derive(Clone, Debug)]
29struct ReduceState {
30    events: usize,
31    fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
32    stale_since: Instant,
33    creation: Instant,
34    metadata: EventMetadata,
35}
36
37fn is_covered_by_strategy(
38    path: &OwnedTargetPath,
39    strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
40) -> bool {
41    let mut current = OwnedTargetPath::event_root();
42    for component in &path.path.segments {
43        current = current.with_field_appended(&component.to_string());
44        if strategies.contains_key(&current) {
45            return true;
46        }
47    }
48    false
49}
50
51impl ReduceState {
52    fn new() -> Self {
53        Self {
54            events: 0,
55            stale_since: Instant::now(),
56            creation: Instant::now(),
57            fields: HashMap::new(),
58            metadata: EventMetadata::default(),
59        }
60    }
61
62    fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
63        self.metadata.merge(e.metadata().clone());
64
65        for (path, strategy) in strategies {
66            if let Some(value) = e.get(path) {
67                match self.fields.entry(path.clone()) {
68                    Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
69                        Ok(m) => {
70                            entry.insert(m);
71                        }
72                        Err(error) => {
73                            warn!(message = "Failed to create value merger.", %error, %path);
74                        }
75                    },
76                    Entry::Occupied(mut entry) => {
77                        if let Err(error) = entry.get_mut().add(value.clone()) {
78                            warn!(message = "Failed to merge value.", %error);
79                        }
80                    }
81                }
82            }
83        }
84
85        if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
86            for (path, value) in fields_iter {
87                // This should not return an error, unless there is a bug in the event fields iterator.
88                let parsed_path = match parse_target_path(&path) {
89                    Ok(path) => path,
90                    Err(error) => {
91                        emit!(ReduceAddEventError { error, path });
92                        continue;
93                    }
94                };
95                if is_covered_by_strategy(&parsed_path, strategies) {
96                    continue;
97                }
98
99                let maybe_strategy = strategies.get(&parsed_path);
100                match self.fields.entry(parsed_path) {
101                    Entry::Vacant(entry) => {
102                        if let Some(strategy) = maybe_strategy {
103                            match get_value_merger(value.clone(), strategy) {
104                                Ok(m) => {
105                                    entry.insert(m);
106                                }
107                                Err(error) => {
108                                    warn!(message = "Failed to merge value.", %error);
109                                }
110                            }
111                        } else {
112                            entry.insert(value.clone().into());
113                        }
114                    }
115                    Entry::Occupied(mut entry) => {
116                        if let Err(error) = entry.get_mut().add(value.clone()) {
117                            warn!(message = "Failed to merge value.", %error);
118                        }
119                    }
120                }
121            }
122        }
123        // else the event root is not an object (see https://github.com/vectordotdev/vector/issues/18219)
124
125        self.events += 1;
126        self.stale_since = Instant::now();
127    }
128
129    fn flush(mut self) -> LogEvent {
130        let mut event = LogEvent::new_with_metadata(self.metadata);
131        for (path, v) in self.fields.drain() {
132            if let Err(error) = v.insert_into(&path, &mut event) {
133                warn!(message = "Failed to merge values for field.", %error);
134            }
135        }
136        self.events = 0;
137        event
138    }
139}
140
141#[derive(Clone, Debug)]
142pub struct Reduce {
143    expire_after: Duration,
144    flush_period: Duration,
145    end_every_period: Option<Duration>,
146    group_by: Vec<String>,
147    merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
148    reduce_merge_states: HashMap<Discriminant, ReduceState>,
149    ends_when: Option<Condition>,
150    starts_when: Option<Condition>,
151    max_events: Option<usize>,
152}
153
154fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
155    for (path, _) in &strategies {
156        let contains_index = parse_target_path(path)
157            .map_err(|_| format!("Could not parse path: `{path}`"))?
158            .path
159            .segments
160            .iter()
161            .any(|segment| segment.is_index());
162        if contains_index {
163            return Err(format!(
164                "Merge strategies with indexes are currently not supported. Path: `{path}`"
165            )
166            .into());
167        }
168    }
169
170    Ok(())
171}
172
173impl Reduce {
174    pub fn new(
175        config: &ReduceConfig,
176        enrichment_tables: &vector_lib::enrichment::TableRegistry,
177    ) -> crate::Result<Self> {
178        if config.ends_when.is_some() && config.starts_when.is_some() {
179            return Err("only one of `ends_when` and `starts_when` can be provided".into());
180        }
181
182        let ends_when = config
183            .ends_when
184            .as_ref()
185            .map(|c| c.build(enrichment_tables))
186            .transpose()?;
187        let starts_when = config
188            .starts_when
189            .as_ref()
190            .map(|c| c.build(enrichment_tables))
191            .transpose()?;
192        let group_by = config.group_by.clone().into_iter().collect();
193        let max_events = config.max_events.map(|max| max.into());
194
195        validate_merge_strategies(config.merge_strategies.clone())?;
196
197        Ok(Reduce {
198            expire_after: config.expire_after_ms,
199            flush_period: config.flush_period_ms,
200            end_every_period: config.end_every_period_ms,
201            group_by,
202            merge_strategies: config
203                .merge_strategies
204                .iter()
205                .filter_map(|(path, strategy)| {
206                    // TODO Invalid paths are ignored to preserve backwards compatibility.
207                    //      Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`]
208                    //      which means an invalid path would result in an configuration error.
209                    let parsed_path = parse_target_path(path).ok();
210                    if parsed_path.is_none() {
211                        warn!(message = "Ignoring strategy with invalid path.", %path);
212                    }
213                    parsed_path.map(|path| (path, strategy.clone()))
214                })
215                .collect(),
216            reduce_merge_states: HashMap::new(),
217            ends_when,
218            starts_when,
219            max_events,
220        })
221    }
222
223    fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
224        let mut flush_discriminants = Vec::new();
225        let now = Instant::now();
226        for (k, t) in &self.reduce_merge_states {
227            if let Some(period) = self.end_every_period
228                && (now - t.creation) >= period
229            {
230                flush_discriminants.push(k.clone());
231            }
232
233            if (now - t.stale_since) >= self.expire_after {
234                flush_discriminants.push(k.clone());
235            }
236        }
237        for k in &flush_discriminants {
238            if let Some(t) = self.reduce_merge_states.remove(k) {
239                emit!(ReduceStaleEventFlushed);
240                emitter.emit(Event::from(t.flush()));
241            }
242        }
243    }
244
245    fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
246        self.reduce_merge_states
247            .drain()
248            .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
249    }
250
251    fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
252        match self.reduce_merge_states.entry(discriminant) {
253            Entry::Vacant(entry) => {
254                let mut state = ReduceState::new();
255                state.add_event(event, &self.merge_strategies);
256                entry.insert(state);
257            }
258            Entry::Occupied(mut entry) => {
259                entry.get_mut().add_event(event, &self.merge_strategies);
260            }
261        };
262    }
263
264    pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
265        let (starts_here, event) = match &self.starts_when {
266            Some(condition) => condition.check(event),
267            None => (false, event),
268        };
269
270        let (mut ends_here, event) = match &self.ends_when {
271            Some(condition) => condition.check(event),
272            None => (false, event),
273        };
274
275        let event = event.into_log();
276        let discriminant = Discriminant::from_log_event(&event, &self.group_by);
277
278        if let Some(max_events) = self.max_events {
279            if max_events == 1 {
280                ends_here = true;
281            } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
282                // The current event will finish this set
283                if entry.events + 1 == max_events {
284                    ends_here = true;
285                }
286            }
287        }
288
289        if starts_here {
290            if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
291                emitter.emit(state.flush().into());
292            }
293
294            self.push_or_new_reduce_state(event, discriminant)
295        } else if ends_here {
296            emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
297                Some(mut state) => {
298                    state.add_event(event, &self.merge_strategies);
299                    state.flush().into()
300                }
301                None => {
302                    let mut state = ReduceState::new();
303                    state.add_event(event, &self.merge_strategies);
304                    state.flush().into()
305                }
306            });
307        } else {
308            self.push_or_new_reduce_state(event, discriminant)
309        }
310    }
311}
312
313impl TaskTransform<Event> for Reduce {
314    fn transform(
315        self: Box<Self>,
316        input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
317    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
318    where
319        Self: 'static,
320    {
321        let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
322            me.transform_one(emitter, event);
323        };
324
325        construct_output_stream(self, input_rx, transform_fn)
326    }
327}
328
329pub fn construct_output_stream(
330    reduce: Box<Reduce>,
331    input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
332    mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
333) -> Pin<Box<dyn Stream<Item = Event> + Send>>
334where
335    Reduce: 'static,
336{
337    let flush_period = reduce.flush_period;
338    Box::pin(map_with_expiration(
339        reduce,
340        input_rx,
341        flush_period,
342        move |me, event, emitter| {
343            transform_fn(me, event, emitter);
344        },
345        |me, emitter| {
346            me.flush_into(emitter);
347        },
348        |me, emitter| {
349            me.flush_all_into(emitter);
350        },
351    ))
352}
353
354#[cfg(test)]
355mod test {
356    use std::sync::Arc;
357
358    use indoc::indoc;
359    use serde_json::json;
360    use tokio::sync::mpsc;
361    use tokio_stream::wrappers::ReceiverStream;
362    use vector_lib::{enrichment::TableRegistry, lookup::owned_value_path};
363    use vrl::value::Kind;
364
365    use super::*;
366    use crate::{
367        config::{LogNamespace, OutputId, TransformConfig, schema, schema::Definition},
368        event::{LogEvent, Value},
369        test_util::components::assert_transform_compliance,
370        transforms::test::create_topology,
371    };
372
373    #[tokio::test]
374    async fn reduce_from_condition() {
375        let reduce_config = toml::from_str::<ReduceConfig>(
376            r#"
377group_by = [ "request_id" ]
378
379[ends_when]
380  type = "vrl"
381  source = "exists(.test_end)"
382"#,
383        )
384        .unwrap();
385
386        assert_transform_compliance(async move {
387            let input_definition = schema::Definition::default_legacy_namespace()
388                .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
389                .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
390                .with_event_field(
391                    &owned_value_path!("test_end"),
392                    Kind::bytes().or_undefined(),
393                    None,
394                )
395                .with_event_field(
396                    &owned_value_path!("extra_field"),
397                    Kind::bytes().or_undefined(),
398                    None,
399                );
400            let schema_definitions = reduce_config
401                .outputs(
402                    vector_lib::enrichment::TableRegistry::default(),
403                    &[("test".into(), input_definition)],
404                    LogNamespace::Legacy,
405                )
406                .first()
407                .unwrap()
408                .schema_definitions(true)
409                .clone();
410
411            let new_schema_definition = reduce_config.outputs(
412                TableRegistry::default(),
413                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
414                LogNamespace::Legacy,
415            )[0]
416            .clone()
417            .log_schema_definitions
418            .get(&OutputId::from("in"))
419            .unwrap()
420            .clone();
421
422            let (tx, rx) = mpsc::channel(1);
423            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
424
425            let mut e_1 = LogEvent::from("test message 1");
426            e_1.insert("counter", 1);
427            e_1.insert("request_id", "1");
428            let mut metadata_1 = e_1.metadata().clone();
429            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
430            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
431
432            let mut e_2 = LogEvent::from("test message 2");
433            e_2.insert("counter", 2);
434            e_2.insert("request_id", "2");
435            let mut metadata_2 = e_2.metadata().clone();
436            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
437            metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
438
439            let mut e_3 = LogEvent::from("test message 3");
440            e_3.insert("counter", 3);
441            e_3.insert("request_id", "1");
442
443            let mut e_4 = LogEvent::from("test message 4");
444            e_4.insert("counter", 4);
445            e_4.insert("request_id", "1");
446            e_4.insert("test_end", "yep");
447
448            let mut e_5 = LogEvent::from("test message 5");
449            e_5.insert("counter", 5);
450            e_5.insert("request_id", "2");
451            e_5.insert("extra_field", "value1");
452            e_5.insert("test_end", "yep");
453
454            for event in vec![e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
455                tx.send(event).await.unwrap();
456            }
457
458            let output_1 = out.recv().await.unwrap().into_log();
459            assert_eq!(output_1["message"], "test message 1".into());
460            assert_eq!(output_1["counter"], Value::from(8));
461            assert_eq!(output_1.metadata(), &metadata_1);
462            schema_definitions
463                .values()
464                .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
465
466            let output_2 = out.recv().await.unwrap().into_log();
467            assert_eq!(output_2["message"], "test message 2".into());
468            assert_eq!(output_2["extra_field"], "value1".into());
469            assert_eq!(output_2["counter"], Value::from(7));
470            assert_eq!(output_2.metadata(), &metadata_2);
471            schema_definitions
472                .values()
473                .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
474
475            drop(tx);
476            topology.stop().await;
477            assert_eq!(out.recv().await, None);
478        })
479        .await;
480    }
481
482    #[tokio::test]
483    async fn reduce_merge_strategies() {
484        let reduce_config = toml::from_str::<ReduceConfig>(
485            r#"
486group_by = [ "request_id" ]
487
488merge_strategies.foo = "concat"
489merge_strategies.bar = "array"
490merge_strategies.baz = "max"
491
492[ends_when]
493  type = "vrl"
494  source = "exists(.test_end)"
495"#,
496        )
497        .unwrap();
498
499        assert_transform_compliance(async move {
500            let (tx, rx) = mpsc::channel(1);
501
502            let new_schema_definition = reduce_config.outputs(
503                TableRegistry::default(),
504                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
505                LogNamespace::Legacy,
506            )[0]
507            .clone()
508            .log_schema_definitions
509            .get(&OutputId::from("in"))
510            .unwrap()
511            .clone();
512
513            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
514
515            let mut e_1 = LogEvent::from("test message 1");
516            e_1.insert("foo", "first foo");
517            e_1.insert("bar", "first bar");
518            e_1.insert("baz", 2);
519            e_1.insert("request_id", "1");
520            let mut metadata = e_1.metadata().clone();
521            metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
522            metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
523            tx.send(e_1.into()).await.unwrap();
524
525            let mut e_2 = LogEvent::from("test message 2");
526            e_2.insert("foo", "second foo");
527            e_2.insert("bar", 2);
528            e_2.insert("baz", "not number");
529            e_2.insert("request_id", "1");
530            tx.send(e_2.into()).await.unwrap();
531
532            let mut e_3 = LogEvent::from("test message 3");
533            e_3.insert("foo", 10);
534            e_3.insert("bar", "third bar");
535            e_3.insert("baz", 3);
536            e_3.insert("request_id", "1");
537            e_3.insert("test_end", "yep");
538            tx.send(e_3.into()).await.unwrap();
539
540            let output_1 = out.recv().await.unwrap().into_log();
541            assert_eq!(output_1["message"], "test message 1".into());
542            assert_eq!(output_1["foo"], "first foo second foo".into());
543            assert_eq!(
544                output_1["bar"],
545                Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
546            );
547            assert_eq!(output_1["baz"], 3.into());
548            assert_eq!(output_1.metadata(), &metadata);
549
550            drop(tx);
551            topology.stop().await;
552            assert_eq!(out.recv().await, None);
553        })
554        .await;
555    }
556
557    #[tokio::test]
558    async fn missing_group_by() {
559        let reduce_config = toml::from_str::<ReduceConfig>(
560            r#"
561group_by = [ "request_id" ]
562
563[ends_when]
564  type = "vrl"
565  source = "exists(.test_end)"
566"#,
567        )
568        .unwrap();
569
570        assert_transform_compliance(async move {
571            let (tx, rx) = mpsc::channel(1);
572            let new_schema_definition = reduce_config.outputs(
573                TableRegistry::default(),
574                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
575                LogNamespace::Legacy,
576            )[0]
577            .clone()
578            .log_schema_definitions
579            .get(&OutputId::from("in"))
580            .unwrap()
581            .clone();
582
583            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
584
585            let mut e_1 = LogEvent::from("test message 1");
586            e_1.insert("counter", 1);
587            e_1.insert("request_id", "1");
588            let mut metadata_1 = e_1.metadata().clone();
589            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
590            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
591            tx.send(e_1.into()).await.unwrap();
592
593            let mut e_2 = LogEvent::from("test message 2");
594            e_2.insert("counter", 2);
595            let mut metadata_2 = e_2.metadata().clone();
596            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
597            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
598            tx.send(e_2.into()).await.unwrap();
599
600            let mut e_3 = LogEvent::from("test message 3");
601            e_3.insert("counter", 3);
602            e_3.insert("request_id", "1");
603            tx.send(e_3.into()).await.unwrap();
604
605            let mut e_4 = LogEvent::from("test message 4");
606            e_4.insert("counter", 4);
607            e_4.insert("request_id", "1");
608            e_4.insert("test_end", "yep");
609            tx.send(e_4.into()).await.unwrap();
610
611            let mut e_5 = LogEvent::from("test message 5");
612            e_5.insert("counter", 5);
613            e_5.insert("extra_field", "value1");
614            e_5.insert("test_end", "yep");
615            tx.send(e_5.into()).await.unwrap();
616
617            let output_1 = out.recv().await.unwrap().into_log();
618            assert_eq!(output_1["message"], "test message 1".into());
619            assert_eq!(output_1["counter"], Value::from(8));
620            assert_eq!(output_1.metadata(), &metadata_1);
621
622            let output_2 = out.recv().await.unwrap().into_log();
623            assert_eq!(output_2["message"], "test message 2".into());
624            assert_eq!(output_2["extra_field"], "value1".into());
625            assert_eq!(output_2["counter"], Value::from(7));
626            assert_eq!(output_2.metadata(), &metadata_2);
627
628            drop(tx);
629            topology.stop().await;
630            assert_eq!(out.recv().await, None);
631        })
632        .await;
633    }
634
635    #[tokio::test]
636    async fn max_events_0() {
637        let reduce_config = toml::from_str::<ReduceConfig>(
638            r#"
639group_by = [ "id" ]
640merge_strategies.id = "retain"
641merge_strategies.message = "array"
642max_events = 0
643            "#,
644        );
645
646        match reduce_config {
647            Ok(_conf) => unreachable!("max_events=0 should be rejected."),
648            Err(err) => assert!(
649                err.to_string()
650                    .contains("invalid value: integer `0`, expected a nonzero usize")
651            ),
652        }
653    }
654
655    #[tokio::test]
656    async fn max_events_1() {
657        let reduce_config = toml::from_str::<ReduceConfig>(
658            r#"
659group_by = [ "id" ]
660merge_strategies.id = "retain"
661merge_strategies.message = "array"
662max_events = 1
663            "#,
664        )
665        .unwrap();
666        assert_transform_compliance(async move {
667            let (tx, rx) = mpsc::channel(1);
668            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
669
670            let mut e_1 = LogEvent::from("test 1");
671            e_1.insert("id", "1");
672
673            let mut e_2 = LogEvent::from("test 2");
674            e_2.insert("id", "1");
675
676            let mut e_3 = LogEvent::from("test 3");
677            e_3.insert("id", "1");
678
679            for event in vec![e_1.into(), e_2.into(), e_3.into()] {
680                tx.send(event).await.unwrap();
681            }
682
683            let output_1 = out.recv().await.unwrap().into_log();
684            assert_eq!(output_1["message"], vec!["test 1"].into());
685            let output_2 = out.recv().await.unwrap().into_log();
686            assert_eq!(output_2["message"], vec!["test 2"].into());
687
688            let output_3 = out.recv().await.unwrap().into_log();
689            assert_eq!(output_3["message"], vec!["test 3"].into());
690
691            drop(tx);
692            topology.stop().await;
693            assert_eq!(out.recv().await, None);
694        })
695        .await;
696    }
697
698    #[tokio::test]
699    async fn max_events() {
700        let reduce_config = toml::from_str::<ReduceConfig>(
701            r#"
702group_by = [ "id" ]
703merge_strategies.id = "retain"
704merge_strategies.message = "array"
705max_events = 3
706            "#,
707        )
708        .unwrap();
709
710        assert_transform_compliance(async move {
711            let (tx, rx) = mpsc::channel(1);
712            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
713
714            let mut e_1 = LogEvent::from("test 1");
715            e_1.insert("id", "1");
716
717            let mut e_2 = LogEvent::from("test 2");
718            e_2.insert("id", "1");
719
720            let mut e_3 = LogEvent::from("test 3");
721            e_3.insert("id", "1");
722
723            let mut e_4 = LogEvent::from("test 4");
724            e_4.insert("id", "1");
725
726            let mut e_5 = LogEvent::from("test 5");
727            e_5.insert("id", "1");
728
729            let mut e_6 = LogEvent::from("test 6");
730            e_6.insert("id", "1");
731
732            for event in vec![
733                e_1.into(),
734                e_2.into(),
735                e_3.into(),
736                e_4.into(),
737                e_5.into(),
738                e_6.into(),
739            ] {
740                tx.send(event).await.unwrap();
741            }
742
743            let output_1 = out.recv().await.unwrap().into_log();
744            assert_eq!(
745                output_1["message"],
746                vec!["test 1", "test 2", "test 3"].into()
747            );
748
749            let output_2 = out.recv().await.unwrap().into_log();
750            assert_eq!(
751                output_2["message"],
752                vec!["test 4", "test 5", "test 6"].into()
753            );
754
755            drop(tx);
756            topology.stop().await;
757            assert_eq!(out.recv().await, None);
758        })
759        .await
760    }
761
762    #[tokio::test]
763    async fn arrays() {
764        let reduce_config = toml::from_str::<ReduceConfig>(
765            r#"
766group_by = [ "request_id" ]
767
768merge_strategies.foo = "array"
769merge_strategies.bar = "concat"
770
771[ends_when]
772  type = "vrl"
773  source = "exists(.test_end)"
774"#,
775        )
776        .unwrap();
777
778        assert_transform_compliance(async move {
779            let (tx, rx) = mpsc::channel(1);
780
781            let new_schema_definition = reduce_config.outputs(
782                TableRegistry::default(),
783                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
784                LogNamespace::Legacy,
785            )[0]
786            .clone()
787            .log_schema_definitions
788            .get(&OutputId::from("in"))
789            .unwrap()
790            .clone();
791
792            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
793
794            let mut e_1 = LogEvent::from("test message 1");
795            e_1.insert("foo", json!([1, 3]));
796            e_1.insert("bar", json!([1, 3]));
797            e_1.insert("request_id", "1");
798            let mut metadata_1 = e_1.metadata().clone();
799            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
800            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
801
802            tx.send(e_1.into()).await.unwrap();
803
804            let mut e_2 = LogEvent::from("test message 2");
805            e_2.insert("foo", json!([2, 4]));
806            e_2.insert("bar", json!([2, 4]));
807            e_2.insert("request_id", "2");
808            let mut metadata_2 = e_2.metadata().clone();
809            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
810            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
811            tx.send(e_2.into()).await.unwrap();
812
813            let mut e_3 = LogEvent::from("test message 3");
814            e_3.insert("foo", json!([5, 7]));
815            e_3.insert("bar", json!([5, 7]));
816            e_3.insert("request_id", "1");
817            tx.send(e_3.into()).await.unwrap();
818
819            let mut e_4 = LogEvent::from("test message 4");
820            e_4.insert("foo", json!("done"));
821            e_4.insert("bar", json!("done"));
822            e_4.insert("request_id", "1");
823            e_4.insert("test_end", "yep");
824            tx.send(e_4.into()).await.unwrap();
825
826            let mut e_5 = LogEvent::from("test message 5");
827            e_5.insert("foo", json!([6, 8]));
828            e_5.insert("bar", json!([6, 8]));
829            e_5.insert("request_id", "2");
830            tx.send(e_5.into()).await.unwrap();
831
832            let mut e_6 = LogEvent::from("test message 6");
833            e_6.insert("foo", json!("done"));
834            e_6.insert("bar", json!("done"));
835            e_6.insert("request_id", "2");
836            e_6.insert("test_end", "yep");
837            tx.send(e_6.into()).await.unwrap();
838
839            let output_1 = out.recv().await.unwrap().into_log();
840            assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
841            assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
842            assert_eq!(output_1.metadata(), &metadata_1);
843
844            let output_2 = out.recv().await.unwrap().into_log();
845            assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
846            assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
847            assert_eq!(output_2.metadata(), &metadata_2);
848
849            drop(tx);
850            topology.stop().await;
851            assert_eq!(out.recv().await, None);
852        })
853        .await;
854    }
855
856    #[tokio::test]
857    async fn strategy_path_with_nested_fields() {
858        let reduce_config = toml::from_str::<ReduceConfig>(indoc!(
859            r#"
860            group_by = [ "id" ]
861
862            merge_strategies.id = "discard"
863            merge_strategies."message.a.b" = "array"
864
865            [ends_when]
866              type = "vrl"
867              source = "exists(.test_end)"
868            "#,
869        ))
870        .unwrap();
871
872        assert_transform_compliance(async move {
873            let (tx, rx) = mpsc::channel(1);
874
875            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
876
877            let e_1 = LogEvent::from(Value::from(btreemap! {
878                "id" => 777,
879                "message" => btreemap! {
880                    "a" => btreemap! {
881                        "b" => vec![1,2],
882                        "num" => 1,
883                    },
884                },
885                "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
886            }));
887            let mut metadata_1 = e_1.metadata().clone();
888            metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
889
890            tx.send(e_1.into()).await.unwrap();
891
892            let e_2 = LogEvent::from(Value::from(btreemap! {
893                "id" => 777,
894                "message" => btreemap! {
895                        "a" => btreemap! {
896                            "b" => vec![3,4],
897                            "num" => 2,
898                        },
899                },
900                 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
901                "test_end" => "done",
902            }));
903            tx.send(e_2.into()).await.unwrap();
904
905            let mut output = out.recv().await.unwrap().into_log();
906
907            // Remove timestamp fields which were automatically added.
908            output.remove_timestamp();
909            output.remove("timestamp_end");
910
911            assert_eq!(
912                *output.value(),
913                btreemap! {
914                    "id" => 777,
915                    "message" => btreemap! {
916                        "a" => btreemap! {
917                            "b" => vec![vec![1, 2], vec![3,4]],
918                            "num" => 3,
919                        },
920                    },
921                    "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
922                    "test_end" => "done",
923                }
924                .into()
925            );
926
927            drop(tx);
928            topology.stop().await;
929            assert_eq!(out.recv().await, None);
930        })
931        .await;
932    }
933
934    #[test]
935    fn invalid_merge_strategies_containing_indexes() {
936        let config = toml::from_str::<ReduceConfig>(indoc!(
937            r#"
938            group_by = [ "id" ]
939
940            merge_strategies.id = "discard"
941            merge_strategies."nested.msg[0]" = "array"
942            "#,
943        ))
944        .unwrap();
945        let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err();
946        assert_eq!(
947            error.to_string(),
948            "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
949        );
950    }
951
952    #[tokio::test]
953    async fn merge_objects_in_array() {
954        let config = toml::from_str::<ReduceConfig>(indoc!(
955            r#"
956            group_by = [ "id" ]
957            merge_strategies.events = "array"
958            merge_strategies."\"a-b\"" = "retain"
959            merge_strategies.another = "discard"
960
961            [ends_when]
962              type = "vrl"
963              source = "exists(.test_end)"
964            "#,
965        ))
966        .unwrap();
967
968        assert_transform_compliance(async move {
969            let (tx, rx) = mpsc::channel(1);
970
971            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
972
973            let v_1 = Value::from(btreemap! {
974                "attrs" => btreemap! {
975                    "nested.msg" => "foo",
976                },
977                "sev" => 2,
978            });
979            let mut e_1 = LogEvent::from(Value::from(
980                btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
981            ));
982            e_1.insert("events", v_1.clone());
983            e_1.insert("\"a-b\"", 2);
984            tx.send(e_1.into()).await.unwrap();
985
986            let v_2 = Value::from(btreemap! {
987                "attrs" => btreemap! {
988                    "nested.msg" => "bar",
989                },
990                "sev" => 3,
991            });
992            let mut e_2 = LogEvent::from(Value::from(
993                btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
994            ));
995            e_2.insert("events", v_2.clone());
996            e_2.insert("\"a-b\"", 2);
997            tx.send(e_2.into()).await.unwrap();
998
999            let output = out.recv().await.unwrap().into_log();
1000            let expected_value = Value::from(btreemap! {
1001                "id" => 1554,
1002                "events" => vec![v_1, v_2],
1003                "another" => btreemap!{ "a" => 1},
1004                "a-b" => 2,
1005                "test_end" => "done"
1006            });
1007            assert_eq!(*output.value(), expected_value);
1008
1009            drop(tx);
1010            topology.stop().await;
1011            assert_eq!(out.recv().await, None);
1012        })
1013        .await
1014    }
1015
1016    #[tokio::test]
1017    async fn merged_quoted_path() {
1018        let config = toml::from_str::<ReduceConfig>(indoc!(
1019            r#"
1020            [ends_when]
1021              type = "vrl"
1022              source = "exists(.test_end)"
1023            "#,
1024        ))
1025        .unwrap();
1026
1027        assert_transform_compliance(async move {
1028            let (tx, rx) = mpsc::channel(1);
1029
1030            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1031
1032            let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1033            tx.send(e_1.into()).await.unwrap();
1034
1035            let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1036            tx.send(e_2.into()).await.unwrap();
1037
1038            let output = out.recv().await.unwrap().into_log();
1039            let expected_value = Value::from(btreemap! {
1040                "a b" => 3,
1041                "test_end" => "done"
1042            });
1043            assert_eq!(*output.value(), expected_value);
1044
1045            drop(tx);
1046            topology.stop().await;
1047            assert_eq!(out.recv().await, None);
1048        })
1049        .await
1050    }
1051}