vector/transforms/reduce/
transform.rs

1use std::collections::hash_map::Entry;
2use std::collections::HashMap;
3use std::pin::Pin;
4use std::time::{Duration, Instant};
5
6use crate::internal_events::ReduceAddEventError;
7use crate::transforms::reduce::merge_strategy::{
8    get_value_merger, MergeStrategy, ReduceValueMerger,
9};
10use crate::{
11    conditions::Condition,
12    event::{discriminant::Discriminant, Event, EventMetadata, LogEvent},
13    internal_events::ReduceStaleEventFlushed,
14    transforms::{reduce::config::ReduceConfig, TaskTransform},
15};
16use futures::Stream;
17use indexmap::IndexMap;
18use vector_lib::stream::expiration_map::{map_with_expiration, Emitter};
19use vrl::path::{parse_target_path, OwnedTargetPath};
20use vrl::prelude::KeyString;
21
22#[derive(Clone, Debug)]
23struct ReduceState {
24    events: usize,
25    fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
26    stale_since: Instant,
27    creation: Instant,
28    metadata: EventMetadata,
29}
30
31fn is_covered_by_strategy(
32    path: &OwnedTargetPath,
33    strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
34) -> bool {
35    let mut current = OwnedTargetPath::event_root();
36    for component in &path.path.segments {
37        current = current.with_field_appended(&component.to_string());
38        if strategies.contains_key(&current) {
39            return true;
40        }
41    }
42    false
43}
44
45impl ReduceState {
46    fn new() -> Self {
47        Self {
48            events: 0,
49            stale_since: Instant::now(),
50            creation: Instant::now(),
51            fields: HashMap::new(),
52            metadata: EventMetadata::default(),
53        }
54    }
55
56    fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
57        self.metadata.merge(e.metadata().clone());
58
59        for (path, strategy) in strategies {
60            if let Some(value) = e.get(path) {
61                match self.fields.entry(path.clone()) {
62                    Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
63                        Ok(m) => {
64                            entry.insert(m);
65                        }
66                        Err(error) => {
67                            warn!(message = "Failed to create value merger.", %error, %path);
68                        }
69                    },
70                    Entry::Occupied(mut entry) => {
71                        if let Err(error) = entry.get_mut().add(value.clone()) {
72                            warn!(message = "Failed to merge value.", %error);
73                        }
74                    }
75                }
76            }
77        }
78
79        if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
80            for (path, value) in fields_iter {
81                // This should not return an error, unless there is a bug in the event fields iterator.
82                let parsed_path = match parse_target_path(&path) {
83                    Ok(path) => path,
84                    Err(error) => {
85                        emit!(ReduceAddEventError { error, path });
86                        continue;
87                    }
88                };
89                if is_covered_by_strategy(&parsed_path, strategies) {
90                    continue;
91                }
92
93                let maybe_strategy = strategies.get(&parsed_path);
94                match self.fields.entry(parsed_path) {
95                    Entry::Vacant(entry) => {
96                        if let Some(strategy) = maybe_strategy {
97                            match get_value_merger(value.clone(), strategy) {
98                                Ok(m) => {
99                                    entry.insert(m);
100                                }
101                                Err(error) => {
102                                    warn!(message = "Failed to merge value.", %error);
103                                }
104                            }
105                        } else {
106                            entry.insert(value.clone().into());
107                        }
108                    }
109                    Entry::Occupied(mut entry) => {
110                        if let Err(error) = entry.get_mut().add(value.clone()) {
111                            warn!(message = "Failed to merge value.", %error);
112                        }
113                    }
114                }
115            }
116        }
117        // else the event root is not an object (see https://github.com/vectordotdev/vector/issues/18219)
118
119        self.events += 1;
120        self.stale_since = Instant::now();
121    }
122
123    fn flush(mut self) -> LogEvent {
124        let mut event = LogEvent::new_with_metadata(self.metadata);
125        for (path, v) in self.fields.drain() {
126            if let Err(error) = v.insert_into(&path, &mut event) {
127                warn!(message = "Failed to merge values for field.", %error);
128            }
129        }
130        self.events = 0;
131        event
132    }
133}
134
135#[derive(Clone, Debug)]
136pub struct Reduce {
137    expire_after: Duration,
138    flush_period: Duration,
139    end_every_period: Option<Duration>,
140    group_by: Vec<String>,
141    merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
142    reduce_merge_states: HashMap<Discriminant, ReduceState>,
143    ends_when: Option<Condition>,
144    starts_when: Option<Condition>,
145    max_events: Option<usize>,
146}
147
148fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
149    for (path, _) in &strategies {
150        let contains_index = parse_target_path(path)
151            .map_err(|_| format!("Could not parse path: `{path}`"))?
152            .path
153            .segments
154            .iter()
155            .any(|segment| segment.is_index());
156        if contains_index {
157            return Err(format!(
158                "Merge strategies with indexes are currently not supported. Path: `{path}`"
159            )
160            .into());
161        }
162    }
163
164    Ok(())
165}
166
167impl Reduce {
168    pub fn new(
169        config: &ReduceConfig,
170        enrichment_tables: &vector_lib::enrichment::TableRegistry,
171    ) -> crate::Result<Self> {
172        if config.ends_when.is_some() && config.starts_when.is_some() {
173            return Err("only one of `ends_when` and `starts_when` can be provided".into());
174        }
175
176        let ends_when = config
177            .ends_when
178            .as_ref()
179            .map(|c| c.build(enrichment_tables))
180            .transpose()?;
181        let starts_when = config
182            .starts_when
183            .as_ref()
184            .map(|c| c.build(enrichment_tables))
185            .transpose()?;
186        let group_by = config.group_by.clone().into_iter().collect();
187        let max_events = config.max_events.map(|max| max.into());
188
189        validate_merge_strategies(config.merge_strategies.clone())?;
190
191        Ok(Reduce {
192            expire_after: config.expire_after_ms,
193            flush_period: config.flush_period_ms,
194            end_every_period: config.end_every_period_ms,
195            group_by,
196            merge_strategies: config
197                .merge_strategies
198                .iter()
199                .filter_map(|(path, strategy)| {
200                    // TODO Invalid paths are ignored to preserve backwards compatibility.
201                    //      Merge strategy paths should ideally be [`lookup_v2::ConfigTargetPath`]
202                    //      which means an invalid path would result in an configuration error.
203                    let parsed_path = parse_target_path(path).ok();
204                    if parsed_path.is_none() {
205                        warn!(message = "Ignoring strategy with invalid path.", %path);
206                    }
207                    parsed_path.map(|path| (path, strategy.clone()))
208                })
209                .collect(),
210            reduce_merge_states: HashMap::new(),
211            ends_when,
212            starts_when,
213            max_events,
214        })
215    }
216
217    fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
218        let mut flush_discriminants = Vec::new();
219        let now = Instant::now();
220        for (k, t) in &self.reduce_merge_states {
221            if let Some(period) = self.end_every_period {
222                if (now - t.creation) >= period {
223                    flush_discriminants.push(k.clone());
224                }
225            }
226
227            if (now - t.stale_since) >= self.expire_after {
228                flush_discriminants.push(k.clone());
229            }
230        }
231        for k in &flush_discriminants {
232            if let Some(t) = self.reduce_merge_states.remove(k) {
233                emit!(ReduceStaleEventFlushed);
234                emitter.emit(Event::from(t.flush()));
235            }
236        }
237    }
238
239    fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
240        self.reduce_merge_states
241            .drain()
242            .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
243    }
244
245    fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
246        match self.reduce_merge_states.entry(discriminant) {
247            Entry::Vacant(entry) => {
248                let mut state = ReduceState::new();
249                state.add_event(event, &self.merge_strategies);
250                entry.insert(state);
251            }
252            Entry::Occupied(mut entry) => {
253                entry.get_mut().add_event(event, &self.merge_strategies);
254            }
255        };
256    }
257
258    pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
259        let (starts_here, event) = match &self.starts_when {
260            Some(condition) => condition.check(event),
261            None => (false, event),
262        };
263
264        let (mut ends_here, event) = match &self.ends_when {
265            Some(condition) => condition.check(event),
266            None => (false, event),
267        };
268
269        let event = event.into_log();
270        let discriminant = Discriminant::from_log_event(&event, &self.group_by);
271
272        if let Some(max_events) = self.max_events {
273            if max_events == 1 {
274                ends_here = true;
275            } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
276                // The current event will finish this set
277                if entry.events + 1 == max_events {
278                    ends_here = true;
279                }
280            }
281        }
282
283        if starts_here {
284            if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
285                emitter.emit(state.flush().into());
286            }
287
288            self.push_or_new_reduce_state(event, discriminant)
289        } else if ends_here {
290            emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
291                Some(mut state) => {
292                    state.add_event(event, &self.merge_strategies);
293                    state.flush().into()
294                }
295                None => {
296                    let mut state = ReduceState::new();
297                    state.add_event(event, &self.merge_strategies);
298                    state.flush().into()
299                }
300            });
301        } else {
302            self.push_or_new_reduce_state(event, discriminant)
303        }
304    }
305}
306
307impl TaskTransform<Event> for Reduce {
308    fn transform(
309        self: Box<Self>,
310        input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
311    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
312    where
313        Self: 'static,
314    {
315        let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
316            me.transform_one(emitter, event);
317        };
318
319        construct_output_stream(self, input_rx, transform_fn)
320    }
321}
322
323pub fn construct_output_stream(
324    reduce: Box<Reduce>,
325    input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
326    mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
327) -> Pin<Box<dyn Stream<Item = Event> + Send>>
328where
329    Reduce: 'static,
330{
331    let flush_period = reduce.flush_period;
332    Box::pin(map_with_expiration(
333        reduce,
334        input_rx,
335        flush_period,
336        move |me, event, emitter| {
337            transform_fn(me, event, emitter);
338        },
339        |me, emitter| {
340            me.flush_into(emitter);
341        },
342        |me, emitter| {
343            me.flush_all_into(emitter);
344        },
345    ))
346}
347
348#[cfg(test)]
349mod test {
350    use indoc::indoc;
351    use serde_json::json;
352    use std::sync::Arc;
353    use tokio::sync::mpsc;
354    use tokio_stream::wrappers::ReceiverStream;
355    use vrl::value::Kind;
356
357    use vector_lib::enrichment::TableRegistry;
358    use vector_lib::lookup::owned_value_path;
359
360    use crate::config::schema::Definition;
361    use crate::config::{schema, LogNamespace, OutputId, TransformConfig};
362    use crate::event::{LogEvent, Value};
363    use crate::test_util::components::assert_transform_compliance;
364    use crate::transforms::test::create_topology;
365
366    use super::*;
367
368    #[tokio::test]
369    async fn reduce_from_condition() {
370        let reduce_config = toml::from_str::<ReduceConfig>(
371            r#"
372group_by = [ "request_id" ]
373
374[ends_when]
375  type = "vrl"
376  source = "exists(.test_end)"
377"#,
378        )
379        .unwrap();
380
381        assert_transform_compliance(async move {
382            let input_definition = schema::Definition::default_legacy_namespace()
383                .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
384                .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
385                .with_event_field(
386                    &owned_value_path!("test_end"),
387                    Kind::bytes().or_undefined(),
388                    None,
389                )
390                .with_event_field(
391                    &owned_value_path!("extra_field"),
392                    Kind::bytes().or_undefined(),
393                    None,
394                );
395            let schema_definitions = reduce_config
396                .outputs(
397                    vector_lib::enrichment::TableRegistry::default(),
398                    &[("test".into(), input_definition)],
399                    LogNamespace::Legacy,
400                )
401                .first()
402                .unwrap()
403                .schema_definitions(true)
404                .clone();
405
406            let new_schema_definition = reduce_config.outputs(
407                TableRegistry::default(),
408                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
409                LogNamespace::Legacy,
410            )[0]
411            .clone()
412            .log_schema_definitions
413            .get(&OutputId::from("in"))
414            .unwrap()
415            .clone();
416
417            let (tx, rx) = mpsc::channel(1);
418            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
419
420            let mut e_1 = LogEvent::from("test message 1");
421            e_1.insert("counter", 1);
422            e_1.insert("request_id", "1");
423            let mut metadata_1 = e_1.metadata().clone();
424            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
425            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
426
427            let mut e_2 = LogEvent::from("test message 2");
428            e_2.insert("counter", 2);
429            e_2.insert("request_id", "2");
430            let mut metadata_2 = e_2.metadata().clone();
431            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
432            metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
433
434            let mut e_3 = LogEvent::from("test message 3");
435            e_3.insert("counter", 3);
436            e_3.insert("request_id", "1");
437
438            let mut e_4 = LogEvent::from("test message 4");
439            e_4.insert("counter", 4);
440            e_4.insert("request_id", "1");
441            e_4.insert("test_end", "yep");
442
443            let mut e_5 = LogEvent::from("test message 5");
444            e_5.insert("counter", 5);
445            e_5.insert("request_id", "2");
446            e_5.insert("extra_field", "value1");
447            e_5.insert("test_end", "yep");
448
449            for event in vec![e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
450                tx.send(event).await.unwrap();
451            }
452
453            let output_1 = out.recv().await.unwrap().into_log();
454            assert_eq!(output_1["message"], "test message 1".into());
455            assert_eq!(output_1["counter"], Value::from(8));
456            assert_eq!(output_1.metadata(), &metadata_1);
457            schema_definitions
458                .values()
459                .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
460
461            let output_2 = out.recv().await.unwrap().into_log();
462            assert_eq!(output_2["message"], "test message 2".into());
463            assert_eq!(output_2["extra_field"], "value1".into());
464            assert_eq!(output_2["counter"], Value::from(7));
465            assert_eq!(output_2.metadata(), &metadata_2);
466            schema_definitions
467                .values()
468                .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
469
470            drop(tx);
471            topology.stop().await;
472            assert_eq!(out.recv().await, None);
473        })
474        .await;
475    }
476
477    #[tokio::test]
478    async fn reduce_merge_strategies() {
479        let reduce_config = toml::from_str::<ReduceConfig>(
480            r#"
481group_by = [ "request_id" ]
482
483merge_strategies.foo = "concat"
484merge_strategies.bar = "array"
485merge_strategies.baz = "max"
486
487[ends_when]
488  type = "vrl"
489  source = "exists(.test_end)"
490"#,
491        )
492        .unwrap();
493
494        assert_transform_compliance(async move {
495            let (tx, rx) = mpsc::channel(1);
496
497            let new_schema_definition = reduce_config.outputs(
498                TableRegistry::default(),
499                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
500                LogNamespace::Legacy,
501            )[0]
502            .clone()
503            .log_schema_definitions
504            .get(&OutputId::from("in"))
505            .unwrap()
506            .clone();
507
508            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
509
510            let mut e_1 = LogEvent::from("test message 1");
511            e_1.insert("foo", "first foo");
512            e_1.insert("bar", "first bar");
513            e_1.insert("baz", 2);
514            e_1.insert("request_id", "1");
515            let mut metadata = e_1.metadata().clone();
516            metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
517            metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
518            tx.send(e_1.into()).await.unwrap();
519
520            let mut e_2 = LogEvent::from("test message 2");
521            e_2.insert("foo", "second foo");
522            e_2.insert("bar", 2);
523            e_2.insert("baz", "not number");
524            e_2.insert("request_id", "1");
525            tx.send(e_2.into()).await.unwrap();
526
527            let mut e_3 = LogEvent::from("test message 3");
528            e_3.insert("foo", 10);
529            e_3.insert("bar", "third bar");
530            e_3.insert("baz", 3);
531            e_3.insert("request_id", "1");
532            e_3.insert("test_end", "yep");
533            tx.send(e_3.into()).await.unwrap();
534
535            let output_1 = out.recv().await.unwrap().into_log();
536            assert_eq!(output_1["message"], "test message 1".into());
537            assert_eq!(output_1["foo"], "first foo second foo".into());
538            assert_eq!(
539                output_1["bar"],
540                Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
541            );
542            assert_eq!(output_1["baz"], 3.into());
543            assert_eq!(output_1.metadata(), &metadata);
544
545            drop(tx);
546            topology.stop().await;
547            assert_eq!(out.recv().await, None);
548        })
549        .await;
550    }
551
552    #[tokio::test]
553    async fn missing_group_by() {
554        let reduce_config = toml::from_str::<ReduceConfig>(
555            r#"
556group_by = [ "request_id" ]
557
558[ends_when]
559  type = "vrl"
560  source = "exists(.test_end)"
561"#,
562        )
563        .unwrap();
564
565        assert_transform_compliance(async move {
566            let (tx, rx) = mpsc::channel(1);
567            let new_schema_definition = reduce_config.outputs(
568                TableRegistry::default(),
569                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
570                LogNamespace::Legacy,
571            )[0]
572            .clone()
573            .log_schema_definitions
574            .get(&OutputId::from("in"))
575            .unwrap()
576            .clone();
577
578            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
579
580            let mut e_1 = LogEvent::from("test message 1");
581            e_1.insert("counter", 1);
582            e_1.insert("request_id", "1");
583            let mut metadata_1 = e_1.metadata().clone();
584            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
585            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
586            tx.send(e_1.into()).await.unwrap();
587
588            let mut e_2 = LogEvent::from("test message 2");
589            e_2.insert("counter", 2);
590            let mut metadata_2 = e_2.metadata().clone();
591            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
592            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
593            tx.send(e_2.into()).await.unwrap();
594
595            let mut e_3 = LogEvent::from("test message 3");
596            e_3.insert("counter", 3);
597            e_3.insert("request_id", "1");
598            tx.send(e_3.into()).await.unwrap();
599
600            let mut e_4 = LogEvent::from("test message 4");
601            e_4.insert("counter", 4);
602            e_4.insert("request_id", "1");
603            e_4.insert("test_end", "yep");
604            tx.send(e_4.into()).await.unwrap();
605
606            let mut e_5 = LogEvent::from("test message 5");
607            e_5.insert("counter", 5);
608            e_5.insert("extra_field", "value1");
609            e_5.insert("test_end", "yep");
610            tx.send(e_5.into()).await.unwrap();
611
612            let output_1 = out.recv().await.unwrap().into_log();
613            assert_eq!(output_1["message"], "test message 1".into());
614            assert_eq!(output_1["counter"], Value::from(8));
615            assert_eq!(output_1.metadata(), &metadata_1);
616
617            let output_2 = out.recv().await.unwrap().into_log();
618            assert_eq!(output_2["message"], "test message 2".into());
619            assert_eq!(output_2["extra_field"], "value1".into());
620            assert_eq!(output_2["counter"], Value::from(7));
621            assert_eq!(output_2.metadata(), &metadata_2);
622
623            drop(tx);
624            topology.stop().await;
625            assert_eq!(out.recv().await, None);
626        })
627        .await;
628    }
629
630    #[tokio::test]
631    async fn max_events_0() {
632        let reduce_config = toml::from_str::<ReduceConfig>(
633            r#"
634group_by = [ "id" ]
635merge_strategies.id = "retain"
636merge_strategies.message = "array"
637max_events = 0
638            "#,
639        );
640
641        match reduce_config {
642            Ok(_conf) => unreachable!("max_events=0 should be rejected."),
643            Err(err) => assert!(err
644                .to_string()
645                .contains("invalid value: integer `0`, expected a nonzero usize")),
646        }
647    }
648
649    #[tokio::test]
650    async fn max_events_1() {
651        let reduce_config = toml::from_str::<ReduceConfig>(
652            r#"
653group_by = [ "id" ]
654merge_strategies.id = "retain"
655merge_strategies.message = "array"
656max_events = 1
657            "#,
658        )
659        .unwrap();
660        assert_transform_compliance(async move {
661            let (tx, rx) = mpsc::channel(1);
662            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
663
664            let mut e_1 = LogEvent::from("test 1");
665            e_1.insert("id", "1");
666
667            let mut e_2 = LogEvent::from("test 2");
668            e_2.insert("id", "1");
669
670            let mut e_3 = LogEvent::from("test 3");
671            e_3.insert("id", "1");
672
673            for event in vec![e_1.into(), e_2.into(), e_3.into()] {
674                tx.send(event).await.unwrap();
675            }
676
677            let output_1 = out.recv().await.unwrap().into_log();
678            assert_eq!(output_1["message"], vec!["test 1"].into());
679            let output_2 = out.recv().await.unwrap().into_log();
680            assert_eq!(output_2["message"], vec!["test 2"].into());
681
682            let output_3 = out.recv().await.unwrap().into_log();
683            assert_eq!(output_3["message"], vec!["test 3"].into());
684
685            drop(tx);
686            topology.stop().await;
687            assert_eq!(out.recv().await, None);
688        })
689        .await;
690    }
691
692    #[tokio::test]
693    async fn max_events() {
694        let reduce_config = toml::from_str::<ReduceConfig>(
695            r#"
696group_by = [ "id" ]
697merge_strategies.id = "retain"
698merge_strategies.message = "array"
699max_events = 3
700            "#,
701        )
702        .unwrap();
703
704        assert_transform_compliance(async move {
705            let (tx, rx) = mpsc::channel(1);
706            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
707
708            let mut e_1 = LogEvent::from("test 1");
709            e_1.insert("id", "1");
710
711            let mut e_2 = LogEvent::from("test 2");
712            e_2.insert("id", "1");
713
714            let mut e_3 = LogEvent::from("test 3");
715            e_3.insert("id", "1");
716
717            let mut e_4 = LogEvent::from("test 4");
718            e_4.insert("id", "1");
719
720            let mut e_5 = LogEvent::from("test 5");
721            e_5.insert("id", "1");
722
723            let mut e_6 = LogEvent::from("test 6");
724            e_6.insert("id", "1");
725
726            for event in vec![
727                e_1.into(),
728                e_2.into(),
729                e_3.into(),
730                e_4.into(),
731                e_5.into(),
732                e_6.into(),
733            ] {
734                tx.send(event).await.unwrap();
735            }
736
737            let output_1 = out.recv().await.unwrap().into_log();
738            assert_eq!(
739                output_1["message"],
740                vec!["test 1", "test 2", "test 3"].into()
741            );
742
743            let output_2 = out.recv().await.unwrap().into_log();
744            assert_eq!(
745                output_2["message"],
746                vec!["test 4", "test 5", "test 6"].into()
747            );
748
749            drop(tx);
750            topology.stop().await;
751            assert_eq!(out.recv().await, None);
752        })
753        .await
754    }
755
756    #[tokio::test]
757    async fn arrays() {
758        let reduce_config = toml::from_str::<ReduceConfig>(
759            r#"
760group_by = [ "request_id" ]
761
762merge_strategies.foo = "array"
763merge_strategies.bar = "concat"
764
765[ends_when]
766  type = "vrl"
767  source = "exists(.test_end)"
768"#,
769        )
770        .unwrap();
771
772        assert_transform_compliance(async move {
773            let (tx, rx) = mpsc::channel(1);
774
775            let new_schema_definition = reduce_config.outputs(
776                TableRegistry::default(),
777                &[(OutputId::from("in"), Definition::default_legacy_namespace())],
778                LogNamespace::Legacy,
779            )[0]
780            .clone()
781            .log_schema_definitions
782            .get(&OutputId::from("in"))
783            .unwrap()
784            .clone();
785
786            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
787
788            let mut e_1 = LogEvent::from("test message 1");
789            e_1.insert("foo", json!([1, 3]));
790            e_1.insert("bar", json!([1, 3]));
791            e_1.insert("request_id", "1");
792            let mut metadata_1 = e_1.metadata().clone();
793            metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
794            metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
795
796            tx.send(e_1.into()).await.unwrap();
797
798            let mut e_2 = LogEvent::from("test message 2");
799            e_2.insert("foo", json!([2, 4]));
800            e_2.insert("bar", json!([2, 4]));
801            e_2.insert("request_id", "2");
802            let mut metadata_2 = e_2.metadata().clone();
803            metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
804            metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
805            tx.send(e_2.into()).await.unwrap();
806
807            let mut e_3 = LogEvent::from("test message 3");
808            e_3.insert("foo", json!([5, 7]));
809            e_3.insert("bar", json!([5, 7]));
810            e_3.insert("request_id", "1");
811            tx.send(e_3.into()).await.unwrap();
812
813            let mut e_4 = LogEvent::from("test message 4");
814            e_4.insert("foo", json!("done"));
815            e_4.insert("bar", json!("done"));
816            e_4.insert("request_id", "1");
817            e_4.insert("test_end", "yep");
818            tx.send(e_4.into()).await.unwrap();
819
820            let mut e_5 = LogEvent::from("test message 5");
821            e_5.insert("foo", json!([6, 8]));
822            e_5.insert("bar", json!([6, 8]));
823            e_5.insert("request_id", "2");
824            tx.send(e_5.into()).await.unwrap();
825
826            let mut e_6 = LogEvent::from("test message 6");
827            e_6.insert("foo", json!("done"));
828            e_6.insert("bar", json!("done"));
829            e_6.insert("request_id", "2");
830            e_6.insert("test_end", "yep");
831            tx.send(e_6.into()).await.unwrap();
832
833            let output_1 = out.recv().await.unwrap().into_log();
834            assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
835            assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
836            assert_eq!(output_1.metadata(), &metadata_1);
837
838            let output_2 = out.recv().await.unwrap().into_log();
839            assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
840            assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
841            assert_eq!(output_2.metadata(), &metadata_2);
842
843            drop(tx);
844            topology.stop().await;
845            assert_eq!(out.recv().await, None);
846        })
847        .await;
848    }
849
850    #[tokio::test]
851    async fn strategy_path_with_nested_fields() {
852        let reduce_config = toml::from_str::<ReduceConfig>(indoc!(
853            r#"
854            group_by = [ "id" ]
855
856            merge_strategies.id = "discard"
857            merge_strategies."message.a.b" = "array"
858
859            [ends_when]
860              type = "vrl"
861              source = "exists(.test_end)"
862            "#,
863        ))
864        .unwrap();
865
866        assert_transform_compliance(async move {
867            let (tx, rx) = mpsc::channel(1);
868
869            let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
870
871            let e_1 = LogEvent::from(Value::from(btreemap! {
872                "id" => 777,
873                "message" => btreemap! {
874                    "a" => btreemap! {
875                        "b" => vec![1,2],
876                        "num" => 1,
877                    },
878                },
879                "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
880            }));
881            let mut metadata_1 = e_1.metadata().clone();
882            metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
883
884            tx.send(e_1.into()).await.unwrap();
885
886            let e_2 = LogEvent::from(Value::from(btreemap! {
887                "id" => 777,
888                "message" => btreemap! {
889                        "a" => btreemap! {
890                            "b" => vec![3,4],
891                            "num" => 2,
892                        },
893                },
894                 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
895                "test_end" => "done",
896            }));
897            tx.send(e_2.into()).await.unwrap();
898
899            let mut output = out.recv().await.unwrap().into_log();
900
901            // Remove timestamp fields which were automatically added.
902            output.remove_timestamp();
903            output.remove("timestamp_end");
904
905            assert_eq!(
906                *output.value(),
907                btreemap! {
908                    "id" => 777,
909                    "message" => btreemap! {
910                        "a" => btreemap! {
911                            "b" => vec![vec![1, 2], vec![3,4]],
912                            "num" => 3,
913                        },
914                    },
915                    "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
916                    "test_end" => "done",
917                }
918                .into()
919            );
920
921            drop(tx);
922            topology.stop().await;
923            assert_eq!(out.recv().await, None);
924        })
925        .await;
926    }
927
928    #[test]
929    fn invalid_merge_strategies_containing_indexes() {
930        let config = toml::from_str::<ReduceConfig>(indoc!(
931            r#"
932            group_by = [ "id" ]
933
934            merge_strategies.id = "discard"
935            merge_strategies."nested.msg[0]" = "array"
936            "#,
937        ))
938        .unwrap();
939        let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err();
940        assert_eq!(
941            error.to_string(),
942            "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
943        );
944    }
945
946    #[tokio::test]
947    async fn merge_objects_in_array() {
948        let config = toml::from_str::<ReduceConfig>(indoc!(
949            r#"
950            group_by = [ "id" ]
951            merge_strategies.events = "array"
952            merge_strategies."\"a-b\"" = "retain"
953            merge_strategies.another = "discard"
954
955            [ends_when]
956              type = "vrl"
957              source = "exists(.test_end)"
958            "#,
959        ))
960        .unwrap();
961
962        assert_transform_compliance(async move {
963            let (tx, rx) = mpsc::channel(1);
964
965            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
966
967            let v_1 = Value::from(btreemap! {
968                "attrs" => btreemap! {
969                    "nested.msg" => "foo",
970                },
971                "sev" => 2,
972            });
973            let mut e_1 = LogEvent::from(Value::from(
974                btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
975            ));
976            e_1.insert("events", v_1.clone());
977            e_1.insert("\"a-b\"", 2);
978            tx.send(e_1.into()).await.unwrap();
979
980            let v_2 = Value::from(btreemap! {
981                "attrs" => btreemap! {
982                    "nested.msg" => "bar",
983                },
984                "sev" => 3,
985            });
986            let mut e_2 = LogEvent::from(Value::from(
987                btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
988            ));
989            e_2.insert("events", v_2.clone());
990            e_2.insert("\"a-b\"", 2);
991            tx.send(e_2.into()).await.unwrap();
992
993            let output = out.recv().await.unwrap().into_log();
994            let expected_value = Value::from(btreemap! {
995                "id" => 1554,
996                "events" => vec![v_1, v_2],
997                "another" => btreemap!{ "a" => 1},
998                "a-b" => 2,
999                "test_end" => "done"
1000            });
1001            assert_eq!(*output.value(), expected_value);
1002
1003            drop(tx);
1004            topology.stop().await;
1005            assert_eq!(out.recv().await, None);
1006        })
1007        .await
1008    }
1009
1010    #[tokio::test]
1011    async fn merged_quoted_path() {
1012        let config = toml::from_str::<ReduceConfig>(indoc!(
1013            r#"
1014            [ends_when]
1015              type = "vrl"
1016              source = "exists(.test_end)"
1017            "#,
1018        ))
1019        .unwrap();
1020
1021        assert_transform_compliance(async move {
1022            let (tx, rx) = mpsc::channel(1);
1023
1024            let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1025
1026            let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1027            tx.send(e_1.into()).await.unwrap();
1028
1029            let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1030            tx.send(e_2.into()).await.unwrap();
1031
1032            let output = out.recv().await.unwrap().into_log();
1033            let expected_value = Value::from(btreemap! {
1034                "a b" => 3,
1035                "test_end" => "done"
1036            });
1037            assert_eq!(*output.value(), expected_value);
1038
1039            drop(tx);
1040            topology.stop().await;
1041            assert_eq!(out.recv().await, None);
1042        })
1043        .await
1044    }
1045}