vector/transforms/reduce/
merge_strategy.rs

1use std::collections::HashSet;
2
3use bytes::{Bytes, BytesMut};
4use chrono::{DateTime, Utc};
5use dyn_clone::DynClone;
6use ordered_float::NotNan;
7use vector_lib::configurable::configurable_component;
8use vrl::path::OwnedTargetPath;
9
10use crate::event::{LogEvent, Value};
11
12/// Strategies for merging events.
13#[configurable_component]
14#[derive(Clone, Debug, PartialEq)]
15#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
16#[serde(rename_all = "snake_case")]
17pub enum MergeStrategy {
18    /// Discard all but the first value found.
19    Discard,
20
21    /// Discard all but the last value found.
22    ///
23    /// Works as a way to coalesce by not retaining `null`.
24    Retain,
25
26    /// Sum all numeric values.
27    Sum,
28
29    /// Keep the maximum numeric value seen.
30    Max,
31
32    /// Keep the minimum numeric value seen.
33    Min,
34
35    /// Append each value to an array.
36    Array,
37
38    /// Concatenate each string value, delimited with a space.
39    Concat,
40
41    /// Concatenate each string value, delimited with a newline.
42    ConcatNewline,
43
44    /// Concatenate each string, without a delimiter.
45    ConcatRaw,
46
47    /// Keep the shortest array seen.
48    ShortestArray,
49
50    /// Keep the longest array seen.
51    LongestArray,
52
53    /// Create a flattened array of all unique values.
54    FlatUnique,
55}
56
57#[derive(Debug, Clone)]
58struct DiscardMerger {
59    v: Value,
60}
61
62impl DiscardMerger {
63    const fn new(v: Value) -> Self {
64        Self { v }
65    }
66}
67
68impl ReduceValueMerger for DiscardMerger {
69    fn add(&mut self, _v: Value) -> Result<(), String> {
70        Ok(())
71    }
72
73    fn insert_into(
74        self: Box<Self>,
75        path: &OwnedTargetPath,
76        v: &mut LogEvent,
77    ) -> Result<(), String> {
78        v.insert(path, self.v);
79        Ok(())
80    }
81}
82
83#[derive(Debug, Clone)]
84struct RetainMerger {
85    v: Value,
86}
87
88impl RetainMerger {
89    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
90    fn new(v: Value) -> Self {
91        Self { v }
92    }
93}
94
95impl ReduceValueMerger for RetainMerger {
96    fn add(&mut self, v: Value) -> Result<(), String> {
97        if Value::Null != v {
98            self.v = v;
99        }
100        Ok(())
101    }
102
103    fn insert_into(
104        self: Box<Self>,
105        path: &OwnedTargetPath,
106        v: &mut LogEvent,
107    ) -> Result<(), String> {
108        v.insert(path, self.v);
109        Ok(())
110    }
111}
112
113#[derive(Debug, Clone)]
114struct ConcatMerger {
115    v: BytesMut,
116    join_by: Option<Vec<u8>>,
117}
118
119impl ConcatMerger {
120    fn new(v: Bytes, join_by: Option<char>) -> Self {
121        // We need to get the resulting bytes for this character in case it's actually a multi-byte character.
122        let join_by = join_by.map(|c| c.to_string().into_bytes());
123
124        Self {
125            v: BytesMut::from(&v[..]),
126            join_by,
127        }
128    }
129}
130
131impl ReduceValueMerger for ConcatMerger {
132    fn add(&mut self, v: Value) -> Result<(), String> {
133        if let Value::Bytes(b) = v {
134            if let Some(buf) = self.join_by.as_ref() {
135                self.v.extend(&buf[..]);
136            }
137            self.v.extend_from_slice(&b);
138            Ok(())
139        } else {
140            Err(format!(
141                "expected string value, found: '{}'",
142                v.to_string_lossy()
143            ))
144        }
145    }
146
147    fn insert_into(
148        self: Box<Self>,
149        path: &OwnedTargetPath,
150        v: &mut LogEvent,
151    ) -> Result<(), String> {
152        v.insert(path, Value::Bytes(self.v.into()));
153        Ok(())
154    }
155}
156
157#[derive(Debug, Clone)]
158struct ConcatArrayMerger {
159    v: Vec<Value>,
160}
161
162impl ConcatArrayMerger {
163    const fn new(v: Vec<Value>) -> Self {
164        Self { v }
165    }
166}
167
168impl ReduceValueMerger for ConcatArrayMerger {
169    fn add(&mut self, v: Value) -> Result<(), String> {
170        if let Value::Array(a) = v {
171            self.v.extend_from_slice(&a);
172        } else {
173            self.v.push(v);
174        }
175        Ok(())
176    }
177
178    fn insert_into(
179        self: Box<Self>,
180        path: &OwnedTargetPath,
181        v: &mut LogEvent,
182    ) -> Result<(), String> {
183        v.insert(path, Value::Array(self.v));
184        Ok(())
185    }
186}
187
188#[derive(Debug, Clone)]
189struct ArrayMerger {
190    v: Vec<Value>,
191}
192
193impl ArrayMerger {
194    fn new(v: Value) -> Self {
195        Self { v: vec![v] }
196    }
197}
198
199impl ReduceValueMerger for ArrayMerger {
200    fn add(&mut self, v: Value) -> Result<(), String> {
201        self.v.push(v);
202        Ok(())
203    }
204
205    fn insert_into(
206        self: Box<Self>,
207        path: &OwnedTargetPath,
208        v: &mut LogEvent,
209    ) -> Result<(), String> {
210        v.insert(path, Value::Array(self.v));
211        Ok(())
212    }
213}
214
215#[derive(Debug, Clone)]
216struct LongestArrayMerger {
217    v: Vec<Value>,
218}
219
220impl LongestArrayMerger {
221    const fn new(v: Vec<Value>) -> Self {
222        Self { v }
223    }
224}
225
226impl ReduceValueMerger for LongestArrayMerger {
227    fn add(&mut self, v: Value) -> Result<(), String> {
228        if let Value::Array(a) = v {
229            if a.len() > self.v.len() {
230                self.v = a;
231            }
232            Ok(())
233        } else {
234            Err(format!(
235                "expected array value, found: '{}'",
236                v.to_string_lossy()
237            ))
238        }
239    }
240
241    fn insert_into(
242        self: Box<Self>,
243        path: &OwnedTargetPath,
244        v: &mut LogEvent,
245    ) -> Result<(), String> {
246        v.insert(path, Value::Array(self.v));
247        Ok(())
248    }
249}
250
251#[derive(Debug, Clone)]
252struct ShortestArrayMerger {
253    v: Vec<Value>,
254}
255
256impl ShortestArrayMerger {
257    const fn new(v: Vec<Value>) -> Self {
258        Self { v }
259    }
260}
261
262impl ReduceValueMerger for ShortestArrayMerger {
263    fn add(&mut self, v: Value) -> Result<(), String> {
264        if let Value::Array(a) = v {
265            if a.len() < self.v.len() {
266                self.v = a;
267            }
268            Ok(())
269        } else {
270            Err(format!(
271                "expected array value, found: '{}'",
272                v.to_string_lossy()
273            ))
274        }
275    }
276
277    fn insert_into(
278        self: Box<Self>,
279        path: &OwnedTargetPath,
280        v: &mut LogEvent,
281    ) -> Result<(), String> {
282        v.insert(path, Value::Array(self.v));
283        Ok(())
284    }
285}
286
287#[derive(Debug, Clone)]
288struct FlatUniqueMerger {
289    v: HashSet<Value>,
290}
291
292#[allow(clippy::mutable_key_type)] // false positive due to bytes::Bytes
293fn insert_value(h: &mut HashSet<Value>, v: Value) {
294    match v {
295        Value::Object(m) => {
296            for (_, v) in m {
297                h.insert(v);
298            }
299        }
300        Value::Array(vec) => {
301            for v in vec {
302                h.insert(v);
303            }
304        }
305        _ => {
306            h.insert(v);
307        }
308    }
309}
310
311impl FlatUniqueMerger {
312    #[allow(clippy::mutable_key_type)] // false positive due to bytes::Bytes
313    fn new(v: Value) -> Self {
314        let mut h = HashSet::default();
315        insert_value(&mut h, v);
316        Self { v: h }
317    }
318}
319
320impl ReduceValueMerger for FlatUniqueMerger {
321    fn add(&mut self, v: Value) -> Result<(), String> {
322        insert_value(&mut self.v, v);
323        Ok(())
324    }
325
326    fn insert_into(
327        self: Box<Self>,
328        path: &OwnedTargetPath,
329        v: &mut LogEvent,
330    ) -> Result<(), String> {
331        v.insert(path, Value::Array(self.v.into_iter().collect()));
332        Ok(())
333    }
334}
335
336#[derive(Debug, Clone)]
337struct TimestampWindowMerger {
338    started: DateTime<Utc>,
339    latest: DateTime<Utc>,
340}
341
342impl TimestampWindowMerger {
343    const fn new(v: DateTime<Utc>) -> Self {
344        Self {
345            started: v,
346            latest: v,
347        }
348    }
349}
350
351impl ReduceValueMerger for TimestampWindowMerger {
352    fn add(&mut self, v: Value) -> Result<(), String> {
353        if let Value::Timestamp(ts) = v {
354            self.latest = ts
355        } else {
356            return Err(format!(
357                "expected timestamp value, found: {}",
358                v.to_string_lossy()
359            ));
360        }
361        Ok(())
362    }
363
364    fn insert_into(
365        self: Box<Self>,
366        path: &OwnedTargetPath,
367        v: &mut LogEvent,
368    ) -> Result<(), String> {
369        v.insert(
370            format!("{path}_end").as_str(),
371            Value::Timestamp(self.latest),
372        );
373        v.insert(path, Value::Timestamp(self.started));
374        Ok(())
375    }
376}
377
378#[derive(Debug, Clone)]
379enum NumberMergerValue {
380    Int(i64),
381    Float(NotNan<f64>),
382}
383
384impl From<i64> for NumberMergerValue {
385    fn from(v: i64) -> Self {
386        NumberMergerValue::Int(v)
387    }
388}
389
390impl From<NotNan<f64>> for NumberMergerValue {
391    fn from(v: NotNan<f64>) -> Self {
392        NumberMergerValue::Float(v)
393    }
394}
395
396#[derive(Debug, Clone)]
397struct AddNumbersMerger {
398    v: NumberMergerValue,
399}
400
401impl AddNumbersMerger {
402    const fn new(v: NumberMergerValue) -> Self {
403        Self { v }
404    }
405}
406
407impl ReduceValueMerger for AddNumbersMerger {
408    fn add(&mut self, v: Value) -> Result<(), String> {
409        // Try and keep max precision with integer values, but once we've
410        // received a float downgrade to float precision.
411        match v {
412            Value::Integer(i) => match self.v {
413                NumberMergerValue::Int(j) => self.v = NumberMergerValue::Int(i + j),
414                NumberMergerValue::Float(j) => {
415                    self.v = NumberMergerValue::Float(NotNan::new(i as f64).unwrap() + j)
416                }
417            },
418            Value::Float(f) => match self.v {
419                NumberMergerValue::Int(j) => self.v = NumberMergerValue::Float(f + j as f64),
420                NumberMergerValue::Float(j) => self.v = NumberMergerValue::Float(f + j),
421            },
422            _ => {
423                return Err(format!(
424                    "expected numeric value, found: '{}'",
425                    v.to_string_lossy()
426                ));
427            }
428        }
429        Ok(())
430    }
431
432    fn insert_into(
433        self: Box<Self>,
434        path: &OwnedTargetPath,
435        v: &mut LogEvent,
436    ) -> Result<(), String> {
437        match self.v {
438            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
439            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
440        };
441        Ok(())
442    }
443}
444
445#[derive(Debug, Clone)]
446struct MaxNumberMerger {
447    v: NumberMergerValue,
448}
449
450impl MaxNumberMerger {
451    const fn new(v: NumberMergerValue) -> Self {
452        Self { v }
453    }
454}
455
456impl ReduceValueMerger for MaxNumberMerger {
457    fn add(&mut self, v: Value) -> Result<(), String> {
458        // Try and keep max precision with integer values, but once we've
459        // received a float downgrade to float precision.
460        match v {
461            Value::Integer(i) => {
462                match self.v {
463                    NumberMergerValue::Int(i2) => {
464                        if i > i2 {
465                            self.v = NumberMergerValue::Int(i);
466                        }
467                    }
468                    NumberMergerValue::Float(f2) => {
469                        let f = NotNan::new(i as f64).unwrap();
470                        if f > f2 {
471                            self.v = NumberMergerValue::Float(f);
472                        }
473                    }
474                };
475            }
476            Value::Float(f) => {
477                let f2 = match self.v {
478                    NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
479                    NumberMergerValue::Float(f2) => f2,
480                };
481                if f > f2 {
482                    self.v = NumberMergerValue::Float(f);
483                }
484            }
485            _ => {
486                return Err(format!(
487                    "expected numeric value, found: '{}'",
488                    v.to_string_lossy()
489                ));
490            }
491        }
492        Ok(())
493    }
494
495    fn insert_into(
496        self: Box<Self>,
497        path: &OwnedTargetPath,
498        v: &mut LogEvent,
499    ) -> Result<(), String> {
500        match self.v {
501            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
502            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
503        };
504        Ok(())
505    }
506}
507
508#[derive(Debug, Clone)]
509struct MinNumberMerger {
510    v: NumberMergerValue,
511}
512
513impl MinNumberMerger {
514    const fn new(v: NumberMergerValue) -> Self {
515        Self { v }
516    }
517}
518
519impl ReduceValueMerger for MinNumberMerger {
520    fn add(&mut self, v: Value) -> Result<(), String> {
521        // Try and keep max precision with integer values, but once we've
522        // received a float downgrade to float precision.
523        match v {
524            Value::Integer(i) => {
525                match self.v {
526                    NumberMergerValue::Int(i2) => {
527                        if i < i2 {
528                            self.v = NumberMergerValue::Int(i);
529                        }
530                    }
531                    NumberMergerValue::Float(f2) => {
532                        let f = NotNan::new(i as f64).unwrap();
533                        if f < f2 {
534                            self.v = NumberMergerValue::Float(f);
535                        }
536                    }
537                };
538            }
539            Value::Float(f) => {
540                let f2 = match self.v {
541                    NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
542                    NumberMergerValue::Float(f2) => f2,
543                };
544                if f < f2 {
545                    self.v = NumberMergerValue::Float(f);
546                }
547            }
548            _ => {
549                return Err(format!(
550                    "expected numeric value, found: '{}'",
551                    v.to_string_lossy()
552                ));
553            }
554        }
555        Ok(())
556    }
557
558    fn insert_into(
559        self: Box<Self>,
560        path: &OwnedTargetPath,
561        v: &mut LogEvent,
562    ) -> Result<(), String> {
563        match self.v {
564            NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
565            NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
566        };
567        Ok(())
568    }
569}
570
571pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync + DynClone {
572    fn add(&mut self, v: Value) -> Result<(), String>;
573    fn insert_into(self: Box<Self>, path: &OwnedTargetPath, v: &mut LogEvent)
574    -> Result<(), String>;
575}
576
577dyn_clone::clone_trait_object!(ReduceValueMerger);
578
579impl From<Value> for Box<dyn ReduceValueMerger> {
580    fn from(v: Value) -> Self {
581        match v {
582            Value::Integer(i) => Box::new(AddNumbersMerger::new(i.into())),
583            Value::Float(f) => Box::new(AddNumbersMerger::new(f.into())),
584            Value::Timestamp(ts) => Box::new(TimestampWindowMerger::new(ts)),
585            Value::Object(_) => Box::new(DiscardMerger::new(v)),
586            Value::Null => Box::new(DiscardMerger::new(v)),
587            Value::Boolean(_) => Box::new(DiscardMerger::new(v)),
588            Value::Bytes(_) => Box::new(DiscardMerger::new(v)),
589            Value::Regex(_) => Box::new(DiscardMerger::new(v)),
590            Value::Array(_) => Box::new(DiscardMerger::new(v)),
591        }
592    }
593}
594
595pub(crate) fn get_value_merger(
596    v: Value,
597    m: &MergeStrategy,
598) -> Result<Box<dyn ReduceValueMerger>, String> {
599    match m {
600        MergeStrategy::Sum => match v {
601            Value::Integer(i) => Ok(Box::new(AddNumbersMerger::new(i.into()))),
602            Value::Float(f) => Ok(Box::new(AddNumbersMerger::new(f.into()))),
603            _ => Err(format!(
604                "expected number value, found: '{}'",
605                v.to_string_lossy()
606            )),
607        },
608        MergeStrategy::Max => match v {
609            Value::Integer(i) => Ok(Box::new(MaxNumberMerger::new(i.into()))),
610            Value::Float(f) => Ok(Box::new(MaxNumberMerger::new(f.into()))),
611            _ => Err(format!(
612                "expected number value, found: '{}'",
613                v.to_string_lossy()
614            )),
615        },
616        MergeStrategy::Min => match v {
617            Value::Integer(i) => Ok(Box::new(MinNumberMerger::new(i.into()))),
618            Value::Float(f) => Ok(Box::new(MinNumberMerger::new(f.into()))),
619            _ => Err(format!(
620                "expected number value, found: '{}'",
621                v.to_string_lossy()
622            )),
623        },
624        MergeStrategy::Concat => match v {
625            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some(' ')))),
626            Value::Array(a) => Ok(Box::new(ConcatArrayMerger::new(a))),
627            _ => Err(format!(
628                "expected string or array value, found: '{}'",
629                v.to_string_lossy()
630            )),
631        },
632        MergeStrategy::ConcatNewline => match v {
633            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some('\n')))),
634            _ => Err(format!(
635                "expected string value, found: '{}'",
636                v.to_string_lossy()
637            )),
638        },
639        MergeStrategy::ConcatRaw => match v {
640            Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, None))),
641            _ => Err(format!(
642                "expected string value, found: '{}'",
643                v.to_string_lossy()
644            )),
645        },
646        MergeStrategy::Array => Ok(Box::new(ArrayMerger::new(v))),
647        MergeStrategy::ShortestArray => match v {
648            Value::Array(a) => Ok(Box::new(ShortestArrayMerger::new(a))),
649            _ => Err(format!(
650                "expected array value, found: '{}'",
651                v.to_string_lossy()
652            )),
653        },
654        MergeStrategy::LongestArray => match v {
655            Value::Array(a) => Ok(Box::new(LongestArrayMerger::new(a))),
656            _ => Err(format!(
657                "expected array value, found: '{}'",
658                v.to_string_lossy()
659            )),
660        },
661        MergeStrategy::Discard => Ok(Box::new(DiscardMerger::new(v))),
662        MergeStrategy::Retain => Ok(Box::new(RetainMerger::new(v))),
663        MergeStrategy::FlatUnique => Ok(Box::new(FlatUniqueMerger::new(v))),
664    }
665}
666
667#[cfg(test)]
668mod test {
669    use serde_json::json;
670    use vrl::owned_event_path;
671
672    use super::*;
673    use crate::event::LogEvent;
674
675    #[test]
676    fn initial_values() {
677        assert!(get_value_merger("foo".into(), &MergeStrategy::Discard).is_ok());
678        assert!(get_value_merger("foo".into(), &MergeStrategy::Retain).is_ok());
679        assert!(get_value_merger("foo".into(), &MergeStrategy::Sum).is_err());
680        assert!(get_value_merger("foo".into(), &MergeStrategy::Max).is_err());
681        assert!(get_value_merger("foo".into(), &MergeStrategy::Min).is_err());
682        assert!(get_value_merger("foo".into(), &MergeStrategy::Array).is_ok());
683        assert!(get_value_merger("foo".into(), &MergeStrategy::LongestArray).is_err());
684        assert!(get_value_merger("foo".into(), &MergeStrategy::ShortestArray).is_err());
685        assert!(get_value_merger("foo".into(), &MergeStrategy::Concat).is_ok());
686        assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatNewline).is_ok());
687        assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatRaw).is_ok());
688        assert!(get_value_merger("foo".into(), &MergeStrategy::FlatUnique).is_ok());
689
690        assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
691        assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
692        assert!(get_value_merger(42.into(), &MergeStrategy::Sum).is_ok());
693        assert!(get_value_merger(42.into(), &MergeStrategy::Min).is_ok());
694        assert!(get_value_merger(42.into(), &MergeStrategy::Max).is_ok());
695        assert!(get_value_merger(42.into(), &MergeStrategy::Array).is_ok());
696        assert!(get_value_merger(42.into(), &MergeStrategy::LongestArray).is_err());
697        assert!(get_value_merger(42.into(), &MergeStrategy::ShortestArray).is_err());
698        assert!(get_value_merger(42.into(), &MergeStrategy::Concat).is_err());
699        assert!(get_value_merger(42.into(), &MergeStrategy::ConcatNewline).is_err());
700        assert!(get_value_merger(42.into(), &MergeStrategy::ConcatRaw).is_err());
701        assert!(get_value_merger(42.into(), &MergeStrategy::FlatUnique).is_ok());
702
703        assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
704        assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
705        assert!(get_value_merger(4.2.into(), &MergeStrategy::Sum).is_ok());
706        assert!(get_value_merger(4.2.into(), &MergeStrategy::Min).is_ok());
707        assert!(get_value_merger(4.2.into(), &MergeStrategy::Max).is_ok());
708        assert!(get_value_merger(4.2.into(), &MergeStrategy::Array).is_ok());
709        assert!(get_value_merger(4.2.into(), &MergeStrategy::LongestArray).is_err());
710        assert!(get_value_merger(4.2.into(), &MergeStrategy::ShortestArray).is_err());
711        assert!(get_value_merger(4.2.into(), &MergeStrategy::Concat).is_err());
712        assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatNewline).is_err());
713        assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatRaw).is_err());
714        assert!(get_value_merger(4.2.into(), &MergeStrategy::FlatUnique).is_ok());
715
716        assert!(get_value_merger(true.into(), &MergeStrategy::Discard).is_ok());
717        assert!(get_value_merger(true.into(), &MergeStrategy::Retain).is_ok());
718        assert!(get_value_merger(true.into(), &MergeStrategy::Sum).is_err());
719        assert!(get_value_merger(true.into(), &MergeStrategy::Max).is_err());
720        assert!(get_value_merger(true.into(), &MergeStrategy::Min).is_err());
721        assert!(get_value_merger(true.into(), &MergeStrategy::Array).is_ok());
722        assert!(get_value_merger(true.into(), &MergeStrategy::LongestArray).is_err());
723        assert!(get_value_merger(true.into(), &MergeStrategy::ShortestArray).is_err());
724        assert!(get_value_merger(true.into(), &MergeStrategy::Concat).is_err());
725        assert!(get_value_merger(true.into(), &MergeStrategy::ConcatNewline).is_err());
726        assert!(get_value_merger(true.into(), &MergeStrategy::ConcatRaw).is_err());
727        assert!(get_value_merger(true.into(), &MergeStrategy::FlatUnique).is_ok());
728
729        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
730        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Retain).is_ok());
731        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Sum).is_err());
732        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Max).is_err());
733        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Min).is_err());
734        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Array).is_ok());
735        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::LongestArray).is_err());
736        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ShortestArray).is_err());
737        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Concat).is_err());
738        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatNewline).is_err());
739        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatRaw).is_err());
740        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
741        assert!(get_value_merger(Utc::now().into(), &MergeStrategy::FlatUnique).is_ok());
742
743        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Discard).is_ok());
744        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Retain).is_ok());
745        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Sum).is_err());
746        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Max).is_err());
747        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Min).is_err());
748        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Array).is_ok());
749        assert!(get_value_merger(json!([]).into(), &MergeStrategy::LongestArray).is_ok());
750        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ShortestArray).is_ok());
751        assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_ok());
752        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatNewline).is_err());
753        assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatRaw).is_err());
754        assert!(get_value_merger(json!([]).into(), &MergeStrategy::FlatUnique).is_ok());
755
756        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Discard).is_ok());
757        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Retain).is_ok());
758        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Sum).is_err());
759        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Max).is_err());
760        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Min).is_err());
761        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Array).is_ok());
762        assert!(get_value_merger(json!({}).into(), &MergeStrategy::LongestArray).is_err());
763        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ShortestArray).is_err());
764        assert!(get_value_merger(json!({}).into(), &MergeStrategy::Concat).is_err());
765        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatNewline).is_err());
766        assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatRaw).is_err());
767        assert!(get_value_merger(json!({}).into(), &MergeStrategy::FlatUnique).is_ok());
768
769        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Discard).is_ok());
770        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Retain).is_ok());
771        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Sum).is_err());
772        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Max).is_err());
773        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Min).is_err());
774        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Array).is_ok());
775        assert!(get_value_merger(json!(null).into(), &MergeStrategy::LongestArray).is_err());
776        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ShortestArray).is_err());
777        assert!(get_value_merger(json!(null).into(), &MergeStrategy::Concat).is_err());
778        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatNewline).is_err());
779        assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatRaw).is_err());
780        assert!(get_value_merger(json!(null).into(), &MergeStrategy::FlatUnique).is_ok());
781    }
782
783    #[test]
784    fn merging_values() {
785        assert_eq!(
786            merge("foo".into(), "bar".into(), &MergeStrategy::Discard),
787            Ok("foo".into())
788        );
789        assert_eq!(
790            merge("foo".into(), "bar".into(), &MergeStrategy::Retain),
791            Ok("bar".into())
792        );
793        assert_eq!(
794            merge("foo".into(), "bar".into(), &MergeStrategy::Array),
795            Ok(json!(["foo", "bar"]).into())
796        );
797        assert_eq!(
798            merge("foo".into(), "bar".into(), &MergeStrategy::Concat),
799            Ok("foo bar".into())
800        );
801        assert_eq!(
802            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
803            Ok("foo\nbar".into())
804        );
805        assert_eq!(
806            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatRaw),
807            Ok("foobar".into())
808        );
809        assert!(merge("foo".into(), 42.into(), &MergeStrategy::Concat).is_err());
810        assert!(merge("foo".into(), 4.2.into(), &MergeStrategy::Concat).is_err());
811        assert!(merge("foo".into(), true.into(), &MergeStrategy::Concat).is_err());
812        assert!(merge("foo".into(), Utc::now().into(), &MergeStrategy::Concat).is_err());
813        assert!(merge("foo".into(), json!({}).into(), &MergeStrategy::Concat).is_err());
814        assert!(merge("foo".into(), json!([]).into(), &MergeStrategy::Concat).is_err());
815        assert!(merge("foo".into(), json!(null).into(), &MergeStrategy::Concat).is_err());
816
817        assert_eq!(
818            merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
819            Ok("foo\nbar".into())
820        );
821
822        assert_eq!(
823            merge(21.into(), 21.into(), &MergeStrategy::Sum),
824            Ok(42.into())
825        );
826        assert_eq!(
827            merge(41.into(), 42.into(), &MergeStrategy::Max),
828            Ok(42.into())
829        );
830        assert_eq!(
831            merge(42.into(), 41.into(), &MergeStrategy::Max),
832            Ok(42.into())
833        );
834        assert_eq!(
835            merge(42.into(), 43.into(), &MergeStrategy::Min),
836            Ok(42.into())
837        );
838        assert_eq!(
839            merge(43.into(), 42.into(), &MergeStrategy::Min),
840            Ok(42.into())
841        );
842
843        assert_eq!(
844            merge(2.1.into(), 2.1.into(), &MergeStrategy::Sum),
845            Ok(4.2.into())
846        );
847        assert_eq!(
848            merge(4.1.into(), 4.2.into(), &MergeStrategy::Max),
849            Ok(4.2.into())
850        );
851        assert_eq!(
852            merge(4.2.into(), 4.1.into(), &MergeStrategy::Max),
853            Ok(4.2.into())
854        );
855        assert_eq!(
856            merge(4.2.into(), 4.3.into(), &MergeStrategy::Min),
857            Ok(4.2.into())
858        );
859        assert_eq!(
860            merge(4.3.into(), 4.2.into(), &MergeStrategy::Min),
861            Ok(4.2.into())
862        );
863
864        assert_eq!(
865            merge(
866                json!([4_i64]).into(),
867                json!([2_i64]).into(),
868                &MergeStrategy::Concat
869            ),
870            Ok(json!([4_i64, 2_i64]).into())
871        );
872        assert_eq!(
873            merge(json!([]).into(), 42_i64.into(), &MergeStrategy::Concat),
874            Ok(json!([42_i64]).into())
875        );
876
877        assert_eq!(
878            merge(
879                json!([34_i64]).into(),
880                json!([42_i64, 43_i64]).into(),
881                &MergeStrategy::ShortestArray
882            ),
883            Ok(json!([34_i64]).into())
884        );
885        assert_eq!(
886            merge(
887                json!([34_i64]).into(),
888                json!([42_i64, 43_i64]).into(),
889                &MergeStrategy::LongestArray
890            ),
891            Ok(json!([42_i64, 43_i64]).into())
892        );
893
894        let v = merge(34_i64.into(), 43_i64.into(), &MergeStrategy::FlatUnique).unwrap();
895        match v.clone() {
896            Value::Array(v) => {
897                let v: Vec<_> = v
898                    .into_iter()
899                    .map(|i| {
900                        if let Value::Integer(i) = i {
901                            i
902                        } else {
903                            panic!("Bad value");
904                        }
905                    })
906                    .collect();
907                assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
908                assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
909            }
910            _ => {
911                panic!("Not array");
912            }
913        }
914        let v = merge(v, 34_i32.into(), &MergeStrategy::FlatUnique).unwrap();
915        if let Value::Array(v) = v {
916            let v: Vec<_> = v
917                .into_iter()
918                .map(|i| {
919                    if let Value::Integer(i) = i {
920                        i
921                    } else {
922                        panic!("Bad value");
923                    }
924                })
925                .collect();
926            assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
927            assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
928        } else {
929            panic!("Not array");
930        }
931    }
932
933    fn merge(initial: Value, additional: Value, strategy: &MergeStrategy) -> Result<Value, String> {
934        let mut merger = get_value_merger(initial, strategy)?;
935        merger.add(additional)?;
936        let mut output = LogEvent::default();
937        let out_path = owned_event_path!("out");
938        merger.insert_into(&out_path, &mut output)?;
939        Ok(output.remove(&out_path).unwrap())
940    }
941}