vector/transforms/reduce/
merge_strategy.rs

1use std::collections::HashSet;
2
3use crate::event::{LogEvent, Value};
4use bytes::{Bytes, BytesMut};
5use chrono::{DateTime, Utc};
6use dyn_clone::DynClone;
7use ordered_float::NotNan;
8use vector_lib::configurable::configurable_component;
9use vrl::path::OwnedTargetPath;
10
11/// Strategies for merging events.
12#[configurable_component]
13#[derive(Clone, Debug, PartialEq)]
14#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
15#[serde(rename_all = "snake_case")]
16pub enum MergeStrategy {
17    /// Discard all but the first value found.
18    Discard,
19
20    /// Discard all but the last value found.
21    ///
22    /// Works as a way to coalesce by not retaining `null`.
23    Retain,
24
25    /// Sum all numeric values.
26    Sum,
27
28    /// Keep the maximum numeric value seen.
29    Max,
30
31    /// Keep the minimum numeric value seen.
32    Min,
33
34    /// Append each value to an array.
35    Array,
36
37    /// Concatenate each string value, delimited with a space.
38    Concat,
39
40    /// Concatenate each string value, delimited with a newline.
41    ConcatNewline,
42
43    /// Concatenate each string, without a delimiter.
44    ConcatRaw,
45
46    /// Keep the shortest array seen.
47    ShortestArray,
48
49    /// Keep the longest array seen.
50    LongestArray,
51
52    /// Create a flattened array of all unique values.
53    FlatUnique,
54}
55
56#[derive(Debug, Clone)]
57struct DiscardMerger {
58    v: Value,
59}
60
61impl DiscardMerger {
62    const fn new(v: Value) -> Self {
63        Self { v }
64    }
65}
66
67impl ReduceValueMerger for DiscardMerger {
68    fn add(&mut self, _v: Value) -> Result<(), String> {
69        Ok(())
70    }
71
72    fn insert_into(
73        self: Box<Self>,
74        path: &OwnedTargetPath,
75        v: &mut LogEvent,
76    ) -> Result<(), String> {
77        v.insert(path, self.v);
78        Ok(())
79    }
80}
81
82#[derive(Debug, Clone)]
83struct RetainMerger {
84    v: Value,
85}
86
87impl RetainMerger {
88    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
89    fn new(v: Value) -> Self {
90        Self { v }
91    }
92}
93
94impl ReduceValueMerger for RetainMerger {
95    fn add(&mut self, v: Value) -> Result<(), String> {
96        if Value::Null != v {
97            self.v = v;
98        }
99        Ok(())
100    }
101
102    fn insert_into(
103        self: Box<Self>,
104        path: &OwnedTargetPath,
105        v: &mut LogEvent,
106    ) -> Result<(), String> {
107        v.insert(path, self.v);
108        Ok(())
109    }
110}
111
112#[derive(Debug, Clone)]
113struct ConcatMerger {
114    v: BytesMut,
115    join_by: Option<Vec<u8>>,
116}
117
118impl ConcatMerger {
119    fn new(v: Bytes, join_by: Option<char>) -> Self {
120        // We need to get the resulting bytes for this character in case it's actually a multi-byte character.
121        let join_by = join_by.map(|c| c.to_string().into_bytes());
122
123        Self {
124            v: BytesMut::from(&v[..]),
125            join_by,
126        }
127    }
128}
129
130impl ReduceValueMerger for ConcatMerger {
131    fn add(&mut self, v: Value) -> Result<(), String> {
132        if let Value::Bytes(b) = v {
133            if let Some(buf) = self.join_by.as_ref() {
134                self.v.extend(&buf[..]);
135            }
136            self.v.extend_from_slice(&b);
137            Ok(())
138        } else {
139            Err(format!(
140                "expected string value, found: '{}'",
141                v.to_string_lossy()
142            ))
143        }
144    }
145
146    fn insert_into(
147        self: Box<Self>,
148        path: &OwnedTargetPath,
149        v: &mut LogEvent,
150    ) -> Result<(), String> {
151        v.insert(path, Value::Bytes(self.v.into()));
152        Ok(())
153    }
154}
155
156#[derive(Debug, Clone)]
157struct ConcatArrayMerger {
158    v: Vec<Value>,
159}
160
161impl ConcatArrayMerger {
162    const fn new(v: Vec<Value>) -> Self {
163        Self { v }
164    }
165}
166
167impl ReduceValueMerger for ConcatArrayMerger {
168    fn add(&mut self, v: Value) -> Result<(), String> {
169        if let Value::Array(a) = v {
170            self.v.extend_from_slice(&a);
171        } else {
172            self.v.push(v);
173        }
174        Ok(())
175    }
176
177    fn insert_into(
178        self: Box<Self>,
179        path: &OwnedTargetPath,
180        v: &mut LogEvent,
181    ) -> Result<(), String> {
182        v.insert(path, Value::Array(self.v));
183        Ok(())
184    }
185}
186
187#[derive(Debug, Clone)]
188struct ArrayMerger {
189    v: Vec<Value>,
190}
191
192impl ArrayMerger {
193    fn new(v: Value) -> Self {
194        Self { v: vec![v] }
195    }
196}
197
198impl ReduceValueMerger for ArrayMerger {
199    fn add(&mut self, v: Value) -> Result<(), String> {
200        self.v.push(v);
201        Ok(())
202    }
203
204    fn insert_into(
205        self: Box<Self>,
206        path: &OwnedTargetPath,
207        v: &mut LogEvent,
208    ) -> Result<(), String> {
209        v.insert(path, Value::Array(self.v));
210        Ok(())
211    }
212}
213
214#[derive(Debug, Clone)]
215struct LongestArrayMerger {
216    v: Vec<Value>,
217}
218
219impl LongestArrayMerger {
220    const fn new(v: Vec<Value>) -> Self {
221        Self { v }
222    }
223}
224
225impl ReduceValueMerger for LongestArrayMerger {
226    fn add(&mut self, v: Value) -> Result<(), String> {
227        if let Value::Array(a) = v {
228            if a.len() > self.v.len() {
229                self.v = a;
230            }
231            Ok(())
232        } else {
233            Err(format!(
234                "expected array value, found: '{}'",
235                v.to_string_lossy()
236            ))
237        }
238    }
239
240    fn insert_into(
241        self: Box<Self>,
242        path: &OwnedTargetPath,
243        v: &mut LogEvent,
244    ) -> Result<(), String> {
245        v.insert(path, Value::Array(self.v));
246        Ok(())
247    }
248}
249
250#[derive(Debug, Clone)]
251struct ShortestArrayMerger {
252    v: Vec<Value>,
253}
254
255impl ShortestArrayMerger {
256    const fn new(v: Vec<Value>) -> Self {
257        Self { v }
258    }
259}
260
261impl ReduceValueMerger for ShortestArrayMerger {
262    fn add(&mut self, v: Value) -> Result<(), String> {
263        if let Value::Array(a) = v {
264            if a.len() < self.v.len() {
265                self.v = a;
266            }
267            Ok(())
268        } else {
269            Err(format!(
270                "expected array value, found: '{}'",
271                v.to_string_lossy()
272            ))
273        }
274    }
275
276    fn insert_into(
277        self: Box<Self>,
278        path: &OwnedTargetPath,
279        v: &mut LogEvent,
280    ) -> Result<(), String> {
281        v.insert(path, Value::Array(self.v));
282        Ok(())
283    }
284}
285
286#[derive(Debug, Clone)]
287struct FlatUniqueMerger {
288    v: HashSet<Value>,
289}
290
291#[allow(clippy::mutable_key_type)] // false positive due to bytes::Bytes
292fn insert_value(h: &mut HashSet<Value>, v: Value) {
293    match v {
294        Value::Object(m) => {
295            for (_, v) in m {
296                h.insert(v);
297            }
298        }
299        Value::Array(vec) => {
300            for v in vec {
301                h.insert(v);
302            }
303        }
304        _ => {
305            h.insert(v);
306        }
307    }
308}
309
310impl FlatUniqueMerger {
311    #[allow(clippy::mutable_key_type)] // false positive due to bytes::Bytes
312    fn new(v: Value) -> Self {
313        let mut h = HashSet::default();
314        insert_value(&mut h, v);
315        Self { v: h }
316    }
317}
318
319impl ReduceValueMerger for FlatUniqueMerger {
320    fn add(&mut self, v: Value) -> Result<(), String> {
321        insert_value(&mut self.v, v);
322        Ok(())
323    }
324
325    fn insert_into(
326        self: Box<Self>,
327        path: &OwnedTargetPath,
328        v: &mut LogEvent,
329    ) -> Result<(), String> {
330        v.insert(path, Value::Array(self.v.into_iter().collect()));
331        Ok(())
332    }
333}
334
335#[derive(Debug, Clone)]
336struct TimestampWindowMerger {
337    started: DateTime<Utc>,
338    latest: DateTime<Utc>,
339}
340
341impl TimestampWindowMerger {
342    const fn new(v: DateTime<Utc>) -> Self {
343        Self {
344            started: v,
345            latest: v,
346        }
347    }
348}
349
350impl ReduceValueMerger for TimestampWindowMerger {
351    fn add(&mut self, v: Value) -> Result<(), String> {
352        if let Value::Timestamp(ts) = v {
353            self.latest = ts
354        } else {
355            return Err(format!(
356                "expected timestamp value, found: {}",
357                v.to_string_lossy()
358            ));
359        }
360        Ok(())
361    }
362
363    fn insert_into(
364        self: Box<Self>,
365        path: &OwnedTargetPath,
366        v: &mut LogEvent,
367    ) -> Result<(), String> {
368        v.insert(
369            format!("{path}_end").as_str(),
370            Value::Timestamp(self.latest),
371        );
372        v.insert(path, Value::Timestamp(self.started));
373        Ok(())
374    }
375}
376
377#[derive(Debug, Clone)]
378enum NumberMergerValue {
379    Int(i64),
380    Float(NotNan<f64>),
381}
382
383impl From<i64> for NumberMergerValue {
384    fn from(v: i64) -> Self {
385        NumberMergerValue::Int(v)
386    }
387}
388
389impl From<NotNan<f64>> for NumberMergerValue {
390    fn from(v: NotNan<f64>) -> Self {
391        NumberMergerValue::Float(v)
392    }
393}
394
395#[derive(Debug, Clone)]
396struct AddNumbersMerger {
397    v: NumberMergerValue,
398}
399
400impl AddNumbersMerger {
401    const fn new(v: NumberMergerValue) -> Self {
402        Self { v }
403    }
404}
405
406impl ReduceValueMerger for AddNumbersMerger {
407    fn add(&mut self, v: Value) -> Result<(), String> {
408        // Try and keep max precision with integer values, but once we've
409        // received a float downgrade to float precision.
410        match v {
411            Value::Integer(i) => match self.v {
412                NumberMergerValue::Int(j) => self.v = NumberMergerValue::Int(i + j),
413                NumberMergerValue::Float(j) => {
414                    self.v = NumberMergerValue::Float(NotNan::new(i as f64).unwrap() + j)
415                }
416            },
417            Value::Float(f) => match self.v {
418                NumberMergerValue::Int(j) => self.v = NumberMergerValue::Float(f + j as f64),
419                NumberMergerValue::Float(j) => self.v = NumberMergerValue::Float(f + j),
420            },
421            _ => {
422                return Err(format!(
423                    "expected numeric value, found: '{}'",
424                    v.to_string_lossy()
425                ));
426            }
427        }
428        Ok(())
429    }
430
431    fn insert_into(
432        self: Box<Self>,
433        path: &OwnedTargetPath,
434        v: &mut LogEvent,
435    ) -> Result<(), String> {
436        match self.v {
437            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
438            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
439        };
440        Ok(())
441    }
442}
443
444#[derive(Debug, Clone)]
445struct MaxNumberMerger {
446    v: NumberMergerValue,
447}
448
449impl MaxNumberMerger {
450    const fn new(v: NumberMergerValue) -> Self {
451        Self { v }
452    }
453}
454
455impl ReduceValueMerger for MaxNumberMerger {
456    fn add(&mut self, v: Value) -> Result<(), String> {
457        // Try and keep max precision with integer values, but once we've
458        // received a float downgrade to float precision.
459        match v {
460            Value::Integer(i) => {
461                match self.v {
462                    NumberMergerValue::Int(i2) => {
463                        if i > i2 {
464                            self.v = NumberMergerValue::Int(i);
465                        }
466                    }
467                    NumberMergerValue::Float(f2) => {
468                        let f = NotNan::new(i as f64).unwrap();
469                        if f > f2 {
470                            self.v = NumberMergerValue::Float(f);
471                        }
472                    }
473                };
474            }
475            Value::Float(f) => {
476                let f2 = match self.v {
477                    NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
478                    NumberMergerValue::Float(f2) => f2,
479                };
480                if f > f2 {
481                    self.v = NumberMergerValue::Float(f);
482                }
483            }
484            _ => {
485                return Err(format!(
486                    "expected numeric value, found: '{}'",
487                    v.to_string_lossy()
488                ));
489            }
490        }
491        Ok(())
492    }
493
494    fn insert_into(
495        self: Box<Self>,
496        path: &OwnedTargetPath,
497        v: &mut LogEvent,
498    ) -> Result<(), String> {
499        match self.v {
500            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
501            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
502        };
503        Ok(())
504    }
505}
506
507#[derive(Debug, Clone)]
508struct MinNumberMerger {
509    v: NumberMergerValue,
510}
511
512impl MinNumberMerger {
513    const fn new(v: NumberMergerValue) -> Self {
514        Self { v }
515    }
516}
517
518impl ReduceValueMerger for MinNumberMerger {
519    fn add(&mut self, v: Value) -> Result<(), String> {
520        // Try and keep max precision with integer values, but once we've
521        // received a float downgrade to float precision.
522        match v {
523            Value::Integer(i) => {
524                match self.v {
525                    NumberMergerValue::Int(i2) => {
526                        if i < i2 {
527                            self.v = NumberMergerValue::Int(i);
528                        }
529                    }
530                    NumberMergerValue::Float(f2) => {
531                        let f = NotNan::new(i as f64).unwrap();
532                        if f < f2 {
533                            self.v = NumberMergerValue::Float(f);
534                        }
535                    }
536                };
537            }
538            Value::Float(f) => {
539                let f2 = match self.v {
540                    NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
541                    NumberMergerValue::Float(f2) => f2,
542                };
543                if f < f2 {
544                    self.v = NumberMergerValue::Float(f);
545                }
546            }
547            _ => {
548                return Err(format!(
549                    "expected numeric value, found: '{}'",
550                    v.to_string_lossy()
551                ));
552            }
553        }
554        Ok(())
555    }
556
557    fn insert_into(
558        self: Box<Self>,
559        path: &OwnedTargetPath,
560        v: &mut LogEvent,
561    ) -> Result<(), String> {
562        match self.v {
563            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
564            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
565        };
566        Ok(())
567    }
568}
569
570pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync + DynClone {
571    fn add(&mut self, v: Value) -> Result<(), String>;
572    fn insert_into(self: Box<Self>, path: &OwnedTargetPath, v: &mut LogEvent)
573        -> Result<(), String>;
574}
575
576dyn_clone::clone_trait_object!(ReduceValueMerger);
577
578impl From<Value> for Box<dyn ReduceValueMerger> {
579    fn from(v: Value) -> Self {
580        match v {
581            Value::Integer(i) => Box::new(AddNumbersMerger::new(i.into())),
582            Value::Float(f) => Box::new(AddNumbersMerger::new(f.into())),
583            Value::Timestamp(ts) => Box::new(TimestampWindowMerger::new(ts)),
584            Value::Object(_) => Box::new(DiscardMerger::new(v)),
585            Value::Null => Box::new(DiscardMerger::new(v)),
586            Value::Boolean(_) => Box::new(DiscardMerger::new(v)),
587            Value::Bytes(_) => Box::new(DiscardMerger::new(v)),
588            Value::Regex(_) => Box::new(DiscardMerger::new(v)),
589            Value::Array(_) => Box::new(DiscardMerger::new(v)),
590        }
591    }
592}
593
594pub(crate) fn get_value_merger(
595    v: Value,
596    m: &MergeStrategy,
597) -> Result<Box<dyn ReduceValueMerger>, String> {
598    match m {
599        MergeStrategy::Sum => match v {
600            Value::Integer(i) => Ok(Box::new(AddNumbersMerger::new(i.into()))),
601            Value::Float(f) => Ok(Box::new(AddNumbersMerger::new(f.into()))),
602            _ => Err(format!(
603                "expected number value, found: '{}'",
604                v.to_string_lossy()
605            )),
606        },
607        MergeStrategy::Max => match v {
608            Value::Integer(i) => Ok(Box::new(MaxNumberMerger::new(i.into()))),
609            Value::Float(f) => Ok(Box::new(MaxNumberMerger::new(f.into()))),
610            _ => Err(format!(
611                "expected number value, found: '{}'",
612                v.to_string_lossy()
613            )),
614        },
615        MergeStrategy::Min => match v {
616            Value::Integer(i) => Ok(Box::new(MinNumberMerger::new(i.into()))),
617            Value::Float(f) => Ok(Box::new(MinNumberMerger::new(f.into()))),
618            _ => Err(format!(
619                "expected number value, found: '{}'",
620                v.to_string_lossy()
621            )),
622        },
623        MergeStrategy::Concat => match v {
624            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some(' ')))),
625            Value::Array(a) => Ok(Box::new(ConcatArrayMerger::new(a))),
626            _ => Err(format!(
627                "expected string or array value, found: '{}'",
628                v.to_string_lossy()
629            )),
630        },
631        MergeStrategy::ConcatNewline => match v {
632            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some('\n')))),
633            _ => Err(format!(
634                "expected string value, found: '{}'",
635                v.to_string_lossy()
636            )),
637        },
638        MergeStrategy::ConcatRaw => match v {
639            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, None))),
640            _ => Err(format!(
641                "expected string value, found: '{}'",
642                v.to_string_lossy()
643            )),
644        },
645        MergeStrategy::Array => Ok(Box::new(ArrayMerger::new(v))),
646        MergeStrategy::ShortestArray => match v {
647            Value::Array(a) => Ok(Box::new(ShortestArrayMerger::new(a))),
648            _ => Err(format!(
649                "expected array value, found: '{}'",
650                v.to_string_lossy()
651            )),
652        },
653        MergeStrategy::LongestArray => match v {
654            Value::Array(a) => Ok(Box::new(LongestArrayMerger::new(a))),
655            _ => Err(format!(
656                "expected array value, found: '{}'",
657                v.to_string_lossy()
658            )),
659        },
660        MergeStrategy::Discard => Ok(Box::new(DiscardMerger::new(v))),
661        MergeStrategy::Retain => Ok(Box::new(RetainMerger::new(v))),
662        MergeStrategy::FlatUnique => Ok(Box::new(FlatUniqueMerger::new(v))),
663    }
664}
665
666#[cfg(test)]
667mod test {
668    use super::*;
669    use crate::event::LogEvent;
670    use serde_json::json;
671    use vrl::owned_event_path;
672
673    #[test]
674    fn initial_values() {
675        assert!(get_value_merger("foo".into(), &MergeStrategy::Discard).is_ok());
676        assert!(get_value_merger("foo".into(), &MergeStrategy::Retain).is_ok());
677        assert!(get_value_merger("foo".into(), &MergeStrategy::Sum).is_err());
678        assert!(get_value_merger("foo".into(), &MergeStrategy::Max).is_err());
679        assert!(get_value_merger("foo".into(), &MergeStrategy::Min).is_err());
680        assert!(get_value_merger("foo".into(), &MergeStrategy::Array).is_ok());
681        assert!(get_value_merger("foo".into(), &MergeStrategy::LongestArray).is_err());
682        assert!(get_value_merger("foo".into(), &MergeStrategy::ShortestArray).is_err());
683        assert!(get_value_merger("foo".into(), &MergeStrategy::Concat).is_ok());
684        assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatNewline).is_ok());
685        assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatRaw).is_ok());
686        assert!(get_value_merger("foo".into(), &MergeStrategy::FlatUnique).is_ok());
687
688        assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
689        assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
690        assert!(get_value_merger(42.into(), &MergeStrategy::Sum).is_ok());
691        assert!(get_value_merger(42.into(), &MergeStrategy::Min).is_ok());
692        assert!(get_value_merger(42.into(), &MergeStrategy::Max).is_ok());
693        assert!(get_value_merger(42.into(), &MergeStrategy::Array).is_ok());
694        assert!(get_value_merger(42.into(), &MergeStrategy::LongestArray).is_err());
695        assert!(get_value_merger(42.into(), &MergeStrategy::ShortestArray).is_err());
696        assert!(get_value_merger(42.into(), &MergeStrategy::Concat).is_err());
697        assert!(get_value_merger(42.into(), &MergeStrategy::ConcatNewline).is_err());
698        assert!(get_value_merger(42.into(), &MergeStrategy::ConcatRaw).is_err());
699        assert!(get_value_merger(42.into(), &MergeStrategy::FlatUnique).is_ok());
700
701        assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
702        assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
703        assert!(get_value_merger(4.2.into(), &MergeStrategy::Sum).is_ok());
704        assert!(get_value_merger(4.2.into(), &MergeStrategy::Min).is_ok());
705        assert!(get_value_merger(4.2.into(), &MergeStrategy::Max).is_ok());
706        assert!(get_value_merger(4.2.into(), &MergeStrategy::Array).is_ok());
707        assert!(get_value_merger(4.2.into(), &MergeStrategy::LongestArray).is_err());
708        assert!(get_value_merger(4.2.into(), &MergeStrategy::ShortestArray).is_err());
709        assert!(get_value_merger(4.2.into(), &MergeStrategy::Concat).is_err());
710        assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatNewline).is_err());
711        assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatRaw).is_err());
712        assert!(get_value_merger(4.2.into(), &MergeStrategy::FlatUnique).is_ok());
713
714        assert!(get_value_merger(true.into(), &MergeStrategy::Discard).is_ok());
715        assert!(get_value_merger(true.into(), &MergeStrategy::Retain).is_ok());
716        assert!(get_value_merger(true.into(), &MergeStrategy::Sum).is_err());
717        assert!(get_value_merger(true.into(), &MergeStrategy::Max).is_err());
718        assert!(get_value_merger(true.into(), &MergeStrategy::Min).is_err());
719        assert!(get_value_merger(true.into(), &MergeStrategy::Array).is_ok());
720        assert!(get_value_merger(true.into(), &MergeStrategy::LongestArray).is_err());
721        assert!(get_value_merger(true.into(), &MergeStrategy::ShortestArray).is_err());
722        assert!(get_value_merger(true.into(), &MergeStrategy::Concat).is_err());
723        assert!(get_value_merger(true.into(), &MergeStrategy::ConcatNewline).is_err());
724        assert!(get_value_merger(true.into(), &MergeStrategy::ConcatRaw).is_err());
725        assert!(get_value_merger(true.into(), &MergeStrategy::FlatUnique).is_ok());
726
727        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
728        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Retain).is_ok());
729        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Sum).is_err());
730        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Max).is_err());
731        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Min).is_err());
732        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Array).is_ok());
733        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::LongestArray).is_err());
734        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ShortestArray).is_err());
735        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Concat).is_err());
736        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatNewline).is_err());
737        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatRaw).is_err());
738        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
739        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::FlatUnique).is_ok());
740
741        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Discard).is_ok());
742        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Retain).is_ok());
743        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Sum).is_err());
744        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Max).is_err());
745        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Min).is_err());
746        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Array).is_ok());
747        assert!(get_value_merger(json!([]).into(), &MergeStrategy::LongestArray).is_ok());
748        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ShortestArray).is_ok());
749        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_ok());
750        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatNewline).is_err());
751        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatRaw).is_err());
752        assert!(get_value_merger(json!([]).into(), &MergeStrategy::FlatUnique).is_ok());
753
754        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Discard).is_ok());
755        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Retain).is_ok());
756        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Sum).is_err());
757        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Max).is_err());
758        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Min).is_err());
759        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Array).is_ok());
760        assert!(get_value_merger(json!({}).into(), &MergeStrategy::LongestArray).is_err());
761        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ShortestArray).is_err());
762        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Concat).is_err());
763        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatNewline).is_err());
764        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatRaw).is_err());
765        assert!(get_value_merger(json!({}).into(), &MergeStrategy::FlatUnique).is_ok());
766
767        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Discard).is_ok());
768        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Retain).is_ok());
769        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Sum).is_err());
770        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Max).is_err());
771        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Min).is_err());
772        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Array).is_ok());
773        assert!(get_value_merger(json!(null).into(), &MergeStrategy::LongestArray).is_err());
774        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ShortestArray).is_err());
775        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Concat).is_err());
776        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatNewline).is_err());
777        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatRaw).is_err());
778        assert!(get_value_merger(json!(null).into(), &MergeStrategy::FlatUnique).is_ok());
779    }
780
781    #[test]
782    fn merging_values() {
783        assert_eq!(
784            merge("foo".into(), "bar".into(), &MergeStrategy::Discard),
785            Ok("foo".into())
786        );
787        assert_eq!(
788            merge("foo".into(), "bar".into(), &MergeStrategy::Retain),
789            Ok("bar".into())
790        );
791        assert_eq!(
792            merge("foo".into(), "bar".into(), &MergeStrategy::Array),
793            Ok(json!(["foo", "bar"]).into())
794        );
795        assert_eq!(
796            merge("foo".into(), "bar".into(), &MergeStrategy::Concat),
797            Ok("foo bar".into())
798        );
799        assert_eq!(
800            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
801            Ok("foo\nbar".into())
802        );
803        assert_eq!(
804            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatRaw),
805            Ok("foobar".into())
806        );
807        assert!(merge("foo".into(), 42.into(), &MergeStrategy::Concat).is_err());
808        assert!(merge("foo".into(), 4.2.into(), &MergeStrategy::Concat).is_err());
809        assert!(merge("foo".into(), true.into(), &MergeStrategy::Concat).is_err());
810        assert!(merge("foo".into(), Utc::now().into(), &MergeStrategy::Concat).is_err());
811        assert!(merge("foo".into(), json!({}).into(), &MergeStrategy::Concat).is_err());
812        assert!(merge("foo".into(), json!([]).into(), &MergeStrategy::Concat).is_err());
813        assert!(merge("foo".into(), json!(null).into(), &MergeStrategy::Concat).is_err());
814
815        assert_eq!(
816            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
817            Ok("foo\nbar".into())
818        );
819
820        assert_eq!(
821            merge(21.into(), 21.into(), &MergeStrategy::Sum),
822            Ok(42.into())
823        );
824        assert_eq!(
825            merge(41.into(), 42.into(), &MergeStrategy::Max),
826            Ok(42.into())
827        );
828        assert_eq!(
829            merge(42.into(), 41.into(), &MergeStrategy::Max),
830            Ok(42.into())
831        );
832        assert_eq!(
833            merge(42.into(), 43.into(), &MergeStrategy::Min),
834            Ok(42.into())
835        );
836        assert_eq!(
837            merge(43.into(), 42.into(), &MergeStrategy::Min),
838            Ok(42.into())
839        );
840
841        assert_eq!(
842            merge(2.1.into(), 2.1.into(), &MergeStrategy::Sum),
843            Ok(4.2.into())
844        );
845        assert_eq!(
846            merge(4.1.into(), 4.2.into(), &MergeStrategy::Max),
847            Ok(4.2.into())
848        );
849        assert_eq!(
850            merge(4.2.into(), 4.1.into(), &MergeStrategy::Max),
851            Ok(4.2.into())
852        );
853        assert_eq!(
854            merge(4.2.into(), 4.3.into(), &MergeStrategy::Min),
855            Ok(4.2.into())
856        );
857        assert_eq!(
858            merge(4.3.into(), 4.2.into(), &MergeStrategy::Min),
859            Ok(4.2.into())
860        );
861
862        assert_eq!(
863            merge(
864                json!([4_i64]).into(),
865                json!([2_i64]).into(),
866                &MergeStrategy::Concat
867            ),
868            Ok(json!([4_i64, 2_i64]).into())
869        );
870        assert_eq!(
871            merge(json!([]).into(), 42_i64.into(), &MergeStrategy::Concat),
872            Ok(json!([42_i64]).into())
873        );
874
875        assert_eq!(
876            merge(
877                json!([34_i64]).into(),
878                json!([42_i64, 43_i64]).into(),
879                &MergeStrategy::ShortestArray
880            ),
881            Ok(json!([34_i64]).into())
882        );
883        assert_eq!(
884            merge(
885                json!([34_i64]).into(),
886                json!([42_i64, 43_i64]).into(),
887                &MergeStrategy::LongestArray
888            ),
889            Ok(json!([42_i64, 43_i64]).into())
890        );
891
892        let v = merge(34_i64.into(), 43_i64.into(), &MergeStrategy::FlatUnique).unwrap();
893        match v.clone() {
894            Value::Array(v) => {
895                let v: Vec<_> = v
896                    .into_iter()
897                    .map(|i| {
898                        if let Value::Integer(i) = i {
899                            i
900                        } else {
901                            panic!("Bad value");
902                        }
903                    })
904                    .collect();
905                assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
906                assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
907            }
908            _ => {
909                panic!("Not array");
910            }
911        }
912        let v = merge(v, 34_i32.into(), &MergeStrategy::FlatUnique).unwrap();
913        if let Value::Array(v) = v {
914            let v: Vec<_> = v
915                .into_iter()
916                .map(|i| {
917                    if let Value::Integer(i) = i {
918                        i
919                    } else {
920                        panic!("Bad value");
921                    }
922                })
923                .collect();
924            assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
925            assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
926        } else {
927            panic!("Not array");
928        }
929    }
930
931    fn merge(initial: Value, additional: Value, strategy: &MergeStrategy) -> Result<Value, String> {
932        let mut merger = get_value_merger(initial, strategy)?;
933        merger.add(additional)?;
934        let mut output = LogEvent::default();
935        let out_path = owned_event_path!("out");
936        merger.insert_into(&out_path, &mut output)?;
937        Ok(output.remove(&out_path).unwrap())
938    }
939}