vector/transforms/reduce/
transform.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::{Duration, Instant},
5};
6
7use futures::Stream;
8use indexmap::IndexMap;
9use vector_lib::stream::expiration_map::{Emitter, map_with_expiration};
10use vector_vrl_metrics::MetricsStorage;
11use vrl::{
12    path::{OwnedTargetPath, parse_target_path},
13    prelude::KeyString,
14};
15
16use crate::{
17    conditions::Condition,
18    event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
19    internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
20    transforms::{
21        TaskTransform,
22        reduce::{
23            config::ReduceConfig,
24            merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
25        },
26    },
27};
28
29#[derive(Clone, Debug)]
30struct ReduceState {
31    events: usize,
32    fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
33    stale_since: Instant,
34    creation: Instant,
35    metadata: EventMetadata,
36}
37
38fn is_covered_by_strategy(
39    path: &OwnedTargetPath,
40    strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
41) -> bool {
42    let mut current = OwnedTargetPath::event_root();
43    for component in &path.path.segments {
44        current = current.with_field_appended(&component.to_string());
45        if strategies.contains_key(&current) {
46            return true;
47        }
48    }
49    false
50}
51
52impl ReduceState {
53    fn new() -> Self {
54        Self {
55            events: 0,
56            stale_since: Instant::now(),
57            creation: Instant::now(),
58            fields: HashMap::new(),
59            metadata: EventMetadata::default(),
60        }
61    }
62
63    fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
64        self.metadata.merge(e.metadata().clone());
65
66        for (path, strategy) in strategies {
67            if let Some(value) = e.get(path) {
68                match self.fields.entry(path.clone()) {
69                    Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
70                        Ok(m) => {
71                            entry.insert(m);
72                        }
73                        Err(error) => {
74                            warn!(message = "Failed to create value merger.", %error, %path);
75                        }
76                    },
77                    Entry::Occupied(mut entry) => {
78                        if let Err(error) = entry.get_mut().add(value.clone()) {
79                            warn!(message = "Failed to merge value.", %error);
80                        }
81                    }
82                }
83            }
84        }
85
86        if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
87            for (path, value) in fields_iter {
88                // This should not return an error, unless there is a bug in the event fields iterator.
89                let parsed_path = match parse_target_path(&path) {
90                    Ok(path) => path,
91                    Err(error) => {
92                        emit!(ReduceAddEventError { error, path });
93                        continue;
94                    }
95                };
96                if is_covered_by_strategy(&parsed_path, strategies) {
97                    continue;
98                }
99
100                let maybe_strategy = strategies.get(&parsed_path);
101                match self.fields.entry(parsed_path) {
102                    Entry::Vacant(entry) => {
103                        if let Some(strategy) = maybe_strategy {
104                            match get_value_merger(value.clone(), strategy) {
105                                Ok(m) => {
106                                    entry.insert(m);
107                                }
108                                Err(error) => {
109                                    warn!(message = "Failed to merge value.", %error);
110                                }
111                            }
112                        } else {
113                            entry.insert(value.clone().into());
114                        }
115                    }
116                    Entry::Occupied(mut entry) => {
117                        if let Err(error) = entry.get_mut().add(value.clone()) {
118                            warn!(message = "Failed to merge value.", %error);
119                        }
120                    }
121                }
122            }
123        }
124        // else the event root is not an object (see https://github.com/vectordotdev/vector/issues/18219)
125
126        self.events += 1;
127        self.stale_since = Instant::now();
128    }
129
130    fn flush(mut self) -> LogEvent {
131        let mut event = LogEvent::new_with_metadata(self.metadata);
132        for (path, v) in self.fields.drain() {
133            if let Err(error) = v.insert_into(&path, &mut event) {
134                warn!(message = "Failed to merge values for field.", %error);
135            }
136        }
137        self.events = 0;
138        event
139    }
140}
141
142#[derive(Clone, Debug)]
143pub struct Reduce {
144    expire_after: Duration,
145    flush_period: Duration,
146    end_every_period: Option<Duration>,
147    group_by: Vec<String>,
148    merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
149    reduce_merge_states: HashMap<Discriminant, ReduceState>,
150    ends_when: Option<Condition>,
151    starts_when: Option<Condition>,
152    max_events: Option<usize>,
153}
154
155fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
156    for (path, _) in &strategies {
157        let contains_index = parse_target_path(path)
158            .map_err(|_| format!("Could not parse path: `{path}`"))?
159            .path
160            .segments
161            .iter()
162            .any(|segment| segment.is_index());
163        if contains_index {
164            return Err(format!(
165                "Merge strategies with indexes are currently not supported. Path: `{path}`"
166            )
167            .into());
168        }
169    }
170
171    Ok(())
172}
173
174impl Reduce {
175    pub fn new(
176        config: &ReduceConfig,
177        enrichment_tables: &vector_lib::enrichment::TableRegistry,
178        metrics_storage: &MetricsStorage,
179    ) -> crate::Result<Self> {
180        if config.ends_when.is_some() && config.starts_when.is_some() {
181            return Err("only one of `ends_when` and `starts_when` can be provided".into());
182        }
183
184        let ends_when = config
185            .ends_when
186            .as_ref()
187            .map(|c| c.build(enrichment_tables, metrics_storage))
188            .transpose()?;
189        let starts_when = config
190            .starts_when
191            .as_ref()
192            .map(|c| c.build(enrichment_tables, metrics_storage))
193            .transpose()?;
194        let group_by = config.group_by.clone().into_iter().collect();
195        let max_events = config.max_events.map(|max| max.into());
196
197        validate_merge_strategies(config.merge_strategies.clone())?;
198
199        Ok(Reduce {
200            expire_after: config.expire_after_ms,
201            flush_period: config.flush_period_ms,
202            end_every_period: config.end_every_period_ms,
203            group_by,
204            merge_strategies: config
205                .merge_strategies
206                .iter()
207                .filter_map(|(path, strategy)| {
208                    // TODO Invalid paths are ignored to preserve backwards compatibility.
209                    //      Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`]
210                    //      which means an invalid path would result in an configuration error.
211                    let parsed_path = parse_target_path(path).ok();
212                    if parsed_path.is_none() {
213                        warn!(message = "Ignoring strategy with invalid path.", %path);
214                    }
215                    parsed_path.map(|path| (path, strategy.clone()))
216                })
217                .collect(),
218            reduce_merge_states: HashMap::new(),
219            ends_when,
220            starts_when,
221            max_events,
222        })
223    }
224
225    fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
226        let mut flush_discriminants = Vec::new();
227        let now = Instant::now();
228        for (k, t) in &self.reduce_merge_states {
229            if let Some(period) = self.end_every_period
230                && (now - t.creation) >= period
231            {
232                flush_discriminants.push(k.clone());
233            }
234
235            if (now - t.stale_since) >= self.expire_after {
236                flush_discriminants.push(k.clone());
237            }
238        }
239        for k in &flush_discriminants {
240            if let Some(t) = self.reduce_merge_states.remove(k) {
241                emit!(ReduceStaleEventFlushed);
242                emitter.emit(Event::from(t.flush()));
243            }
244        }
245    }
246
247    fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
248        self.reduce_merge_states
249            .drain()
250            .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
251    }
252
253    fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
254        match self.reduce_merge_states.entry(discriminant) {
255            Entry::Vacant(entry) => {
256                let mut state = ReduceState::new();
257                state.add_event(event, &self.merge_strategies);
258                entry.insert(state);
259            }
260            Entry::Occupied(mut entry) => {
261                entry.get_mut().add_event(event, &self.merge_strategies);
262            }
263        };
264    }
265
266    pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
267        let (starts_here, event) = match &self.starts_when {
268            Some(condition) => condition.check(event),
269            None => (false, event),
270        };
271
272        let (mut ends_here, event) = match &self.ends_when {
273            Some(condition) => condition.check(event),
274            None => (false, event),
275        };
276
277        let event = event.into_log();
278        let discriminant = Discriminant::from_log_event(&event, &self.group_by);
279
280        if let Some(max_events) = self.max_events {
281            if max_events == 1 {
282                ends_here = true;
283            } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
284                // The current event will finish this set
285                if entry.events + 1 == max_events {
286                    ends_here = true;
287                }
288            }
289        }
290
291        if starts_here {
292            if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
293                emitter.emit(state.flush().into());
294            }
295
296            self.push_or_new_reduce_state(event, discriminant)
297        } else if ends_here {
298            emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
299                Some(mut state) => {
300                    state.add_event(event, &self.merge_strategies);
301                    state.flush().into()
302                }
303                None => {
304                    let mut state = ReduceState::new();
305                    state.add_event(event, &self.merge_strategies);
306                    state.flush().into()
307                }
308            });
309        } else {
310            self.push_or_new_reduce_state(event, discriminant)
311        }
312    }
313}
314
315impl TaskTransform<Event> for Reduce {
316    fn transform(
317        self: Box<Self>,
318        input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
319    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
320    where
321        Self: 'static,
322    {
323        let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
324            me.transform_one(emitter, event);
325        };
326
327        construct_output_stream(self, input_rx, transform_fn)
328    }
329}
330
331pub fn construct_output_stream(
332    reduce: Box<Reduce>,
333    input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
334    mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
335) -> Pin<Box<dyn Stream<Item = Event> + Send>>
336where
337    Reduce: 'static,
338{
339    let flush_period = reduce.flush_period;
340    Box::pin(map_with_expiration(
341        reduce,
342        input_rx,
343        flush_period,
344        move |me, event, emitter| {
345            transform_fn(me, event, emitter);
346        },
347        |me, emitter| {
348            me.flush_into(emitter);
349        },
350        |me, emitter| {
351            me.flush_all_into(emitter);
352        },
353    ))
354}
355
356#[cfg(test)]
357mod test {
358    use std::sync::Arc;
359
360    use indoc::indoc;
361    use serde_json::json;
362    use tokio::sync::mpsc;
363    use tokio_stream::wrappers::ReceiverStream;
364    use vector_lib::{enrichment::TableRegistry, lookup::owned_value_path};
365    use vrl::value::Kind;
366
367    use super::*;
368    use crate::{
369        config::{OutputId, TransformConfig, schema, schema::Definition},
370        event::{LogEvent, Value},
371        test_util::components::assert_transform_compliance,
372        transforms::test::create_topology,
373    };
374
375    #[tokio::test]
376    async fn reduce_from_condition() {
377        let reduce_config = toml::from_str::<ReduceConfig>(
378            r#"
379group_by = [ "request_id" ]
380
381[ends_when]
382  type = "vrl"
383  source = "exists(.test_end)"
384"#,
385        )
386        .unwrap();
387
388        assert_transform_compliance(async move {
389            let input_definition = schema::Definition::default_legacy_namespace()
390                .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
391                .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
392                .with_event_field(
393                    &owned_value_path!("test_end"),
394                    Kind::bytes().or_undefined(),
395                    None,
396                )
397                .with_event_field(
398                    &owned_value_path!("extra_field"),
399                    Kind::bytes().or_undefined(),
400                    None,
401                );
402            let schema_definitions = reduce_config
403                .outputs(&Default::default(), &[("test".into(), input_definition)])
404                .first()
405                .unwrap()
406                .schema_definitions(true)
407                .clone();
408
409            let new_schema_definition = reduce_config.outputs(
410                &Default::default(),
411                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
412            )[0]
413            .clone()
414            .log_schema_definitions
415            .get(&OutputId::from("in"))
416            .unwrap()
417            .clone();
418
419            let (tx, rx) = mpsc::channel(1);
420            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
421
422            let mut e_1 = LogEvent::from("test message 1");
423            e_1.insert("counter", 1);
424            e_1.insert("request_id", "1");
425            let mut metadata_1 = e_1.metadata().clone();
426            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
427            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
428
429            let mut e_2 = LogEvent::from("test message 2");
430            e_2.insert("counter", 2);
431            e_2.insert("request_id", "2");
432            let mut metadata_2 = e_2.metadata().clone();
433            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
434            metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
435
436            let mut e_3 = LogEvent::from("test message 3");
437            e_3.insert("counter", 3);
438            e_3.insert("request_id", "1");
439
440            let mut e_4 = LogEvent::from("test message 4");
441            e_4.insert("counter", 4);
442            e_4.insert("request_id", "1");
443            e_4.insert("test_end", "yep");
444
445            let mut e_5 = LogEvent::from("test message 5");
446            e_5.insert("counter", 5);
447            e_5.insert("request_id", "2");
448            e_5.insert("extra_field", "value1");
449            e_5.insert("test_end", "yep");
450
451            for event in [e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
452                tx.send(event).await.unwrap();
453            }
454
455            let output_1 = out.recv().await.unwrap().into_log();
456            assert_eq!(output_1["message"], "test message 1".into());
457            assert_eq!(output_1["counter"], Value::from(8));
458            assert_eq!(output_1.metadata(), &metadata_1);
459            schema_definitions
460                .values()
461                .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
462
463            let output_2 = out.recv().await.unwrap().into_log();
464            assert_eq!(output_2["message"], "test message 2".into());
465            assert_eq!(output_2["extra_field"], "value1".into());
466            assert_eq!(output_2["counter"], Value::from(7));
467            assert_eq!(output_2.metadata(), &metadata_2);
468            schema_definitions
469                .values()
470                .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
471
472            drop(tx);
473            topology.stop().await;
474            assert_eq!(out.recv().await, None);
475        })
476        .await;
477    }
478
479    #[tokio::test]
480    async fn reduce_merge_strategies() {
481        let reduce_config = toml::from_str::<ReduceConfig>(
482            r#"
483group_by = [ "request_id" ]
484
485merge_strategies.foo = "concat"
486merge_strategies.bar = "array"
487merge_strategies.baz = "max"
488
489[ends_when]
490  type = "vrl"
491  source = "exists(.test_end)"
492"#,
493        )
494        .unwrap();
495
496        assert_transform_compliance(async move {
497            let (tx, rx) = mpsc::channel(1);
498
499            let new_schema_definition = reduce_config.outputs(
500                &Default::default(),
501                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
502            )[0]
503            .clone()
504            .log_schema_definitions
505            .get(&OutputId::from("in"))
506            .unwrap()
507            .clone();
508
509            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
510
511            let mut e_1 = LogEvent::from("test message 1");
512            e_1.insert("foo", "first foo");
513            e_1.insert("bar", "first bar");
514            e_1.insert("baz", 2);
515            e_1.insert("request_id", "1");
516            let mut metadata = e_1.metadata().clone();
517            metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
518            metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
519            tx.send(e_1.into()).await.unwrap();
520
521            let mut e_2 = LogEvent::from("test message 2");
522            e_2.insert("foo", "second foo");
523            e_2.insert("bar", 2);
524            e_2.insert("baz", "not number");
525            e_2.insert("request_id", "1");
526            tx.send(e_2.into()).await.unwrap();
527
528            let mut e_3 = LogEvent::from("test message 3");
529            e_3.insert("foo", 10);
530            e_3.insert("bar", "third bar");
531            e_3.insert("baz", 3);
532            e_3.insert("request_id", "1");
533            e_3.insert("test_end", "yep");
534            tx.send(e_3.into()).await.unwrap();
535
536            let output_1 = out.recv().await.unwrap().into_log();
537            assert_eq!(output_1["message"], "test message 1".into());
538            assert_eq!(output_1["foo"], "first foo second foo".into());
539            assert_eq!(
540                output_1["bar"],
541                Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
542            );
543            assert_eq!(output_1["baz"], 3.into());
544            assert_eq!(output_1.metadata(), &metadata);
545
546            drop(tx);
547            topology.stop().await;
548            assert_eq!(out.recv().await, None);
549        })
550        .await;
551    }
552
553    #[tokio::test]
554    async fn missing_group_by() {
555        let reduce_config = toml::from_str::<ReduceConfig>(
556            r#"
557group_by = [ "request_id" ]
558
559[ends_when]
560  type = "vrl"
561  source = "exists(.test_end)"
562"#,
563        )
564        .unwrap();
565
566        assert_transform_compliance(async move {
567            let (tx, rx) = mpsc::channel(1);
568            let new_schema_definition = reduce_config.outputs(
569                &Default::default(),
570                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
571            )[0]
572            .clone()
573            .log_schema_definitions
574            .get(&OutputId::from("in"))
575            .unwrap()
576            .clone();
577
578            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
579
580            let mut e_1 = LogEvent::from("test message 1");
581            e_1.insert("counter", 1);
582            e_1.insert("request_id", "1");
583            let mut metadata_1 = e_1.metadata().clone();
584            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
585            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
586            tx.send(e_1.into()).await.unwrap();
587
588            let mut e_2 = LogEvent::from("test message 2");
589            e_2.insert("counter", 2);
590            let mut metadata_2 = e_2.metadata().clone();
591            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
592            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
593            tx.send(e_2.into()).await.unwrap();
594
595            let mut e_3 = LogEvent::from("test message 3");
596            e_3.insert("counter", 3);
597            e_3.insert("request_id", "1");
598            tx.send(e_3.into()).await.unwrap();
599
600            let mut e_4 = LogEvent::from("test message 4");
601            e_4.insert("counter", 4);
602            e_4.insert("request_id", "1");
603            e_4.insert("test_end", "yep");
604            tx.send(e_4.into()).await.unwrap();
605
606            let mut e_5 = LogEvent::from("test message 5");
607            e_5.insert("counter", 5);
608            e_5.insert("extra_field", "value1");
609            e_5.insert("test_end", "yep");
610            tx.send(e_5.into()).await.unwrap();
611
612            let output_1 = out.recv().await.unwrap().into_log();
613            assert_eq!(output_1["message"], "test message 1".into());
614            assert_eq!(output_1["counter"], Value::from(8));
615            assert_eq!(output_1.metadata(), &metadata_1);
616
617            let output_2 = out.recv().await.unwrap().into_log();
618            assert_eq!(output_2["message"], "test message 2".into());
619            assert_eq!(output_2["extra_field"], "value1".into());
620            assert_eq!(output_2["counter"], Value::from(7));
621            assert_eq!(output_2.metadata(), &metadata_2);
622
623            drop(tx);
624            topology.stop().await;
625            assert_eq!(out.recv().await, None);
626        })
627        .await;
628    }
629
630    #[tokio::test]
631    async fn max_events_0() {
632        let reduce_config = toml::from_str::<ReduceConfig>(
633            r#"
634group_by = [ "id" ]
635merge_strategies.id = "retain"
636merge_strategies.message = "array"
637max_events = 0
638            "#,
639        );
640
641        match reduce_config {
642            Ok(_conf) => unreachable!("max_events=0 should be rejected."),
643            Err(err) => assert!(
644                err.to_string()
645                    .contains("invalid value: integer `0`, expected a nonzero usize")
646            ),
647        }
648    }
649
650    #[tokio::test]
651    async fn max_events_1() {
652        let reduce_config = toml::from_str::<ReduceConfig>(
653            r#"
654group_by = [ "id" ]
655merge_strategies.id = "retain"
656merge_strategies.message = "array"
657max_events = 1
658            "#,
659        )
660        .unwrap();
661        assert_transform_compliance(async move {
662            let (tx, rx) = mpsc::channel(1);
663            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
664
665            let mut e_1 = LogEvent::from("test 1");
666            e_1.insert("id", "1");
667
668            let mut e_2 = LogEvent::from("test 2");
669            e_2.insert("id", "1");
670
671            let mut e_3 = LogEvent::from("test 3");
672            e_3.insert("id", "1");
673
674            for event in [e_1.into(), e_2.into(), e_3.into()] {
675                tx.send(event).await.unwrap();
676            }
677
678            let output_1 = out.recv().await.unwrap().into_log();
679            assert_eq!(output_1["message"], vec!["test 1"].into());
680            let output_2 = out.recv().await.unwrap().into_log();
681            assert_eq!(output_2["message"], vec!["test 2"].into());
682
683            let output_3 = out.recv().await.unwrap().into_log();
684            assert_eq!(output_3["message"], vec!["test 3"].into());
685
686            drop(tx);
687            topology.stop().await;
688            assert_eq!(out.recv().await, None);
689        })
690        .await;
691    }
692
693    #[tokio::test]
694    async fn max_events() {
695        let reduce_config = toml::from_str::<ReduceConfig>(
696            r#"
697group_by = [ "id" ]
698merge_strategies.id = "retain"
699merge_strategies.message = "array"
700max_events = 3
701            "#,
702        )
703        .unwrap();
704
705        assert_transform_compliance(async move {
706            let (tx, rx) = mpsc::channel(1);
707            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
708
709            let mut e_1 = LogEvent::from("test 1");
710            e_1.insert("id", "1");
711
712            let mut e_2 = LogEvent::from("test 2");
713            e_2.insert("id", "1");
714
715            let mut e_3 = LogEvent::from("test 3");
716            e_3.insert("id", "1");
717
718            let mut e_4 = LogEvent::from("test 4");
719            e_4.insert("id", "1");
720
721            let mut e_5 = LogEvent::from("test 5");
722            e_5.insert("id", "1");
723
724            let mut e_6 = LogEvent::from("test 6");
725            e_6.insert("id", "1");
726
727            for event in [
728                e_1.into(),
729                e_2.into(),
730                e_3.into(),
731                e_4.into(),
732                e_5.into(),
733                e_6.into(),
734            ] {
735                tx.send(event).await.unwrap();
736            }
737
738            let output_1 = out.recv().await.unwrap().into_log();
739            assert_eq!(
740                output_1["message"],
741                vec!["test 1", "test 2", "test 3"].into()
742            );
743
744            let output_2 = out.recv().await.unwrap().into_log();
745            assert_eq!(
746                output_2["message"],
747                vec!["test 4", "test 5", "test 6"].into()
748            );
749
750            drop(tx);
751            topology.stop().await;
752            assert_eq!(out.recv().await, None);
753        })
754        .await
755    }
756
757    #[tokio::test]
758    async fn arrays() {
759        let reduce_config = toml::from_str::<ReduceConfig>(
760            r#"
761group_by = [ "request_id" ]
762
763merge_strategies.foo = "array"
764merge_strategies.bar = "concat"
765
766[ends_when]
767  type = "vrl"
768  source = "exists(.test_end)"
769"#,
770        )
771        .unwrap();
772
773        assert_transform_compliance(async move {
774            let (tx, rx) = mpsc::channel(1);
775
776            let new_schema_definition = reduce_config.outputs(
777                &Default::default(),
778                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
779            )[0]
780            .clone()
781            .log_schema_definitions
782            .get(&OutputId::from("in"))
783            .unwrap()
784            .clone();
785
786            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
787
788            let mut e_1 = LogEvent::from("test message 1");
789            e_1.insert("foo", json!([1, 3]));
790            e_1.insert("bar", json!([1, 3]));
791            e_1.insert("request_id", "1");
792            let mut metadata_1 = e_1.metadata().clone();
793            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
794            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
795
796            tx.send(e_1.into()).await.unwrap();
797
798            let mut e_2 = LogEvent::from("test message 2");
799            e_2.insert("foo", json!([2, 4]));
800            e_2.insert("bar", json!([2, 4]));
801            e_2.insert("request_id", "2");
802            let mut metadata_2 = e_2.metadata().clone();
803            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
804            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
805            tx.send(e_2.into()).await.unwrap();
806
807            let mut e_3 = LogEvent::from("test message 3");
808            e_3.insert("foo", json!([5, 7]));
809            e_3.insert("bar", json!([5, 7]));
810            e_3.insert("request_id", "1");
811            tx.send(e_3.into()).await.unwrap();
812
813            let mut e_4 = LogEvent::from("test message 4");
814            e_4.insert("foo", json!("done"));
815            e_4.insert("bar", json!("done"));
816            e_4.insert("request_id", "1");
817            e_4.insert("test_end", "yep");
818            tx.send(e_4.into()).await.unwrap();
819
820            let mut e_5 = LogEvent::from("test message 5");
821            e_5.insert("foo", json!([6, 8]));
822            e_5.insert("bar", json!([6, 8]));
823            e_5.insert("request_id", "2");
824            tx.send(e_5.into()).await.unwrap();
825
826            let mut e_6 = LogEvent::from("test message 6");
827            e_6.insert("foo", json!("done"));
828            e_6.insert("bar", json!("done"));
829            e_6.insert("request_id", "2");
830            e_6.insert("test_end", "yep");
831            tx.send(e_6.into()).await.unwrap();
832
833            let output_1 = out.recv().await.unwrap().into_log();
834            assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
835            assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
836            assert_eq!(output_1.metadata(), &metadata_1);
837
838            let output_2 = out.recv().await.unwrap().into_log();
839            assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
840            assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
841            assert_eq!(output_2.metadata(), &metadata_2);
842
843            drop(tx);
844            topology.stop().await;
845            assert_eq!(out.recv().await, None);
846        })
847        .await;
848    }
849
850    #[tokio::test]
851    async fn strategy_path_with_nested_fields() {
852        let reduce_config = toml::from_str::<ReduceConfig>(indoc!(
853            r#"
854            group_by = [ "id" ]
855
856            merge_strategies.id = "discard"
857            merge_strategies."message.a.b" = "array"
858
859            [ends_when]
860              type = "vrl"
861              source = "exists(.test_end)"
862            "#,
863        ))
864        .unwrap();
865
866        assert_transform_compliance(async move {
867            let (tx, rx) = mpsc::channel(1);
868
869            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
870
871            let e_1 = LogEvent::from(Value::from(btreemap! {
872                "id" => 777,
873                "message" => btreemap! {
874                    "a" => btreemap! {
875                        "b" => vec![1,2],
876                        "num" => 1,
877                    },
878                },
879                "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
880            }));
881            let mut metadata_1 = e_1.metadata().clone();
882            metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
883
884            tx.send(e_1.into()).await.unwrap();
885
886            let e_2 = LogEvent::from(Value::from(btreemap! {
887                "id" => 777,
888                "message" => btreemap! {
889                        "a" => btreemap! {
890                            "b" => vec![3,4],
891                            "num" => 2,
892                        },
893                },
894                 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
895                "test_end" => "done",
896            }));
897            tx.send(e_2.into()).await.unwrap();
898
899            let mut output = out.recv().await.unwrap().into_log();
900
901            // Remove timestamp fields which were automatically added.
902            output.remove_timestamp();
903            output.remove("timestamp_end");
904
905            assert_eq!(
906                *output.value(),
907                btreemap! {
908                    "id" => 777,
909                    "message" => btreemap! {
910                        "a" => btreemap! {
911                            "b" => vec![vec![1, 2], vec![3,4]],
912                            "num" => 3,
913                        },
914                    },
915                    "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
916                    "test_end" => "done",
917                }
918                .into()
919            );
920
921            drop(tx);
922            topology.stop().await;
923            assert_eq!(out.recv().await, None);
924        })
925        .await;
926    }
927
928    #[test]
929    fn invalid_merge_strategies_containing_indexes() {
930        let config = toml::from_str::<ReduceConfig>(indoc!(
931            r#"
932            group_by = [ "id" ]
933
934            merge_strategies.id = "discard"
935            merge_strategies."nested.msg[0]" = "array"
936            "#,
937        ))
938        .unwrap();
939        let error = Reduce::new(
940            &config,
941            &TableRegistry::default(),
942            &MetricsStorage::default(),
943        )
944        .unwrap_err();
945        assert_eq!(
946            error.to_string(),
947            "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
948        );
949    }
950
951    #[tokio::test]
952    async fn merge_objects_in_array() {
953        let config = toml::from_str::<ReduceConfig>(indoc!(
954            r#"
955            group_by = [ "id" ]
956            merge_strategies.events = "array"
957            merge_strategies."\"a-b\"" = "retain"
958            merge_strategies.another = "discard"
959
960            [ends_when]
961              type = "vrl"
962              source = "exists(.test_end)"
963            "#,
964        ))
965        .unwrap();
966
967        assert_transform_compliance(async move {
968            let (tx, rx) = mpsc::channel(1);
969
970            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
971
972            let v_1 = Value::from(btreemap! {
973                "attrs" => btreemap! {
974                    "nested.msg" => "foo",
975                },
976                "sev" => 2,
977            });
978            let mut e_1 = LogEvent::from(Value::from(
979                btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
980            ));
981            e_1.insert("events", v_1.clone());
982            e_1.insert("\"a-b\"", 2);
983            tx.send(e_1.into()).await.unwrap();
984
985            let v_2 = Value::from(btreemap! {
986                "attrs" => btreemap! {
987                    "nested.msg" => "bar",
988                },
989                "sev" => 3,
990            });
991            let mut e_2 = LogEvent::from(Value::from(
992                btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
993            ));
994            e_2.insert("events", v_2.clone());
995            e_2.insert("\"a-b\"", 2);
996            tx.send(e_2.into()).await.unwrap();
997
998            let output = out.recv().await.unwrap().into_log();
999            let expected_value = Value::from(btreemap! {
1000                "id" => 1554,
1001                "events" => vec![v_1, v_2],
1002                "another" => btreemap!{ "a" => 1},
1003                "a-b" => 2,
1004                "test_end" => "done"
1005            });
1006            assert_eq!(*output.value(), expected_value);
1007
1008            drop(tx);
1009            topology.stop().await;
1010            assert_eq!(out.recv().await, None);
1011        })
1012        .await
1013    }
1014
1015    #[tokio::test]
1016    async fn merged_quoted_path() {
1017        let config = toml::from_str::<ReduceConfig>(indoc!(
1018            r#"
1019            [ends_when]
1020              type = "vrl"
1021              source = "exists(.test_end)"
1022            "#,
1023        ))
1024        .unwrap();
1025
1026        assert_transform_compliance(async move {
1027            let (tx, rx) = mpsc::channel(1);
1028
1029            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1030
1031            let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1032            tx.send(e_1.into()).await.unwrap();
1033
1034            let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1035            tx.send(e_2.into()).await.unwrap();
1036
1037            let output = out.recv().await.unwrap().into_log();
1038            let expected_value = Value::from(btreemap! {
1039                "a b" => 3,
1040                "test_end" => "done"
1041            });
1042            assert_eq!(*output.value(), expected_value);
1043
1044            drop(tx);
1045            topology.stop().await;
1046            assert_eq!(out.recv().await, None);
1047        })
1048        .await
1049    }
1050}