1use std::collections::HashSet;
2
3use crate::event::{LogEvent, Value};
4use bytes::{Bytes, BytesMut};
5use chrono::{DateTime, Utc};
6use dyn_clone::DynClone;
7use ordered_float::NotNan;
8use vector_lib::configurable::configurable_component;
9use vrl::path::OwnedTargetPath;
10
11#[configurable_component]
13#[derive(Clone, Debug, PartialEq)]
14#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
15#[serde(rename_all = "snake_case")]
16pub enum MergeStrategy {
17 Discard,
19
20 Retain,
24
25 Sum,
27
28 Max,
30
31 Min,
33
34 Array,
36
37 Concat,
39
40 ConcatNewline,
42
43 ConcatRaw,
45
46 ShortestArray,
48
49 LongestArray,
51
52 FlatUnique,
54}
55
56#[derive(Debug, Clone)]
57struct DiscardMerger {
58 v: Value,
59}
60
61impl DiscardMerger {
62 const fn new(v: Value) -> Self {
63 Self { v }
64 }
65}
66
67impl ReduceValueMerger for DiscardMerger {
68 fn add(&mut self, _v: Value) -> Result<(), String> {
69 Ok(())
70 }
71
72 fn insert_into(
73 self: Box<Self>,
74 path: &OwnedTargetPath,
75 v: &mut LogEvent,
76 ) -> Result<(), String> {
77 v.insert(path, self.v);
78 Ok(())
79 }
80}
81
82#[derive(Debug, Clone)]
83struct RetainMerger {
84 v: Value,
85}
86
87impl RetainMerger {
88 #[allow(clippy::missing_const_for_fn)] fn new(v: Value) -> Self {
90 Self { v }
91 }
92}
93
94impl ReduceValueMerger for RetainMerger {
95 fn add(&mut self, v: Value) -> Result<(), String> {
96 if Value::Null != v {
97 self.v = v;
98 }
99 Ok(())
100 }
101
102 fn insert_into(
103 self: Box<Self>,
104 path: &OwnedTargetPath,
105 v: &mut LogEvent,
106 ) -> Result<(), String> {
107 v.insert(path, self.v);
108 Ok(())
109 }
110}
111
112#[derive(Debug, Clone)]
113struct ConcatMerger {
114 v: BytesMut,
115 join_by: Option<Vec<u8>>,
116}
117
118impl ConcatMerger {
119 fn new(v: Bytes, join_by: Option<char>) -> Self {
120 let join_by = join_by.map(|c| c.to_string().into_bytes());
122
123 Self {
124 v: BytesMut::from(&v[..]),
125 join_by,
126 }
127 }
128}
129
130impl ReduceValueMerger for ConcatMerger {
131 fn add(&mut self, v: Value) -> Result<(), String> {
132 if let Value::Bytes(b) = v {
133 if let Some(buf) = self.join_by.as_ref() {
134 self.v.extend(&buf[..]);
135 }
136 self.v.extend_from_slice(&b);
137 Ok(())
138 } else {
139 Err(format!(
140 "expected string value, found: '{}'",
141 v.to_string_lossy()
142 ))
143 }
144 }
145
146 fn insert_into(
147 self: Box<Self>,
148 path: &OwnedTargetPath,
149 v: &mut LogEvent,
150 ) -> Result<(), String> {
151 v.insert(path, Value::Bytes(self.v.into()));
152 Ok(())
153 }
154}
155
156#[derive(Debug, Clone)]
157struct ConcatArrayMerger {
158 v: Vec<Value>,
159}
160
161impl ConcatArrayMerger {
162 const fn new(v: Vec<Value>) -> Self {
163 Self { v }
164 }
165}
166
167impl ReduceValueMerger for ConcatArrayMerger {
168 fn add(&mut self, v: Value) -> Result<(), String> {
169 if let Value::Array(a) = v {
170 self.v.extend_from_slice(&a);
171 } else {
172 self.v.push(v);
173 }
174 Ok(())
175 }
176
177 fn insert_into(
178 self: Box<Self>,
179 path: &OwnedTargetPath,
180 v: &mut LogEvent,
181 ) -> Result<(), String> {
182 v.insert(path, Value::Array(self.v));
183 Ok(())
184 }
185}
186
187#[derive(Debug, Clone)]
188struct ArrayMerger {
189 v: Vec<Value>,
190}
191
192impl ArrayMerger {
193 fn new(v: Value) -> Self {
194 Self { v: vec![v] }
195 }
196}
197
198impl ReduceValueMerger for ArrayMerger {
199 fn add(&mut self, v: Value) -> Result<(), String> {
200 self.v.push(v);
201 Ok(())
202 }
203
204 fn insert_into(
205 self: Box<Self>,
206 path: &OwnedTargetPath,
207 v: &mut LogEvent,
208 ) -> Result<(), String> {
209 v.insert(path, Value::Array(self.v));
210 Ok(())
211 }
212}
213
214#[derive(Debug, Clone)]
215struct LongestArrayMerger {
216 v: Vec<Value>,
217}
218
219impl LongestArrayMerger {
220 const fn new(v: Vec<Value>) -> Self {
221 Self { v }
222 }
223}
224
225impl ReduceValueMerger for LongestArrayMerger {
226 fn add(&mut self, v: Value) -> Result<(), String> {
227 if let Value::Array(a) = v {
228 if a.len() > self.v.len() {
229 self.v = a;
230 }
231 Ok(())
232 } else {
233 Err(format!(
234 "expected array value, found: '{}'",
235 v.to_string_lossy()
236 ))
237 }
238 }
239
240 fn insert_into(
241 self: Box<Self>,
242 path: &OwnedTargetPath,
243 v: &mut LogEvent,
244 ) -> Result<(), String> {
245 v.insert(path, Value::Array(self.v));
246 Ok(())
247 }
248}
249
250#[derive(Debug, Clone)]
251struct ShortestArrayMerger {
252 v: Vec<Value>,
253}
254
255impl ShortestArrayMerger {
256 const fn new(v: Vec<Value>) -> Self {
257 Self { v }
258 }
259}
260
261impl ReduceValueMerger for ShortestArrayMerger {
262 fn add(&mut self, v: Value) -> Result<(), String> {
263 if let Value::Array(a) = v {
264 if a.len() < self.v.len() {
265 self.v = a;
266 }
267 Ok(())
268 } else {
269 Err(format!(
270 "expected array value, found: '{}'",
271 v.to_string_lossy()
272 ))
273 }
274 }
275
276 fn insert_into(
277 self: Box<Self>,
278 path: &OwnedTargetPath,
279 v: &mut LogEvent,
280 ) -> Result<(), String> {
281 v.insert(path, Value::Array(self.v));
282 Ok(())
283 }
284}
285
286#[derive(Debug, Clone)]
287struct FlatUniqueMerger {
288 v: HashSet<Value>,
289}
290
291#[allow(clippy::mutable_key_type)] fn insert_value(h: &mut HashSet<Value>, v: Value) {
293 match v {
294 Value::Object(m) => {
295 for (_, v) in m {
296 h.insert(v);
297 }
298 }
299 Value::Array(vec) => {
300 for v in vec {
301 h.insert(v);
302 }
303 }
304 _ => {
305 h.insert(v);
306 }
307 }
308}
309
310impl FlatUniqueMerger {
311 #[allow(clippy::mutable_key_type)] fn new(v: Value) -> Self {
313 let mut h = HashSet::default();
314 insert_value(&mut h, v);
315 Self { v: h }
316 }
317}
318
319impl ReduceValueMerger for FlatUniqueMerger {
320 fn add(&mut self, v: Value) -> Result<(), String> {
321 insert_value(&mut self.v, v);
322 Ok(())
323 }
324
325 fn insert_into(
326 self: Box<Self>,
327 path: &OwnedTargetPath,
328 v: &mut LogEvent,
329 ) -> Result<(), String> {
330 v.insert(path, Value::Array(self.v.into_iter().collect()));
331 Ok(())
332 }
333}
334
335#[derive(Debug, Clone)]
336struct TimestampWindowMerger {
337 started: DateTime<Utc>,
338 latest: DateTime<Utc>,
339}
340
341impl TimestampWindowMerger {
342 const fn new(v: DateTime<Utc>) -> Self {
343 Self {
344 started: v,
345 latest: v,
346 }
347 }
348}
349
350impl ReduceValueMerger for TimestampWindowMerger {
351 fn add(&mut self, v: Value) -> Result<(), String> {
352 if let Value::Timestamp(ts) = v {
353 self.latest = ts
354 } else {
355 return Err(format!(
356 "expected timestamp value, found: {}",
357 v.to_string_lossy()
358 ));
359 }
360 Ok(())
361 }
362
363 fn insert_into(
364 self: Box<Self>,
365 path: &OwnedTargetPath,
366 v: &mut LogEvent,
367 ) -> Result<(), String> {
368 v.insert(
369 format!("{path}_end").as_str(),
370 Value::Timestamp(self.latest),
371 );
372 v.insert(path, Value::Timestamp(self.started));
373 Ok(())
374 }
375}
376
377#[derive(Debug, Clone)]
378enum NumberMergerValue {
379 Int(i64),
380 Float(NotNan<f64>),
381}
382
383impl From<i64> for NumberMergerValue {
384 fn from(v: i64) -> Self {
385 NumberMergerValue::Int(v)
386 }
387}
388
389impl From<NotNan<f64>> for NumberMergerValue {
390 fn from(v: NotNan<f64>) -> Self {
391 NumberMergerValue::Float(v)
392 }
393}
394
395#[derive(Debug, Clone)]
396struct AddNumbersMerger {
397 v: NumberMergerValue,
398}
399
400impl AddNumbersMerger {
401 const fn new(v: NumberMergerValue) -> Self {
402 Self { v }
403 }
404}
405
406impl ReduceValueMerger for AddNumbersMerger {
407 fn add(&mut self, v: Value) -> Result<(), String> {
408 match v {
411 Value::Integer(i) => match self.v {
412 NumberMergerValue::Int(j) => self.v = NumberMergerValue::Int(i + j),
413 NumberMergerValue::Float(j) => {
414 self.v = NumberMergerValue::Float(NotNan::new(i as f64).unwrap() + j)
415 }
416 },
417 Value::Float(f) => match self.v {
418 NumberMergerValue::Int(j) => self.v = NumberMergerValue::Float(f + j as f64),
419 NumberMergerValue::Float(j) => self.v = NumberMergerValue::Float(f + j),
420 },
421 _ => {
422 return Err(format!(
423 "expected numeric value, found: '{}'",
424 v.to_string_lossy()
425 ));
426 }
427 }
428 Ok(())
429 }
430
431 fn insert_into(
432 self: Box<Self>,
433 path: &OwnedTargetPath,
434 v: &mut LogEvent,
435 ) -> Result<(), String> {
436 match self.v {
437 NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
438 NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
439 };
440 Ok(())
441 }
442}
443
444#[derive(Debug, Clone)]
445struct MaxNumberMerger {
446 v: NumberMergerValue,
447}
448
449impl MaxNumberMerger {
450 const fn new(v: NumberMergerValue) -> Self {
451 Self { v }
452 }
453}
454
455impl ReduceValueMerger for MaxNumberMerger {
456 fn add(&mut self, v: Value) -> Result<(), String> {
457 match v {
460 Value::Integer(i) => {
461 match self.v {
462 NumberMergerValue::Int(i2) => {
463 if i > i2 {
464 self.v = NumberMergerValue::Int(i);
465 }
466 }
467 NumberMergerValue::Float(f2) => {
468 let f = NotNan::new(i as f64).unwrap();
469 if f > f2 {
470 self.v = NumberMergerValue::Float(f);
471 }
472 }
473 };
474 }
475 Value::Float(f) => {
476 let f2 = match self.v {
477 NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
478 NumberMergerValue::Float(f2) => f2,
479 };
480 if f > f2 {
481 self.v = NumberMergerValue::Float(f);
482 }
483 }
484 _ => {
485 return Err(format!(
486 "expected numeric value, found: '{}'",
487 v.to_string_lossy()
488 ));
489 }
490 }
491 Ok(())
492 }
493
494 fn insert_into(
495 self: Box<Self>,
496 path: &OwnedTargetPath,
497 v: &mut LogEvent,
498 ) -> Result<(), String> {
499 match self.v {
500 NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
501 NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
502 };
503 Ok(())
504 }
505}
506
507#[derive(Debug, Clone)]
508struct MinNumberMerger {
509 v: NumberMergerValue,
510}
511
512impl MinNumberMerger {
513 const fn new(v: NumberMergerValue) -> Self {
514 Self { v }
515 }
516}
517
518impl ReduceValueMerger for MinNumberMerger {
519 fn add(&mut self, v: Value) -> Result<(), String> {
520 match v {
523 Value::Integer(i) => {
524 match self.v {
525 NumberMergerValue::Int(i2) => {
526 if i < i2 {
527 self.v = NumberMergerValue::Int(i);
528 }
529 }
530 NumberMergerValue::Float(f2) => {
531 let f = NotNan::new(i as f64).unwrap();
532 if f < f2 {
533 self.v = NumberMergerValue::Float(f);
534 }
535 }
536 };
537 }
538 Value::Float(f) => {
539 let f2 = match self.v {
540 NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
541 NumberMergerValue::Float(f2) => f2,
542 };
543 if f < f2 {
544 self.v = NumberMergerValue::Float(f);
545 }
546 }
547 _ => {
548 return Err(format!(
549 "expected numeric value, found: '{}'",
550 v.to_string_lossy()
551 ));
552 }
553 }
554 Ok(())
555 }
556
557 fn insert_into(
558 self: Box<Self>,
559 path: &OwnedTargetPath,
560 v: &mut LogEvent,
561 ) -> Result<(), String> {
562 match self.v {
563 NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
564 NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
565 };
566 Ok(())
567 }
568}
569
570pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync + DynClone {
571 fn add(&mut self, v: Value) -> Result<(), String>;
572 fn insert_into(self: Box<Self>, path: &OwnedTargetPath, v: &mut LogEvent)
573 -> Result<(), String>;
574}
575
576dyn_clone::clone_trait_object!(ReduceValueMerger);
577
578impl From<Value> for Box<dyn ReduceValueMerger> {
579 fn from(v: Value) -> Self {
580 match v {
581 Value::Integer(i) => Box::new(AddNumbersMerger::new(i.into())),
582 Value::Float(f) => Box::new(AddNumbersMerger::new(f.into())),
583 Value::Timestamp(ts) => Box::new(TimestampWindowMerger::new(ts)),
584 Value::Object(_) => Box::new(DiscardMerger::new(v)),
585 Value::Null => Box::new(DiscardMerger::new(v)),
586 Value::Boolean(_) => Box::new(DiscardMerger::new(v)),
587 Value::Bytes(_) => Box::new(DiscardMerger::new(v)),
588 Value::Regex(_) => Box::new(DiscardMerger::new(v)),
589 Value::Array(_) => Box::new(DiscardMerger::new(v)),
590 }
591 }
592}
593
594pub(crate) fn get_value_merger(
595 v: Value,
596 m: &MergeStrategy,
597) -> Result<Box<dyn ReduceValueMerger>, String> {
598 match m {
599 MergeStrategy::Sum => match v {
600 Value::Integer(i) => Ok(Box::new(AddNumbersMerger::new(i.into()))),
601 Value::Float(f) => Ok(Box::new(AddNumbersMerger::new(f.into()))),
602 _ => Err(format!(
603 "expected number value, found: '{}'",
604 v.to_string_lossy()
605 )),
606 },
607 MergeStrategy::Max => match v {
608 Value::Integer(i) => Ok(Box::new(MaxNumberMerger::new(i.into()))),
609 Value::Float(f) => Ok(Box::new(MaxNumberMerger::new(f.into()))),
610 _ => Err(format!(
611 "expected number value, found: '{}'",
612 v.to_string_lossy()
613 )),
614 },
615 MergeStrategy::Min => match v {
616 Value::Integer(i) => Ok(Box::new(MinNumberMerger::new(i.into()))),
617 Value::Float(f) => Ok(Box::new(MinNumberMerger::new(f.into()))),
618 _ => Err(format!(
619 "expected number value, found: '{}'",
620 v.to_string_lossy()
621 )),
622 },
623 MergeStrategy::Concat => match v {
624 Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some(' ')))),
625 Value::Array(a) => Ok(Box::new(ConcatArrayMerger::new(a))),
626 _ => Err(format!(
627 "expected string or array value, found: '{}'",
628 v.to_string_lossy()
629 )),
630 },
631 MergeStrategy::ConcatNewline => match v {
632 Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some('\n')))),
633 _ => Err(format!(
634 "expected string value, found: '{}'",
635 v.to_string_lossy()
636 )),
637 },
638 MergeStrategy::ConcatRaw => match v {
639 Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, None))),
640 _ => Err(format!(
641 "expected string value, found: '{}'",
642 v.to_string_lossy()
643 )),
644 },
645 MergeStrategy::Array => Ok(Box::new(ArrayMerger::new(v))),
646 MergeStrategy::ShortestArray => match v {
647 Value::Array(a) => Ok(Box::new(ShortestArrayMerger::new(a))),
648 _ => Err(format!(
649 "expected array value, found: '{}'",
650 v.to_string_lossy()
651 )),
652 },
653 MergeStrategy::LongestArray => match v {
654 Value::Array(a) => Ok(Box::new(LongestArrayMerger::new(a))),
655 _ => Err(format!(
656 "expected array value, found: '{}'",
657 v.to_string_lossy()
658 )),
659 },
660 MergeStrategy::Discard => Ok(Box::new(DiscardMerger::new(v))),
661 MergeStrategy::Retain => Ok(Box::new(RetainMerger::new(v))),
662 MergeStrategy::FlatUnique => Ok(Box::new(FlatUniqueMerger::new(v))),
663 }
664}
665
666#[cfg(test)]
667mod test {
668 use super::*;
669 use crate::event::LogEvent;
670 use serde_json::json;
671 use vrl::owned_event_path;
672
673 #[test]
674 fn initial_values() {
675 assert!(get_value_merger("foo".into(), &MergeStrategy::Discard).is_ok());
676 assert!(get_value_merger("foo".into(), &MergeStrategy::Retain).is_ok());
677 assert!(get_value_merger("foo".into(), &MergeStrategy::Sum).is_err());
678 assert!(get_value_merger("foo".into(), &MergeStrategy::Max).is_err());
679 assert!(get_value_merger("foo".into(), &MergeStrategy::Min).is_err());
680 assert!(get_value_merger("foo".into(), &MergeStrategy::Array).is_ok());
681 assert!(get_value_merger("foo".into(), &MergeStrategy::LongestArray).is_err());
682 assert!(get_value_merger("foo".into(), &MergeStrategy::ShortestArray).is_err());
683 assert!(get_value_merger("foo".into(), &MergeStrategy::Concat).is_ok());
684 assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatNewline).is_ok());
685 assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatRaw).is_ok());
686 assert!(get_value_merger("foo".into(), &MergeStrategy::FlatUnique).is_ok());
687
688 assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
689 assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
690 assert!(get_value_merger(42.into(), &MergeStrategy::Sum).is_ok());
691 assert!(get_value_merger(42.into(), &MergeStrategy::Min).is_ok());
692 assert!(get_value_merger(42.into(), &MergeStrategy::Max).is_ok());
693 assert!(get_value_merger(42.into(), &MergeStrategy::Array).is_ok());
694 assert!(get_value_merger(42.into(), &MergeStrategy::LongestArray).is_err());
695 assert!(get_value_merger(42.into(), &MergeStrategy::ShortestArray).is_err());
696 assert!(get_value_merger(42.into(), &MergeStrategy::Concat).is_err());
697 assert!(get_value_merger(42.into(), &MergeStrategy::ConcatNewline).is_err());
698 assert!(get_value_merger(42.into(), &MergeStrategy::ConcatRaw).is_err());
699 assert!(get_value_merger(42.into(), &MergeStrategy::FlatUnique).is_ok());
700
701 assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
702 assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
703 assert!(get_value_merger(4.2.into(), &MergeStrategy::Sum).is_ok());
704 assert!(get_value_merger(4.2.into(), &MergeStrategy::Min).is_ok());
705 assert!(get_value_merger(4.2.into(), &MergeStrategy::Max).is_ok());
706 assert!(get_value_merger(4.2.into(), &MergeStrategy::Array).is_ok());
707 assert!(get_value_merger(4.2.into(), &MergeStrategy::LongestArray).is_err());
708 assert!(get_value_merger(4.2.into(), &MergeStrategy::ShortestArray).is_err());
709 assert!(get_value_merger(4.2.into(), &MergeStrategy::Concat).is_err());
710 assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatNewline).is_err());
711 assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatRaw).is_err());
712 assert!(get_value_merger(4.2.into(), &MergeStrategy::FlatUnique).is_ok());
713
714 assert!(get_value_merger(true.into(), &MergeStrategy::Discard).is_ok());
715 assert!(get_value_merger(true.into(), &MergeStrategy::Retain).is_ok());
716 assert!(get_value_merger(true.into(), &MergeStrategy::Sum).is_err());
717 assert!(get_value_merger(true.into(), &MergeStrategy::Max).is_err());
718 assert!(get_value_merger(true.into(), &MergeStrategy::Min).is_err());
719 assert!(get_value_merger(true.into(), &MergeStrategy::Array).is_ok());
720 assert!(get_value_merger(true.into(), &MergeStrategy::LongestArray).is_err());
721 assert!(get_value_merger(true.into(), &MergeStrategy::ShortestArray).is_err());
722 assert!(get_value_merger(true.into(), &MergeStrategy::Concat).is_err());
723 assert!(get_value_merger(true.into(), &MergeStrategy::ConcatNewline).is_err());
724 assert!(get_value_merger(true.into(), &MergeStrategy::ConcatRaw).is_err());
725 assert!(get_value_merger(true.into(), &MergeStrategy::FlatUnique).is_ok());
726
727 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
728 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Retain).is_ok());
729 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Sum).is_err());
730 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Max).is_err());
731 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Min).is_err());
732 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Array).is_ok());
733 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::LongestArray).is_err());
734 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ShortestArray).is_err());
735 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Concat).is_err());
736 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatNewline).is_err());
737 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatRaw).is_err());
738 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
739 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::FlatUnique).is_ok());
740
741 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Discard).is_ok());
742 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Retain).is_ok());
743 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Sum).is_err());
744 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Max).is_err());
745 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Min).is_err());
746 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Array).is_ok());
747 assert!(get_value_merger(json!([]).into(), &MergeStrategy::LongestArray).is_ok());
748 assert!(get_value_merger(json!([]).into(), &MergeStrategy::ShortestArray).is_ok());
749 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_ok());
750 assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatNewline).is_err());
751 assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatRaw).is_err());
752 assert!(get_value_merger(json!([]).into(), &MergeStrategy::FlatUnique).is_ok());
753
754 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Discard).is_ok());
755 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Retain).is_ok());
756 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Sum).is_err());
757 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Max).is_err());
758 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Min).is_err());
759 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Array).is_ok());
760 assert!(get_value_merger(json!({}).into(), &MergeStrategy::LongestArray).is_err());
761 assert!(get_value_merger(json!({}).into(), &MergeStrategy::ShortestArray).is_err());
762 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Concat).is_err());
763 assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatNewline).is_err());
764 assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatRaw).is_err());
765 assert!(get_value_merger(json!({}).into(), &MergeStrategy::FlatUnique).is_ok());
766
767 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Discard).is_ok());
768 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Retain).is_ok());
769 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Sum).is_err());
770 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Max).is_err());
771 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Min).is_err());
772 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Array).is_ok());
773 assert!(get_value_merger(json!(null).into(), &MergeStrategy::LongestArray).is_err());
774 assert!(get_value_merger(json!(null).into(), &MergeStrategy::ShortestArray).is_err());
775 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Concat).is_err());
776 assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatNewline).is_err());
777 assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatRaw).is_err());
778 assert!(get_value_merger(json!(null).into(), &MergeStrategy::FlatUnique).is_ok());
779 }
780
781 #[test]
782 fn merging_values() {
783 assert_eq!(
784 merge("foo".into(), "bar".into(), &MergeStrategy::Discard),
785 Ok("foo".into())
786 );
787 assert_eq!(
788 merge("foo".into(), "bar".into(), &MergeStrategy::Retain),
789 Ok("bar".into())
790 );
791 assert_eq!(
792 merge("foo".into(), "bar".into(), &MergeStrategy::Array),
793 Ok(json!(["foo", "bar"]).into())
794 );
795 assert_eq!(
796 merge("foo".into(), "bar".into(), &MergeStrategy::Concat),
797 Ok("foo bar".into())
798 );
799 assert_eq!(
800 merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
801 Ok("foo\nbar".into())
802 );
803 assert_eq!(
804 merge("foo".into(), "bar".into(), &MergeStrategy::ConcatRaw),
805 Ok("foobar".into())
806 );
807 assert!(merge("foo".into(), 42.into(), &MergeStrategy::Concat).is_err());
808 assert!(merge("foo".into(), 4.2.into(), &MergeStrategy::Concat).is_err());
809 assert!(merge("foo".into(), true.into(), &MergeStrategy::Concat).is_err());
810 assert!(merge("foo".into(), Utc::now().into(), &MergeStrategy::Concat).is_err());
811 assert!(merge("foo".into(), json!({}).into(), &MergeStrategy::Concat).is_err());
812 assert!(merge("foo".into(), json!([]).into(), &MergeStrategy::Concat).is_err());
813 assert!(merge("foo".into(), json!(null).into(), &MergeStrategy::Concat).is_err());
814
815 assert_eq!(
816 merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
817 Ok("foo\nbar".into())
818 );
819
820 assert_eq!(
821 merge(21.into(), 21.into(), &MergeStrategy::Sum),
822 Ok(42.into())
823 );
824 assert_eq!(
825 merge(41.into(), 42.into(), &MergeStrategy::Max),
826 Ok(42.into())
827 );
828 assert_eq!(
829 merge(42.into(), 41.into(), &MergeStrategy::Max),
830 Ok(42.into())
831 );
832 assert_eq!(
833 merge(42.into(), 43.into(), &MergeStrategy::Min),
834 Ok(42.into())
835 );
836 assert_eq!(
837 merge(43.into(), 42.into(), &MergeStrategy::Min),
838 Ok(42.into())
839 );
840
841 assert_eq!(
842 merge(2.1.into(), 2.1.into(), &MergeStrategy::Sum),
843 Ok(4.2.into())
844 );
845 assert_eq!(
846 merge(4.1.into(), 4.2.into(), &MergeStrategy::Max),
847 Ok(4.2.into())
848 );
849 assert_eq!(
850 merge(4.2.into(), 4.1.into(), &MergeStrategy::Max),
851 Ok(4.2.into())
852 );
853 assert_eq!(
854 merge(4.2.into(), 4.3.into(), &MergeStrategy::Min),
855 Ok(4.2.into())
856 );
857 assert_eq!(
858 merge(4.3.into(), 4.2.into(), &MergeStrategy::Min),
859 Ok(4.2.into())
860 );
861
862 assert_eq!(
863 merge(
864 json!([4_i64]).into(),
865 json!([2_i64]).into(),
866 &MergeStrategy::Concat
867 ),
868 Ok(json!([4_i64, 2_i64]).into())
869 );
870 assert_eq!(
871 merge(json!([]).into(), 42_i64.into(), &MergeStrategy::Concat),
872 Ok(json!([42_i64]).into())
873 );
874
875 assert_eq!(
876 merge(
877 json!([34_i64]).into(),
878 json!([42_i64, 43_i64]).into(),
879 &MergeStrategy::ShortestArray
880 ),
881 Ok(json!([34_i64]).into())
882 );
883 assert_eq!(
884 merge(
885 json!([34_i64]).into(),
886 json!([42_i64, 43_i64]).into(),
887 &MergeStrategy::LongestArray
888 ),
889 Ok(json!([42_i64, 43_i64]).into())
890 );
891
892 let v = merge(34_i64.into(), 43_i64.into(), &MergeStrategy::FlatUnique).unwrap();
893 match v.clone() {
894 Value::Array(v) => {
895 let v: Vec<_> = v
896 .into_iter()
897 .map(|i| {
898 if let Value::Integer(i) = i {
899 i
900 } else {
901 panic!("Bad value");
902 }
903 })
904 .collect();
905 assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
906 assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
907 }
908 _ => {
909 panic!("Not array");
910 }
911 }
912 let v = merge(v, 34_i32.into(), &MergeStrategy::FlatUnique).unwrap();
913 if let Value::Array(v) = v {
914 let v: Vec<_> = v
915 .into_iter()
916 .map(|i| {
917 if let Value::Integer(i) = i {
918 i
919 } else {
920 panic!("Bad value");
921 }
922 })
923 .collect();
924 assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
925 assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
926 } else {
927 panic!("Not array");
928 }
929 }
930
931 fn merge(initial: Value, additional: Value, strategy: &MergeStrategy) -> Result<Value, String> {
932 let mut merger = get_value_merger(initial, strategy)?;
933 merger.add(additional)?;
934 let mut output = LogEvent::default();
935 let out_path = owned_event_path!("out");
936 merger.insert_into(&out_path, &mut output)?;
937 Ok(output.remove(&out_path).unwrap())
938 }
939}