1use std::collections::HashSet;
2
3use bytes::{Bytes, BytesMut};
4use chrono::{DateTime, Utc};
5use dyn_clone::DynClone;
6use ordered_float::NotNan;
7use vector_lib::configurable::configurable_component;
8use vrl::path::OwnedTargetPath;
9
10use crate::event::{LogEvent, Value};
11
12#[configurable_component]
14#[derive(Clone, Debug, PartialEq)]
15#[cfg_attr(feature = "proptest", derive(proptest_derive::Arbitrary))]
16#[serde(rename_all = "snake_case")]
17pub enum MergeStrategy {
18 Discard,
20
21 Retain,
25
26 Sum,
28
29 Max,
31
32 Min,
34
35 Array,
37
38 Concat,
40
41 ConcatNewline,
43
44 ConcatRaw,
46
47 ShortestArray,
49
50 LongestArray,
52
53 FlatUnique,
55}
56
57#[derive(Debug, Clone)]
58struct DiscardMerger {
59 v: Value,
60}
61
62impl DiscardMerger {
63 const fn new(v: Value) -> Self {
64 Self { v }
65 }
66}
67
68impl ReduceValueMerger for DiscardMerger {
69 fn add(&mut self, _v: Value) -> Result<(), String> {
70 Ok(())
71 }
72
73 fn insert_into(
74 self: Box<Self>,
75 path: &OwnedTargetPath,
76 v: &mut LogEvent,
77 ) -> Result<(), String> {
78 v.insert(path, self.v);
79 Ok(())
80 }
81}
82
83#[derive(Debug, Clone)]
84struct RetainMerger {
85 v: Value,
86}
87
88impl RetainMerger {
89 #[allow(clippy::missing_const_for_fn)] fn new(v: Value) -> Self {
91 Self { v }
92 }
93}
94
95impl ReduceValueMerger for RetainMerger {
96 fn add(&mut self, v: Value) -> Result<(), String> {
97 if Value::Null != v {
98 self.v = v;
99 }
100 Ok(())
101 }
102
103 fn insert_into(
104 self: Box<Self>,
105 path: &OwnedTargetPath,
106 v: &mut LogEvent,
107 ) -> Result<(), String> {
108 v.insert(path, self.v);
109 Ok(())
110 }
111}
112
113#[derive(Debug, Clone)]
114struct ConcatMerger {
115 v: BytesMut,
116 join_by: Option<Vec<u8>>,
117}
118
119impl ConcatMerger {
120 fn new(v: Bytes, join_by: Option<char>) -> Self {
121 let join_by = join_by.map(|c| c.to_string().into_bytes());
123
124 Self {
125 v: BytesMut::from(&v[..]),
126 join_by,
127 }
128 }
129}
130
131impl ReduceValueMerger for ConcatMerger {
132 fn add(&mut self, v: Value) -> Result<(), String> {
133 if let Value::Bytes(b) = v {
134 if let Some(buf) = self.join_by.as_ref() {
135 self.v.extend(&buf[..]);
136 }
137 self.v.extend_from_slice(&b);
138 Ok(())
139 } else {
140 Err(format!(
141 "expected string value, found: '{}'",
142 v.to_string_lossy()
143 ))
144 }
145 }
146
147 fn insert_into(
148 self: Box<Self>,
149 path: &OwnedTargetPath,
150 v: &mut LogEvent,
151 ) -> Result<(), String> {
152 v.insert(path, Value::Bytes(self.v.into()));
153 Ok(())
154 }
155}
156
157#[derive(Debug, Clone)]
158struct ConcatArrayMerger {
159 v: Vec<Value>,
160}
161
162impl ConcatArrayMerger {
163 const fn new(v: Vec<Value>) -> Self {
164 Self { v }
165 }
166}
167
168impl ReduceValueMerger for ConcatArrayMerger {
169 fn add(&mut self, v: Value) -> Result<(), String> {
170 if let Value::Array(a) = v {
171 self.v.extend_from_slice(&a);
172 } else {
173 self.v.push(v);
174 }
175 Ok(())
176 }
177
178 fn insert_into(
179 self: Box<Self>,
180 path: &OwnedTargetPath,
181 v: &mut LogEvent,
182 ) -> Result<(), String> {
183 v.insert(path, Value::Array(self.v));
184 Ok(())
185 }
186}
187
188#[derive(Debug, Clone)]
189struct ArrayMerger {
190 v: Vec<Value>,
191}
192
193impl ArrayMerger {
194 fn new(v: Value) -> Self {
195 Self { v: vec![v] }
196 }
197}
198
199impl ReduceValueMerger for ArrayMerger {
200 fn add(&mut self, v: Value) -> Result<(), String> {
201 self.v.push(v);
202 Ok(())
203 }
204
205 fn insert_into(
206 self: Box<Self>,
207 path: &OwnedTargetPath,
208 v: &mut LogEvent,
209 ) -> Result<(), String> {
210 v.insert(path, Value::Array(self.v));
211 Ok(())
212 }
213}
214
215#[derive(Debug, Clone)]
216struct LongestArrayMerger {
217 v: Vec<Value>,
218}
219
220impl LongestArrayMerger {
221 const fn new(v: Vec<Value>) -> Self {
222 Self { v }
223 }
224}
225
226impl ReduceValueMerger for LongestArrayMerger {
227 fn add(&mut self, v: Value) -> Result<(), String> {
228 if let Value::Array(a) = v {
229 if a.len() > self.v.len() {
230 self.v = a;
231 }
232 Ok(())
233 } else {
234 Err(format!(
235 "expected array value, found: '{}'",
236 v.to_string_lossy()
237 ))
238 }
239 }
240
241 fn insert_into(
242 self: Box<Self>,
243 path: &OwnedTargetPath,
244 v: &mut LogEvent,
245 ) -> Result<(), String> {
246 v.insert(path, Value::Array(self.v));
247 Ok(())
248 }
249}
250
251#[derive(Debug, Clone)]
252struct ShortestArrayMerger {
253 v: Vec<Value>,
254}
255
256impl ShortestArrayMerger {
257 const fn new(v: Vec<Value>) -> Self {
258 Self { v }
259 }
260}
261
262impl ReduceValueMerger for ShortestArrayMerger {
263 fn add(&mut self, v: Value) -> Result<(), String> {
264 if let Value::Array(a) = v {
265 if a.len() < self.v.len() {
266 self.v = a;
267 }
268 Ok(())
269 } else {
270 Err(format!(
271 "expected array value, found: '{}'",
272 v.to_string_lossy()
273 ))
274 }
275 }
276
277 fn insert_into(
278 self: Box<Self>,
279 path: &OwnedTargetPath,
280 v: &mut LogEvent,
281 ) -> Result<(), String> {
282 v.insert(path, Value::Array(self.v));
283 Ok(())
284 }
285}
286
287#[derive(Debug, Clone)]
288struct FlatUniqueMerger {
289 v: HashSet<Value>,
290}
291
292#[allow(clippy::mutable_key_type)] fn insert_value(h: &mut HashSet<Value>, v: Value) {
294 match v {
295 Value::Object(m) => {
296 for (_, v) in m {
297 h.insert(v);
298 }
299 }
300 Value::Array(vec) => {
301 for v in vec {
302 h.insert(v);
303 }
304 }
305 _ => {
306 h.insert(v);
307 }
308 }
309}
310
311impl FlatUniqueMerger {
312 #[allow(clippy::mutable_key_type)] fn new(v: Value) -> Self {
314 let mut h = HashSet::default();
315 insert_value(&mut h, v);
316 Self { v: h }
317 }
318}
319
320impl ReduceValueMerger for FlatUniqueMerger {
321 fn add(&mut self, v: Value) -> Result<(), String> {
322 insert_value(&mut self.v, v);
323 Ok(())
324 }
325
326 fn insert_into(
327 self: Box<Self>,
328 path: &OwnedTargetPath,
329 v: &mut LogEvent,
330 ) -> Result<(), String> {
331 v.insert(path, Value::Array(self.v.into_iter().collect()));
332 Ok(())
333 }
334}
335
336#[derive(Debug, Clone)]
337struct TimestampWindowMerger {
338 started: DateTime<Utc>,
339 latest: DateTime<Utc>,
340}
341
342impl TimestampWindowMerger {
343 const fn new(v: DateTime<Utc>) -> Self {
344 Self {
345 started: v,
346 latest: v,
347 }
348 }
349}
350
351impl ReduceValueMerger for TimestampWindowMerger {
352 fn add(&mut self, v: Value) -> Result<(), String> {
353 if let Value::Timestamp(ts) = v {
354 self.latest = ts
355 } else {
356 return Err(format!(
357 "expected timestamp value, found: {}",
358 v.to_string_lossy()
359 ));
360 }
361 Ok(())
362 }
363
364 fn insert_into(
365 self: Box<Self>,
366 path: &OwnedTargetPath,
367 v: &mut LogEvent,
368 ) -> Result<(), String> {
369 v.insert(
370 format!("{path}_end").as_str(),
371 Value::Timestamp(self.latest),
372 );
373 v.insert(path, Value::Timestamp(self.started));
374 Ok(())
375 }
376}
377
378#[derive(Debug, Clone)]
379enum NumberMergerValue {
380 Int(i64),
381 Float(NotNan<f64>),
382}
383
384impl From<i64> for NumberMergerValue {
385 fn from(v: i64) -> Self {
386 NumberMergerValue::Int(v)
387 }
388}
389
390impl From<NotNan<f64>> for NumberMergerValue {
391 fn from(v: NotNan<f64>) -> Self {
392 NumberMergerValue::Float(v)
393 }
394}
395
396#[derive(Debug, Clone)]
397struct AddNumbersMerger {
398 v: NumberMergerValue,
399}
400
401impl AddNumbersMerger {
402 const fn new(v: NumberMergerValue) -> Self {
403 Self { v }
404 }
405}
406
407impl ReduceValueMerger for AddNumbersMerger {
408 fn add(&mut self, v: Value) -> Result<(), String> {
409 match v {
412 Value::Integer(i) => match self.v {
413 NumberMergerValue::Int(j) => self.v = NumberMergerValue::Int(i + j),
414 NumberMergerValue::Float(j) => {
415 self.v = NumberMergerValue::Float(NotNan::new(i as f64).unwrap() + j)
416 }
417 },
418 Value::Float(f) => match self.v {
419 NumberMergerValue::Int(j) => self.v = NumberMergerValue::Float(f + j as f64),
420 NumberMergerValue::Float(j) => self.v = NumberMergerValue::Float(f + j),
421 },
422 _ => {
423 return Err(format!(
424 "expected numeric value, found: '{}'",
425 v.to_string_lossy()
426 ));
427 }
428 }
429 Ok(())
430 }
431
432 fn insert_into(
433 self: Box<Self>,
434 path: &OwnedTargetPath,
435 v: &mut LogEvent,
436 ) -> Result<(), String> {
437 match self.v {
438 NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
439 NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
440 };
441 Ok(())
442 }
443}
444
445#[derive(Debug, Clone)]
446struct MaxNumberMerger {
447 v: NumberMergerValue,
448}
449
450impl MaxNumberMerger {
451 const fn new(v: NumberMergerValue) -> Self {
452 Self { v }
453 }
454}
455
456impl ReduceValueMerger for MaxNumberMerger {
457 fn add(&mut self, v: Value) -> Result<(), String> {
458 match v {
461 Value::Integer(i) => {
462 match self.v {
463 NumberMergerValue::Int(i2) => {
464 if i > i2 {
465 self.v = NumberMergerValue::Int(i);
466 }
467 }
468 NumberMergerValue::Float(f2) => {
469 let f = NotNan::new(i as f64).unwrap();
470 if f > f2 {
471 self.v = NumberMergerValue::Float(f);
472 }
473 }
474 };
475 }
476 Value::Float(f) => {
477 let f2 = match self.v {
478 NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
479 NumberMergerValue::Float(f2) => f2,
480 };
481 if f > f2 {
482 self.v = NumberMergerValue::Float(f);
483 }
484 }
485 _ => {
486 return Err(format!(
487 "expected numeric value, found: '{}'",
488 v.to_string_lossy()
489 ));
490 }
491 }
492 Ok(())
493 }
494
495 fn insert_into(
496 self: Box<Self>,
497 path: &OwnedTargetPath,
498 v: &mut LogEvent,
499 ) -> Result<(), String> {
500 match self.v {
501 NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
502 NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
503 };
504 Ok(())
505 }
506}
507
508#[derive(Debug, Clone)]
509struct MinNumberMerger {
510 v: NumberMergerValue,
511}
512
513impl MinNumberMerger {
514 const fn new(v: NumberMergerValue) -> Self {
515 Self { v }
516 }
517}
518
519impl ReduceValueMerger for MinNumberMerger {
520 fn add(&mut self, v: Value) -> Result<(), String> {
521 match v {
524 Value::Integer(i) => {
525 match self.v {
526 NumberMergerValue::Int(i2) => {
527 if i < i2 {
528 self.v = NumberMergerValue::Int(i);
529 }
530 }
531 NumberMergerValue::Float(f2) => {
532 let f = NotNan::new(i as f64).unwrap();
533 if f < f2 {
534 self.v = NumberMergerValue::Float(f);
535 }
536 }
537 };
538 }
539 Value::Float(f) => {
540 let f2 = match self.v {
541 NumberMergerValue::Int(i2) => NotNan::new(i2 as f64).unwrap(),
542 NumberMergerValue::Float(f2) => f2,
543 };
544 if f < f2 {
545 self.v = NumberMergerValue::Float(f);
546 }
547 }
548 _ => {
549 return Err(format!(
550 "expected numeric value, found: '{}'",
551 v.to_string_lossy()
552 ));
553 }
554 }
555 Ok(())
556 }
557
558 fn insert_into(
559 self: Box<Self>,
560 path: &OwnedTargetPath,
561 v: &mut LogEvent,
562 ) -> Result<(), String> {
563 match self.v {
564 NumberMergerValue::Float(f) => v.insert(path, Value::Float(f)),
565 NumberMergerValue::Int(i) => v.insert(path, Value::Integer(i)),
566 };
567 Ok(())
568 }
569}
570
571pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync + DynClone {
572 fn add(&mut self, v: Value) -> Result<(), String>;
573 fn insert_into(self: Box<Self>, path: &OwnedTargetPath, v: &mut LogEvent)
574 -> Result<(), String>;
575}
576
577dyn_clone::clone_trait_object!(ReduceValueMerger);
578
579impl From<Value> for Box<dyn ReduceValueMerger> {
580 fn from(v: Value) -> Self {
581 match v {
582 Value::Integer(i) => Box::new(AddNumbersMerger::new(i.into())),
583 Value::Float(f) => Box::new(AddNumbersMerger::new(f.into())),
584 Value::Timestamp(ts) => Box::new(TimestampWindowMerger::new(ts)),
585 Value::Object(_) => Box::new(DiscardMerger::new(v)),
586 Value::Null => Box::new(DiscardMerger::new(v)),
587 Value::Boolean(_) => Box::new(DiscardMerger::new(v)),
588 Value::Bytes(_) => Box::new(DiscardMerger::new(v)),
589 Value::Regex(_) => Box::new(DiscardMerger::new(v)),
590 Value::Array(_) => Box::new(DiscardMerger::new(v)),
591 }
592 }
593}
594
595pub(crate) fn get_value_merger(
596 v: Value,
597 m: &MergeStrategy,
598) -> Result<Box<dyn ReduceValueMerger>, String> {
599 match m {
600 MergeStrategy::Sum => match v {
601 Value::Integer(i) => Ok(Box::new(AddNumbersMerger::new(i.into()))),
602 Value::Float(f) => Ok(Box::new(AddNumbersMerger::new(f.into()))),
603 _ => Err(format!(
604 "expected number value, found: '{}'",
605 v.to_string_lossy()
606 )),
607 },
608 MergeStrategy::Max => match v {
609 Value::Integer(i) => Ok(Box::new(MaxNumberMerger::new(i.into()))),
610 Value::Float(f) => Ok(Box::new(MaxNumberMerger::new(f.into()))),
611 _ => Err(format!(
612 "expected number value, found: '{}'",
613 v.to_string_lossy()
614 )),
615 },
616 MergeStrategy::Min => match v {
617 Value::Integer(i) => Ok(Box::new(MinNumberMerger::new(i.into()))),
618 Value::Float(f) => Ok(Box::new(MinNumberMerger::new(f.into()))),
619 _ => Err(format!(
620 "expected number value, found: '{}'",
621 v.to_string_lossy()
622 )),
623 },
624 MergeStrategy::Concat => match v {
625 Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some(' ')))),
626 Value::Array(a) => Ok(Box::new(ConcatArrayMerger::new(a))),
627 _ => Err(format!(
628 "expected string or array value, found: '{}'",
629 v.to_string_lossy()
630 )),
631 },
632 MergeStrategy::ConcatNewline => match v {
633 Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, Some('\n')))),
634 _ => Err(format!(
635 "expected string value, found: '{}'",
636 v.to_string_lossy()
637 )),
638 },
639 MergeStrategy::ConcatRaw => match v {
640 Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b, None))),
641 _ => Err(format!(
642 "expected string value, found: '{}'",
643 v.to_string_lossy()
644 )),
645 },
646 MergeStrategy::Array => Ok(Box::new(ArrayMerger::new(v))),
647 MergeStrategy::ShortestArray => match v {
648 Value::Array(a) => Ok(Box::new(ShortestArrayMerger::new(a))),
649 _ => Err(format!(
650 "expected array value, found: '{}'",
651 v.to_string_lossy()
652 )),
653 },
654 MergeStrategy::LongestArray => match v {
655 Value::Array(a) => Ok(Box::new(LongestArrayMerger::new(a))),
656 _ => Err(format!(
657 "expected array value, found: '{}'",
658 v.to_string_lossy()
659 )),
660 },
661 MergeStrategy::Discard => Ok(Box::new(DiscardMerger::new(v))),
662 MergeStrategy::Retain => Ok(Box::new(RetainMerger::new(v))),
663 MergeStrategy::FlatUnique => Ok(Box::new(FlatUniqueMerger::new(v))),
664 }
665}
666
667#[cfg(test)]
668mod test {
669 use serde_json::json;
670 use vrl::owned_event_path;
671
672 use super::*;
673 use crate::event::LogEvent;
674
675 #[test]
676 fn initial_values() {
677 assert!(get_value_merger("foo".into(), &MergeStrategy::Discard).is_ok());
678 assert!(get_value_merger("foo".into(), &MergeStrategy::Retain).is_ok());
679 assert!(get_value_merger("foo".into(), &MergeStrategy::Sum).is_err());
680 assert!(get_value_merger("foo".into(), &MergeStrategy::Max).is_err());
681 assert!(get_value_merger("foo".into(), &MergeStrategy::Min).is_err());
682 assert!(get_value_merger("foo".into(), &MergeStrategy::Array).is_ok());
683 assert!(get_value_merger("foo".into(), &MergeStrategy::LongestArray).is_err());
684 assert!(get_value_merger("foo".into(), &MergeStrategy::ShortestArray).is_err());
685 assert!(get_value_merger("foo".into(), &MergeStrategy::Concat).is_ok());
686 assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatNewline).is_ok());
687 assert!(get_value_merger("foo".into(), &MergeStrategy::ConcatRaw).is_ok());
688 assert!(get_value_merger("foo".into(), &MergeStrategy::FlatUnique).is_ok());
689
690 assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
691 assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
692 assert!(get_value_merger(42.into(), &MergeStrategy::Sum).is_ok());
693 assert!(get_value_merger(42.into(), &MergeStrategy::Min).is_ok());
694 assert!(get_value_merger(42.into(), &MergeStrategy::Max).is_ok());
695 assert!(get_value_merger(42.into(), &MergeStrategy::Array).is_ok());
696 assert!(get_value_merger(42.into(), &MergeStrategy::LongestArray).is_err());
697 assert!(get_value_merger(42.into(), &MergeStrategy::ShortestArray).is_err());
698 assert!(get_value_merger(42.into(), &MergeStrategy::Concat).is_err());
699 assert!(get_value_merger(42.into(), &MergeStrategy::ConcatNewline).is_err());
700 assert!(get_value_merger(42.into(), &MergeStrategy::ConcatRaw).is_err());
701 assert!(get_value_merger(42.into(), &MergeStrategy::FlatUnique).is_ok());
702
703 assert!(get_value_merger(42.into(), &MergeStrategy::Discard).is_ok());
704 assert!(get_value_merger(42.into(), &MergeStrategy::Retain).is_ok());
705 assert!(get_value_merger(4.2.into(), &MergeStrategy::Sum).is_ok());
706 assert!(get_value_merger(4.2.into(), &MergeStrategy::Min).is_ok());
707 assert!(get_value_merger(4.2.into(), &MergeStrategy::Max).is_ok());
708 assert!(get_value_merger(4.2.into(), &MergeStrategy::Array).is_ok());
709 assert!(get_value_merger(4.2.into(), &MergeStrategy::LongestArray).is_err());
710 assert!(get_value_merger(4.2.into(), &MergeStrategy::ShortestArray).is_err());
711 assert!(get_value_merger(4.2.into(), &MergeStrategy::Concat).is_err());
712 assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatNewline).is_err());
713 assert!(get_value_merger(4.2.into(), &MergeStrategy::ConcatRaw).is_err());
714 assert!(get_value_merger(4.2.into(), &MergeStrategy::FlatUnique).is_ok());
715
716 assert!(get_value_merger(true.into(), &MergeStrategy::Discard).is_ok());
717 assert!(get_value_merger(true.into(), &MergeStrategy::Retain).is_ok());
718 assert!(get_value_merger(true.into(), &MergeStrategy::Sum).is_err());
719 assert!(get_value_merger(true.into(), &MergeStrategy::Max).is_err());
720 assert!(get_value_merger(true.into(), &MergeStrategy::Min).is_err());
721 assert!(get_value_merger(true.into(), &MergeStrategy::Array).is_ok());
722 assert!(get_value_merger(true.into(), &MergeStrategy::LongestArray).is_err());
723 assert!(get_value_merger(true.into(), &MergeStrategy::ShortestArray).is_err());
724 assert!(get_value_merger(true.into(), &MergeStrategy::Concat).is_err());
725 assert!(get_value_merger(true.into(), &MergeStrategy::ConcatNewline).is_err());
726 assert!(get_value_merger(true.into(), &MergeStrategy::ConcatRaw).is_err());
727 assert!(get_value_merger(true.into(), &MergeStrategy::FlatUnique).is_ok());
728
729 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
730 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Retain).is_ok());
731 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Sum).is_err());
732 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Max).is_err());
733 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Min).is_err());
734 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Array).is_ok());
735 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::LongestArray).is_err());
736 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ShortestArray).is_err());
737 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Concat).is_err());
738 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatNewline).is_err());
739 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::ConcatRaw).is_err());
740 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::Discard).is_ok());
741 assert!(get_value_merger(Utc::now().into(), &MergeStrategy::FlatUnique).is_ok());
742
743 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Discard).is_ok());
744 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Retain).is_ok());
745 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Sum).is_err());
746 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Max).is_err());
747 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Min).is_err());
748 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Array).is_ok());
749 assert!(get_value_merger(json!([]).into(), &MergeStrategy::LongestArray).is_ok());
750 assert!(get_value_merger(json!([]).into(), &MergeStrategy::ShortestArray).is_ok());
751 assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_ok());
752 assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatNewline).is_err());
753 assert!(get_value_merger(json!([]).into(), &MergeStrategy::ConcatRaw).is_err());
754 assert!(get_value_merger(json!([]).into(), &MergeStrategy::FlatUnique).is_ok());
755
756 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Discard).is_ok());
757 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Retain).is_ok());
758 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Sum).is_err());
759 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Max).is_err());
760 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Min).is_err());
761 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Array).is_ok());
762 assert!(get_value_merger(json!({}).into(), &MergeStrategy::LongestArray).is_err());
763 assert!(get_value_merger(json!({}).into(), &MergeStrategy::ShortestArray).is_err());
764 assert!(get_value_merger(json!({}).into(), &MergeStrategy::Concat).is_err());
765 assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatNewline).is_err());
766 assert!(get_value_merger(json!({}).into(), &MergeStrategy::ConcatRaw).is_err());
767 assert!(get_value_merger(json!({}).into(), &MergeStrategy::FlatUnique).is_ok());
768
769 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Discard).is_ok());
770 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Retain).is_ok());
771 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Sum).is_err());
772 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Max).is_err());
773 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Min).is_err());
774 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Array).is_ok());
775 assert!(get_value_merger(json!(null).into(), &MergeStrategy::LongestArray).is_err());
776 assert!(get_value_merger(json!(null).into(), &MergeStrategy::ShortestArray).is_err());
777 assert!(get_value_merger(json!(null).into(), &MergeStrategy::Concat).is_err());
778 assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatNewline).is_err());
779 assert!(get_value_merger(json!(null).into(), &MergeStrategy::ConcatRaw).is_err());
780 assert!(get_value_merger(json!(null).into(), &MergeStrategy::FlatUnique).is_ok());
781 }
782
783 #[test]
784 fn merging_values() {
785 assert_eq!(
786 merge("foo".into(), "bar".into(), &MergeStrategy::Discard),
787 Ok("foo".into())
788 );
789 assert_eq!(
790 merge("foo".into(), "bar".into(), &MergeStrategy::Retain),
791 Ok("bar".into())
792 );
793 assert_eq!(
794 merge("foo".into(), "bar".into(), &MergeStrategy::Array),
795 Ok(json!(["foo", "bar"]).into())
796 );
797 assert_eq!(
798 merge("foo".into(), "bar".into(), &MergeStrategy::Concat),
799 Ok("foo bar".into())
800 );
801 assert_eq!(
802 merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
803 Ok("foo\nbar".into())
804 );
805 assert_eq!(
806 merge("foo".into(), "bar".into(), &MergeStrategy::ConcatRaw),
807 Ok("foobar".into())
808 );
809 assert!(merge("foo".into(), 42.into(), &MergeStrategy::Concat).is_err());
810 assert!(merge("foo".into(), 4.2.into(), &MergeStrategy::Concat).is_err());
811 assert!(merge("foo".into(), true.into(), &MergeStrategy::Concat).is_err());
812 assert!(merge("foo".into(), Utc::now().into(), &MergeStrategy::Concat).is_err());
813 assert!(merge("foo".into(), json!({}).into(), &MergeStrategy::Concat).is_err());
814 assert!(merge("foo".into(), json!([]).into(), &MergeStrategy::Concat).is_err());
815 assert!(merge("foo".into(), json!(null).into(), &MergeStrategy::Concat).is_err());
816
817 assert_eq!(
818 merge("foo".into(), "bar".into(), &MergeStrategy::ConcatNewline),
819 Ok("foo\nbar".into())
820 );
821
822 assert_eq!(
823 merge(21.into(), 21.into(), &MergeStrategy::Sum),
824 Ok(42.into())
825 );
826 assert_eq!(
827 merge(41.into(), 42.into(), &MergeStrategy::Max),
828 Ok(42.into())
829 );
830 assert_eq!(
831 merge(42.into(), 41.into(), &MergeStrategy::Max),
832 Ok(42.into())
833 );
834 assert_eq!(
835 merge(42.into(), 43.into(), &MergeStrategy::Min),
836 Ok(42.into())
837 );
838 assert_eq!(
839 merge(43.into(), 42.into(), &MergeStrategy::Min),
840 Ok(42.into())
841 );
842
843 assert_eq!(
844 merge(2.1.into(), 2.1.into(), &MergeStrategy::Sum),
845 Ok(4.2.into())
846 );
847 assert_eq!(
848 merge(4.1.into(), 4.2.into(), &MergeStrategy::Max),
849 Ok(4.2.into())
850 );
851 assert_eq!(
852 merge(4.2.into(), 4.1.into(), &MergeStrategy::Max),
853 Ok(4.2.into())
854 );
855 assert_eq!(
856 merge(4.2.into(), 4.3.into(), &MergeStrategy::Min),
857 Ok(4.2.into())
858 );
859 assert_eq!(
860 merge(4.3.into(), 4.2.into(), &MergeStrategy::Min),
861 Ok(4.2.into())
862 );
863
864 assert_eq!(
865 merge(
866 json!([4_i64]).into(),
867 json!([2_i64]).into(),
868 &MergeStrategy::Concat
869 ),
870 Ok(json!([4_i64, 2_i64]).into())
871 );
872 assert_eq!(
873 merge(json!([]).into(), 42_i64.into(), &MergeStrategy::Concat),
874 Ok(json!([42_i64]).into())
875 );
876
877 assert_eq!(
878 merge(
879 json!([34_i64]).into(),
880 json!([42_i64, 43_i64]).into(),
881 &MergeStrategy::ShortestArray
882 ),
883 Ok(json!([34_i64]).into())
884 );
885 assert_eq!(
886 merge(
887 json!([34_i64]).into(),
888 json!([42_i64, 43_i64]).into(),
889 &MergeStrategy::LongestArray
890 ),
891 Ok(json!([42_i64, 43_i64]).into())
892 );
893
894 let v = merge(34_i64.into(), 43_i64.into(), &MergeStrategy::FlatUnique).unwrap();
895 match v.clone() {
896 Value::Array(v) => {
897 let v: Vec<_> = v
898 .into_iter()
899 .map(|i| {
900 if let Value::Integer(i) = i {
901 i
902 } else {
903 panic!("Bad value");
904 }
905 })
906 .collect();
907 assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
908 assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
909 }
910 _ => {
911 panic!("Not array");
912 }
913 }
914 let v = merge(v, 34_i32.into(), &MergeStrategy::FlatUnique).unwrap();
915 if let Value::Array(v) = v {
916 let v: Vec<_> = v
917 .into_iter()
918 .map(|i| {
919 if let Value::Integer(i) = i {
920 i
921 } else {
922 panic!("Bad value");
923 }
924 })
925 .collect();
926 assert_eq!(v.iter().filter(|i| **i == 34i64).count(), 1);
927 assert_eq!(v.iter().filter(|i| **i == 43i64).count(), 1);
928 } else {
929 panic!("Not array");
930 }
931 }
932
933 fn merge(initial: Value, additional: Value, strategy: &MergeStrategy) -> Result<Value, String> {
934 let mut merger = get_value_merger(initial, strategy)?;
935 merger.add(additional)?;
936 let mut output = LogEvent::default();
937 let out_path = owned_event_path!("out");
938 merger.insert_into(&out_path, &mut output)?;
939 Ok(output.remove(&out_path).unwrap())
940 }
941}