1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 time::{Duration, Instant},
5};
6
7use futures::Stream;
8use indexmap::IndexMap;
9use vector_lib::stream::expiration_map::{Emitter, map_with_expiration};
10use vrl::{
11 path::{OwnedTargetPath, parse_target_path},
12 prelude::KeyString,
13};
14
15use crate::{
16 conditions::Condition,
17 event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
18 internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
19 transforms::{
20 TaskTransform,
21 reduce::{
22 config::ReduceConfig,
23 merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
24 },
25 },
26};
27
28#[derive(Clone, Debug)]
29struct ReduceState {
30 events: usize,
31 fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
32 stale_since: Instant,
33 creation: Instant,
34 metadata: EventMetadata,
35}
36
37fn is_covered_by_strategy(
38 path: &OwnedTargetPath,
39 strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
40) -> bool {
41 let mut current = OwnedTargetPath::event_root();
42 for component in &path.path.segments {
43 current = current.with_field_appended(&component.to_string());
44 if strategies.contains_key(¤t) {
45 return true;
46 }
47 }
48 false
49}
50
51impl ReduceState {
52 fn new() -> Self {
53 Self {
54 events: 0,
55 stale_since: Instant::now(),
56 creation: Instant::now(),
57 fields: HashMap::new(),
58 metadata: EventMetadata::default(),
59 }
60 }
61
62 fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
63 self.metadata.merge(e.metadata().clone());
64
65 for (path, strategy) in strategies {
66 if let Some(value) = e.get(path) {
67 match self.fields.entry(path.clone()) {
68 Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
69 Ok(m) => {
70 entry.insert(m);
71 }
72 Err(error) => {
73 warn!(message = "Failed to create value merger.", %error, %path);
74 }
75 },
76 Entry::Occupied(mut entry) => {
77 if let Err(error) = entry.get_mut().add(value.clone()) {
78 warn!(message = "Failed to merge value.", %error);
79 }
80 }
81 }
82 }
83 }
84
85 if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
86 for (path, value) in fields_iter {
87 let parsed_path = match parse_target_path(&path) {
89 Ok(path) => path,
90 Err(error) => {
91 emit!(ReduceAddEventError { error, path });
92 continue;
93 }
94 };
95 if is_covered_by_strategy(&parsed_path, strategies) {
96 continue;
97 }
98
99 let maybe_strategy = strategies.get(&parsed_path);
100 match self.fields.entry(parsed_path) {
101 Entry::Vacant(entry) => {
102 if let Some(strategy) = maybe_strategy {
103 match get_value_merger(value.clone(), strategy) {
104 Ok(m) => {
105 entry.insert(m);
106 }
107 Err(error) => {
108 warn!(message = "Failed to merge value.", %error);
109 }
110 }
111 } else {
112 entry.insert(value.clone().into());
113 }
114 }
115 Entry::Occupied(mut entry) => {
116 if let Err(error) = entry.get_mut().add(value.clone()) {
117 warn!(message = "Failed to merge value.", %error);
118 }
119 }
120 }
121 }
122 }
123 self.events += 1;
126 self.stale_since = Instant::now();
127 }
128
129 fn flush(mut self) -> LogEvent {
130 let mut event = LogEvent::new_with_metadata(self.metadata);
131 for (path, v) in self.fields.drain() {
132 if let Err(error) = v.insert_into(&path, &mut event) {
133 warn!(message = "Failed to merge values for field.", %error);
134 }
135 }
136 self.events = 0;
137 event
138 }
139}
140
141#[derive(Clone, Debug)]
142pub struct Reduce {
143 expire_after: Duration,
144 flush_period: Duration,
145 end_every_period: Option<Duration>,
146 group_by: Vec<String>,
147 merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
148 reduce_merge_states: HashMap<Discriminant, ReduceState>,
149 ends_when: Option<Condition>,
150 starts_when: Option<Condition>,
151 max_events: Option<usize>,
152}
153
154fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
155 for (path, _) in &strategies {
156 let contains_index = parse_target_path(path)
157 .map_err(|_| format!("Could not parse path: `{path}`"))?
158 .path
159 .segments
160 .iter()
161 .any(|segment| segment.is_index());
162 if contains_index {
163 return Err(format!(
164 "Merge strategies with indexes are currently not supported. Path: `{path}`"
165 )
166 .into());
167 }
168 }
169
170 Ok(())
171}
172
173impl Reduce {
174 pub fn new(
175 config: &ReduceConfig,
176 enrichment_tables: &vector_lib::enrichment::TableRegistry,
177 ) -> crate::Result<Self> {
178 if config.ends_when.is_some() && config.starts_when.is_some() {
179 return Err("only one of `ends_when` and `starts_when` can be provided".into());
180 }
181
182 let ends_when = config
183 .ends_when
184 .as_ref()
185 .map(|c| c.build(enrichment_tables))
186 .transpose()?;
187 let starts_when = config
188 .starts_when
189 .as_ref()
190 .map(|c| c.build(enrichment_tables))
191 .transpose()?;
192 let group_by = config.group_by.clone().into_iter().collect();
193 let max_events = config.max_events.map(|max| max.into());
194
195 validate_merge_strategies(config.merge_strategies.clone())?;
196
197 Ok(Reduce {
198 expire_after: config.expire_after_ms,
199 flush_period: config.flush_period_ms,
200 end_every_period: config.end_every_period_ms,
201 group_by,
202 merge_strategies: config
203 .merge_strategies
204 .iter()
205 .filter_map(|(path, strategy)| {
206 let parsed_path = parse_target_path(path).ok();
210 if parsed_path.is_none() {
211 warn!(message = "Ignoring strategy with invalid path.", %path);
212 }
213 parsed_path.map(|path| (path, strategy.clone()))
214 })
215 .collect(),
216 reduce_merge_states: HashMap::new(),
217 ends_when,
218 starts_when,
219 max_events,
220 })
221 }
222
223 fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
224 let mut flush_discriminants = Vec::new();
225 let now = Instant::now();
226 for (k, t) in &self.reduce_merge_states {
227 if let Some(period) = self.end_every_period
228 && (now - t.creation) >= period
229 {
230 flush_discriminants.push(k.clone());
231 }
232
233 if (now - t.stale_since) >= self.expire_after {
234 flush_discriminants.push(k.clone());
235 }
236 }
237 for k in &flush_discriminants {
238 if let Some(t) = self.reduce_merge_states.remove(k) {
239 emit!(ReduceStaleEventFlushed);
240 emitter.emit(Event::from(t.flush()));
241 }
242 }
243 }
244
245 fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
246 self.reduce_merge_states
247 .drain()
248 .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
249 }
250
251 fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
252 match self.reduce_merge_states.entry(discriminant) {
253 Entry::Vacant(entry) => {
254 let mut state = ReduceState::new();
255 state.add_event(event, &self.merge_strategies);
256 entry.insert(state);
257 }
258 Entry::Occupied(mut entry) => {
259 entry.get_mut().add_event(event, &self.merge_strategies);
260 }
261 };
262 }
263
264 pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
265 let (starts_here, event) = match &self.starts_when {
266 Some(condition) => condition.check(event),
267 None => (false, event),
268 };
269
270 let (mut ends_here, event) = match &self.ends_when {
271 Some(condition) => condition.check(event),
272 None => (false, event),
273 };
274
275 let event = event.into_log();
276 let discriminant = Discriminant::from_log_event(&event, &self.group_by);
277
278 if let Some(max_events) = self.max_events {
279 if max_events == 1 {
280 ends_here = true;
281 } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
282 if entry.events + 1 == max_events {
284 ends_here = true;
285 }
286 }
287 }
288
289 if starts_here {
290 if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
291 emitter.emit(state.flush().into());
292 }
293
294 self.push_or_new_reduce_state(event, discriminant)
295 } else if ends_here {
296 emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
297 Some(mut state) => {
298 state.add_event(event, &self.merge_strategies);
299 state.flush().into()
300 }
301 None => {
302 let mut state = ReduceState::new();
303 state.add_event(event, &self.merge_strategies);
304 state.flush().into()
305 }
306 });
307 } else {
308 self.push_or_new_reduce_state(event, discriminant)
309 }
310 }
311}
312
313impl TaskTransform<Event> for Reduce {
314 fn transform(
315 self: Box<Self>,
316 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
317 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
318 where
319 Self: 'static,
320 {
321 let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
322 me.transform_one(emitter, event);
323 };
324
325 construct_output_stream(self, input_rx, transform_fn)
326 }
327}
328
329pub fn construct_output_stream(
330 reduce: Box<Reduce>,
331 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
332 mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
333) -> Pin<Box<dyn Stream<Item = Event> + Send>>
334where
335 Reduce: 'static,
336{
337 let flush_period = reduce.flush_period;
338 Box::pin(map_with_expiration(
339 reduce,
340 input_rx,
341 flush_period,
342 move |me, event, emitter| {
343 transform_fn(me, event, emitter);
344 },
345 |me, emitter| {
346 me.flush_into(emitter);
347 },
348 |me, emitter| {
349 me.flush_all_into(emitter);
350 },
351 ))
352}
353
354#[cfg(test)]
355mod test {
356 use std::sync::Arc;
357
358 use indoc::indoc;
359 use serde_json::json;
360 use tokio::sync::mpsc;
361 use tokio_stream::wrappers::ReceiverStream;
362 use vector_lib::{enrichment::TableRegistry, lookup::owned_value_path};
363 use vrl::value::Kind;
364
365 use super::*;
366 use crate::{
367 config::{LogNamespace, OutputId, TransformConfig, schema, schema::Definition},
368 event::{LogEvent, Value},
369 test_util::components::assert_transform_compliance,
370 transforms::test::create_topology,
371 };
372
373 #[tokio::test]
374 async fn reduce_from_condition() {
375 let reduce_config = toml::from_str::<ReduceConfig>(
376 r#"
377group_by = [ "request_id" ]
378
379[ends_when]
380 type = "vrl"
381 source = "exists(.test_end)"
382"#,
383 )
384 .unwrap();
385
386 assert_transform_compliance(async move {
387 let input_definition = schema::Definition::default_legacy_namespace()
388 .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
389 .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
390 .with_event_field(
391 &owned_value_path!("test_end"),
392 Kind::bytes().or_undefined(),
393 None,
394 )
395 .with_event_field(
396 &owned_value_path!("extra_field"),
397 Kind::bytes().or_undefined(),
398 None,
399 );
400 let schema_definitions = reduce_config
401 .outputs(
402 vector_lib::enrichment::TableRegistry::default(),
403 &[("test".into(), input_definition)],
404 LogNamespace::Legacy,
405 )
406 .first()
407 .unwrap()
408 .schema_definitions(true)
409 .clone();
410
411 let new_schema_definition = reduce_config.outputs(
412 TableRegistry::default(),
413 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
414 LogNamespace::Legacy,
415 )[0]
416 .clone()
417 .log_schema_definitions
418 .get(&OutputId::from("in"))
419 .unwrap()
420 .clone();
421
422 let (tx, rx) = mpsc::channel(1);
423 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
424
425 let mut e_1 = LogEvent::from("test message 1");
426 e_1.insert("counter", 1);
427 e_1.insert("request_id", "1");
428 let mut metadata_1 = e_1.metadata().clone();
429 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
430 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
431
432 let mut e_2 = LogEvent::from("test message 2");
433 e_2.insert("counter", 2);
434 e_2.insert("request_id", "2");
435 let mut metadata_2 = e_2.metadata().clone();
436 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
437 metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
438
439 let mut e_3 = LogEvent::from("test message 3");
440 e_3.insert("counter", 3);
441 e_3.insert("request_id", "1");
442
443 let mut e_4 = LogEvent::from("test message 4");
444 e_4.insert("counter", 4);
445 e_4.insert("request_id", "1");
446 e_4.insert("test_end", "yep");
447
448 let mut e_5 = LogEvent::from("test message 5");
449 e_5.insert("counter", 5);
450 e_5.insert("request_id", "2");
451 e_5.insert("extra_field", "value1");
452 e_5.insert("test_end", "yep");
453
454 for event in vec![e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
455 tx.send(event).await.unwrap();
456 }
457
458 let output_1 = out.recv().await.unwrap().into_log();
459 assert_eq!(output_1["message"], "test message 1".into());
460 assert_eq!(output_1["counter"], Value::from(8));
461 assert_eq!(output_1.metadata(), &metadata_1);
462 schema_definitions
463 .values()
464 .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
465
466 let output_2 = out.recv().await.unwrap().into_log();
467 assert_eq!(output_2["message"], "test message 2".into());
468 assert_eq!(output_2["extra_field"], "value1".into());
469 assert_eq!(output_2["counter"], Value::from(7));
470 assert_eq!(output_2.metadata(), &metadata_2);
471 schema_definitions
472 .values()
473 .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
474
475 drop(tx);
476 topology.stop().await;
477 assert_eq!(out.recv().await, None);
478 })
479 .await;
480 }
481
482 #[tokio::test]
483 async fn reduce_merge_strategies() {
484 let reduce_config = toml::from_str::<ReduceConfig>(
485 r#"
486group_by = [ "request_id" ]
487
488merge_strategies.foo = "concat"
489merge_strategies.bar = "array"
490merge_strategies.baz = "max"
491
492[ends_when]
493 type = "vrl"
494 source = "exists(.test_end)"
495"#,
496 )
497 .unwrap();
498
499 assert_transform_compliance(async move {
500 let (tx, rx) = mpsc::channel(1);
501
502 let new_schema_definition = reduce_config.outputs(
503 TableRegistry::default(),
504 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
505 LogNamespace::Legacy,
506 )[0]
507 .clone()
508 .log_schema_definitions
509 .get(&OutputId::from("in"))
510 .unwrap()
511 .clone();
512
513 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
514
515 let mut e_1 = LogEvent::from("test message 1");
516 e_1.insert("foo", "first foo");
517 e_1.insert("bar", "first bar");
518 e_1.insert("baz", 2);
519 e_1.insert("request_id", "1");
520 let mut metadata = e_1.metadata().clone();
521 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
522 metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
523 tx.send(e_1.into()).await.unwrap();
524
525 let mut e_2 = LogEvent::from("test message 2");
526 e_2.insert("foo", "second foo");
527 e_2.insert("bar", 2);
528 e_2.insert("baz", "not number");
529 e_2.insert("request_id", "1");
530 tx.send(e_2.into()).await.unwrap();
531
532 let mut e_3 = LogEvent::from("test message 3");
533 e_3.insert("foo", 10);
534 e_3.insert("bar", "third bar");
535 e_3.insert("baz", 3);
536 e_3.insert("request_id", "1");
537 e_3.insert("test_end", "yep");
538 tx.send(e_3.into()).await.unwrap();
539
540 let output_1 = out.recv().await.unwrap().into_log();
541 assert_eq!(output_1["message"], "test message 1".into());
542 assert_eq!(output_1["foo"], "first foo second foo".into());
543 assert_eq!(
544 output_1["bar"],
545 Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
546 );
547 assert_eq!(output_1["baz"], 3.into());
548 assert_eq!(output_1.metadata(), &metadata);
549
550 drop(tx);
551 topology.stop().await;
552 assert_eq!(out.recv().await, None);
553 })
554 .await;
555 }
556
557 #[tokio::test]
558 async fn missing_group_by() {
559 let reduce_config = toml::from_str::<ReduceConfig>(
560 r#"
561group_by = [ "request_id" ]
562
563[ends_when]
564 type = "vrl"
565 source = "exists(.test_end)"
566"#,
567 )
568 .unwrap();
569
570 assert_transform_compliance(async move {
571 let (tx, rx) = mpsc::channel(1);
572 let new_schema_definition = reduce_config.outputs(
573 TableRegistry::default(),
574 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
575 LogNamespace::Legacy,
576 )[0]
577 .clone()
578 .log_schema_definitions
579 .get(&OutputId::from("in"))
580 .unwrap()
581 .clone();
582
583 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
584
585 let mut e_1 = LogEvent::from("test message 1");
586 e_1.insert("counter", 1);
587 e_1.insert("request_id", "1");
588 let mut metadata_1 = e_1.metadata().clone();
589 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
590 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
591 tx.send(e_1.into()).await.unwrap();
592
593 let mut e_2 = LogEvent::from("test message 2");
594 e_2.insert("counter", 2);
595 let mut metadata_2 = e_2.metadata().clone();
596 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
597 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
598 tx.send(e_2.into()).await.unwrap();
599
600 let mut e_3 = LogEvent::from("test message 3");
601 e_3.insert("counter", 3);
602 e_3.insert("request_id", "1");
603 tx.send(e_3.into()).await.unwrap();
604
605 let mut e_4 = LogEvent::from("test message 4");
606 e_4.insert("counter", 4);
607 e_4.insert("request_id", "1");
608 e_4.insert("test_end", "yep");
609 tx.send(e_4.into()).await.unwrap();
610
611 let mut e_5 = LogEvent::from("test message 5");
612 e_5.insert("counter", 5);
613 e_5.insert("extra_field", "value1");
614 e_5.insert("test_end", "yep");
615 tx.send(e_5.into()).await.unwrap();
616
617 let output_1 = out.recv().await.unwrap().into_log();
618 assert_eq!(output_1["message"], "test message 1".into());
619 assert_eq!(output_1["counter"], Value::from(8));
620 assert_eq!(output_1.metadata(), &metadata_1);
621
622 let output_2 = out.recv().await.unwrap().into_log();
623 assert_eq!(output_2["message"], "test message 2".into());
624 assert_eq!(output_2["extra_field"], "value1".into());
625 assert_eq!(output_2["counter"], Value::from(7));
626 assert_eq!(output_2.metadata(), &metadata_2);
627
628 drop(tx);
629 topology.stop().await;
630 assert_eq!(out.recv().await, None);
631 })
632 .await;
633 }
634
635 #[tokio::test]
636 async fn max_events_0() {
637 let reduce_config = toml::from_str::<ReduceConfig>(
638 r#"
639group_by = [ "id" ]
640merge_strategies.id = "retain"
641merge_strategies.message = "array"
642max_events = 0
643 "#,
644 );
645
646 match reduce_config {
647 Ok(_conf) => unreachable!("max_events=0 should be rejected."),
648 Err(err) => assert!(
649 err.to_string()
650 .contains("invalid value: integer `0`, expected a nonzero usize")
651 ),
652 }
653 }
654
655 #[tokio::test]
656 async fn max_events_1() {
657 let reduce_config = toml::from_str::<ReduceConfig>(
658 r#"
659group_by = [ "id" ]
660merge_strategies.id = "retain"
661merge_strategies.message = "array"
662max_events = 1
663 "#,
664 )
665 .unwrap();
666 assert_transform_compliance(async move {
667 let (tx, rx) = mpsc::channel(1);
668 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
669
670 let mut e_1 = LogEvent::from("test 1");
671 e_1.insert("id", "1");
672
673 let mut e_2 = LogEvent::from("test 2");
674 e_2.insert("id", "1");
675
676 let mut e_3 = LogEvent::from("test 3");
677 e_3.insert("id", "1");
678
679 for event in vec![e_1.into(), e_2.into(), e_3.into()] {
680 tx.send(event).await.unwrap();
681 }
682
683 let output_1 = out.recv().await.unwrap().into_log();
684 assert_eq!(output_1["message"], vec!["test 1"].into());
685 let output_2 = out.recv().await.unwrap().into_log();
686 assert_eq!(output_2["message"], vec!["test 2"].into());
687
688 let output_3 = out.recv().await.unwrap().into_log();
689 assert_eq!(output_3["message"], vec!["test 3"].into());
690
691 drop(tx);
692 topology.stop().await;
693 assert_eq!(out.recv().await, None);
694 })
695 .await;
696 }
697
698 #[tokio::test]
699 async fn max_events() {
700 let reduce_config = toml::from_str::<ReduceConfig>(
701 r#"
702group_by = [ "id" ]
703merge_strategies.id = "retain"
704merge_strategies.message = "array"
705max_events = 3
706 "#,
707 )
708 .unwrap();
709
710 assert_transform_compliance(async move {
711 let (tx, rx) = mpsc::channel(1);
712 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
713
714 let mut e_1 = LogEvent::from("test 1");
715 e_1.insert("id", "1");
716
717 let mut e_2 = LogEvent::from("test 2");
718 e_2.insert("id", "1");
719
720 let mut e_3 = LogEvent::from("test 3");
721 e_3.insert("id", "1");
722
723 let mut e_4 = LogEvent::from("test 4");
724 e_4.insert("id", "1");
725
726 let mut e_5 = LogEvent::from("test 5");
727 e_5.insert("id", "1");
728
729 let mut e_6 = LogEvent::from("test 6");
730 e_6.insert("id", "1");
731
732 for event in vec![
733 e_1.into(),
734 e_2.into(),
735 e_3.into(),
736 e_4.into(),
737 e_5.into(),
738 e_6.into(),
739 ] {
740 tx.send(event).await.unwrap();
741 }
742
743 let output_1 = out.recv().await.unwrap().into_log();
744 assert_eq!(
745 output_1["message"],
746 vec!["test 1", "test 2", "test 3"].into()
747 );
748
749 let output_2 = out.recv().await.unwrap().into_log();
750 assert_eq!(
751 output_2["message"],
752 vec!["test 4", "test 5", "test 6"].into()
753 );
754
755 drop(tx);
756 topology.stop().await;
757 assert_eq!(out.recv().await, None);
758 })
759 .await
760 }
761
762 #[tokio::test]
763 async fn arrays() {
764 let reduce_config = toml::from_str::<ReduceConfig>(
765 r#"
766group_by = [ "request_id" ]
767
768merge_strategies.foo = "array"
769merge_strategies.bar = "concat"
770
771[ends_when]
772 type = "vrl"
773 source = "exists(.test_end)"
774"#,
775 )
776 .unwrap();
777
778 assert_transform_compliance(async move {
779 let (tx, rx) = mpsc::channel(1);
780
781 let new_schema_definition = reduce_config.outputs(
782 TableRegistry::default(),
783 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
784 LogNamespace::Legacy,
785 )[0]
786 .clone()
787 .log_schema_definitions
788 .get(&OutputId::from("in"))
789 .unwrap()
790 .clone();
791
792 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
793
794 let mut e_1 = LogEvent::from("test message 1");
795 e_1.insert("foo", json!([1, 3]));
796 e_1.insert("bar", json!([1, 3]));
797 e_1.insert("request_id", "1");
798 let mut metadata_1 = e_1.metadata().clone();
799 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
800 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
801
802 tx.send(e_1.into()).await.unwrap();
803
804 let mut e_2 = LogEvent::from("test message 2");
805 e_2.insert("foo", json!([2, 4]));
806 e_2.insert("bar", json!([2, 4]));
807 e_2.insert("request_id", "2");
808 let mut metadata_2 = e_2.metadata().clone();
809 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
810 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
811 tx.send(e_2.into()).await.unwrap();
812
813 let mut e_3 = LogEvent::from("test message 3");
814 e_3.insert("foo", json!([5, 7]));
815 e_3.insert("bar", json!([5, 7]));
816 e_3.insert("request_id", "1");
817 tx.send(e_3.into()).await.unwrap();
818
819 let mut e_4 = LogEvent::from("test message 4");
820 e_4.insert("foo", json!("done"));
821 e_4.insert("bar", json!("done"));
822 e_4.insert("request_id", "1");
823 e_4.insert("test_end", "yep");
824 tx.send(e_4.into()).await.unwrap();
825
826 let mut e_5 = LogEvent::from("test message 5");
827 e_5.insert("foo", json!([6, 8]));
828 e_5.insert("bar", json!([6, 8]));
829 e_5.insert("request_id", "2");
830 tx.send(e_5.into()).await.unwrap();
831
832 let mut e_6 = LogEvent::from("test message 6");
833 e_6.insert("foo", json!("done"));
834 e_6.insert("bar", json!("done"));
835 e_6.insert("request_id", "2");
836 e_6.insert("test_end", "yep");
837 tx.send(e_6.into()).await.unwrap();
838
839 let output_1 = out.recv().await.unwrap().into_log();
840 assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
841 assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
842 assert_eq!(output_1.metadata(), &metadata_1);
843
844 let output_2 = out.recv().await.unwrap().into_log();
845 assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
846 assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
847 assert_eq!(output_2.metadata(), &metadata_2);
848
849 drop(tx);
850 topology.stop().await;
851 assert_eq!(out.recv().await, None);
852 })
853 .await;
854 }
855
856 #[tokio::test]
857 async fn strategy_path_with_nested_fields() {
858 let reduce_config = toml::from_str::<ReduceConfig>(indoc!(
859 r#"
860 group_by = [ "id" ]
861
862 merge_strategies.id = "discard"
863 merge_strategies."message.a.b" = "array"
864
865 [ends_when]
866 type = "vrl"
867 source = "exists(.test_end)"
868 "#,
869 ))
870 .unwrap();
871
872 assert_transform_compliance(async move {
873 let (tx, rx) = mpsc::channel(1);
874
875 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
876
877 let e_1 = LogEvent::from(Value::from(btreemap! {
878 "id" => 777,
879 "message" => btreemap! {
880 "a" => btreemap! {
881 "b" => vec![1,2],
882 "num" => 1,
883 },
884 },
885 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
886 }));
887 let mut metadata_1 = e_1.metadata().clone();
888 metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
889
890 tx.send(e_1.into()).await.unwrap();
891
892 let e_2 = LogEvent::from(Value::from(btreemap! {
893 "id" => 777,
894 "message" => btreemap! {
895 "a" => btreemap! {
896 "b" => vec![3,4],
897 "num" => 2,
898 },
899 },
900 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
901 "test_end" => "done",
902 }));
903 tx.send(e_2.into()).await.unwrap();
904
905 let mut output = out.recv().await.unwrap().into_log();
906
907 output.remove_timestamp();
909 output.remove("timestamp_end");
910
911 assert_eq!(
912 *output.value(),
913 btreemap! {
914 "id" => 777,
915 "message" => btreemap! {
916 "a" => btreemap! {
917 "b" => vec![vec![1, 2], vec![3,4]],
918 "num" => 3,
919 },
920 },
921 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
922 "test_end" => "done",
923 }
924 .into()
925 );
926
927 drop(tx);
928 topology.stop().await;
929 assert_eq!(out.recv().await, None);
930 })
931 .await;
932 }
933
934 #[test]
935 fn invalid_merge_strategies_containing_indexes() {
936 let config = toml::from_str::<ReduceConfig>(indoc!(
937 r#"
938 group_by = [ "id" ]
939
940 merge_strategies.id = "discard"
941 merge_strategies."nested.msg[0]" = "array"
942 "#,
943 ))
944 .unwrap();
945 let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err();
946 assert_eq!(
947 error.to_string(),
948 "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
949 );
950 }
951
952 #[tokio::test]
953 async fn merge_objects_in_array() {
954 let config = toml::from_str::<ReduceConfig>(indoc!(
955 r#"
956 group_by = [ "id" ]
957 merge_strategies.events = "array"
958 merge_strategies."\"a-b\"" = "retain"
959 merge_strategies.another = "discard"
960
961 [ends_when]
962 type = "vrl"
963 source = "exists(.test_end)"
964 "#,
965 ))
966 .unwrap();
967
968 assert_transform_compliance(async move {
969 let (tx, rx) = mpsc::channel(1);
970
971 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
972
973 let v_1 = Value::from(btreemap! {
974 "attrs" => btreemap! {
975 "nested.msg" => "foo",
976 },
977 "sev" => 2,
978 });
979 let mut e_1 = LogEvent::from(Value::from(
980 btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
981 ));
982 e_1.insert("events", v_1.clone());
983 e_1.insert("\"a-b\"", 2);
984 tx.send(e_1.into()).await.unwrap();
985
986 let v_2 = Value::from(btreemap! {
987 "attrs" => btreemap! {
988 "nested.msg" => "bar",
989 },
990 "sev" => 3,
991 });
992 let mut e_2 = LogEvent::from(Value::from(
993 btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
994 ));
995 e_2.insert("events", v_2.clone());
996 e_2.insert("\"a-b\"", 2);
997 tx.send(e_2.into()).await.unwrap();
998
999 let output = out.recv().await.unwrap().into_log();
1000 let expected_value = Value::from(btreemap! {
1001 "id" => 1554,
1002 "events" => vec![v_1, v_2],
1003 "another" => btreemap!{ "a" => 1},
1004 "a-b" => 2,
1005 "test_end" => "done"
1006 });
1007 assert_eq!(*output.value(), expected_value);
1008
1009 drop(tx);
1010 topology.stop().await;
1011 assert_eq!(out.recv().await, None);
1012 })
1013 .await
1014 }
1015
1016 #[tokio::test]
1017 async fn merged_quoted_path() {
1018 let config = toml::from_str::<ReduceConfig>(indoc!(
1019 r#"
1020 [ends_when]
1021 type = "vrl"
1022 source = "exists(.test_end)"
1023 "#,
1024 ))
1025 .unwrap();
1026
1027 assert_transform_compliance(async move {
1028 let (tx, rx) = mpsc::channel(1);
1029
1030 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1031
1032 let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1033 tx.send(e_1.into()).await.unwrap();
1034
1035 let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1036 tx.send(e_2.into()).await.unwrap();
1037
1038 let output = out.recv().await.unwrap().into_log();
1039 let expected_value = Value::from(btreemap! {
1040 "a b" => 3,
1041 "test_end" => "done"
1042 });
1043 assert_eq!(*output.value(), expected_value);
1044
1045 drop(tx);
1046 topology.stop().await;
1047 assert_eq!(out.recv().await, None);
1048 })
1049 .await
1050 }
1051}