vector/transforms/
aggregate.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10    configurable::configurable_component,
11    event::{
12        MetricValue,
13        metric::{Metric, MetricData, MetricKind, MetricSeries},
14    },
15};
16
17use crate::{
18    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
19    event::{Event, EventMetadata},
20    internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
21    schema,
22    transforms::{TaskTransform, Transform},
23};
24
25/// Configuration for the `aggregate` transform.
26#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
28#[serde(deny_unknown_fields)]
29pub struct AggregateConfig {
30    /// The interval between flushes, in milliseconds.
31    ///
32    /// During this time frame, metrics (beta) with the same series data (name, namespace, tags, and so on) are aggregated.
33    #[serde(default = "default_interval_ms")]
34    #[configurable(metadata(docs::human_name = "Flush Interval"))]
35    pub interval_ms: u64,
36    /// Function to use for aggregation.
37    ///
38    /// Some of the functions may only function on incremental and some only on absolute metrics.
39    #[serde(default = "default_mode")]
40    #[configurable(derived)]
41    pub mode: AggregationMode,
42}
43
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[configurable(description = "The aggregation mode to use.")]
47pub enum AggregationMode {
48    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
49    #[default]
50    Auto,
51
52    /// Sums incremental metrics, ignores absolute
53    Sum,
54
55    /// Returns the latest value for absolute metrics, ignores incremental
56    Latest,
57
58    /// Counts metrics for incremental and absolute metrics
59    Count,
60
61    /// Returns difference between latest value for absolute, ignores incremental
62    Diff,
63
64    /// Max value of absolute metric, ignores incremental
65    Max,
66
67    /// Min value of absolute metric, ignores incremental
68    Min,
69
70    /// Mean value of absolute metric, ignores incremental
71    Mean,
72
73    /// Stdev value of absolute metric, ignores incremental
74    Stdev,
75}
76
77const fn default_mode() -> AggregationMode {
78    AggregationMode::Auto
79}
80
81const fn default_interval_ms() -> u64 {
82    10 * 1000
83}
84
85impl_generate_config_from_default!(AggregateConfig);
86
87#[async_trait::async_trait]
88#[typetag::serde(name = "aggregate")]
89impl TransformConfig for AggregateConfig {
90    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
91        Aggregate::new(self).map(Transform::event_task)
92    }
93
94    fn input(&self) -> Input {
95        Input::metric()
96    }
97
98    fn outputs(
99        &self,
100        _: &TransformContext,
101        _: &[(OutputId, schema::Definition)],
102    ) -> Vec<TransformOutput> {
103        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
104    }
105}
106
107type MetricEntry = (MetricData, EventMetadata);
108
109#[derive(Debug)]
110pub struct Aggregate {
111    interval: Duration,
112    map: HashMap<MetricSeries, MetricEntry>,
113    prev_map: HashMap<MetricSeries, MetricEntry>,
114    multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
115    mode: AggregationMode,
116}
117
118impl Aggregate {
119    pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
120        Ok(Self {
121            interval: Duration::from_millis(config.interval_ms),
122            map: Default::default(),
123            prev_map: Default::default(),
124            multi_map: Default::default(),
125            mode: config.mode,
126        })
127    }
128
129    fn record(&mut self, event: Event) {
130        let (series, data, metadata) = event.into_metric().into_parts();
131
132        match self.mode {
133            AggregationMode::Auto => match data.kind {
134                MetricKind::Incremental => self.record_sum(series, data, metadata),
135                MetricKind::Absolute => {
136                    self.map.insert(series, (data, metadata));
137                }
138            },
139            AggregationMode::Sum => self.record_sum(series, data, metadata),
140            AggregationMode::Latest | AggregationMode::Diff => match data.kind {
141                MetricKind::Incremental => (),
142                MetricKind::Absolute => {
143                    self.map.insert(series, (data, metadata));
144                }
145            },
146            AggregationMode::Count => self.record_count(series, data, metadata),
147            AggregationMode::Max | AggregationMode::Min => {
148                self.record_comparison(series, data, metadata)
149            }
150            AggregationMode::Mean | AggregationMode::Stdev => match data.kind {
151                MetricKind::Incremental => (),
152                MetricKind::Absolute => {
153                    if matches!(data.value, MetricValue::Gauge { value: _ }) {
154                        match self.multi_map.entry(series) {
155                            Entry::Occupied(mut entry) => {
156                                let existing = entry.get_mut();
157                                existing.push((data, metadata));
158                            }
159                            Entry::Vacant(entry) => {
160                                entry.insert(vec![(data, metadata)]);
161                            }
162                        }
163                    }
164                }
165            },
166        }
167
168        emit!(AggregateEventRecorded);
169    }
170
171    fn record_count(
172        &mut self,
173        series: MetricSeries,
174        mut data: MetricData,
175        metadata: EventMetadata,
176    ) {
177        let mut count_data = data.clone();
178        let existing = self.map.entry(series).or_insert_with(|| {
179            *data.value_mut() = MetricValue::Counter { value: 0f64 };
180            (data.clone(), metadata.clone())
181        });
182        *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
183        if existing.0.kind == data.kind && existing.0.update(&count_data) {
184            existing.1.merge(metadata);
185        } else {
186            emit!(AggregateUpdateFailed);
187        }
188    }
189
190    fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
191        match data.kind {
192            MetricKind::Incremental => match self.map.entry(series) {
193                Entry::Occupied(mut entry) => {
194                    let existing = entry.get_mut();
195                    // In order to update (add) the new and old kind's must match
196                    if existing.0.kind == data.kind && existing.0.update(&data) {
197                        existing.1.merge(metadata);
198                    } else {
199                        emit!(AggregateUpdateFailed);
200                        *existing = (data, metadata);
201                    }
202                }
203                Entry::Vacant(entry) => {
204                    entry.insert((data, metadata));
205                }
206            },
207            MetricKind::Absolute => {}
208        }
209    }
210
211    fn record_comparison(
212        &mut self,
213        series: MetricSeries,
214        data: MetricData,
215        metadata: EventMetadata,
216    ) {
217        match data.kind {
218            MetricKind::Incremental => (),
219            MetricKind::Absolute => match self.map.entry(series) {
220                Entry::Occupied(mut entry) => {
221                    let existing = entry.get_mut();
222                    // In order to update (add) the new and old kind's must match
223                    if existing.0.kind == data.kind {
224                        if let MetricValue::Gauge {
225                            value: existing_value,
226                        } = existing.0.value()
227                            && let MetricValue::Gauge { value: new_value } = data.value()
228                        {
229                            let should_update = match self.mode {
230                                AggregationMode::Max => new_value > existing_value,
231                                AggregationMode::Min => new_value < existing_value,
232                                _ => false,
233                            };
234                            if should_update {
235                                *existing = (data, metadata);
236                            }
237                        }
238                    } else {
239                        emit!(AggregateUpdateFailed);
240                        *existing = (data, metadata);
241                    }
242                }
243                Entry::Vacant(entry) => {
244                    entry.insert((data, metadata));
245                }
246            },
247        }
248    }
249
250    fn flush_into(&mut self, output: &mut Vec<Event>) {
251        let map = std::mem::take(&mut self.map);
252        for (series, entry) in map.clone().into_iter() {
253            let mut metric = Metric::from_parts(series, entry.0, entry.1);
254            if matches!(self.mode, AggregationMode::Diff)
255                && let Some(prev_entry) = self.prev_map.get(metric.series())
256                && metric.data().kind == prev_entry.0.kind
257                && !metric.subtract(&prev_entry.0)
258            {
259                emit!(AggregateUpdateFailed);
260            }
261            output.push(Event::Metric(metric));
262        }
263
264        let multi_map = std::mem::take(&mut self.multi_map);
265        'outer: for (series, entries) in multi_map.into_iter() {
266            if entries.is_empty() {
267                continue;
268            }
269
270            let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
271            for (data, metadata) in entries.iter().skip(1) {
272                if !final_sum.update(data) {
273                    // Incompatible types, skip this metric
274                    emit!(AggregateUpdateFailed);
275                    continue 'outer;
276                }
277                final_metadata.merge(metadata.clone());
278            }
279
280            let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
281                // Entries are not empty so this is safe.
282                *value /= entries.len() as f64;
283                *value
284            } else {
285                0.0
286            };
287
288            let final_mean = final_sum.clone();
289            match self.mode {
290                AggregationMode::Mean => {
291                    let metric = Metric::from_parts(series, final_mean, final_metadata);
292                    output.push(Event::Metric(metric));
293                }
294                AggregationMode::Stdev => {
295                    let variance = entries
296                        .iter()
297                        .filter_map(|(data, _)| {
298                            if let MetricValue::Gauge { value } = data.value() {
299                                let diff = final_mean_value - value;
300                                Some(diff * diff)
301                            } else {
302                                None
303                            }
304                        })
305                        .sum::<f64>()
306                        / entries.len() as f64;
307                    let mut final_stdev = final_mean;
308                    if let MetricValue::Gauge { value } = final_stdev.value_mut() {
309                        *value = variance.sqrt()
310                    }
311                    let metric = Metric::from_parts(series, final_stdev, final_metadata);
312                    output.push(Event::Metric(metric));
313                }
314                _ => (),
315            }
316        }
317
318        self.prev_map = map;
319        emit!(AggregateFlushed);
320    }
321}
322
323impl TaskTransform<Event> for Aggregate {
324    fn transform(
325        mut self: Box<Self>,
326        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
327    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
328    where
329        Self: 'static,
330    {
331        let mut flush_stream = tokio::time::interval(self.interval);
332
333        Box::pin(stream! {
334            let mut output = Vec::new();
335            let mut done = false;
336            while !done {
337                tokio::select! {
338                    _ = flush_stream.tick() => {
339                        self.flush_into(&mut output);
340                    },
341                    maybe_event = input_rx.next() => {
342                        match maybe_event {
343                            None => {
344                                self.flush_into(&mut output);
345                                done = true;
346                            }
347                            Some(event) => self.record(event),
348                        }
349                    }
350                };
351                for event in output.drain(..) {
352                    yield event;
353                }
354            }
355        })
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use std::{collections::BTreeSet, sync::Arc, task::Poll};
362
363    use futures::stream;
364    use tokio::sync::mpsc;
365    use tokio_stream::wrappers::ReceiverStream;
366    use vector_lib::config::{ComponentKey, LogNamespace};
367    use vrl::value::Kind;
368
369    use super::*;
370    use crate::{
371        event::{
372            Event, Metric,
373            metric::{MetricKind, MetricValue},
374        },
375        schema::Definition,
376        test_util::components::assert_transform_compliance,
377        transforms::test::create_topology,
378    };
379
380    #[test]
381    fn generate_config() {
382        crate::test_util::test_generate_config::<AggregateConfig>();
383    }
384
385    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
386        let mut event = Event::Metric(Metric::new(name, kind, value))
387            .with_source_id(Arc::new(ComponentKey::from("in")))
388            .with_upstream_id(Arc::new(OutputId::from("transform")));
389        event.metadata_mut().set_schema_definition(&Arc::new(
390            Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
391        ));
392
393        event.metadata_mut().set_source_type("unit_test_stream");
394
395        event
396    }
397
398    #[test]
399    fn incremental_auto() {
400        let mut agg = Aggregate::new(&AggregateConfig {
401            interval_ms: 1000_u64,
402            mode: AggregationMode::Auto,
403        })
404        .unwrap();
405
406        let counter_a_1 = make_metric(
407            "counter_a",
408            MetricKind::Incremental,
409            MetricValue::Counter { value: 42.0 },
410        );
411        let counter_a_2 = make_metric(
412            "counter_a",
413            MetricKind::Incremental,
414            MetricValue::Counter { value: 43.0 },
415        );
416        let counter_a_summed = make_metric(
417            "counter_a",
418            MetricKind::Incremental,
419            MetricValue::Counter { value: 85.0 },
420        );
421
422        // Single item, just stored regardless of kind
423        agg.record(counter_a_1.clone());
424        let mut out = vec![];
425        // We should flush 1 item counter_a_1
426        agg.flush_into(&mut out);
427        assert_eq!(1, out.len());
428        assert_eq!(&counter_a_1, &out[0]);
429
430        // A subsequent flush doesn't send out anything
431        out.clear();
432        agg.flush_into(&mut out);
433        assert_eq!(0, out.len());
434
435        // One more just to make sure that we don't re-see from the other buffer
436        out.clear();
437        agg.flush_into(&mut out);
438        assert_eq!(0, out.len());
439
440        // Two increments with the same series, should sum into 1
441        agg.record(counter_a_1.clone());
442        agg.record(counter_a_2);
443        out.clear();
444        agg.flush_into(&mut out);
445        assert_eq!(1, out.len());
446        assert_eq!(&counter_a_summed, &out[0]);
447
448        let counter_b_1 = make_metric(
449            "counter_b",
450            MetricKind::Incremental,
451            MetricValue::Counter { value: 44.0 },
452        );
453        // Two increments with the different series, should get each back as-is
454        agg.record(counter_a_1.clone());
455        agg.record(counter_b_1.clone());
456        out.clear();
457        agg.flush_into(&mut out);
458        assert_eq!(2, out.len());
459        // B/c we don't know the order they'll come back
460        for event in out {
461            match event.as_metric().series().name.name.as_str() {
462                "counter_a" => assert_eq!(counter_a_1, event),
463                "counter_b" => assert_eq!(counter_b_1, event),
464                _ => panic!("Unexpected metric name in aggregate output"),
465            }
466        }
467    }
468
469    #[test]
470    fn absolute_auto() {
471        let mut agg = Aggregate::new(&AggregateConfig {
472            interval_ms: 1000_u64,
473            mode: AggregationMode::Auto,
474        })
475        .unwrap();
476
477        let gauge_a_1 = make_metric(
478            "gauge_a",
479            MetricKind::Absolute,
480            MetricValue::Gauge { value: 42.0 },
481        );
482        let gauge_a_2 = make_metric(
483            "gauge_a",
484            MetricKind::Absolute,
485            MetricValue::Gauge { value: 43.0 },
486        );
487
488        // Single item, just stored regardless of kind
489        agg.record(gauge_a_1.clone());
490        let mut out = vec![];
491        // We should flush 1 item gauge_a_1
492        agg.flush_into(&mut out);
493        assert_eq!(1, out.len());
494        assert_eq!(&gauge_a_1, &out[0]);
495
496        // A subsequent flush doesn't send out anything
497        out.clear();
498        agg.flush_into(&mut out);
499        assert_eq!(0, out.len());
500
501        // One more just to make sure that we don't re-see from the other buffer
502        out.clear();
503        agg.flush_into(&mut out);
504        assert_eq!(0, out.len());
505
506        // Two absolutes with the same series, should get the 2nd (last) back.
507        agg.record(gauge_a_1.clone());
508        agg.record(gauge_a_2.clone());
509        out.clear();
510        agg.flush_into(&mut out);
511        assert_eq!(1, out.len());
512        assert_eq!(&gauge_a_2, &out[0]);
513
514        let gauge_b_1 = make_metric(
515            "gauge_b",
516            MetricKind::Absolute,
517            MetricValue::Gauge { value: 44.0 },
518        );
519        // Two increments with the different series, should get each back as-is
520        agg.record(gauge_a_1.clone());
521        agg.record(gauge_b_1.clone());
522        out.clear();
523        agg.flush_into(&mut out);
524        assert_eq!(2, out.len());
525        // B/c we don't know the order they'll come back
526        for event in out {
527            match event.as_metric().series().name.name.as_str() {
528                "gauge_a" => assert_eq!(gauge_a_1, event),
529                "gauge_b" => assert_eq!(gauge_b_1, event),
530                _ => panic!("Unexpected metric name in aggregate output"),
531            }
532        }
533    }
534
535    #[test]
536    fn count_agg() {
537        let mut agg = Aggregate::new(&AggregateConfig {
538            interval_ms: 1000_u64,
539            mode: AggregationMode::Count,
540        })
541        .unwrap();
542
543        let gauge_a_1 = make_metric(
544            "gauge_a",
545            MetricKind::Absolute,
546            MetricValue::Gauge { value: 42.0 },
547        );
548        let gauge_a_2 = make_metric(
549            "gauge_a",
550            MetricKind::Absolute,
551            MetricValue::Gauge { value: 43.0 },
552        );
553        let result_count = make_metric(
554            "gauge_a",
555            MetricKind::Absolute,
556            MetricValue::Counter { value: 1.0 },
557        );
558        let result_count_2 = make_metric(
559            "gauge_a",
560            MetricKind::Absolute,
561            MetricValue::Counter { value: 2.0 },
562        );
563
564        // Single item, counter should be 1
565        agg.record(gauge_a_1.clone());
566        let mut out = vec![];
567        // We should flush 1 item gauge_a_1
568        agg.flush_into(&mut out);
569        assert_eq!(1, out.len());
570        assert_eq!(&result_count, &out[0]);
571
572        // A subsequent flush doesn't send out anything
573        out.clear();
574        agg.flush_into(&mut out);
575        assert_eq!(0, out.len());
576
577        // One more just to make sure that we don't re-see from the other buffer
578        out.clear();
579        agg.flush_into(&mut out);
580        assert_eq!(0, out.len());
581
582        // Two absolutes with the same series, counter should be 2
583        agg.record(gauge_a_1.clone());
584        agg.record(gauge_a_2.clone());
585        out.clear();
586        agg.flush_into(&mut out);
587        assert_eq!(1, out.len());
588        assert_eq!(&result_count_2, &out[0]);
589    }
590
591    #[test]
592    fn absolute_max() {
593        let mut agg = Aggregate::new(&AggregateConfig {
594            interval_ms: 1000_u64,
595            mode: AggregationMode::Max,
596        })
597        .unwrap();
598
599        let gauge_a_1 = make_metric(
600            "gauge_a",
601            MetricKind::Absolute,
602            MetricValue::Gauge { value: 112.0 },
603        );
604        let gauge_a_2 = make_metric(
605            "gauge_a",
606            MetricKind::Absolute,
607            MetricValue::Gauge { value: 89.0 },
608        );
609
610        // Single item, it should be returned as is
611        agg.record(gauge_a_2.clone());
612        let mut out = vec![];
613        // We should flush 1 item gauge_a_2
614        agg.flush_into(&mut out);
615        assert_eq!(1, out.len());
616        assert_eq!(&gauge_a_2, &out[0]);
617
618        // A subsequent flush doesn't send out anything
619        out.clear();
620        agg.flush_into(&mut out);
621        assert_eq!(0, out.len());
622
623        // One more just to make sure that we don't re-see from the other buffer
624        out.clear();
625        agg.flush_into(&mut out);
626        assert_eq!(0, out.len());
627
628        // Two absolutes, result should be higher of the 2
629        agg.record(gauge_a_1.clone());
630        agg.record(gauge_a_2.clone());
631        out.clear();
632        agg.flush_into(&mut out);
633        assert_eq!(1, out.len());
634        assert_eq!(&gauge_a_1, &out[0]);
635    }
636
637    #[test]
638    fn absolute_min() {
639        let mut agg = Aggregate::new(&AggregateConfig {
640            interval_ms: 1000_u64,
641            mode: AggregationMode::Min,
642        })
643        .unwrap();
644
645        let gauge_a_1 = make_metric(
646            "gauge_a",
647            MetricKind::Absolute,
648            MetricValue::Gauge { value: 32.0 },
649        );
650        let gauge_a_2 = make_metric(
651            "gauge_a",
652            MetricKind::Absolute,
653            MetricValue::Gauge { value: 89.0 },
654        );
655
656        // Single item, it should be returned as is
657        agg.record(gauge_a_2.clone());
658        let mut out = vec![];
659        // We should flush 1 item gauge_a_2
660        agg.flush_into(&mut out);
661        assert_eq!(1, out.len());
662        assert_eq!(&gauge_a_2, &out[0]);
663
664        // A subsequent flush doesn't send out anything
665        out.clear();
666        agg.flush_into(&mut out);
667        assert_eq!(0, out.len());
668
669        // One more just to make sure that we don't re-see from the other buffer
670        out.clear();
671        agg.flush_into(&mut out);
672        assert_eq!(0, out.len());
673
674        // Two absolutes, result should be lower of the 2
675        agg.record(gauge_a_1.clone());
676        agg.record(gauge_a_2.clone());
677        out.clear();
678        agg.flush_into(&mut out);
679        assert_eq!(1, out.len());
680        assert_eq!(&gauge_a_1, &out[0]);
681    }
682
683    #[test]
684    fn absolute_diff() {
685        let mut agg = Aggregate::new(&AggregateConfig {
686            interval_ms: 1000_u64,
687            mode: AggregationMode::Diff,
688        })
689        .unwrap();
690
691        let gauge_a_1 = make_metric(
692            "gauge_a",
693            MetricKind::Absolute,
694            MetricValue::Gauge { value: 32.0 },
695        );
696        let gauge_a_2 = make_metric(
697            "gauge_a",
698            MetricKind::Absolute,
699            MetricValue::Gauge { value: 82.0 },
700        );
701        let result = make_metric(
702            "gauge_a",
703            MetricKind::Absolute,
704            MetricValue::Gauge { value: 50.0 },
705        );
706
707        // Single item, it should be returned as is
708        agg.record(gauge_a_2.clone());
709        let mut out = vec![];
710        // We should flush 1 item gauge_a_2
711        agg.flush_into(&mut out);
712        assert_eq!(1, out.len());
713        assert_eq!(&gauge_a_2, &out[0]);
714
715        // A subsequent flush doesn't send out anything
716        out.clear();
717        agg.flush_into(&mut out);
718        assert_eq!(0, out.len());
719
720        // One more just to make sure that we don't re-see from the other buffer
721        out.clear();
722        agg.flush_into(&mut out);
723        assert_eq!(0, out.len());
724
725        // Two absolutes in 2 separate flushes, result should be diff between the 2
726        agg.record(gauge_a_1.clone());
727        out.clear();
728        agg.flush_into(&mut out);
729        assert_eq!(1, out.len());
730        assert_eq!(&gauge_a_1, &out[0]);
731
732        agg.record(gauge_a_2.clone());
733        out.clear();
734        agg.flush_into(&mut out);
735        assert_eq!(1, out.len());
736        assert_eq!(&result, &out[0]);
737    }
738
739    #[test]
740    fn absolute_diff_conflicting_type() {
741        let mut agg = Aggregate::new(&AggregateConfig {
742            interval_ms: 1000_u64,
743            mode: AggregationMode::Diff,
744        })
745        .unwrap();
746
747        let gauge_a_1 = make_metric(
748            "gauge_a",
749            MetricKind::Absolute,
750            MetricValue::Gauge { value: 32.0 },
751        );
752        let gauge_a_2 = make_metric(
753            "gauge_a",
754            MetricKind::Absolute,
755            MetricValue::Counter { value: 1.0 },
756        );
757
758        let mut out = vec![];
759        // Two absolutes in 2 separate flushes, result should be second one due to different types
760        agg.record(gauge_a_1.clone());
761        out.clear();
762        agg.flush_into(&mut out);
763        assert_eq!(1, out.len());
764        assert_eq!(&gauge_a_1, &out[0]);
765
766        agg.record(gauge_a_2.clone());
767        out.clear();
768        agg.flush_into(&mut out);
769        assert_eq!(1, out.len());
770        // Due to incompatible results, the new value just overwrites the old one
771        assert_eq!(&gauge_a_2, &out[0]);
772    }
773
774    #[test]
775    fn absolute_mean() {
776        let mut agg = Aggregate::new(&AggregateConfig {
777            interval_ms: 1000_u64,
778            mode: AggregationMode::Mean,
779        })
780        .unwrap();
781
782        let gauge_a_1 = make_metric(
783            "gauge_a",
784            MetricKind::Absolute,
785            MetricValue::Gauge { value: 32.0 },
786        );
787        let gauge_a_2 = make_metric(
788            "gauge_a",
789            MetricKind::Absolute,
790            MetricValue::Gauge { value: 82.0 },
791        );
792        let gauge_a_3 = make_metric(
793            "gauge_a",
794            MetricKind::Absolute,
795            MetricValue::Gauge { value: 51.0 },
796        );
797        let mean_result = make_metric(
798            "gauge_a",
799            MetricKind::Absolute,
800            MetricValue::Gauge { value: 55.0 },
801        );
802
803        // Single item, it should be returned as is
804        agg.record(gauge_a_2.clone());
805        let mut out = vec![];
806        // We should flush 1 item gauge_a_2
807        agg.flush_into(&mut out);
808        assert_eq!(1, out.len());
809        assert_eq!(&gauge_a_2, &out[0]);
810
811        // A subsequent flush doesn't send out anything
812        out.clear();
813        agg.flush_into(&mut out);
814        assert_eq!(0, out.len());
815
816        // One more just to make sure that we don't re-see from the other buffer
817        out.clear();
818        agg.flush_into(&mut out);
819        assert_eq!(0, out.len());
820
821        // Three absolutes, result should be mean
822        agg.record(gauge_a_1.clone());
823        agg.record(gauge_a_2.clone());
824        agg.record(gauge_a_3.clone());
825        out.clear();
826        agg.flush_into(&mut out);
827        assert_eq!(1, out.len());
828        assert_eq!(&mean_result, &out[0]);
829    }
830
831    #[test]
832    fn absolute_stdev() {
833        let mut agg = Aggregate::new(&AggregateConfig {
834            interval_ms: 1000_u64,
835            mode: AggregationMode::Stdev,
836        })
837        .unwrap();
838
839        let gauges = vec![
840            make_metric(
841                "gauge_a",
842                MetricKind::Absolute,
843                MetricValue::Gauge { value: 25.0 },
844            ),
845            make_metric(
846                "gauge_a",
847                MetricKind::Absolute,
848                MetricValue::Gauge { value: 30.0 },
849            ),
850            make_metric(
851                "gauge_a",
852                MetricKind::Absolute,
853                MetricValue::Gauge { value: 35.0 },
854            ),
855            make_metric(
856                "gauge_a",
857                MetricKind::Absolute,
858                MetricValue::Gauge { value: 40.0 },
859            ),
860            make_metric(
861                "gauge_a",
862                MetricKind::Absolute,
863                MetricValue::Gauge { value: 45.0 },
864            ),
865            make_metric(
866                "gauge_a",
867                MetricKind::Absolute,
868                MetricValue::Gauge { value: 50.0 },
869            ),
870            make_metric(
871                "gauge_a",
872                MetricKind::Absolute,
873                MetricValue::Gauge { value: 55.0 },
874            ),
875        ];
876        let stdev_result = make_metric(
877            "gauge_a",
878            MetricKind::Absolute,
879            MetricValue::Gauge { value: 10.0 },
880        );
881
882        for gauge in gauges {
883            agg.record(gauge);
884        }
885        let mut out = vec![];
886        agg.flush_into(&mut out);
887        assert_eq!(1, out.len());
888        assert_eq!(&stdev_result, &out[0]);
889    }
890
891    #[test]
892    fn conflicting_value_type() {
893        let mut agg = Aggregate::new(&AggregateConfig {
894            interval_ms: 1000_u64,
895            mode: AggregationMode::Auto,
896        })
897        .unwrap();
898
899        let counter = make_metric(
900            "the-thing",
901            MetricKind::Incremental,
902            MetricValue::Counter { value: 42.0 },
903        );
904        let mut values = BTreeSet::<String>::new();
905        values.insert("a".into());
906        values.insert("b".into());
907        let set = make_metric(
908            "the-thing",
909            MetricKind::Incremental,
910            MetricValue::Set { values },
911        );
912        let summed = make_metric(
913            "the-thing",
914            MetricKind::Incremental,
915            MetricValue::Counter { value: 84.0 },
916        );
917
918        // when types conflict the new values replaces whatever is there
919
920        // Start with an counter
921        agg.record(counter.clone());
922        // Another will "add" to it
923        agg.record(counter.clone());
924        // Then an set will replace it due to a failed update
925        agg.record(set.clone());
926        // Then a set union would be a noop
927        agg.record(set.clone());
928        let mut out = vec![];
929        // We should flush 1 item counter
930        agg.flush_into(&mut out);
931        assert_eq!(1, out.len());
932        assert_eq!(&set, &out[0]);
933
934        // Start out with an set
935        agg.record(set.clone());
936        // Union with itself, a noop
937        agg.record(set);
938        // Send an counter with the same name, will replace due to a failed update
939        agg.record(counter.clone());
940        // Send another counter will "add"
941        agg.record(counter);
942        let mut out = vec![];
943        // We should flush 1 item counter
944        agg.flush_into(&mut out);
945        assert_eq!(1, out.len());
946        assert_eq!(&summed, &out[0]);
947    }
948
949    #[test]
950    fn conflicting_kinds() {
951        let mut agg = Aggregate::new(&AggregateConfig {
952            interval_ms: 1000_u64,
953            mode: AggregationMode::Auto,
954        })
955        .unwrap();
956
957        let incremental = make_metric(
958            "the-thing",
959            MetricKind::Incremental,
960            MetricValue::Counter { value: 42.0 },
961        );
962        let absolute = make_metric(
963            "the-thing",
964            MetricKind::Absolute,
965            MetricValue::Counter { value: 43.0 },
966        );
967        let summed = make_metric(
968            "the-thing",
969            MetricKind::Incremental,
970            MetricValue::Counter { value: 84.0 },
971        );
972
973        // when types conflict the new values replaces whatever is there
974
975        // Start with an incremental
976        agg.record(incremental.clone());
977        // Another will "add" to it
978        agg.record(incremental.clone());
979        // Then an absolute will replace it with a failed update
980        agg.record(absolute.clone());
981        // Then another absolute will replace it normally
982        agg.record(absolute.clone());
983        let mut out = vec![];
984        // We should flush 1 item incremental
985        agg.flush_into(&mut out);
986        assert_eq!(1, out.len());
987        assert_eq!(&absolute, &out[0]);
988
989        // Start out with an absolute
990        agg.record(absolute.clone());
991        // Replace it normally
992        agg.record(absolute);
993        // Send an incremental with the same name, will replace due to a failed update
994        agg.record(incremental.clone());
995        // Send another incremental will "add"
996        agg.record(incremental);
997        let mut out = vec![];
998        // We should flush 1 item incremental
999        agg.flush_into(&mut out);
1000        assert_eq!(1, out.len());
1001        assert_eq!(&summed, &out[0]);
1002    }
1003
1004    #[tokio::test]
1005    async fn transform_shutdown() {
1006        let agg = toml::from_str::<AggregateConfig>(
1007            r"
1008interval_ms = 999999
1009",
1010        )
1011        .unwrap()
1012        .build(&TransformContext::default())
1013        .await
1014        .unwrap();
1015
1016        let agg = agg.into_task();
1017
1018        let counter_a_1 = make_metric(
1019            "counter_a",
1020            MetricKind::Incremental,
1021            MetricValue::Counter { value: 42.0 },
1022        );
1023        let counter_a_2 = make_metric(
1024            "counter_a",
1025            MetricKind::Incremental,
1026            MetricValue::Counter { value: 43.0 },
1027        );
1028        let counter_a_summed = make_metric(
1029            "counter_a",
1030            MetricKind::Incremental,
1031            MetricValue::Counter { value: 85.0 },
1032        );
1033        let gauge_a_1 = make_metric(
1034            "gauge_a",
1035            MetricKind::Absolute,
1036            MetricValue::Gauge { value: 42.0 },
1037        );
1038        let gauge_a_2 = make_metric(
1039            "gauge_a",
1040            MetricKind::Absolute,
1041            MetricValue::Gauge { value: 43.0 },
1042        );
1043        let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1044
1045        // Queue up some events to be consumed & recorded
1046        let in_stream = Box::pin(stream::iter(inputs));
1047        // Kick off the transform process which should consume & record them
1048        let mut out_stream = agg.transform_events(in_stream);
1049
1050        // B/c the input stream has ended we will have gone through the `input_rx.next() => None`
1051        // part of the loop and do the shutting down final flush immediately. We'll already be able
1052        // to read our expected bits on the output.
1053        let mut count = 0_u8;
1054        while let Some(event) = out_stream.next().await {
1055            count += 1;
1056            match event.as_metric().series().name.name.as_str() {
1057                "counter_a" => assert_eq!(counter_a_summed, event),
1058                "gauge_a" => assert_eq!(gauge_a_2, event),
1059                _ => panic!("Unexpected metric name in aggregate output"),
1060            };
1061        }
1062        // There were only 2
1063        assert_eq!(2, count);
1064    }
1065
1066    #[tokio::test]
1067    async fn transform_interval() {
1068        let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1069
1070        let counter_a_1 = make_metric(
1071            "counter_a",
1072            MetricKind::Incremental,
1073            MetricValue::Counter { value: 42.0 },
1074        );
1075        let counter_a_2 = make_metric(
1076            "counter_a",
1077            MetricKind::Incremental,
1078            MetricValue::Counter { value: 43.0 },
1079        );
1080        let counter_a_summed = make_metric(
1081            "counter_a",
1082            MetricKind::Incremental,
1083            MetricValue::Counter { value: 85.0 },
1084        );
1085        let gauge_a_1 = make_metric(
1086            "gauge_a",
1087            MetricKind::Absolute,
1088            MetricValue::Gauge { value: 42.0 },
1089        );
1090        let gauge_a_2 = make_metric(
1091            "gauge_a",
1092            MetricKind::Absolute,
1093            MetricValue::Gauge { value: 43.0 },
1094        );
1095
1096        assert_transform_compliance(async {
1097            let (tx, rx) = mpsc::channel(10);
1098            let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1099            let mut out = ReceiverStream::new(out);
1100
1101            tokio::time::pause();
1102
1103            // tokio interval is always immediately ready, so we poll once to make sure
1104            // we trip it/set the interval in the future
1105            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1106
1107            // Now send our events
1108            tx.send(counter_a_1).await.unwrap();
1109            tx.send(counter_a_2).await.unwrap();
1110            tx.send(gauge_a_1).await.unwrap();
1111            tx.send(gauge_a_2.clone()).await.unwrap();
1112            // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs
1113            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1114            // Now fast forward time enough that our flush should trigger.
1115            tokio::time::advance(Duration::from_secs(11)).await;
1116            // We should have had an interval fire now and our output aggregate events should be
1117            // available.
1118            let mut count = 0_u8;
1119            while count < 2 {
1120                match out.next().await {
1121                    Some(event) => {
1122                        match event.as_metric().series().name.name.as_str() {
1123                            "counter_a" => assert_eq!(counter_a_summed, event),
1124                            "gauge_a" => assert_eq!(gauge_a_2, event),
1125                            _ => panic!("Unexpected metric name in aggregate output"),
1126                        };
1127                        count += 1;
1128                    }
1129                    _ => {
1130                        panic!("Unexpectedly received None in output stream");
1131                    }
1132                }
1133            }
1134            // We should be back to pending, having nothing waiting for us
1135            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1136
1137            drop(tx);
1138            topology.stop().await;
1139            assert_eq!(out.next().await, None);
1140        })
1141        .await;
1142    }
1143}