1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 time::{Duration, Instant},
5};
6
7use futures::Stream;
8use indexmap::IndexMap;
9use vector_lib::stream::expiration_map::{Emitter, map_with_expiration};
10use vector_vrl_metrics::MetricsStorage;
11use vrl::{
12 path::{OwnedTargetPath, parse_target_path},
13 prelude::KeyString,
14};
15
16use crate::{
17 conditions::Condition,
18 event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
19 internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
20 transforms::{
21 TaskTransform,
22 reduce::{
23 config::ReduceConfig,
24 merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
25 },
26 },
27};
28
29#[derive(Clone, Debug)]
30struct ReduceState {
31 events: usize,
32 fields: HashMap<OwnedTargetPath, Box<dyn ReduceValueMerger>>,
33 stale_since: Instant,
34 creation: Instant,
35 metadata: EventMetadata,
36}
37
38fn is_covered_by_strategy(
39 path: &OwnedTargetPath,
40 strategies: &IndexMap<OwnedTargetPath, MergeStrategy>,
41) -> bool {
42 let mut current = OwnedTargetPath::event_root();
43 for component in &path.path.segments {
44 current = current.with_field_appended(&component.to_string());
45 if strategies.contains_key(¤t) {
46 return true;
47 }
48 }
49 false
50}
51
52impl ReduceState {
53 fn new() -> Self {
54 Self {
55 events: 0,
56 stale_since: Instant::now(),
57 creation: Instant::now(),
58 fields: HashMap::new(),
59 metadata: EventMetadata::default(),
60 }
61 }
62
63 fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<OwnedTargetPath, MergeStrategy>) {
64 self.metadata.merge(e.metadata().clone());
65
66 for (path, strategy) in strategies {
67 if let Some(value) = e.get(path) {
68 match self.fields.entry(path.clone()) {
69 Entry::Vacant(entry) => match get_value_merger(value.clone(), strategy) {
70 Ok(m) => {
71 entry.insert(m);
72 }
73 Err(error) => {
74 warn!(message = "Failed to create value merger.", %error, %path);
75 }
76 },
77 Entry::Occupied(mut entry) => {
78 if let Err(error) = entry.get_mut().add(value.clone()) {
79 warn!(message = "Failed to merge value.", %error);
80 }
81 }
82 }
83 }
84 }
85
86 if let Some(fields_iter) = e.all_event_fields_skip_array_elements() {
87 for (path, value) in fields_iter {
88 let parsed_path = match parse_target_path(&path) {
90 Ok(path) => path,
91 Err(error) => {
92 emit!(ReduceAddEventError { error, path });
93 continue;
94 }
95 };
96 if is_covered_by_strategy(&parsed_path, strategies) {
97 continue;
98 }
99
100 let maybe_strategy = strategies.get(&parsed_path);
101 match self.fields.entry(parsed_path) {
102 Entry::Vacant(entry) => {
103 if let Some(strategy) = maybe_strategy {
104 match get_value_merger(value.clone(), strategy) {
105 Ok(m) => {
106 entry.insert(m);
107 }
108 Err(error) => {
109 warn!(message = "Failed to merge value.", %error);
110 }
111 }
112 } else {
113 entry.insert(value.clone().into());
114 }
115 }
116 Entry::Occupied(mut entry) => {
117 if let Err(error) = entry.get_mut().add(value.clone()) {
118 warn!(message = "Failed to merge value.", %error);
119 }
120 }
121 }
122 }
123 }
124 self.events += 1;
127 self.stale_since = Instant::now();
128 }
129
130 fn flush(mut self) -> LogEvent {
131 let mut event = LogEvent::new_with_metadata(self.metadata);
132 for (path, v) in self.fields.drain() {
133 if let Err(error) = v.insert_into(&path, &mut event) {
134 warn!(message = "Failed to merge values for field.", %error);
135 }
136 }
137 self.events = 0;
138 event
139 }
140}
141
142#[derive(Clone, Debug)]
143pub struct Reduce {
144 expire_after: Duration,
145 flush_period: Duration,
146 end_every_period: Option<Duration>,
147 group_by: Vec<String>,
148 merge_strategies: IndexMap<OwnedTargetPath, MergeStrategy>,
149 reduce_merge_states: HashMap<Discriminant, ReduceState>,
150 ends_when: Option<Condition>,
151 starts_when: Option<Condition>,
152 max_events: Option<usize>,
153}
154
155fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
156 for (path, _) in &strategies {
157 let contains_index = parse_target_path(path)
158 .map_err(|_| format!("Could not parse path: `{path}`"))?
159 .path
160 .segments
161 .iter()
162 .any(|segment| segment.is_index());
163 if contains_index {
164 return Err(format!(
165 "Merge strategies with indexes are currently not supported. Path: `{path}`"
166 )
167 .into());
168 }
169 }
170
171 Ok(())
172}
173
174impl Reduce {
175 pub fn new(
176 config: &ReduceConfig,
177 enrichment_tables: &vector_lib::enrichment::TableRegistry,
178 metrics_storage: &MetricsStorage,
179 ) -> crate::Result<Self> {
180 if config.ends_when.is_some() && config.starts_when.is_some() {
181 return Err("only one of `ends_when` and `starts_when` can be provided".into());
182 }
183
184 let ends_when = config
185 .ends_when
186 .as_ref()
187 .map(|c| c.build(enrichment_tables, metrics_storage))
188 .transpose()?;
189 let starts_when = config
190 .starts_when
191 .as_ref()
192 .map(|c| c.build(enrichment_tables, metrics_storage))
193 .transpose()?;
194 let group_by = config.group_by.clone().into_iter().collect();
195 let max_events = config.max_events.map(|max| max.into());
196
197 validate_merge_strategies(config.merge_strategies.clone())?;
198
199 Ok(Reduce {
200 expire_after: config.expire_after_ms,
201 flush_period: config.flush_period_ms,
202 end_every_period: config.end_every_period_ms,
203 group_by,
204 merge_strategies: config
205 .merge_strategies
206 .iter()
207 .filter_map(|(path, strategy)| {
208 let parsed_path = parse_target_path(path).ok();
212 if parsed_path.is_none() {
213 warn!(message = "Ignoring strategy with invalid path.", %path);
214 }
215 parsed_path.map(|path| (path, strategy.clone()))
216 })
217 .collect(),
218 reduce_merge_states: HashMap::new(),
219 ends_when,
220 starts_when,
221 max_events,
222 })
223 }
224
225 fn flush_into(&mut self, emitter: &mut Emitter<Event>) {
226 let mut flush_discriminants = Vec::new();
227 let now = Instant::now();
228 for (k, t) in &self.reduce_merge_states {
229 if let Some(period) = self.end_every_period
230 && (now - t.creation) >= period
231 {
232 flush_discriminants.push(k.clone());
233 }
234
235 if (now - t.stale_since) >= self.expire_after {
236 flush_discriminants.push(k.clone());
237 }
238 }
239 for k in &flush_discriminants {
240 if let Some(t) = self.reduce_merge_states.remove(k) {
241 emit!(ReduceStaleEventFlushed);
242 emitter.emit(Event::from(t.flush()));
243 }
244 }
245 }
246
247 fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
248 self.reduce_merge_states
249 .drain()
250 .for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
251 }
252
253 fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
254 match self.reduce_merge_states.entry(discriminant) {
255 Entry::Vacant(entry) => {
256 let mut state = ReduceState::new();
257 state.add_event(event, &self.merge_strategies);
258 entry.insert(state);
259 }
260 Entry::Occupied(mut entry) => {
261 entry.get_mut().add_event(event, &self.merge_strategies);
262 }
263 };
264 }
265
266 pub fn transform_one(&mut self, emitter: &mut Emitter<Event>, event: Event) {
267 let (starts_here, event) = match &self.starts_when {
268 Some(condition) => condition.check(event),
269 None => (false, event),
270 };
271
272 let (mut ends_here, event) = match &self.ends_when {
273 Some(condition) => condition.check(event),
274 None => (false, event),
275 };
276
277 let event = event.into_log();
278 let discriminant = Discriminant::from_log_event(&event, &self.group_by);
279
280 if let Some(max_events) = self.max_events {
281 if max_events == 1 {
282 ends_here = true;
283 } else if let Some(entry) = self.reduce_merge_states.get(&discriminant) {
284 if entry.events + 1 == max_events {
286 ends_here = true;
287 }
288 }
289 }
290
291 if starts_here {
292 if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
293 emitter.emit(state.flush().into());
294 }
295
296 self.push_or_new_reduce_state(event, discriminant)
297 } else if ends_here {
298 emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
299 Some(mut state) => {
300 state.add_event(event, &self.merge_strategies);
301 state.flush().into()
302 }
303 None => {
304 let mut state = ReduceState::new();
305 state.add_event(event, &self.merge_strategies);
306 state.flush().into()
307 }
308 });
309 } else {
310 self.push_or_new_reduce_state(event, discriminant)
311 }
312 }
313}
314
315impl TaskTransform<Event> for Reduce {
316 fn transform(
317 self: Box<Self>,
318 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
319 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
320 where
321 Self: 'static,
322 {
323 let transform_fn = move |me: &mut Box<Reduce>, event, emitter: &mut Emitter<Event>| {
324 me.transform_one(emitter, event);
325 };
326
327 construct_output_stream(self, input_rx, transform_fn)
328 }
329}
330
331pub fn construct_output_stream(
332 reduce: Box<Reduce>,
333 input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
334 mut transform_fn: impl FnMut(&mut Box<Reduce>, Event, &mut Emitter<Event>) + Send + Sync + 'static,
335) -> Pin<Box<dyn Stream<Item = Event> + Send>>
336where
337 Reduce: 'static,
338{
339 let flush_period = reduce.flush_period;
340 Box::pin(map_with_expiration(
341 reduce,
342 input_rx,
343 flush_period,
344 move |me, event, emitter| {
345 transform_fn(me, event, emitter);
346 },
347 |me, emitter| {
348 me.flush_into(emitter);
349 },
350 |me, emitter| {
351 me.flush_all_into(emitter);
352 },
353 ))
354}
355
356#[cfg(test)]
357mod test {
358 use std::sync::Arc;
359
360 use indoc::indoc;
361 use serde_json::json;
362 use tokio::sync::mpsc;
363 use tokio_stream::wrappers::ReceiverStream;
364 use vector_lib::{enrichment::TableRegistry, lookup::owned_value_path};
365 use vrl::value::Kind;
366
367 use super::*;
368 use crate::{
369 config::{OutputId, TransformConfig, schema, schema::Definition},
370 event::{LogEvent, Value},
371 test_util::components::assert_transform_compliance,
372 transforms::test::create_topology,
373 };
374
375 #[tokio::test]
376 async fn reduce_from_condition() {
377 let reduce_config = toml::from_str::<ReduceConfig>(
378 r#"
379group_by = [ "request_id" ]
380
381[ends_when]
382 type = "vrl"
383 source = "exists(.test_end)"
384"#,
385 )
386 .unwrap();
387
388 assert_transform_compliance(async move {
389 let input_definition = schema::Definition::default_legacy_namespace()
390 .with_event_field(&owned_value_path!("counter"), Kind::integer(), None)
391 .with_event_field(&owned_value_path!("request_id"), Kind::bytes(), None)
392 .with_event_field(
393 &owned_value_path!("test_end"),
394 Kind::bytes().or_undefined(),
395 None,
396 )
397 .with_event_field(
398 &owned_value_path!("extra_field"),
399 Kind::bytes().or_undefined(),
400 None,
401 );
402 let schema_definitions = reduce_config
403 .outputs(&Default::default(), &[("test".into(), input_definition)])
404 .first()
405 .unwrap()
406 .schema_definitions(true)
407 .clone();
408
409 let new_schema_definition = reduce_config.outputs(
410 &Default::default(),
411 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
412 )[0]
413 .clone()
414 .log_schema_definitions
415 .get(&OutputId::from("in"))
416 .unwrap()
417 .clone();
418
419 let (tx, rx) = mpsc::channel(1);
420 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
421
422 let mut e_1 = LogEvent::from("test message 1");
423 e_1.insert("counter", 1);
424 e_1.insert("request_id", "1");
425 let mut metadata_1 = e_1.metadata().clone();
426 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
427 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
428
429 let mut e_2 = LogEvent::from("test message 2");
430 e_2.insert("counter", 2);
431 e_2.insert("request_id", "2");
432 let mut metadata_2 = e_2.metadata().clone();
433 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
434 metadata_2.set_schema_definition(&Arc::new(new_schema_definition.clone()));
435
436 let mut e_3 = LogEvent::from("test message 3");
437 e_3.insert("counter", 3);
438 e_3.insert("request_id", "1");
439
440 let mut e_4 = LogEvent::from("test message 4");
441 e_4.insert("counter", 4);
442 e_4.insert("request_id", "1");
443 e_4.insert("test_end", "yep");
444
445 let mut e_5 = LogEvent::from("test message 5");
446 e_5.insert("counter", 5);
447 e_5.insert("request_id", "2");
448 e_5.insert("extra_field", "value1");
449 e_5.insert("test_end", "yep");
450
451 for event in [e_1.into(), e_2.into(), e_3.into(), e_4.into(), e_5.into()] {
452 tx.send(event).await.unwrap();
453 }
454
455 let output_1 = out.recv().await.unwrap().into_log();
456 assert_eq!(output_1["message"], "test message 1".into());
457 assert_eq!(output_1["counter"], Value::from(8));
458 assert_eq!(output_1.metadata(), &metadata_1);
459 schema_definitions
460 .values()
461 .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into()));
462
463 let output_2 = out.recv().await.unwrap().into_log();
464 assert_eq!(output_2["message"], "test message 2".into());
465 assert_eq!(output_2["extra_field"], "value1".into());
466 assert_eq!(output_2["counter"], Value::from(7));
467 assert_eq!(output_2.metadata(), &metadata_2);
468 schema_definitions
469 .values()
470 .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into()));
471
472 drop(tx);
473 topology.stop().await;
474 assert_eq!(out.recv().await, None);
475 })
476 .await;
477 }
478
479 #[tokio::test]
480 async fn reduce_merge_strategies() {
481 let reduce_config = toml::from_str::<ReduceConfig>(
482 r#"
483group_by = [ "request_id" ]
484
485merge_strategies.foo = "concat"
486merge_strategies.bar = "array"
487merge_strategies.baz = "max"
488
489[ends_when]
490 type = "vrl"
491 source = "exists(.test_end)"
492"#,
493 )
494 .unwrap();
495
496 assert_transform_compliance(async move {
497 let (tx, rx) = mpsc::channel(1);
498
499 let new_schema_definition = reduce_config.outputs(
500 &Default::default(),
501 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
502 )[0]
503 .clone()
504 .log_schema_definitions
505 .get(&OutputId::from("in"))
506 .unwrap()
507 .clone();
508
509 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
510
511 let mut e_1 = LogEvent::from("test message 1");
512 e_1.insert("foo", "first foo");
513 e_1.insert("bar", "first bar");
514 e_1.insert("baz", 2);
515 e_1.insert("request_id", "1");
516 let mut metadata = e_1.metadata().clone();
517 metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
518 metadata.set_schema_definition(&Arc::new(new_schema_definition.clone()));
519 tx.send(e_1.into()).await.unwrap();
520
521 let mut e_2 = LogEvent::from("test message 2");
522 e_2.insert("foo", "second foo");
523 e_2.insert("bar", 2);
524 e_2.insert("baz", "not number");
525 e_2.insert("request_id", "1");
526 tx.send(e_2.into()).await.unwrap();
527
528 let mut e_3 = LogEvent::from("test message 3");
529 e_3.insert("foo", 10);
530 e_3.insert("bar", "third bar");
531 e_3.insert("baz", 3);
532 e_3.insert("request_id", "1");
533 e_3.insert("test_end", "yep");
534 tx.send(e_3.into()).await.unwrap();
535
536 let output_1 = out.recv().await.unwrap().into_log();
537 assert_eq!(output_1["message"], "test message 1".into());
538 assert_eq!(output_1["foo"], "first foo second foo".into());
539 assert_eq!(
540 output_1["bar"],
541 Value::Array(vec!["first bar".into(), 2.into(), "third bar".into()]),
542 );
543 assert_eq!(output_1["baz"], 3.into());
544 assert_eq!(output_1.metadata(), &metadata);
545
546 drop(tx);
547 topology.stop().await;
548 assert_eq!(out.recv().await, None);
549 })
550 .await;
551 }
552
553 #[tokio::test]
554 async fn missing_group_by() {
555 let reduce_config = toml::from_str::<ReduceConfig>(
556 r#"
557group_by = [ "request_id" ]
558
559[ends_when]
560 type = "vrl"
561 source = "exists(.test_end)"
562"#,
563 )
564 .unwrap();
565
566 assert_transform_compliance(async move {
567 let (tx, rx) = mpsc::channel(1);
568 let new_schema_definition = reduce_config.outputs(
569 &Default::default(),
570 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
571 )[0]
572 .clone()
573 .log_schema_definitions
574 .get(&OutputId::from("in"))
575 .unwrap()
576 .clone();
577
578 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
579
580 let mut e_1 = LogEvent::from("test message 1");
581 e_1.insert("counter", 1);
582 e_1.insert("request_id", "1");
583 let mut metadata_1 = e_1.metadata().clone();
584 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
585 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
586 tx.send(e_1.into()).await.unwrap();
587
588 let mut e_2 = LogEvent::from("test message 2");
589 e_2.insert("counter", 2);
590 let mut metadata_2 = e_2.metadata().clone();
591 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
592 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
593 tx.send(e_2.into()).await.unwrap();
594
595 let mut e_3 = LogEvent::from("test message 3");
596 e_3.insert("counter", 3);
597 e_3.insert("request_id", "1");
598 tx.send(e_3.into()).await.unwrap();
599
600 let mut e_4 = LogEvent::from("test message 4");
601 e_4.insert("counter", 4);
602 e_4.insert("request_id", "1");
603 e_4.insert("test_end", "yep");
604 tx.send(e_4.into()).await.unwrap();
605
606 let mut e_5 = LogEvent::from("test message 5");
607 e_5.insert("counter", 5);
608 e_5.insert("extra_field", "value1");
609 e_5.insert("test_end", "yep");
610 tx.send(e_5.into()).await.unwrap();
611
612 let output_1 = out.recv().await.unwrap().into_log();
613 assert_eq!(output_1["message"], "test message 1".into());
614 assert_eq!(output_1["counter"], Value::from(8));
615 assert_eq!(output_1.metadata(), &metadata_1);
616
617 let output_2 = out.recv().await.unwrap().into_log();
618 assert_eq!(output_2["message"], "test message 2".into());
619 assert_eq!(output_2["extra_field"], "value1".into());
620 assert_eq!(output_2["counter"], Value::from(7));
621 assert_eq!(output_2.metadata(), &metadata_2);
622
623 drop(tx);
624 topology.stop().await;
625 assert_eq!(out.recv().await, None);
626 })
627 .await;
628 }
629
630 #[tokio::test]
631 async fn max_events_0() {
632 let reduce_config = toml::from_str::<ReduceConfig>(
633 r#"
634group_by = [ "id" ]
635merge_strategies.id = "retain"
636merge_strategies.message = "array"
637max_events = 0
638 "#,
639 );
640
641 match reduce_config {
642 Ok(_conf) => unreachable!("max_events=0 should be rejected."),
643 Err(err) => assert!(
644 err.to_string()
645 .contains("invalid value: integer `0`, expected a nonzero usize")
646 ),
647 }
648 }
649
650 #[tokio::test]
651 async fn max_events_1() {
652 let reduce_config = toml::from_str::<ReduceConfig>(
653 r#"
654group_by = [ "id" ]
655merge_strategies.id = "retain"
656merge_strategies.message = "array"
657max_events = 1
658 "#,
659 )
660 .unwrap();
661 assert_transform_compliance(async move {
662 let (tx, rx) = mpsc::channel(1);
663 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
664
665 let mut e_1 = LogEvent::from("test 1");
666 e_1.insert("id", "1");
667
668 let mut e_2 = LogEvent::from("test 2");
669 e_2.insert("id", "1");
670
671 let mut e_3 = LogEvent::from("test 3");
672 e_3.insert("id", "1");
673
674 for event in [e_1.into(), e_2.into(), e_3.into()] {
675 tx.send(event).await.unwrap();
676 }
677
678 let output_1 = out.recv().await.unwrap().into_log();
679 assert_eq!(output_1["message"], vec!["test 1"].into());
680 let output_2 = out.recv().await.unwrap().into_log();
681 assert_eq!(output_2["message"], vec!["test 2"].into());
682
683 let output_3 = out.recv().await.unwrap().into_log();
684 assert_eq!(output_3["message"], vec!["test 3"].into());
685
686 drop(tx);
687 topology.stop().await;
688 assert_eq!(out.recv().await, None);
689 })
690 .await;
691 }
692
693 #[tokio::test]
694 async fn max_events() {
695 let reduce_config = toml::from_str::<ReduceConfig>(
696 r#"
697group_by = [ "id" ]
698merge_strategies.id = "retain"
699merge_strategies.message = "array"
700max_events = 3
701 "#,
702 )
703 .unwrap();
704
705 assert_transform_compliance(async move {
706 let (tx, rx) = mpsc::channel(1);
707 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
708
709 let mut e_1 = LogEvent::from("test 1");
710 e_1.insert("id", "1");
711
712 let mut e_2 = LogEvent::from("test 2");
713 e_2.insert("id", "1");
714
715 let mut e_3 = LogEvent::from("test 3");
716 e_3.insert("id", "1");
717
718 let mut e_4 = LogEvent::from("test 4");
719 e_4.insert("id", "1");
720
721 let mut e_5 = LogEvent::from("test 5");
722 e_5.insert("id", "1");
723
724 let mut e_6 = LogEvent::from("test 6");
725 e_6.insert("id", "1");
726
727 for event in [
728 e_1.into(),
729 e_2.into(),
730 e_3.into(),
731 e_4.into(),
732 e_5.into(),
733 e_6.into(),
734 ] {
735 tx.send(event).await.unwrap();
736 }
737
738 let output_1 = out.recv().await.unwrap().into_log();
739 assert_eq!(
740 output_1["message"],
741 vec!["test 1", "test 2", "test 3"].into()
742 );
743
744 let output_2 = out.recv().await.unwrap().into_log();
745 assert_eq!(
746 output_2["message"],
747 vec!["test 4", "test 5", "test 6"].into()
748 );
749
750 drop(tx);
751 topology.stop().await;
752 assert_eq!(out.recv().await, None);
753 })
754 .await
755 }
756
757 #[tokio::test]
758 async fn arrays() {
759 let reduce_config = toml::from_str::<ReduceConfig>(
760 r#"
761group_by = [ "request_id" ]
762
763merge_strategies.foo = "array"
764merge_strategies.bar = "concat"
765
766[ends_when]
767 type = "vrl"
768 source = "exists(.test_end)"
769"#,
770 )
771 .unwrap();
772
773 assert_transform_compliance(async move {
774 let (tx, rx) = mpsc::channel(1);
775
776 let new_schema_definition = reduce_config.outputs(
777 &Default::default(),
778 &[(OutputId::from("in"), Definition::default_legacy_namespace())],
779 )[0]
780 .clone()
781 .log_schema_definitions
782 .get(&OutputId::from("in"))
783 .unwrap()
784 .clone();
785
786 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
787
788 let mut e_1 = LogEvent::from("test message 1");
789 e_1.insert("foo", json!([1, 3]));
790 e_1.insert("bar", json!([1, 3]));
791 e_1.insert("request_id", "1");
792 let mut metadata_1 = e_1.metadata().clone();
793 metadata_1.set_upstream_id(Arc::new(OutputId::from("transform")));
794 metadata_1.set_schema_definition(&Arc::new(new_schema_definition.clone()));
795
796 tx.send(e_1.into()).await.unwrap();
797
798 let mut e_2 = LogEvent::from("test message 2");
799 e_2.insert("foo", json!([2, 4]));
800 e_2.insert("bar", json!([2, 4]));
801 e_2.insert("request_id", "2");
802 let mut metadata_2 = e_2.metadata().clone();
803 metadata_2.set_upstream_id(Arc::new(OutputId::from("transform")));
804 metadata_2.set_schema_definition(&Arc::new(new_schema_definition));
805 tx.send(e_2.into()).await.unwrap();
806
807 let mut e_3 = LogEvent::from("test message 3");
808 e_3.insert("foo", json!([5, 7]));
809 e_3.insert("bar", json!([5, 7]));
810 e_3.insert("request_id", "1");
811 tx.send(e_3.into()).await.unwrap();
812
813 let mut e_4 = LogEvent::from("test message 4");
814 e_4.insert("foo", json!("done"));
815 e_4.insert("bar", json!("done"));
816 e_4.insert("request_id", "1");
817 e_4.insert("test_end", "yep");
818 tx.send(e_4.into()).await.unwrap();
819
820 let mut e_5 = LogEvent::from("test message 5");
821 e_5.insert("foo", json!([6, 8]));
822 e_5.insert("bar", json!([6, 8]));
823 e_5.insert("request_id", "2");
824 tx.send(e_5.into()).await.unwrap();
825
826 let mut e_6 = LogEvent::from("test message 6");
827 e_6.insert("foo", json!("done"));
828 e_6.insert("bar", json!("done"));
829 e_6.insert("request_id", "2");
830 e_6.insert("test_end", "yep");
831 tx.send(e_6.into()).await.unwrap();
832
833 let output_1 = out.recv().await.unwrap().into_log();
834 assert_eq!(output_1["foo"], json!([[1, 3], [5, 7], "done"]).into());
835 assert_eq!(output_1["bar"], json!([1, 3, 5, 7, "done"]).into());
836 assert_eq!(output_1.metadata(), &metadata_1);
837
838 let output_2 = out.recv().await.unwrap().into_log();
839 assert_eq!(output_2["foo"], json!([[2, 4], [6, 8], "done"]).into());
840 assert_eq!(output_2["bar"], json!([2, 4, 6, 8, "done"]).into());
841 assert_eq!(output_2.metadata(), &metadata_2);
842
843 drop(tx);
844 topology.stop().await;
845 assert_eq!(out.recv().await, None);
846 })
847 .await;
848 }
849
850 #[tokio::test]
851 async fn strategy_path_with_nested_fields() {
852 let reduce_config = toml::from_str::<ReduceConfig>(indoc!(
853 r#"
854 group_by = [ "id" ]
855
856 merge_strategies.id = "discard"
857 merge_strategies."message.a.b" = "array"
858
859 [ends_when]
860 type = "vrl"
861 source = "exists(.test_end)"
862 "#,
863 ))
864 .unwrap();
865
866 assert_transform_compliance(async move {
867 let (tx, rx) = mpsc::channel(1);
868
869 let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;
870
871 let e_1 = LogEvent::from(Value::from(btreemap! {
872 "id" => 777,
873 "message" => btreemap! {
874 "a" => btreemap! {
875 "b" => vec![1,2],
876 "num" => 1,
877 },
878 },
879 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }]
880 }));
881 let mut metadata_1 = e_1.metadata().clone();
882 metadata_1.set_upstream_id(Arc::new(OutputId::from("reduce")));
883
884 tx.send(e_1.into()).await.unwrap();
885
886 let e_2 = LogEvent::from(Value::from(btreemap! {
887 "id" => 777,
888 "message" => btreemap! {
889 "a" => btreemap! {
890 "b" => vec![3,4],
891 "num" => 2,
892 },
893 },
894 "arr" => vec![btreemap! { "a" => 2 }, btreemap! { "b" => 2 }],
895 "test_end" => "done",
896 }));
897 tx.send(e_2.into()).await.unwrap();
898
899 let mut output = out.recv().await.unwrap().into_log();
900
901 output.remove_timestamp();
903 output.remove("timestamp_end");
904
905 assert_eq!(
906 *output.value(),
907 btreemap! {
908 "id" => 777,
909 "message" => btreemap! {
910 "a" => btreemap! {
911 "b" => vec![vec![1, 2], vec![3,4]],
912 "num" => 3,
913 },
914 },
915 "arr" => vec![btreemap! { "a" => 1 }, btreemap! { "b" => 1 }],
916 "test_end" => "done",
917 }
918 .into()
919 );
920
921 drop(tx);
922 topology.stop().await;
923 assert_eq!(out.recv().await, None);
924 })
925 .await;
926 }
927
928 #[test]
929 fn invalid_merge_strategies_containing_indexes() {
930 let config = toml::from_str::<ReduceConfig>(indoc!(
931 r#"
932 group_by = [ "id" ]
933
934 merge_strategies.id = "discard"
935 merge_strategies."nested.msg[0]" = "array"
936 "#,
937 ))
938 .unwrap();
939 let error = Reduce::new(
940 &config,
941 &TableRegistry::default(),
942 &MetricsStorage::default(),
943 )
944 .unwrap_err();
945 assert_eq!(
946 error.to_string(),
947 "Merge strategies with indexes are currently not supported. Path: `nested.msg[0]`"
948 );
949 }
950
951 #[tokio::test]
952 async fn merge_objects_in_array() {
953 let config = toml::from_str::<ReduceConfig>(indoc!(
954 r#"
955 group_by = [ "id" ]
956 merge_strategies.events = "array"
957 merge_strategies."\"a-b\"" = "retain"
958 merge_strategies.another = "discard"
959
960 [ends_when]
961 type = "vrl"
962 source = "exists(.test_end)"
963 "#,
964 ))
965 .unwrap();
966
967 assert_transform_compliance(async move {
968 let (tx, rx) = mpsc::channel(1);
969
970 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
971
972 let v_1 = Value::from(btreemap! {
973 "attrs" => btreemap! {
974 "nested.msg" => "foo",
975 },
976 "sev" => 2,
977 });
978 let mut e_1 = LogEvent::from(Value::from(
979 btreemap! {"id" => 777, "another" => btreemap!{ "a" => 1}},
980 ));
981 e_1.insert("events", v_1.clone());
982 e_1.insert("\"a-b\"", 2);
983 tx.send(e_1.into()).await.unwrap();
984
985 let v_2 = Value::from(btreemap! {
986 "attrs" => btreemap! {
987 "nested.msg" => "bar",
988 },
989 "sev" => 3,
990 });
991 let mut e_2 = LogEvent::from(Value::from(
992 btreemap! {"id" => 777, "test_end" => "done", "another" => btreemap!{ "b" => 2}},
993 ));
994 e_2.insert("events", v_2.clone());
995 e_2.insert("\"a-b\"", 2);
996 tx.send(e_2.into()).await.unwrap();
997
998 let output = out.recv().await.unwrap().into_log();
999 let expected_value = Value::from(btreemap! {
1000 "id" => 1554,
1001 "events" => vec![v_1, v_2],
1002 "another" => btreemap!{ "a" => 1},
1003 "a-b" => 2,
1004 "test_end" => "done"
1005 });
1006 assert_eq!(*output.value(), expected_value);
1007
1008 drop(tx);
1009 topology.stop().await;
1010 assert_eq!(out.recv().await, None);
1011 })
1012 .await
1013 }
1014
1015 #[tokio::test]
1016 async fn merged_quoted_path() {
1017 let config = toml::from_str::<ReduceConfig>(indoc!(
1018 r#"
1019 [ends_when]
1020 type = "vrl"
1021 source = "exists(.test_end)"
1022 "#,
1023 ))
1024 .unwrap();
1025
1026 assert_transform_compliance(async move {
1027 let (tx, rx) = mpsc::channel(1);
1028
1029 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
1030
1031 let e_1 = LogEvent::from(Value::from(btreemap! {"a b" => 1}));
1032 tx.send(e_1.into()).await.unwrap();
1033
1034 let e_2 = LogEvent::from(Value::from(btreemap! {"a b" => 2, "test_end" => "done"}));
1035 tx.send(e_2.into()).await.unwrap();
1036
1037 let output = out.recv().await.unwrap().into_log();
1038 let expected_value = Value::from(btreemap! {
1039 "a b" => 3,
1040 "test_end" => "done"
1041 });
1042 assert_eq!(*output.value(), expected_value);
1043
1044 drop(tx);
1045 topology.stop().await;
1046 assert_eq!(out.recv().await, None);
1047 })
1048 .await
1049 }
1050}