1use std::collections::hash_map::Entry;
2use std::collections::HashMap;
3use std::pin::Pin;
4use std::time::{Duration, Instant};
5
6use crate::internal_events::ReduceAddEventError;
7use crate::transforms::reduce::merge_strategy::{
8 get_value_merger, MergeStrategy, ReduceValueMerger,
9};
10use crate::{
11 conditions::Condition,
12 event::{discriminant::Discriminant, Event, EventMetadata, LogEvent},
13 internal_events::ReduceStaleEventFlushed,
14 transforms::{reduce::config::ReduceConfig, TaskTransform},
15};
16use futures::Stream;
17use indexmap::IndexMap;
18use vector_lib::stream::expiration_map::{map_with_expiration, Emitter};
19use vrl::path::{parse_target_path, OwnedTargetPath};
20use vrl::prelude::KeyString;
21
22#[derive(Clone, Debug)]
23struct ReduceState {
24 events: usize,
25 fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
26 stale_since: Instant,
27 creation: Instant,
28 metadata: EventMetadata,
29}
30
31fn is_covered_by_strategy(
32 path: &OwnedTargetPath,
33 strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
34) -> bool {
35 let mut current = OwnedTargetPath::event_root();
36 for component in &path.path.segments {
37 current = current.with_field_appended(&component.to_string());
38 if strategies.contains_key(¤t) {
39 return true;
40 }
41 }
42 false
43}
44
45impl ReduceState {
46 fn new() -> Self {
47 Self {
48 events: 0,
49 stale_since: Instant::now(),
50 creation: Instant::now(),
51 fields: HashMap::new(),
52 metadata: EventMetadata::default(),
53 }
54 }
55
56 fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
57 self.metadata.merge(e.metadata().clone());
58
59 for (path, strategy) in strategies {
60 if let Some(value) = e.get(path) {
61 match self.fields.entry(path.clone()) {
62 Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
63 Ok(m) => {
64 entry.insert(m);
65 }
66 Err(error) => {
67 warn!(message = "Failed to create value merger.", %error, %path);
68 }
69 },
70 Entry::Occupied(mut entry) => {
71 if let Err(error) = entry.get_mut().add(value.clone()) {
72 warn!(message = "Failed to merge value.", %error);
73 }
74 }
75 }
76 }
77 }
78
79 if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
80 for (path, value) in fields_iter {
81 let parsed_path = match parse_target_path(&path) {
83 Ok(path) => path,
84 Err(error) => {
85 emit!(ReduceAddEventError { error, path });
86 continue;
87 }
88 };
89 if is_covered_by_strategy(&parsed_path, strategies) {
90 continue;
91 }
92
93 let maybe_strategy = strategies.get(&parsed_path);
94 match self.fields.entry(parsed_path) {
95 Entry::Vacant(entry) => {
96 if let Some(strategy) = maybe_strategy {
97 match get_value_merger(value.clone(), strategy) {
98 Ok(m) => {
99 entry.insert(m);
100 }
101 Err(error) => {
102 warn!(message = "Failed to merge value.", %error);
103 }
104 }
105 } else {
106 entry.insert(value.clone().into());
107 }
108 }
109 Entry::Occupied(mut entry) => {
110 if let Err(error) = entry.get_mut().add(value.clone()) {
111 warn!(message = "Failed to merge value.", %error);
112 }
113 }
114 }
115 }
116 }
117 self.events += 1;
120 self.stale_since = Instant::now();
121 }
122
123 fn flush(mut self) -> LogEvent {
124 let mut event = LogEvent::new_with_metadata(self.metadata);
125 for (path, v) in self.fields.drain() {
126 if let Err(error) = v.insert_into(&path, &mut event) {
127 warn!(message = "Failed to merge values for field.", %error);
128 }
129 }
130 self.events = 0;
131 event
132 }
133}
134
135#[derive(Clone, Debug)]
136pub struct Reduce {
137 expire_after: Duration,
138 flush_period: Duration,
139 end_every_period: Option<Duration>,
140 group_by: Vec<String>,
141 merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
142 reduce_merge_states: HashMap<Discriminant, ReduceState>,
143 ends_when: Option<Condition>,
144 starts_when: Option<Condition>,
145 max_events: Option<usize>,
146}
147
148fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
149 for (path, _) in &strategies {
150 let contains_index = parse_target_path(path)
151 .map_err(|_| format!("Could not parse path: `{path}`"))?
152 .path
153 .segments
154 .iter()
155 .any(|segment| segment.is_index());
156 if contains_index {
157 return Err(format!(
158 "Merge strategies with indexes are currently not supported. Path: `{path}`"
159 )
160 .into());
161 }
162 }
163
164 Ok(())
165}
166
167impl Reduce {
168 pub fn new(
169 config: &ReduceConfig,
170 enrichment_tables: &vector_lib::enrichment::TableRegistry,
171 ) -> crate::Result<Self> {
172 if config.ends_when.is_some() && config.starts_when.is_some() {
173 return Err("only one of `ends_when` and `starts_when` can be provided".into());
174 }
175
176 let ends_when = config
177 .ends_when
178 .as_ref()
179 .map(|c| c.build(enrichment_tables))
180 .transpose()?;
181 let starts_when = config
182 .starts_when
183 .as_ref()
184 .map(|c| c.build(enrichment_tables))
185 .transpose()?;
186 let group_by = config.group_by.clone().into_iter().collect();
187 let max_events = config.max_events.map(|max| max.into());
188
189 validate_merge_strategies(config.merge_strategies.clone())?;
190
191 Ok(Reduce {
192 expire_after: config.expire_after_ms,
193 flush_period: config.flush_period_ms,
194 end_every_period: config.end_every_period_ms,
195 group_by,
196 merge_strategies: config
197 .merge_strategies
198 .iter()
199 .filter_map(|(path, strategy)| {
200 let parsed_path = parse_target_path(path).ok();
204 if parsed_path.is_none() {
205 warn!(message = "Ignoring strategy with invalid path.", %path);
206 }
207 parsed_path.map(|path| (path, strategy.clone()))
208 })
209 .collect(),
210 reduce_merge_states: HashMap::new(),
211 ends_when,
212 starts_when,
213 max_events,
214 })
215 }
216
217 fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
218 let mut flush_discriminants = Vec::new();
219 let now = Instant::now();
220 for (k, t) in &self.reduce_merge_states {
221 if let Some(period) = self.end_every_period {
222 if (now - t.creation) >= period {
223 flush_discriminants.push(k.clone());
224 }
225 }
226
227 if (now - t.stale_since) >= self.expire_after {
228 flush_discriminants.push(k.clone());
229 }
230 }
231 for k in &flush_discriminants {
232 if let Some(t) = self.reduce_merge_states.remove(k) {
233 emit!(ReduceStaleEventFlushed);
234 emitter.emit(Event::from(t.flush()));
235 }
236 }
237 }
238
239 fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
240 self.reduce_merge_states
241 .drain()
242 .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
243 }
244
245 fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
246 match self.reduce_merge_states.entry(discriminant) {
247 Entry::Vacant(entry) => {
248 let mut state = ReduceState::new();
249 state.add_event(event, &self.merge_strategies);
250 entry.insert(state);
251 }
252 Entry::Occupied(mut entry) => {
253 entry.get_mut().add_event(event, &self.merge_strategies);
254 }
255 };
256 }
257
258 pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
259 let (starts_here, event) = match &self.starts_when {
260 Some(condition) => condition.check(event),
261 None => (false, event),
262 };
263
264 let (mut ends_here, event) = match &self.ends_when {
265 Some(condition) => condition.check(event),
266 None => (false, event),
267 };
268
269 let event = event.into_log();
270 let discriminant = Discriminant::from_log_event(&event, &self.group_by);
271
272 if let Some(max_events) = self.max_events {
273 if max_events == 1 {
274 ends_here = true;
275 } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
276 if entry.events + 1 == max_events {
278 ends_here = true;
279 }
280 }
281 }
282
283 if starts_here {
284 if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
285 emitter.emit(state.flush().into());
286 }
287
288 self.push_or_new_reduce_state(event, discriminant)
289 } else if ends_here {
290 emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
291 Some(mut state) => {
292 state.add_event(event, &self.merge_strategies);
293 state.flush().into()
294 }
295 None => {
296 let mut state = ReduceState::new();
297 state.add_event(event, &self.merge_strategies);
298 state.flush().into()
299 }
300 });
301 } else {
302 self.push_or_new_reduce_state(event, discriminant)
303 }
304 }
305}
306
307impl TaskTransform<Event> for Reduce {
308 fn transform(
309 self: Box<Self>,
310 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
311 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
312 where
313 Self: 'static,
314 {
315 let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
316 me.transform_one(emitter, event);
317 };
318
319 construct_output_stream(self, input_rx, transform_fn)
320 }
321}
322
323pub fn construct_output_stream(
324 reduce: Box<Reduce>,
325 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
326 mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
327) -> Pin<Box<dyn Stream<Item = Event> + Send>>
328where
329 Reduce: 'static,
330{
331 let flush_period = reduce.flush_period;
332 Box::pin(map_with_expiration(
333 reduce,
334 input_rx,
335 flush_period,
336 move |me, event, emitter| {
337 transform_fn(me, event, emitter);
338 },
339 |me, emitter| {
340 me.flush_into(emitter);
341 },
342 |me, emitter| {
343 me.flush_all_into(emitter);
344 },
345 ))
346}
347
348#[cfg(test)]
349mod test {
350 use indoc::indoc;
351 use serde_json::json;
352 use std::sync::Arc;
353 use tokio::sync::mpsc;
354 use tokio_stream::wrappers::ReceiverStream;
355 use vrl::value::Kind;
356
357 use vector_lib::enrichment::TableRegistry;
358 use vector_lib::lookup::owned_value_path;
359
360 use crate::config::schema::Definition;
361 use crate::config::{schema, LogNamespace, OutputId, TransformConfig};
362 use crate::event::{LogEvent, Value};
363 use crate::test_util::components::assert_transform_compliance;
364 use crate::transforms::test::create_topology;
365
366 use super::*;
367
368 #[tokio::test]
369 async fn reduce_from_condition() {
370 let reduce_config = toml::from_str::<ReduceConfig>(
371 r#"
372group_by = [ "request_id" ]
373
374[ends_when]
375 type = "vrl"
376 source = "exists(.test_end)"
377"#,
378 )
379 .unwrap();
380
381 assert_transform_compliance(async move {
382 let input_definition = schema::Definition::default_legacy_namespace()
383 .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
384 .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
385 .with_event_field(
386 &owned_value_path!("test_end"),
387 Kind::bytes().or_undefined(),
388 None,
389 )
390 .with_event_field(
391 &owned_value_path!("extra_field"),
392 Kind::bytes().or_undefined(),
393 None,
394 );
395 let schema_definitions = reduce_config
396 .outputs(
397 vector_lib::enrichment::TableRegistry::default(),
398 &[("test".into(), input_definition)],
399 LogNamespace::Legacy,
400 )
401 .first()
402 .unwrap()
403 .schema_definitions(true)
404 .clone();
405
406 let new_schema_definition = reduce_config.outputs(
407 TableRegistry::default(),
408 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
409 LogNamespace::Legacy,
410 )[0]
411 .clone()
412 .log_schema_definitions
413 .get(&OutputId::from("in"))
414 .unwrap()
415 .clone();
416
417 let (tx, rx) = mpsc::channel(1);
418 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
419
420 let mut e_1 = LogEvent::from("test message 1");
421 e_1.insert("counter", 1);
422 e_1.insert("request_id", "1");
423 let mut metadata_1 = e_1.metadata().clone();
424 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
425 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
426
427 let mut e_2 = LogEvent::from("test message 2");
428 e_2.insert("counter", 2);
429 e_2.insert("request_id", "2");
430 let mut metadata_2 = e_2.metadata().clone();
431 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
432 metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
433
434 let mut e_3 = LogEvent::from("test message 3");
435 e_3.insert("counter", 3);
436 e_3.insert("request_id", "1");
437
438 let mut e_4 = LogEvent::from("test message 4");
439 e_4.insert("counter", 4);
440 e_4.insert("request_id", "1");
441 e_4.insert("test_end", "yep");
442
443 let mut e_5 = LogEvent::from("test message 5");
444 e_5.insert("counter", 5);
445 e_5.insert("request_id", "2");
446 e_5.insert("extra_field", "value1");
447 e_5.insert("test_end", "yep");
448
449 for event in vec![e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
450 tx.send(event).await.unwrap();
451 }
452
453 let output_1 = out.recv().await.unwrap().into_log();
454 assert_eq!(output_1["message"], "test message 1".into());
455 assert_eq!(output_1["counter"], Value::from(8));
456 assert_eq!(output_1.metadata(), &metadata_1);
457 schema_definitions
458 .values()
459 .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
460
461 let output_2 = out.recv().await.unwrap().into_log();
462 assert_eq!(output_2["message"], "test message 2".into());
463 assert_eq!(output_2["extra_field"], "value1".into());
464 assert_eq!(output_2["counter"], Value::from(7));
465 assert_eq!(output_2.metadata(), &metadata_2);
466 schema_definitions
467 .values()
468 .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
469
470 drop(tx);
471 topology.stop().await;
472 assert_eq!(out.recv().await, None);
473 })
474 .await;
475 }
476
477 #[tokio::test]
478 async fn reduce_merge_strategies() {
479 let reduce_config = toml::from_str::<ReduceConfig>(
480 r#"
481group_by = [ "request_id" ]
482
483merge_strategies.foo = "concat"
484merge_strategies.bar = "array"
485merge_strategies.baz = "max"
486
487[ends_when]
488 type = "vrl"
489 source = "exists(.test_end)"
490"#,
491 )
492 .unwrap();
493
494 assert_transform_compliance(async move {
495 let (tx, rx) = mpsc::channel(1);
496
497 let new_schema_definition = reduce_config.outputs(
498 TableRegistry::default(),
499 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
500 LogNamespace::Legacy,
501 )[0]
502 .clone()
503 .log_schema_definitions
504 .get(&OutputId::from("in"))
505 .unwrap()
506 .clone();
507
508 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
509
510 let mut e_1 = LogEvent::from("test message 1");
511 e_1.insert("foo", "first foo");
512 e_1.insert("bar", "first bar");
513 e_1.insert("baz", 2);
514 e_1.insert("request_id", "1");
515 let mut metadata = e_1.metadata().clone();
516 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
517 metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
518 tx.send(e_1.into()).await.unwrap();
519
520 let mut e_2 = LogEvent::from("test message 2");
521 e_2.insert("foo", "second foo");
522 e_2.insert("bar", 2);
523 e_2.insert("baz", "not number");
524 e_2.insert("request_id", "1");
525 tx.send(e_2.into()).await.unwrap();
526
527 let mut e_3 = LogEvent::from("test message 3");
528 e_3.insert("foo", 10);
529 e_3.insert("bar", "third bar");
530 e_3.insert("baz", 3);
531 e_3.insert("request_id", "1");
532 e_3.insert("test_end", "yep");
533 tx.send(e_3.into()).await.unwrap();
534
535 let output_1 = out.recv().await.unwrap().into_log();
536 assert_eq!(output_1["message"], "test message 1".into());
537 assert_eq!(output_1["foo"], "first foo second foo".into());
538 assert_eq!(
539 output_1["bar"],
540 Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
541 );
542 assert_eq!(output_1["baz"], 3.into());
543 assert_eq!(output_1.metadata(), &metadata);
544
545 drop(tx);
546 topology.stop().await;
547 assert_eq!(out.recv().await, None);
548 })
549 .await;
550 }
551
552 #[tokio::test]
553 async fn missing_group_by() {
554 let reduce_config = toml::from_str::<ReduceConfig>(
555 r#"
556group_by = [ "request_id" ]
557
558[ends_when]
559 type = "vrl"
560 source = "exists(.test_end)"
561"#,
562 )
563 .unwrap();
564
565 assert_transform_compliance(async move {
566 let (tx, rx) = mpsc::channel(1);
567 let new_schema_definition = reduce_config.outputs(
568 TableRegistry::default(),
569 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
570 LogNamespace::Legacy,
571 )[0]
572 .clone()
573 .log_schema_definitions
574 .get(&OutputId::from("in"))
575 .unwrap()
576 .clone();
577
578 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
579
580 let mut e_1 = LogEvent::from("test message 1");
581 e_1.insert("counter", 1);
582 e_1.insert("request_id", "1");
583 let mut metadata_1 = e_1.metadata().clone();
584 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
585 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
586 tx.send(e_1.into()).await.unwrap();
587
588 let mut e_2 = LogEvent::from("test message 2");
589 e_2.insert("counter", 2);
590 let mut metadata_2 = e_2.metadata().clone();
591 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
592 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
593 tx.send(e_2.into()).await.unwrap();
594
595 let mut e_3 = LogEvent::from("test message 3");
596 e_3.insert("counter", 3);
597 e_3.insert("request_id", "1");
598 tx.send(e_3.into()).await.unwrap();
599
600 let mut e_4 = LogEvent::from("test message 4");
601 e_4.insert("counter", 4);
602 e_4.insert("request_id", "1");
603 e_4.insert("test_end", "yep");
604 tx.send(e_4.into()).await.unwrap();
605
606 let mut e_5 = LogEvent::from("test message 5");
607 e_5.insert("counter", 5);
608 e_5.insert("extra_field", "value1");
609 e_5.insert("test_end", "yep");
610 tx.send(e_5.into()).await.unwrap();
611
612 let output_1 = out.recv().await.unwrap().into_log();
613 assert_eq!(output_1["message"], "test message 1".into());
614 assert_eq!(output_1["counter"], Value::from(8));
615 assert_eq!(output_1.metadata(), &metadata_1);
616
617 let output_2 = out.recv().await.unwrap().into_log();
618 assert_eq!(output_2["message"], "test message 2".into());
619 assert_eq!(output_2["extra_field"], "value1".into());
620 assert_eq!(output_2["counter"], Value::from(7));
621 assert_eq!(output_2.metadata(), &metadata_2);
622
623 drop(tx);
624 topology.stop().await;
625 assert_eq!(out.recv().await, None);
626 })
627 .await;
628 }
629
630 #[tokio::test]
631 async fn max_events_0() {
632 let reduce_config = toml::from_str::<ReduceConfig>(
633 r#"
634group_by = [ "id" ]
635merge_strategies.id = "retain"
636merge_strategies.message = "array"
637max_events = 0
638 "#,
639 );
640
641 match reduce_config {
642 Ok(_conf) => unreachable!("max_events=0 should be rejected."),
643 Err(err) => assert!(err
644 .to_string()
645 .contains("invalid value: integer `0`, expected a nonzero usize")),
646 }
647 }
648
649 #[tokio::test]
650 async fn max_events_1() {
651 let reduce_config = toml::from_str::<ReduceConfig>(
652 r#"
653group_by = [ "id" ]
654merge_strategies.id = "retain"
655merge_strategies.message = "array"
656max_events = 1
657 "#,
658 )
659 .unwrap();
660 assert_transform_compliance(async move {
661 let (tx, rx) = mpsc::channel(1);
662 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
663
664 let mut e_1 = LogEvent::from("test 1");
665 e_1.insert("id", "1");
666
667 let mut e_2 = LogEvent::from("test 2");
668 e_2.insert("id", "1");
669
670 let mut e_3 = LogEvent::from("test 3");
671 e_3.insert("id", "1");
672
673 for event in vec![e_1.into(), e_2.into(), e_3.into()] {
674 tx.send(event).await.unwrap();
675 }
676
677 let output_1 = out.recv().await.unwrap().into_log();
678 assert_eq!(output_1["message"], vec!["test 1"].into());
679 let output_2 = out.recv().await.unwrap().into_log();
680 assert_eq!(output_2["message"], vec!["test 2"].into());
681
682 let output_3 = out.recv().await.unwrap().into_log();
683 assert_eq!(output_3["message"], vec!["test 3"].into());
684
685 drop(tx);
686 topology.stop().await;
687 assert_eq!(out.recv().await, None);
688 })
689 .await;
690 }
691
692 #[tokio::test]
693 async fn max_events() {
694 let reduce_config = toml::from_str::<ReduceConfig>(
695 r#"
696group_by = [ "id" ]
697merge_strategies.id = "retain"
698merge_strategies.message = "array"
699max_events = 3
700 "#,
701 )
702 .unwrap();
703
704 assert_transform_compliance(async move {
705 let (tx, rx) = mpsc::channel(1);
706 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
707
708 let mut e_1 = LogEvent::from("test 1");
709 e_1.insert("id", "1");
710
711 let mut e_2 = LogEvent::from("test 2");
712 e_2.insert("id", "1");
713
714 let mut e_3 = LogEvent::from("test 3");
715 e_3.insert("id", "1");
716
717 let mut e_4 = LogEvent::from("test 4");
718 e_4.insert("id", "1");
719
720 let mut e_5 = LogEvent::from("test 5");
721 e_5.insert("id", "1");
722
723 let mut e_6 = LogEvent::from("test 6");
724 e_6.insert("id", "1");
725
726 for event in vec![
727 e_1.into(),
728 e_2.into(),
729 e_3.into(),
730 e_4.into(),
731 e_5.into(),
732 e_6.into(),
733 ] {
734 tx.send(event).await.unwrap();
735 }
736
737 let output_1 = out.recv().await.unwrap().into_log();
738 assert_eq!(
739 output_1["message"],
740 vec!["test 1", "test 2", "test 3"].into()
741 );
742
743 let output_2 = out.recv().await.unwrap().into_log();
744 assert_eq!(
745 output_2["message"],
746 vec!["test 4", "test 5", "test 6"].into()
747 );
748
749 drop(tx);
750 topology.stop().await;
751 assert_eq!(out.recv().await, None);
752 })
753 .await
754 }
755
756 #[tokio::test]
757 async fn arrays() {
758 let reduce_config = toml::from_str::<ReduceConfig>(
759 r#"
760group_by = [ "request_id" ]
761
762merge_strategies.foo = "array"
763merge_strategies.bar = "concat"
764
765[ends_when]
766 type = "vrl"
767 source = "exists(.test_end)"
768"#,
769 )
770 .unwrap();
771
772 assert_transform_compliance(async move {
773 let (tx, rx) = mpsc::channel(1);
774
775 let new_schema_definition = reduce_config.outputs(
776 TableRegistry::default(),
777 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
778 LogNamespace::Legacy,
779 )[0]
780 .clone()
781 .log_schema_definitions
782 .get(&OutputId::from("in"))
783 .unwrap()
784 .clone();
785
786 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
787
788 let mut e_1 = LogEvent::from("test message 1");
789 e_1.insert("foo", json!([1, 3]));
790 e_1.insert("bar", json!([1, 3]));
791 e_1.insert("request_id", "1");
792 let mut metadata_1 = e_1.metadata().clone();
793 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
794 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
795
796 tx.send(e_1.into()).await.unwrap();
797
798 let mut e_2 = LogEvent::from("test message 2");
799 e_2.insert("foo", json!([2, 4]));
800 e_2.insert("bar", json!([2, 4]));
801 e_2.insert("request_id", "2");
802 let mut metadata_2 = e_2.metadata().clone();
803 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
804 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
805 tx.send(e_2.into()).await.unwrap();
806
807 let mut e_3 = LogEvent::from("test message 3");
808 e_3.insert("foo", json!([5, 7]));
809 e_3.insert("bar", json!([5, 7]));
810 e_3.insert("request_id", "1");
811 tx.send(e_3.into()).await.unwrap();
812
813 let mut e_4 = LogEvent::from("test message 4");
814 e_4.insert("foo", json!("done"));
815 e_4.insert("bar", json!("done"));
816 e_4.insert("request_id", "1");
817 e_4.insert("test_end", "yep");
818 tx.send(e_4.into()).await.unwrap();
819
820 let mut e_5 = LogEvent::from("test message 5");
821 e_5.insert("foo", json!([6, 8]));
822 e_5.insert("bar", json!([6, 8]));
823 e_5.insert("request_id", "2");
824 tx.send(e_5.into()).await.unwrap();
825
826 let mut e_6 = LogEvent::from("test message 6");
827 e_6.insert("foo", json!("done"));
828 e_6.insert("bar", json!("done"));
829 e_6.insert("request_id", "2");
830 e_6.insert("test_end", "yep");
831 tx.send(e_6.into()).await.unwrap();
832
833 let output_1 = out.recv().await.unwrap().into_log();
834 assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
835 assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
836 assert_eq!(output_1.metadata(), &metadata_1);
837
838 let output_2 = out.recv().await.unwrap().into_log();
839 assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
840 assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
841 assert_eq!(output_2.metadata(), &metadata_2);
842
843 drop(tx);
844 topology.stop().await;
845 assert_eq!(out.recv().await, None);
846 })
847 .await;
848 }
849
850 #[tokio::test]
851 async fn strategy_path_with_nested_fields() {
852 let reduce_config = toml::from_str::<ReduceConfig>(indoc!(
853 r#"
854 group_by = [ "id" ]
855
856 merge_strategies.id = "discard"
857 merge_strategies."message.a.b" = "array"
858
859 [ends_when]
860 type = "vrl"
861 source = "exists(.test_end)"
862 "#,
863 ))
864 .unwrap();
865
866 assert_transform_compliance(async move {
867 let (tx, rx) = mpsc::channel(1);
868
869 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
870
871 let e_1 = LogEvent::from(Value::from(btreemap! {
872 "id" => 777,
873 "message" => btreemap! {
874 "a" => btreemap! {
875 "b" => vec![1,2],
876 "num" => 1,
877 },
878 },
879 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
880 }));
881 let mut metadata_1 = e_1.metadata().clone();
882 metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
883
884 tx.send(e_1.into()).await.unwrap();
885
886 let e_2 = LogEvent::from(Value::from(btreemap! {
887 "id" => 777,
888 "message" => btreemap! {
889 "a" => btreemap! {
890 "b" => vec![3,4],
891 "num" => 2,
892 },
893 },
894 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
895 "test_end" => "done",
896 }));
897 tx.send(e_2.into()).await.unwrap();
898
899 let mut output = out.recv().await.unwrap().into_log();
900
901 output.remove_timestamp();
903 output.remove("timestamp_end");
904
905 assert_eq!(
906 *output.value(),
907 btreemap! {
908 "id" => 777,
909 "message" => btreemap! {
910 "a" => btreemap! {
911 "b" => vec![vec![1, 2], vec![3,4]],
912 "num" => 3,
913 },
914 },
915 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
916 "test_end" => "done",
917 }
918 .into()
919 );
920
921 drop(tx);
922 topology.stop().await;
923 assert_eq!(out.recv().await, None);
924 })
925 .await;
926 }
927
928 #[test]
929 fn invalid_merge_strategies_containing_indexes() {
930 let config = toml::from_str::<ReduceConfig>(indoc!(
931 r#"
932 group_by = [ "id" ]
933
934 merge_strategies.id = "discard"
935 merge_strategies."nested.msg[0]" = "array"
936 "#,
937 ))
938 .unwrap();
939 let error = Reduce::new(&config, &TableRegistry::default()).unwrap_err();
940 assert_eq!(
941 error.to_string(),
942 "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
943 );
944 }
945
946 #[tokio::test]
947 async fn merge_objects_in_array() {
948 let config = toml::from_str::<ReduceConfig>(indoc!(
949 r#"
950 group_by = [ "id" ]
951 merge_strategies.events = "array"
952 merge_strategies."\"a-b\"" = "retain"
953 merge_strategies.another = "discard"
954
955 [ends_when]
956 type = "vrl"
957 source = "exists(.test_end)"
958 "#,
959 ))
960 .unwrap();
961
962 assert_transform_compliance(async move {
963 let (tx, rx) = mpsc::channel(1);
964
965 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
966
967 let v_1 = Value::from(btreemap! {
968 "attrs" => btreemap! {
969 "nested.msg" => "foo",
970 },
971 "sev" => 2,
972 });
973 let mut e_1 = LogEvent::from(Value::from(
974 btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
975 ));
976 e_1.insert("events", v_1.clone());
977 e_1.insert("\"a-b\"", 2);
978 tx.send(e_1.into()).await.unwrap();
979
980 let v_2 = Value::from(btreemap! {
981 "attrs" => btreemap! {
982 "nested.msg" => "bar",
983 },
984 "sev" => 3,
985 });
986 let mut e_2 = LogEvent::from(Value::from(
987 btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
988 ));
989 e_2.insert("events", v_2.clone());
990 e_2.insert("\"a-b\"", 2);
991 tx.send(e_2.into()).await.unwrap();
992
993 let output = out.recv().await.unwrap().into_log();
994 let expected_value = Value::from(btreemap! {
995 "id" => 1554,
996 "events" => vec![v_1, v_2],
997 "another" => btreemap!{ "a" => 1},
998 "a-b" => 2,
999 "test_end" => "done"
1000 });
1001 assert_eq!(*output.value(), expected_value);
1002
1003 drop(tx);
1004 topology.stop().await;
1005 assert_eq!(out.recv().await, None);
1006 })
1007 .await
1008 }
1009
1010 #[tokio::test]
1011 async fn merged_quoted_path() {
1012 let config = toml::from_str::<ReduceConfig>(indoc!(
1013 r#"
1014 [ends_when]
1015 type = "vrl"
1016 source = "exists(.test_end)"
1017 "#,
1018 ))
1019 .unwrap();
1020
1021 assert_transform_compliance(async move {
1022 let (tx, rx) = mpsc::channel(1);
1023
1024 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1025
1026 let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1027 tx.send(e_1.into()).await.unwrap();
1028
1029 let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1030 tx.send(e_2.into()).await.unwrap();
1031
1032 let output = out.recv().await.unwrap().into_log();
1033 let expected_value = Value::from(btreemap! {
1034 "a b" => 3,
1035 "test_end" => "done"
1036 });
1037 assert_eq!(*output.value(), expected_value);
1038
1039 drop(tx);
1040 topology.stop().await;
1041 assert_eq!(out.recv().await, None);
1042 })
1043 .await
1044 }
1045}