vector/transforms/
aggregate.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10    configurable::configurable_component,
11    event::{
12        MetricValue,
13        metric::{Metric, MetricData, MetricKind, MetricSeries},
14    },
15};
16
17use crate::{
18    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
19    event::{Event, EventMetadata},
20    internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
21    schema,
22    transforms::{TaskTransform, Transform},
23};
24
25/// Configuration for the `aggregate` transform.
26#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
28#[serde(deny_unknown_fields)]
29pub struct AggregateConfig {
30    /// The interval between flushes, in milliseconds.
31    ///
32    /// During this time frame, metrics (beta) with the same series data (name, namespace, tags, and so on) are aggregated.
33    #[serde(default = "default_interval_ms")]
34    #[configurable(metadata(docs::human_name = "Flush Interval"))]
35    pub interval_ms: u64,
36    /// Function to use for aggregation.
37    ///
38    /// Some of the functions may only function on incremental and some only on absolute metrics.
39    #[serde(default = "default_mode")]
40    #[configurable(derived)]
41    pub mode: AggregationMode,
42}
43
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[configurable(description = "The aggregation mode to use.")]
47pub enum AggregationMode {
48    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
49    #[default]
50    Auto,
51
52    /// Sums incremental metrics, ignores absolute
53    Sum,
54
55    /// Returns the latest value for absolute metrics, ignores incremental
56    Latest,
57
58    /// Counts metrics for incremental and absolute metrics
59    Count,
60
61    /// Returns difference between latest value for absolute, ignores incremental
62    Diff,
63
64    /// Max value of absolute metric, ignores incremental
65    Max,
66
67    /// Min value of absolute metric, ignores incremental
68    Min,
69
70    /// Mean value of absolute metric, ignores incremental
71    Mean,
72
73    /// Stdev value of absolute metric, ignores incremental
74    Stdev,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78enum InnerMode {
79    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
80    #[default]
81    Auto,
82
83    /// Sums incremental metrics, ignores absolute
84    Sum,
85
86    /// Returns the latest value for absolute metrics, ignores incremental
87    Latest,
88
89    /// Counts metrics for incremental and absolute metrics
90    Count,
91
92    /// Returns difference between latest value for absolute, ignores incremental
93    Diff {
94        prev_map: HashMap<MetricSeries, MetricEntry>,
95    },
96
97    /// Max value of absolute metric, ignores incremental
98    Max,
99
100    /// Min value of absolute metric, ignores incremental
101    Min,
102
103    /// Mean value of absolute metric, ignores incremental
104    Mean {
105        multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
106    },
107
108    /// Stdev value of absolute metric, ignores incremental
109    Stdev {
110        multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
111    },
112}
113
114impl From<AggregationMode> for InnerMode {
115    fn from(value: AggregationMode) -> Self {
116        match value {
117            AggregationMode::Auto => InnerMode::Auto,
118            AggregationMode::Sum => InnerMode::Sum,
119            AggregationMode::Latest => InnerMode::Latest,
120            AggregationMode::Count => InnerMode::Count,
121            AggregationMode::Diff => InnerMode::Diff {
122                prev_map: HashMap::default(),
123            },
124            AggregationMode::Max => InnerMode::Max,
125            AggregationMode::Min => InnerMode::Min,
126            AggregationMode::Mean => InnerMode::Mean {
127                multi_map: HashMap::default(),
128            },
129            AggregationMode::Stdev => InnerMode::Stdev {
130                multi_map: HashMap::default(),
131            },
132        }
133    }
134}
135
136const fn default_mode() -> AggregationMode {
137    AggregationMode::Auto
138}
139
140const fn default_interval_ms() -> u64 {
141    10 * 1000
142}
143
144impl_generate_config_from_default!(AggregateConfig);
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "aggregate")]
148impl TransformConfig for AggregateConfig {
149    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
150        Aggregate::new(self).map(Transform::event_task)
151    }
152
153    fn input(&self) -> Input {
154        Input::metric()
155    }
156
157    fn outputs(
158        &self,
159        _: &TransformContext,
160        _: &[(OutputId, schema::Definition)],
161    ) -> Vec<TransformOutput> {
162        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
163    }
164}
165
166type MetricEntry = (MetricData, EventMetadata);
167
168#[derive(Debug)]
169pub struct Aggregate {
170    interval: Duration,
171    map: HashMap<MetricSeries, MetricEntry>,
172    mode: InnerMode,
173}
174
175impl Aggregate {
176    pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
177        Ok(Self {
178            interval: Duration::from_millis(config.interval_ms),
179            map: Default::default(),
180            mode: config.mode.into(),
181        })
182    }
183
184    fn record(&mut self, event: Event) {
185        let (series, data, metadata) = event.into_metric().into_parts();
186
187        match &mut self.mode {
188            InnerMode::Auto => match data.kind {
189                MetricKind::Incremental => self.record_sum(series, data, metadata),
190                MetricKind::Absolute => {
191                    self.map.insert(series, (data, metadata));
192                }
193            },
194            InnerMode::Sum => self.record_sum(series, data, metadata),
195            InnerMode::Latest | InnerMode::Diff { .. } => match data.kind {
196                MetricKind::Incremental => (),
197                MetricKind::Absolute => {
198                    self.map.insert(series, (data, metadata));
199                }
200            },
201            InnerMode::Count => self.record_count(series, data, metadata),
202            InnerMode::Max | InnerMode::Min => self.record_comparison(series, data, metadata),
203            InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => match data.kind {
204                MetricKind::Incremental => (),
205                MetricKind::Absolute => {
206                    if matches!(data.value, MetricValue::Gauge { value: _ }) {
207                        match multi_map.entry(series) {
208                            Entry::Occupied(mut entry) => {
209                                let existing = entry.get_mut();
210                                existing.push((data, metadata));
211                            }
212                            Entry::Vacant(entry) => {
213                                entry.insert(vec![(data, metadata)]);
214                            }
215                        }
216                    }
217                }
218            },
219        }
220
221        emit!(AggregateEventRecorded);
222    }
223
224    fn record_count(
225        &mut self,
226        series: MetricSeries,
227        mut data: MetricData,
228        metadata: EventMetadata,
229    ) {
230        let mut count_data = data.clone();
231        let existing = self.map.entry(series).or_insert_with(|| {
232            *data.value_mut() = MetricValue::Counter { value: 0f64 };
233            (data.clone(), metadata.clone())
234        });
235        *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
236        if existing.0.kind == data.kind && existing.0.update(&count_data) {
237            existing.1.merge(metadata);
238        } else {
239            emit!(AggregateUpdateFailed);
240        }
241    }
242
243    fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
244        match data.kind {
245            MetricKind::Incremental => match self.map.entry(series) {
246                Entry::Occupied(mut entry) => {
247                    let existing = entry.get_mut();
248                    // In order to update (add) the new and old kind's must match
249                    if existing.0.kind == data.kind && existing.0.update(&data) {
250                        existing.1.merge(metadata);
251                    } else {
252                        emit!(AggregateUpdateFailed);
253                        *existing = (data, metadata);
254                    }
255                }
256                Entry::Vacant(entry) => {
257                    entry.insert((data, metadata));
258                }
259            },
260            MetricKind::Absolute => {}
261        }
262    }
263
264    fn record_comparison(
265        &mut self,
266        series: MetricSeries,
267        data: MetricData,
268        metadata: EventMetadata,
269    ) {
270        match data.kind {
271            MetricKind::Incremental => (),
272            MetricKind::Absolute => match self.map.entry(series) {
273                Entry::Occupied(mut entry) => {
274                    let existing = entry.get_mut();
275                    // In order to update (add) the new and old kind's must match
276                    if existing.0.kind == data.kind {
277                        if let MetricValue::Gauge {
278                            value: existing_value,
279                        } = existing.0.value()
280                            && let MetricValue::Gauge { value: new_value } = data.value()
281                        {
282                            let should_update = match self.mode {
283                                InnerMode::Max => new_value > existing_value,
284                                InnerMode::Min => new_value < existing_value,
285                                _ => false,
286                            };
287                            if should_update {
288                                *existing = (data, metadata);
289                            }
290                        }
291                    } else {
292                        emit!(AggregateUpdateFailed);
293                        *existing = (data, metadata);
294                    }
295                }
296                Entry::Vacant(entry) => {
297                    entry.insert((data, metadata));
298                }
299            },
300        }
301    }
302
303    fn flush_into(&mut self, output: &mut Vec<Event>) {
304        let map = std::mem::take(&mut self.map);
305        for (series, entry) in map.clone().into_iter() {
306            let mut metric = Metric::from_parts(series, entry.0, entry.1);
307            if let InnerMode::Diff { prev_map } = &self.mode
308                && let Some(prev_entry) = prev_map.get(metric.series())
309                && metric.data().kind == prev_entry.0.kind
310                && !metric.subtract(&prev_entry.0)
311            {
312                emit!(AggregateUpdateFailed);
313            }
314            output.push(Event::Metric(metric));
315        }
316
317        let multi_map = match &mut self.mode {
318            InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => {
319                std::mem::take(multi_map)
320            }
321            _ => HashMap::default(),
322        };
323
324        'outer: for (series, entries) in multi_map.into_iter() {
325            if entries.is_empty() {
326                continue;
327            }
328
329            let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
330            for (data, metadata) in entries.iter().skip(1) {
331                if !final_sum.update(data) {
332                    // Incompatible types, skip this metric
333                    emit!(AggregateUpdateFailed);
334                    continue 'outer;
335                }
336                final_metadata.merge(metadata.clone());
337            }
338
339            let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
340                // Entries are not empty so this is safe.
341                *value /= entries.len() as f64;
342                *value
343            } else {
344                0.0
345            };
346
347            let final_mean = final_sum.clone();
348            match self.mode {
349                InnerMode::Mean { .. } => {
350                    let metric = Metric::from_parts(series, final_mean, final_metadata);
351                    output.push(Event::Metric(metric));
352                }
353                InnerMode::Stdev { .. } => {
354                    let variance = entries
355                        .iter()
356                        .filter_map(|(data, _)| {
357                            if let MetricValue::Gauge { value } = data.value() {
358                                let diff = final_mean_value - value;
359                                Some(diff * diff)
360                            } else {
361                                None
362                            }
363                        })
364                        .sum::<f64>()
365                        / entries.len() as f64;
366                    let mut final_stdev = final_mean;
367                    if let MetricValue::Gauge { value } = final_stdev.value_mut() {
368                        *value = variance.sqrt()
369                    }
370                    let metric = Metric::from_parts(series, final_stdev, final_metadata);
371                    output.push(Event::Metric(metric));
372                }
373                _ => (),
374            }
375        }
376
377        if let InnerMode::Diff { prev_map } = &mut self.mode {
378            *prev_map = map;
379        }
380        emit!(AggregateFlushed);
381    }
382}
383
384impl TaskTransform<Event> for Aggregate {
385    fn transform(
386        mut self: Box<Self>,
387        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
388    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
389    where
390        Self: 'static,
391    {
392        let mut flush_stream = tokio::time::interval(self.interval);
393
394        Box::pin(stream! {
395            let mut output = Vec::new();
396            let mut done = false;
397            while !done {
398                tokio::select! {
399                    _ = flush_stream.tick() => {
400                        self.flush_into(&mut output);
401                    },
402                    maybe_event = input_rx.next() => {
403                        match maybe_event {
404                            None => {
405                                self.flush_into(&mut output);
406                                done = true;
407                            }
408                            Some(event) => self.record(event),
409                        }
410                    }
411                };
412                for event in output.drain(..) {
413                    yield event;
414                }
415            }
416        })
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use std::{collections::BTreeSet, sync::Arc, task::Poll};
423
424    use futures::stream;
425    use tokio::sync::mpsc;
426    use tokio_stream::wrappers::ReceiverStream;
427    use vector_lib::config::{ComponentKey, LogNamespace};
428    use vrl::value::Kind;
429
430    use super::*;
431    use crate::{
432        event::{
433            Event, Metric,
434            metric::{MetricKind, MetricValue},
435        },
436        schema::Definition,
437        test_util::components::assert_transform_compliance,
438        transforms::test::create_topology,
439    };
440
441    #[test]
442    fn generate_config() {
443        crate::test_util::test_generate_config::<AggregateConfig>();
444    }
445
446    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
447        let mut event = Event::Metric(Metric::new(name, kind, value))
448            .with_source_id(Arc::new(ComponentKey::from("in")))
449            .with_upstream_id(Arc::new(OutputId::from("transform")));
450        event.metadata_mut().set_schema_definition(&Arc::new(
451            Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
452        ));
453
454        event.metadata_mut().set_source_type("unit_test_stream");
455
456        event
457    }
458
459    #[test]
460    fn incremental_auto() {
461        let mut agg = Aggregate::new(&AggregateConfig {
462            interval_ms: 1000_u64,
463            mode: AggregationMode::Auto,
464        })
465        .unwrap();
466
467        let counter_a_1 = make_metric(
468            "counter_a",
469            MetricKind::Incremental,
470            MetricValue::Counter { value: 42.0 },
471        );
472        let counter_a_2 = make_metric(
473            "counter_a",
474            MetricKind::Incremental,
475            MetricValue::Counter { value: 43.0 },
476        );
477        let counter_a_summed = make_metric(
478            "counter_a",
479            MetricKind::Incremental,
480            MetricValue::Counter { value: 85.0 },
481        );
482
483        // Single item, just stored regardless of kind
484        agg.record(counter_a_1.clone());
485        let mut out = vec![];
486        // We should flush 1 item counter_a_1
487        agg.flush_into(&mut out);
488        assert_eq!(1, out.len());
489        assert_eq!(&counter_a_1, &out[0]);
490
491        // A subsequent flush doesn't send out anything
492        out.clear();
493        agg.flush_into(&mut out);
494        assert_eq!(0, out.len());
495
496        // One more just to make sure that we don't re-see from the other buffer
497        out.clear();
498        agg.flush_into(&mut out);
499        assert_eq!(0, out.len());
500
501        // Two increments with the same series, should sum into 1
502        agg.record(counter_a_1.clone());
503        agg.record(counter_a_2);
504        out.clear();
505        agg.flush_into(&mut out);
506        assert_eq!(1, out.len());
507        assert_eq!(&counter_a_summed, &out[0]);
508
509        let counter_b_1 = make_metric(
510            "counter_b",
511            MetricKind::Incremental,
512            MetricValue::Counter { value: 44.0 },
513        );
514        // Two increments with the different series, should get each back as-is
515        agg.record(counter_a_1.clone());
516        agg.record(counter_b_1.clone());
517        out.clear();
518        agg.flush_into(&mut out);
519        assert_eq!(2, out.len());
520        // B/c we don't know the order they'll come back
521        for event in out {
522            match event.as_metric().series().name.name.as_str() {
523                "counter_a" => assert_eq!(counter_a_1, event),
524                "counter_b" => assert_eq!(counter_b_1, event),
525                _ => panic!("Unexpected metric name in aggregate output"),
526            }
527        }
528    }
529
530    #[test]
531    fn absolute_auto() {
532        let mut agg = Aggregate::new(&AggregateConfig {
533            interval_ms: 1000_u64,
534            mode: AggregationMode::Auto,
535        })
536        .unwrap();
537
538        let gauge_a_1 = make_metric(
539            "gauge_a",
540            MetricKind::Absolute,
541            MetricValue::Gauge { value: 42.0 },
542        );
543        let gauge_a_2 = make_metric(
544            "gauge_a",
545            MetricKind::Absolute,
546            MetricValue::Gauge { value: 43.0 },
547        );
548
549        // Single item, just stored regardless of kind
550        agg.record(gauge_a_1.clone());
551        let mut out = vec![];
552        // We should flush 1 item gauge_a_1
553        agg.flush_into(&mut out);
554        assert_eq!(1, out.len());
555        assert_eq!(&gauge_a_1, &out[0]);
556
557        // A subsequent flush doesn't send out anything
558        out.clear();
559        agg.flush_into(&mut out);
560        assert_eq!(0, out.len());
561
562        // One more just to make sure that we don't re-see from the other buffer
563        out.clear();
564        agg.flush_into(&mut out);
565        assert_eq!(0, out.len());
566
567        // Two absolutes with the same series, should get the 2nd (last) back.
568        agg.record(gauge_a_1.clone());
569        agg.record(gauge_a_2.clone());
570        out.clear();
571        agg.flush_into(&mut out);
572        assert_eq!(1, out.len());
573        assert_eq!(&gauge_a_2, &out[0]);
574
575        let gauge_b_1 = make_metric(
576            "gauge_b",
577            MetricKind::Absolute,
578            MetricValue::Gauge { value: 44.0 },
579        );
580        // Two increments with the different series, should get each back as-is
581        agg.record(gauge_a_1.clone());
582        agg.record(gauge_b_1.clone());
583        out.clear();
584        agg.flush_into(&mut out);
585        assert_eq!(2, out.len());
586        // B/c we don't know the order they'll come back
587        for event in out {
588            match event.as_metric().series().name.name.as_str() {
589                "gauge_a" => assert_eq!(gauge_a_1, event),
590                "gauge_b" => assert_eq!(gauge_b_1, event),
591                _ => panic!("Unexpected metric name in aggregate output"),
592            }
593        }
594    }
595
596    #[test]
597    fn count_agg() {
598        let mut agg = Aggregate::new(&AggregateConfig {
599            interval_ms: 1000_u64,
600            mode: AggregationMode::Count,
601        })
602        .unwrap();
603
604        let gauge_a_1 = make_metric(
605            "gauge_a",
606            MetricKind::Absolute,
607            MetricValue::Gauge { value: 42.0 },
608        );
609        let gauge_a_2 = make_metric(
610            "gauge_a",
611            MetricKind::Absolute,
612            MetricValue::Gauge { value: 43.0 },
613        );
614        let result_count = make_metric(
615            "gauge_a",
616            MetricKind::Absolute,
617            MetricValue::Counter { value: 1.0 },
618        );
619        let result_count_2 = make_metric(
620            "gauge_a",
621            MetricKind::Absolute,
622            MetricValue::Counter { value: 2.0 },
623        );
624
625        // Single item, counter should be 1
626        agg.record(gauge_a_1.clone());
627        let mut out = vec![];
628        // We should flush 1 item gauge_a_1
629        agg.flush_into(&mut out);
630        assert_eq!(1, out.len());
631        assert_eq!(&result_count, &out[0]);
632
633        // A subsequent flush doesn't send out anything
634        out.clear();
635        agg.flush_into(&mut out);
636        assert_eq!(0, out.len());
637
638        // One more just to make sure that we don't re-see from the other buffer
639        out.clear();
640        agg.flush_into(&mut out);
641        assert_eq!(0, out.len());
642
643        // Two absolutes with the same series, counter should be 2
644        agg.record(gauge_a_1.clone());
645        agg.record(gauge_a_2.clone());
646        out.clear();
647        agg.flush_into(&mut out);
648        assert_eq!(1, out.len());
649        assert_eq!(&result_count_2, &out[0]);
650    }
651
652    #[test]
653    fn absolute_max() {
654        let mut agg = Aggregate::new(&AggregateConfig {
655            interval_ms: 1000_u64,
656            mode: AggregationMode::Max,
657        })
658        .unwrap();
659
660        let gauge_a_1 = make_metric(
661            "gauge_a",
662            MetricKind::Absolute,
663            MetricValue::Gauge { value: 112.0 },
664        );
665        let gauge_a_2 = make_metric(
666            "gauge_a",
667            MetricKind::Absolute,
668            MetricValue::Gauge { value: 89.0 },
669        );
670
671        // Single item, it should be returned as is
672        agg.record(gauge_a_2.clone());
673        let mut out = vec![];
674        // We should flush 1 item gauge_a_2
675        agg.flush_into(&mut out);
676        assert_eq!(1, out.len());
677        assert_eq!(&gauge_a_2, &out[0]);
678
679        // A subsequent flush doesn't send out anything
680        out.clear();
681        agg.flush_into(&mut out);
682        assert_eq!(0, out.len());
683
684        // One more just to make sure that we don't re-see from the other buffer
685        out.clear();
686        agg.flush_into(&mut out);
687        assert_eq!(0, out.len());
688
689        // Two absolutes, result should be higher of the 2
690        agg.record(gauge_a_1.clone());
691        agg.record(gauge_a_2.clone());
692        out.clear();
693        agg.flush_into(&mut out);
694        assert_eq!(1, out.len());
695        assert_eq!(&gauge_a_1, &out[0]);
696    }
697
698    #[test]
699    fn absolute_min() {
700        let mut agg = Aggregate::new(&AggregateConfig {
701            interval_ms: 1000_u64,
702            mode: AggregationMode::Min,
703        })
704        .unwrap();
705
706        let gauge_a_1 = make_metric(
707            "gauge_a",
708            MetricKind::Absolute,
709            MetricValue::Gauge { value: 32.0 },
710        );
711        let gauge_a_2 = make_metric(
712            "gauge_a",
713            MetricKind::Absolute,
714            MetricValue::Gauge { value: 89.0 },
715        );
716
717        // Single item, it should be returned as is
718        agg.record(gauge_a_2.clone());
719        let mut out = vec![];
720        // We should flush 1 item gauge_a_2
721        agg.flush_into(&mut out);
722        assert_eq!(1, out.len());
723        assert_eq!(&gauge_a_2, &out[0]);
724
725        // A subsequent flush doesn't send out anything
726        out.clear();
727        agg.flush_into(&mut out);
728        assert_eq!(0, out.len());
729
730        // One more just to make sure that we don't re-see from the other buffer
731        out.clear();
732        agg.flush_into(&mut out);
733        assert_eq!(0, out.len());
734
735        // Two absolutes, result should be lower of the 2
736        agg.record(gauge_a_1.clone());
737        agg.record(gauge_a_2.clone());
738        out.clear();
739        agg.flush_into(&mut out);
740        assert_eq!(1, out.len());
741        assert_eq!(&gauge_a_1, &out[0]);
742    }
743
744    #[test]
745    fn absolute_diff() {
746        let mut agg = Aggregate::new(&AggregateConfig {
747            interval_ms: 1000_u64,
748            mode: AggregationMode::Diff,
749        })
750        .unwrap();
751
752        let gauge_a_1 = make_metric(
753            "gauge_a",
754            MetricKind::Absolute,
755            MetricValue::Gauge { value: 32.0 },
756        );
757        let gauge_a_2 = make_metric(
758            "gauge_a",
759            MetricKind::Absolute,
760            MetricValue::Gauge { value: 82.0 },
761        );
762        let result = make_metric(
763            "gauge_a",
764            MetricKind::Absolute,
765            MetricValue::Gauge { value: 50.0 },
766        );
767
768        // Single item, it should be returned as is
769        agg.record(gauge_a_2.clone());
770        let mut out = vec![];
771        // We should flush 1 item gauge_a_2
772        agg.flush_into(&mut out);
773        assert_eq!(1, out.len());
774        assert_eq!(&gauge_a_2, &out[0]);
775
776        // A subsequent flush doesn't send out anything
777        out.clear();
778        agg.flush_into(&mut out);
779        assert_eq!(0, out.len());
780
781        // One more just to make sure that we don't re-see from the other buffer
782        out.clear();
783        agg.flush_into(&mut out);
784        assert_eq!(0, out.len());
785
786        // Two absolutes in 2 separate flushes, result should be diff between the 2
787        agg.record(gauge_a_1.clone());
788        out.clear();
789        agg.flush_into(&mut out);
790        assert_eq!(1, out.len());
791        assert_eq!(&gauge_a_1, &out[0]);
792
793        agg.record(gauge_a_2.clone());
794        out.clear();
795        agg.flush_into(&mut out);
796        assert_eq!(1, out.len());
797        assert_eq!(&result, &out[0]);
798    }
799
800    #[test]
801    fn absolute_diff_conflicting_type() {
802        let mut agg = Aggregate::new(&AggregateConfig {
803            interval_ms: 1000_u64,
804            mode: AggregationMode::Diff,
805        })
806        .unwrap();
807
808        let gauge_a_1 = make_metric(
809            "gauge_a",
810            MetricKind::Absolute,
811            MetricValue::Gauge { value: 32.0 },
812        );
813        let gauge_a_2 = make_metric(
814            "gauge_a",
815            MetricKind::Absolute,
816            MetricValue::Counter { value: 1.0 },
817        );
818
819        let mut out = vec![];
820        // Two absolutes in 2 separate flushes, result should be second one due to different types
821        agg.record(gauge_a_1.clone());
822        out.clear();
823        agg.flush_into(&mut out);
824        assert_eq!(1, out.len());
825        assert_eq!(&gauge_a_1, &out[0]);
826
827        agg.record(gauge_a_2.clone());
828        out.clear();
829        agg.flush_into(&mut out);
830        assert_eq!(1, out.len());
831        // Due to incompatible results, the new value just overwrites the old one
832        assert_eq!(&gauge_a_2, &out[0]);
833    }
834
835    #[test]
836    fn absolute_mean() {
837        let mut agg = Aggregate::new(&AggregateConfig {
838            interval_ms: 1000_u64,
839            mode: AggregationMode::Mean,
840        })
841        .unwrap();
842
843        let gauge_a_1 = make_metric(
844            "gauge_a",
845            MetricKind::Absolute,
846            MetricValue::Gauge { value: 32.0 },
847        );
848        let gauge_a_2 = make_metric(
849            "gauge_a",
850            MetricKind::Absolute,
851            MetricValue::Gauge { value: 82.0 },
852        );
853        let gauge_a_3 = make_metric(
854            "gauge_a",
855            MetricKind::Absolute,
856            MetricValue::Gauge { value: 51.0 },
857        );
858        let mean_result = make_metric(
859            "gauge_a",
860            MetricKind::Absolute,
861            MetricValue::Gauge { value: 55.0 },
862        );
863
864        // Single item, it should be returned as is
865        agg.record(gauge_a_2.clone());
866        let mut out = vec![];
867        // We should flush 1 item gauge_a_2
868        agg.flush_into(&mut out);
869        assert_eq!(1, out.len());
870        assert_eq!(&gauge_a_2, &out[0]);
871
872        // A subsequent flush doesn't send out anything
873        out.clear();
874        agg.flush_into(&mut out);
875        assert_eq!(0, out.len());
876
877        // One more just to make sure that we don't re-see from the other buffer
878        out.clear();
879        agg.flush_into(&mut out);
880        assert_eq!(0, out.len());
881
882        // Three absolutes, result should be mean
883        agg.record(gauge_a_1.clone());
884        agg.record(gauge_a_2.clone());
885        agg.record(gauge_a_3.clone());
886        out.clear();
887        agg.flush_into(&mut out);
888        assert_eq!(1, out.len());
889        assert_eq!(&mean_result, &out[0]);
890    }
891
892    #[test]
893    fn absolute_stdev() {
894        let mut agg = Aggregate::new(&AggregateConfig {
895            interval_ms: 1000_u64,
896            mode: AggregationMode::Stdev,
897        })
898        .unwrap();
899
900        let gauges = vec![
901            make_metric(
902                "gauge_a",
903                MetricKind::Absolute,
904                MetricValue::Gauge { value: 25.0 },
905            ),
906            make_metric(
907                "gauge_a",
908                MetricKind::Absolute,
909                MetricValue::Gauge { value: 30.0 },
910            ),
911            make_metric(
912                "gauge_a",
913                MetricKind::Absolute,
914                MetricValue::Gauge { value: 35.0 },
915            ),
916            make_metric(
917                "gauge_a",
918                MetricKind::Absolute,
919                MetricValue::Gauge { value: 40.0 },
920            ),
921            make_metric(
922                "gauge_a",
923                MetricKind::Absolute,
924                MetricValue::Gauge { value: 45.0 },
925            ),
926            make_metric(
927                "gauge_a",
928                MetricKind::Absolute,
929                MetricValue::Gauge { value: 50.0 },
930            ),
931            make_metric(
932                "gauge_a",
933                MetricKind::Absolute,
934                MetricValue::Gauge { value: 55.0 },
935            ),
936        ];
937        let stdev_result = make_metric(
938            "gauge_a",
939            MetricKind::Absolute,
940            MetricValue::Gauge { value: 10.0 },
941        );
942
943        for gauge in gauges {
944            agg.record(gauge);
945        }
946        let mut out = vec![];
947        agg.flush_into(&mut out);
948        assert_eq!(1, out.len());
949        assert_eq!(&stdev_result, &out[0]);
950    }
951
952    #[test]
953    fn conflicting_value_type() {
954        let mut agg = Aggregate::new(&AggregateConfig {
955            interval_ms: 1000_u64,
956            mode: AggregationMode::Auto,
957        })
958        .unwrap();
959
960        let counter = make_metric(
961            "the-thing",
962            MetricKind::Incremental,
963            MetricValue::Counter { value: 42.0 },
964        );
965        let mut values = BTreeSet::<String>::new();
966        values.insert("a".into());
967        values.insert("b".into());
968        let set = make_metric(
969            "the-thing",
970            MetricKind::Incremental,
971            MetricValue::Set { values },
972        );
973        let summed = make_metric(
974            "the-thing",
975            MetricKind::Incremental,
976            MetricValue::Counter { value: 84.0 },
977        );
978
979        // when types conflict the new values replaces whatever is there
980
981        // Start with an counter
982        agg.record(counter.clone());
983        // Another will "add" to it
984        agg.record(counter.clone());
985        // Then an set will replace it due to a failed update
986        agg.record(set.clone());
987        // Then a set union would be a noop
988        agg.record(set.clone());
989        let mut out = vec![];
990        // We should flush 1 item counter
991        agg.flush_into(&mut out);
992        assert_eq!(1, out.len());
993        assert_eq!(&set, &out[0]);
994
995        // Start out with an set
996        agg.record(set.clone());
997        // Union with itself, a noop
998        agg.record(set);
999        // Send an counter with the same name, will replace due to a failed update
1000        agg.record(counter.clone());
1001        // Send another counter will "add"
1002        agg.record(counter);
1003        let mut out = vec![];
1004        // We should flush 1 item counter
1005        agg.flush_into(&mut out);
1006        assert_eq!(1, out.len());
1007        assert_eq!(&summed, &out[0]);
1008    }
1009
1010    #[test]
1011    fn conflicting_kinds() {
1012        let mut agg = Aggregate::new(&AggregateConfig {
1013            interval_ms: 1000_u64,
1014            mode: AggregationMode::Auto,
1015        })
1016        .unwrap();
1017
1018        let incremental = make_metric(
1019            "the-thing",
1020            MetricKind::Incremental,
1021            MetricValue::Counter { value: 42.0 },
1022        );
1023        let absolute = make_metric(
1024            "the-thing",
1025            MetricKind::Absolute,
1026            MetricValue::Counter { value: 43.0 },
1027        );
1028        let summed = make_metric(
1029            "the-thing",
1030            MetricKind::Incremental,
1031            MetricValue::Counter { value: 84.0 },
1032        );
1033
1034        // when types conflict the new values replaces whatever is there
1035
1036        // Start with an incremental
1037        agg.record(incremental.clone());
1038        // Another will "add" to it
1039        agg.record(incremental.clone());
1040        // Then an absolute will replace it with a failed update
1041        agg.record(absolute.clone());
1042        // Then another absolute will replace it normally
1043        agg.record(absolute.clone());
1044        let mut out = vec![];
1045        // We should flush 1 item incremental
1046        agg.flush_into(&mut out);
1047        assert_eq!(1, out.len());
1048        assert_eq!(&absolute, &out[0]);
1049
1050        // Start out with an absolute
1051        agg.record(absolute.clone());
1052        // Replace it normally
1053        agg.record(absolute);
1054        // Send an incremental with the same name, will replace due to a failed update
1055        agg.record(incremental.clone());
1056        // Send another incremental will "add"
1057        agg.record(incremental);
1058        let mut out = vec![];
1059        // We should flush 1 item incremental
1060        agg.flush_into(&mut out);
1061        assert_eq!(1, out.len());
1062        assert_eq!(&summed, &out[0]);
1063    }
1064
1065    #[tokio::test]
1066    async fn transform_shutdown() {
1067        let agg = toml::from_str::<AggregateConfig>(
1068            r"
1069interval_ms = 999999
1070",
1071        )
1072        .unwrap()
1073        .build(&TransformContext::default())
1074        .await
1075        .unwrap();
1076
1077        let agg = agg.into_task();
1078
1079        let counter_a_1 = make_metric(
1080            "counter_a",
1081            MetricKind::Incremental,
1082            MetricValue::Counter { value: 42.0 },
1083        );
1084        let counter_a_2 = make_metric(
1085            "counter_a",
1086            MetricKind::Incremental,
1087            MetricValue::Counter { value: 43.0 },
1088        );
1089        let counter_a_summed = make_metric(
1090            "counter_a",
1091            MetricKind::Incremental,
1092            MetricValue::Counter { value: 85.0 },
1093        );
1094        let gauge_a_1 = make_metric(
1095            "gauge_a",
1096            MetricKind::Absolute,
1097            MetricValue::Gauge { value: 42.0 },
1098        );
1099        let gauge_a_2 = make_metric(
1100            "gauge_a",
1101            MetricKind::Absolute,
1102            MetricValue::Gauge { value: 43.0 },
1103        );
1104        let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1105
1106        // Queue up some events to be consumed & recorded
1107        let in_stream = Box::pin(stream::iter(inputs));
1108        // Kick off the transform process which should consume & record them
1109        let mut out_stream = agg.transform_events(in_stream);
1110
1111        // B/c the input stream has ended we will have gone through the `input_rx.next() => None`
1112        // part of the loop and do the shutting down final flush immediately. We'll already be able
1113        // to read our expected bits on the output.
1114        let mut count = 0_u8;
1115        while let Some(event) = out_stream.next().await {
1116            count += 1;
1117            match event.as_metric().series().name.name.as_str() {
1118                "counter_a" => assert_eq!(counter_a_summed, event),
1119                "gauge_a" => assert_eq!(gauge_a_2, event),
1120                _ => panic!("Unexpected metric name in aggregate output"),
1121            };
1122        }
1123        // There were only 2
1124        assert_eq!(2, count);
1125    }
1126
1127    #[tokio::test]
1128    async fn transform_interval() {
1129        let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1130
1131        let counter_a_1 = make_metric(
1132            "counter_a",
1133            MetricKind::Incremental,
1134            MetricValue::Counter { value: 42.0 },
1135        );
1136        let counter_a_2 = make_metric(
1137            "counter_a",
1138            MetricKind::Incremental,
1139            MetricValue::Counter { value: 43.0 },
1140        );
1141        let counter_a_summed = make_metric(
1142            "counter_a",
1143            MetricKind::Incremental,
1144            MetricValue::Counter { value: 85.0 },
1145        );
1146        let gauge_a_1 = make_metric(
1147            "gauge_a",
1148            MetricKind::Absolute,
1149            MetricValue::Gauge { value: 42.0 },
1150        );
1151        let gauge_a_2 = make_metric(
1152            "gauge_a",
1153            MetricKind::Absolute,
1154            MetricValue::Gauge { value: 43.0 },
1155        );
1156
1157        assert_transform_compliance(async {
1158            let (tx, rx) = mpsc::channel(10);
1159            let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1160            let mut out = ReceiverStream::new(out);
1161
1162            tokio::time::pause();
1163
1164            // tokio interval is always immediately ready, so we poll once to make sure
1165            // we trip it/set the interval in the future
1166            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1167
1168            // Now send our events
1169            tx.send(counter_a_1).await.unwrap();
1170            tx.send(counter_a_2).await.unwrap();
1171            tx.send(gauge_a_1).await.unwrap();
1172            tx.send(gauge_a_2.clone()).await.unwrap();
1173            // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs
1174            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1175            // Now fast forward time enough that our flush should trigger.
1176            tokio::time::advance(Duration::from_secs(11)).await;
1177            // We should have had an interval fire now and our output aggregate events should be
1178            // available.
1179            let mut count = 0_u8;
1180            while count < 2 {
1181                match out.next().await {
1182                    Some(event) => {
1183                        match event.as_metric().series().name.name.as_str() {
1184                            "counter_a" => assert_eq!(counter_a_summed, event),
1185                            "gauge_a" => assert_eq!(gauge_a_2, event),
1186                            _ => panic!("Unexpected metric name in aggregate output"),
1187                        };
1188                        count += 1;
1189                    }
1190                    _ => {
1191                        panic!("Unexpectedly received None in output stream");
1192                    }
1193                }
1194            }
1195            // We should be back to pending, having nothing waiting for us
1196            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1197
1198            drop(tx);
1199            topology.stop().await;
1200            assert_eq!(out.next().await, None);
1201        })
1202        .await;
1203    }
1204}