vector/transforms/
aggregate.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{config::LogNamespace, event::MetricValue};
10use vector_lib::{
11    configurable::configurable_component,
12    event::metric::{Metric, MetricData, MetricKind, MetricSeries},
13};
14
15use crate::{
16    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
17    event::{Event, EventMetadata},
18    internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
19    schema,
20    transforms::{TaskTransform, Transform},
21};
22
23/// Configuration for the `aggregate` transform.
24#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
25#[derive(Clone, Debug, Default)]
26#[serde(deny_unknown_fields)]
27pub struct AggregateConfig {
28    /// The interval between flushes, in milliseconds.
29    ///
30    /// During this time frame, metrics (beta) with the same series data (name, namespace, tags, and so on) are aggregated.
31    #[serde(default = "default_interval_ms")]
32    #[configurable(metadata(docs::human_name = "Flush Interval"))]
33    pub interval_ms: u64,
34    /// Function to use for aggregation.
35    ///
36    /// Some of the functions may only function on incremental and some only on absolute metrics.
37    #[serde(default = "default_mode")]
38    #[configurable(derived)]
39    pub mode: AggregationMode,
40}
41
42#[configurable_component]
43#[derive(Clone, Debug, Default)]
44#[configurable(description = "The aggregation mode to use.")]
45pub enum AggregationMode {
46    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
47    #[default]
48    Auto,
49
50    /// Sums incremental metrics, ignores absolute
51    Sum,
52
53    /// Returns the latest value for absolute metrics, ignores incremental
54    Latest,
55
56    /// Counts metrics for incremental and absolute metrics
57    Count,
58
59    /// Returns difference between latest value for absolute, ignores incremental
60    Diff,
61
62    /// Max value of absolute metric, ignores incremental
63    Max,
64
65    /// Min value of absolute metric, ignores incremental
66    Min,
67
68    /// Mean value of absolute metric, ignores incremental
69    Mean,
70
71    /// Stdev value of absolute metric, ignores incremental
72    Stdev,
73}
74
75const fn default_mode() -> AggregationMode {
76    AggregationMode::Auto
77}
78
79const fn default_interval_ms() -> u64 {
80    10 * 1000
81}
82
83impl_generate_config_from_default!(AggregateConfig);
84
85#[async_trait::async_trait]
86#[typetag::serde(name = "aggregate")]
87impl TransformConfig for AggregateConfig {
88    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
89        Aggregate::new(self).map(Transform::event_task)
90    }
91
92    fn input(&self) -> Input {
93        Input::metric()
94    }
95
96    fn outputs(
97        &self,
98        _: vector_lib::enrichment::TableRegistry,
99        _: &[(OutputId, schema::Definition)],
100        _: LogNamespace,
101    ) -> Vec<TransformOutput> {
102        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
103    }
104}
105
106type MetricEntry = (MetricData, EventMetadata);
107
108#[derive(Debug)]
109pub struct Aggregate {
110    interval: Duration,
111    map: HashMap<MetricSeries, MetricEntry>,
112    prev_map: HashMap<MetricSeries, MetricEntry>,
113    multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
114    mode: AggregationMode,
115}
116
117impl Aggregate {
118    pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
119        Ok(Self {
120            interval: Duration::from_millis(config.interval_ms),
121            map: Default::default(),
122            prev_map: Default::default(),
123            multi_map: Default::default(),
124            mode: config.mode.clone(),
125        })
126    }
127
128    fn record(&mut self, event: Event) {
129        let (series, data, metadata) = event.into_metric().into_parts();
130
131        match self.mode {
132            AggregationMode::Auto => match data.kind {
133                MetricKind::Incremental => self.record_sum(series, data, metadata),
134                MetricKind::Absolute => {
135                    self.map.insert(series, (data, metadata));
136                }
137            },
138            AggregationMode::Sum => self.record_sum(series, data, metadata),
139            AggregationMode::Latest | AggregationMode::Diff => match data.kind {
140                MetricKind::Incremental => (),
141                MetricKind::Absolute => {
142                    self.map.insert(series, (data, metadata));
143                }
144            },
145            AggregationMode::Count => self.record_count(series, data, metadata),
146            AggregationMode::Max | AggregationMode::Min => {
147                self.record_comparison(series, data, metadata)
148            }
149            AggregationMode::Mean | AggregationMode::Stdev => match data.kind {
150                MetricKind::Incremental => (),
151                MetricKind::Absolute => {
152                    if matches!(data.value, MetricValue::Gauge { value: _ }) {
153                        match self.multi_map.entry(series) {
154                            Entry::Occupied(mut entry) => {
155                                let existing = entry.get_mut();
156                                existing.push((data, metadata));
157                            }
158                            Entry::Vacant(entry) => {
159                                entry.insert(vec![(data, metadata)]);
160                            }
161                        }
162                    }
163                }
164            },
165        }
166
167        emit!(AggregateEventRecorded);
168    }
169
170    fn record_count(
171        &mut self,
172        series: MetricSeries,
173        mut data: MetricData,
174        metadata: EventMetadata,
175    ) {
176        let mut count_data = data.clone();
177        let existing = self.map.entry(series).or_insert_with(|| {
178            *data.value_mut() = MetricValue::Counter { value: 0f64 };
179            (data.clone(), metadata.clone())
180        });
181        *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
182        if existing.0.kind == data.kind && existing.0.update(&count_data) {
183            existing.1.merge(metadata);
184        } else {
185            emit!(AggregateUpdateFailed);
186        }
187    }
188
189    fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
190        match data.kind {
191            MetricKind::Incremental => match self.map.entry(series) {
192                Entry::Occupied(mut entry) => {
193                    let existing = entry.get_mut();
194                    // In order to update (add) the new and old kind's must match
195                    if existing.0.kind == data.kind && existing.0.update(&data) {
196                        existing.1.merge(metadata);
197                    } else {
198                        emit!(AggregateUpdateFailed);
199                        *existing = (data, metadata);
200                    }
201                }
202                Entry::Vacant(entry) => {
203                    entry.insert((data, metadata));
204                }
205            },
206            MetricKind::Absolute => {}
207        }
208    }
209
210    fn record_comparison(
211        &mut self,
212        series: MetricSeries,
213        data: MetricData,
214        metadata: EventMetadata,
215    ) {
216        match data.kind {
217            MetricKind::Incremental => (),
218            MetricKind::Absolute => match self.map.entry(series) {
219                Entry::Occupied(mut entry) => {
220                    let existing = entry.get_mut();
221                    // In order to update (add) the new and old kind's must match
222                    if existing.0.kind == data.kind {
223                        if let MetricValue::Gauge {
224                            value: existing_value,
225                        } = existing.0.value()
226                        {
227                            if let MetricValue::Gauge { value: new_value } = data.value() {
228                                let should_update = match self.mode {
229                                    AggregationMode::Max => new_value > existing_value,
230                                    AggregationMode::Min => new_value < existing_value,
231                                    _ => false,
232                                };
233                                if should_update {
234                                    *existing = (data, metadata);
235                                }
236                            }
237                        }
238                    } else {
239                        emit!(AggregateUpdateFailed);
240                        *existing = (data, metadata);
241                    }
242                }
243                Entry::Vacant(entry) => {
244                    entry.insert((data, metadata));
245                }
246            },
247        }
248    }
249
250    fn flush_into(&mut self, output: &mut Vec<Event>) {
251        let map = std::mem::take(&mut self.map);
252        for (series, entry) in map.clone().into_iter() {
253            let mut metric = Metric::from_parts(series, entry.0, entry.1);
254            if matches!(self.mode, AggregationMode::Diff) {
255                if let Some(prev_entry) = self.prev_map.get(metric.series()) {
256                    if metric.data().kind == prev_entry.0.kind && !metric.subtract(&prev_entry.0) {
257                        emit!(AggregateUpdateFailed);
258                    }
259                }
260            }
261            output.push(Event::Metric(metric));
262        }
263
264        let multi_map = std::mem::take(&mut self.multi_map);
265        'outer: for (series, entries) in multi_map.into_iter() {
266            if entries.is_empty() {
267                continue;
268            }
269
270            let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
271            for (data, metadata) in entries.iter().skip(1) {
272                if !final_sum.update(data) {
273                    // Incompatible types, skip this metric
274                    emit!(AggregateUpdateFailed);
275                    continue 'outer;
276                }
277                final_metadata.merge(metadata.clone());
278            }
279
280            let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
281                // Entries are not empty so this is safe.
282                *value /= entries.len() as f64;
283                *value
284            } else {
285                0.0
286            };
287
288            let final_mean = final_sum.clone();
289            match self.mode {
290                AggregationMode::Mean => {
291                    let metric = Metric::from_parts(series, final_mean, final_metadata);
292                    output.push(Event::Metric(metric));
293                }
294                AggregationMode::Stdev => {
295                    let variance = entries
296                        .iter()
297                        .filter_map(|(data, _)| {
298                            if let MetricValue::Gauge { value } = data.value() {
299                                let diff = final_mean_value - value;
300                                Some(diff * diff)
301                            } else {
302                                None
303                            }
304                        })
305                        .sum::<f64>()
306                        / entries.len() as f64;
307                    let mut final_stdev = final_mean;
308                    if let MetricValue::Gauge { value } = final_stdev.value_mut() {
309                        *value = variance.sqrt()
310                    }
311                    let metric = Metric::from_parts(series, final_stdev, final_metadata);
312                    output.push(Event::Metric(metric));
313                }
314                _ => (),
315            }
316        }
317
318        self.prev_map = map;
319        emit!(AggregateFlushed);
320    }
321}
322
323impl TaskTransform<Event> for Aggregate {
324    fn transform(
325        mut self: Box<Self>,
326        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
327    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
328    where
329        Self: 'static,
330    {
331        let mut flush_stream = tokio::time::interval(self.interval);
332
333        Box::pin(stream! {
334            let mut output = Vec::new();
335            let mut done = false;
336            while !done {
337                tokio::select! {
338                    _ = flush_stream.tick() => {
339                        self.flush_into(&mut output);
340                    },
341                    maybe_event = input_rx.next() => {
342                        match maybe_event {
343                            None => {
344                                self.flush_into(&mut output);
345                                done = true;
346                            }
347                            Some(event) => self.record(event),
348                        }
349                    }
350                };
351                for event in output.drain(..) {
352                    yield event;
353                }
354            }
355        })
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use std::{collections::BTreeSet, sync::Arc, task::Poll};
362
363    use futures::stream;
364    use tokio::sync::mpsc;
365    use tokio_stream::wrappers::ReceiverStream;
366    use vector_lib::config::ComponentKey;
367    use vrl::value::Kind;
368
369    use super::*;
370    use crate::schema::Definition;
371    use crate::{
372        event::{
373            metric::{MetricKind, MetricValue},
374            Event, Metric,
375        },
376        test_util::components::assert_transform_compliance,
377        transforms::test::create_topology,
378    };
379
380    #[test]
381    fn generate_config() {
382        crate::test_util::test_generate_config::<AggregateConfig>();
383    }
384
385    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
386        let mut event = Event::Metric(Metric::new(name, kind, value))
387            .with_source_id(Arc::new(ComponentKey::from("in")))
388            .with_upstream_id(Arc::new(OutputId::from("transform")));
389        event.metadata_mut().set_schema_definition(&Arc::new(
390            Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
391        ));
392
393        event.metadata_mut().set_source_type("unit_test_stream");
394
395        event
396    }
397
398    #[test]
399    fn incremental_auto() {
400        let mut agg = Aggregate::new(&AggregateConfig {
401            interval_ms: 1000_u64,
402            mode: AggregationMode::Auto,
403        })
404        .unwrap();
405
406        let counter_a_1 = make_metric(
407            "counter_a",
408            MetricKind::Incremental,
409            MetricValue::Counter { value: 42.0 },
410        );
411        let counter_a_2 = make_metric(
412            "counter_a",
413            MetricKind::Incremental,
414            MetricValue::Counter { value: 43.0 },
415        );
416        let counter_a_summed = make_metric(
417            "counter_a",
418            MetricKind::Incremental,
419            MetricValue::Counter { value: 85.0 },
420        );
421
422        // Single item, just stored regardless of kind
423        agg.record(counter_a_1.clone());
424        let mut out = vec![];
425        // We should flush 1 item counter_a_1
426        agg.flush_into(&mut out);
427        assert_eq!(1, out.len());
428        assert_eq!(&counter_a_1, &out[0]);
429
430        // A subsequent flush doesn't send out anything
431        out.clear();
432        agg.flush_into(&mut out);
433        assert_eq!(0, out.len());
434
435        // One more just to make sure that we don't re-see from the other buffer
436        out.clear();
437        agg.flush_into(&mut out);
438        assert_eq!(0, out.len());
439
440        // Two increments with the same series, should sum into 1
441        agg.record(counter_a_1.clone());
442        agg.record(counter_a_2);
443        out.clear();
444        agg.flush_into(&mut out);
445        assert_eq!(1, out.len());
446        assert_eq!(&counter_a_summed, &out[0]);
447
448        let counter_b_1 = make_metric(
449            "counter_b",
450            MetricKind::Incremental,
451            MetricValue::Counter { value: 44.0 },
452        );
453        // Two increments with the different series, should get each back as-is
454        agg.record(counter_a_1.clone());
455        agg.record(counter_b_1.clone());
456        out.clear();
457        agg.flush_into(&mut out);
458        assert_eq!(2, out.len());
459        // B/c we don't know the order they'll come back
460        for event in out {
461            match event.as_metric().series().name.name.as_str() {
462                "counter_a" => assert_eq!(counter_a_1, event),
463                "counter_b" => assert_eq!(counter_b_1, event),
464                _ => panic!("Unexpected metric name in aggregate output"),
465            }
466        }
467    }
468
469    #[test]
470    fn absolute_auto() {
471        let mut agg = Aggregate::new(&AggregateConfig {
472            interval_ms: 1000_u64,
473            mode: AggregationMode::Auto,
474        })
475        .unwrap();
476
477        let gauge_a_1 = make_metric(
478            "gauge_a",
479            MetricKind::Absolute,
480            MetricValue::Gauge { value: 42.0 },
481        );
482        let gauge_a_2 = make_metric(
483            "gauge_a",
484            MetricKind::Absolute,
485            MetricValue::Gauge { value: 43.0 },
486        );
487
488        // Single item, just stored regardless of kind
489        agg.record(gauge_a_1.clone());
490        let mut out = vec![];
491        // We should flush 1 item gauge_a_1
492        agg.flush_into(&mut out);
493        assert_eq!(1, out.len());
494        assert_eq!(&gauge_a_1, &out[0]);
495
496        // A subsequent flush doesn't send out anything
497        out.clear();
498        agg.flush_into(&mut out);
499        assert_eq!(0, out.len());
500
501        // One more just to make sure that we don't re-see from the other buffer
502        out.clear();
503        agg.flush_into(&mut out);
504        assert_eq!(0, out.len());
505
506        // Two absolutes with the same series, should get the 2nd (last) back.
507        agg.record(gauge_a_1.clone());
508        agg.record(gauge_a_2.clone());
509        out.clear();
510        agg.flush_into(&mut out);
511        assert_eq!(1, out.len());
512        assert_eq!(&gauge_a_2, &out[0]);
513
514        let gauge_b_1 = make_metric(
515            "gauge_b",
516            MetricKind::Absolute,
517            MetricValue::Gauge { value: 44.0 },
518        );
519        // Two increments with the different series, should get each back as-is
520        agg.record(gauge_a_1.clone());
521        agg.record(gauge_b_1.clone());
522        out.clear();
523        agg.flush_into(&mut out);
524        assert_eq!(2, out.len());
525        // B/c we don't know the order they'll come back
526        for event in out {
527            match event.as_metric().series().name.name.as_str() {
528                "gauge_a" => assert_eq!(gauge_a_1, event),
529                "gauge_b" => assert_eq!(gauge_b_1, event),
530                _ => panic!("Unexpected metric name in aggregate output"),
531            }
532        }
533    }
534
535    #[test]
536    fn count_agg() {
537        let mut agg = Aggregate::new(&AggregateConfig {
538            interval_ms: 1000_u64,
539            mode: AggregationMode::Count,
540        })
541        .unwrap();
542
543        let gauge_a_1 = make_metric(
544            "gauge_a",
545            MetricKind::Absolute,
546            MetricValue::Gauge { value: 42.0 },
547        );
548        let gauge_a_2 = make_metric(
549            "gauge_a",
550            MetricKind::Absolute,
551            MetricValue::Gauge { value: 43.0 },
552        );
553        let result_count = make_metric(
554            "gauge_a",
555            MetricKind::Absolute,
556            MetricValue::Counter { value: 1.0 },
557        );
558        let result_count_2 = make_metric(
559            "gauge_a",
560            MetricKind::Absolute,
561            MetricValue::Counter { value: 2.0 },
562        );
563
564        // Single item, counter should be 1
565        agg.record(gauge_a_1.clone());
566        let mut out = vec![];
567        // We should flush 1 item gauge_a_1
568        agg.flush_into(&mut out);
569        assert_eq!(1, out.len());
570        assert_eq!(&result_count, &out[0]);
571
572        // A subsequent flush doesn't send out anything
573        out.clear();
574        agg.flush_into(&mut out);
575        assert_eq!(0, out.len());
576
577        // One more just to make sure that we don't re-see from the other buffer
578        out.clear();
579        agg.flush_into(&mut out);
580        assert_eq!(0, out.len());
581
582        // Two absolutes with the same series, counter should be 2
583        agg.record(gauge_a_1.clone());
584        agg.record(gauge_a_2.clone());
585        out.clear();
586        agg.flush_into(&mut out);
587        assert_eq!(1, out.len());
588        assert_eq!(&result_count_2, &out[0]);
589    }
590
591    #[test]
592    fn absolute_max() {
593        let mut agg = Aggregate::new(&AggregateConfig {
594            interval_ms: 1000_u64,
595            mode: AggregationMode::Max,
596        })
597        .unwrap();
598
599        let gauge_a_1 = make_metric(
600            "gauge_a",
601            MetricKind::Absolute,
602            MetricValue::Gauge { value: 112.0 },
603        );
604        let gauge_a_2 = make_metric(
605            "gauge_a",
606            MetricKind::Absolute,
607            MetricValue::Gauge { value: 89.0 },
608        );
609
610        // Single item, it should be returned as is
611        agg.record(gauge_a_2.clone());
612        let mut out = vec![];
613        // We should flush 1 item gauge_a_2
614        agg.flush_into(&mut out);
615        assert_eq!(1, out.len());
616        assert_eq!(&gauge_a_2, &out[0]);
617
618        // A subsequent flush doesn't send out anything
619        out.clear();
620        agg.flush_into(&mut out);
621        assert_eq!(0, out.len());
622
623        // One more just to make sure that we don't re-see from the other buffer
624        out.clear();
625        agg.flush_into(&mut out);
626        assert_eq!(0, out.len());
627
628        // Two absolutes, result should be higher of the 2
629        agg.record(gauge_a_1.clone());
630        agg.record(gauge_a_2.clone());
631        out.clear();
632        agg.flush_into(&mut out);
633        assert_eq!(1, out.len());
634        assert_eq!(&gauge_a_1, &out[0]);
635    }
636
637    #[test]
638    fn absolute_min() {
639        let mut agg = Aggregate::new(&AggregateConfig {
640            interval_ms: 1000_u64,
641            mode: AggregationMode::Min,
642        })
643        .unwrap();
644
645        let gauge_a_1 = make_metric(
646            "gauge_a",
647            MetricKind::Absolute,
648            MetricValue::Gauge { value: 32.0 },
649        );
650        let gauge_a_2 = make_metric(
651            "gauge_a",
652            MetricKind::Absolute,
653            MetricValue::Gauge { value: 89.0 },
654        );
655
656        // Single item, it should be returned as is
657        agg.record(gauge_a_2.clone());
658        let mut out = vec![];
659        // We should flush 1 item gauge_a_2
660        agg.flush_into(&mut out);
661        assert_eq!(1, out.len());
662        assert_eq!(&gauge_a_2, &out[0]);
663
664        // A subsequent flush doesn't send out anything
665        out.clear();
666        agg.flush_into(&mut out);
667        assert_eq!(0, out.len());
668
669        // One more just to make sure that we don't re-see from the other buffer
670        out.clear();
671        agg.flush_into(&mut out);
672        assert_eq!(0, out.len());
673
674        // Two absolutes, result should be lower of the 2
675        agg.record(gauge_a_1.clone());
676        agg.record(gauge_a_2.clone());
677        out.clear();
678        agg.flush_into(&mut out);
679        assert_eq!(1, out.len());
680        assert_eq!(&gauge_a_1, &out[0]);
681    }
682
683    #[test]
684    fn absolute_diff() {
685        let mut agg = Aggregate::new(&AggregateConfig {
686            interval_ms: 1000_u64,
687            mode: AggregationMode::Diff,
688        })
689        .unwrap();
690
691        let gauge_a_1 = make_metric(
692            "gauge_a",
693            MetricKind::Absolute,
694            MetricValue::Gauge { value: 32.0 },
695        );
696        let gauge_a_2 = make_metric(
697            "gauge_a",
698            MetricKind::Absolute,
699            MetricValue::Gauge { value: 82.0 },
700        );
701        let result = make_metric(
702            "gauge_a",
703            MetricKind::Absolute,
704            MetricValue::Gauge { value: 50.0 },
705        );
706
707        // Single item, it should be returned as is
708        agg.record(gauge_a_2.clone());
709        let mut out = vec![];
710        // We should flush 1 item gauge_a_2
711        agg.flush_into(&mut out);
712        assert_eq!(1, out.len());
713        assert_eq!(&gauge_a_2, &out[0]);
714
715        // A subsequent flush doesn't send out anything
716        out.clear();
717        agg.flush_into(&mut out);
718        assert_eq!(0, out.len());
719
720        // One more just to make sure that we don't re-see from the other buffer
721        out.clear();
722        agg.flush_into(&mut out);
723        assert_eq!(0, out.len());
724
725        // Two absolutes in 2 separate flushes, result should be diff between the 2
726        agg.record(gauge_a_1.clone());
727        out.clear();
728        agg.flush_into(&mut out);
729        assert_eq!(1, out.len());
730        assert_eq!(&gauge_a_1, &out[0]);
731
732        agg.record(gauge_a_2.clone());
733        out.clear();
734        agg.flush_into(&mut out);
735        assert_eq!(1, out.len());
736        assert_eq!(&result, &out[0]);
737    }
738
739    #[test]
740    fn absolute_diff_conflicting_type() {
741        let mut agg = Aggregate::new(&AggregateConfig {
742            interval_ms: 1000_u64,
743            mode: AggregationMode::Diff,
744        })
745        .unwrap();
746
747        let gauge_a_1 = make_metric(
748            "gauge_a",
749            MetricKind::Absolute,
750            MetricValue::Gauge { value: 32.0 },
751        );
752        let gauge_a_2 = make_metric(
753            "gauge_a",
754            MetricKind::Absolute,
755            MetricValue::Counter { value: 1.0 },
756        );
757
758        let mut out = vec![];
759        // Two absolutes in 2 separate flushes, result should be second one due to different types
760        agg.record(gauge_a_1.clone());
761        out.clear();
762        agg.flush_into(&mut out);
763        assert_eq!(1, out.len());
764        assert_eq!(&gauge_a_1, &out[0]);
765
766        agg.record(gauge_a_2.clone());
767        out.clear();
768        agg.flush_into(&mut out);
769        assert_eq!(1, out.len());
770        // Due to incompatible results, the new value just overwrites the old one
771        assert_eq!(&gauge_a_2, &out[0]);
772    }
773
774    #[test]
775    fn absolute_mean() {
776        let mut agg = Aggregate::new(&AggregateConfig {
777            interval_ms: 1000_u64,
778            mode: AggregationMode::Mean,
779        })
780        .unwrap();
781
782        let gauge_a_1 = make_metric(
783            "gauge_a",
784            MetricKind::Absolute,
785            MetricValue::Gauge { value: 32.0 },
786        );
787        let gauge_a_2 = make_metric(
788            "gauge_a",
789            MetricKind::Absolute,
790            MetricValue::Gauge { value: 82.0 },
791        );
792        let gauge_a_3 = make_metric(
793            "gauge_a",
794            MetricKind::Absolute,
795            MetricValue::Gauge { value: 51.0 },
796        );
797        let mean_result = make_metric(
798            "gauge_a",
799            MetricKind::Absolute,
800            MetricValue::Gauge { value: 55.0 },
801        );
802
803        // Single item, it should be returned as is
804        agg.record(gauge_a_2.clone());
805        let mut out = vec![];
806        // We should flush 1 item gauge_a_2
807        agg.flush_into(&mut out);
808        assert_eq!(1, out.len());
809        assert_eq!(&gauge_a_2, &out[0]);
810
811        // A subsequent flush doesn't send out anything
812        out.clear();
813        agg.flush_into(&mut out);
814        assert_eq!(0, out.len());
815
816        // One more just to make sure that we don't re-see from the other buffer
817        out.clear();
818        agg.flush_into(&mut out);
819        assert_eq!(0, out.len());
820
821        // Three absolutes, result should be mean
822        agg.record(gauge_a_1.clone());
823        agg.record(gauge_a_2.clone());
824        agg.record(gauge_a_3.clone());
825        out.clear();
826        agg.flush_into(&mut out);
827        assert_eq!(1, out.len());
828        assert_eq!(&mean_result, &out[0]);
829    }
830
831    #[test]
832    fn absolute_stdev() {
833        let mut agg = Aggregate::new(&AggregateConfig {
834            interval_ms: 1000_u64,
835            mode: AggregationMode::Stdev,
836        })
837        .unwrap();
838
839        let gauges = vec![
840            make_metric(
841                "gauge_a",
842                MetricKind::Absolute,
843                MetricValue::Gauge { value: 25.0 },
844            ),
845            make_metric(
846                "gauge_a",
847                MetricKind::Absolute,
848                MetricValue::Gauge { value: 30.0 },
849            ),
850            make_metric(
851                "gauge_a",
852                MetricKind::Absolute,
853                MetricValue::Gauge { value: 35.0 },
854            ),
855            make_metric(
856                "gauge_a",
857                MetricKind::Absolute,
858                MetricValue::Gauge { value: 40.0 },
859            ),
860            make_metric(
861                "gauge_a",
862                MetricKind::Absolute,
863                MetricValue::Gauge { value: 45.0 },
864            ),
865            make_metric(
866                "gauge_a",
867                MetricKind::Absolute,
868                MetricValue::Gauge { value: 50.0 },
869            ),
870            make_metric(
871                "gauge_a",
872                MetricKind::Absolute,
873                MetricValue::Gauge { value: 55.0 },
874            ),
875        ];
876        let stdev_result = make_metric(
877            "gauge_a",
878            MetricKind::Absolute,
879            MetricValue::Gauge { value: 10.0 },
880        );
881
882        for gauge in gauges {
883            agg.record(gauge);
884        }
885        let mut out = vec![];
886        agg.flush_into(&mut out);
887        assert_eq!(1, out.len());
888        assert_eq!(&stdev_result, &out[0]);
889    }
890
891    #[test]
892    fn conflicting_value_type() {
893        let mut agg = Aggregate::new(&AggregateConfig {
894            interval_ms: 1000_u64,
895            mode: AggregationMode::Auto,
896        })
897        .unwrap();
898
899        let counter = make_metric(
900            "the-thing",
901            MetricKind::Incremental,
902            MetricValue::Counter { value: 42.0 },
903        );
904        let mut values = BTreeSet::<String>::new();
905        values.insert("a".into());
906        values.insert("b".into());
907        let set = make_metric(
908            "the-thing",
909            MetricKind::Incremental,
910            MetricValue::Set { values },
911        );
912        let summed = make_metric(
913            "the-thing",
914            MetricKind::Incremental,
915            MetricValue::Counter { value: 84.0 },
916        );
917
918        // when types conflict the new values replaces whatever is there
919
920        // Start with an counter
921        agg.record(counter.clone());
922        // Another will "add" to it
923        agg.record(counter.clone());
924        // Then an set will replace it due to a failed update
925        agg.record(set.clone());
926        // Then a set union would be a noop
927        agg.record(set.clone());
928        let mut out = vec![];
929        // We should flush 1 item counter
930        agg.flush_into(&mut out);
931        assert_eq!(1, out.len());
932        assert_eq!(&set, &out[0]);
933
934        // Start out with an set
935        agg.record(set.clone());
936        // Union with itself, a noop
937        agg.record(set);
938        // Send an counter with the same name, will replace due to a failed update
939        agg.record(counter.clone());
940        // Send another counter will "add"
941        agg.record(counter);
942        let mut out = vec![];
943        // We should flush 1 item counter
944        agg.flush_into(&mut out);
945        assert_eq!(1, out.len());
946        assert_eq!(&summed, &out[0]);
947    }
948
949    #[test]
950    fn conflicting_kinds() {
951        let mut agg = Aggregate::new(&AggregateConfig {
952            interval_ms: 1000_u64,
953            mode: AggregationMode::Auto,
954        })
955        .unwrap();
956
957        let incremental = make_metric(
958            "the-thing",
959            MetricKind::Incremental,
960            MetricValue::Counter { value: 42.0 },
961        );
962        let absolute = make_metric(
963            "the-thing",
964            MetricKind::Absolute,
965            MetricValue::Counter { value: 43.0 },
966        );
967        let summed = make_metric(
968            "the-thing",
969            MetricKind::Incremental,
970            MetricValue::Counter { value: 84.0 },
971        );
972
973        // when types conflict the new values replaces whatever is there
974
975        // Start with an incremental
976        agg.record(incremental.clone());
977        // Another will "add" to it
978        agg.record(incremental.clone());
979        // Then an absolute will replace it with a failed update
980        agg.record(absolute.clone());
981        // Then another absolute will replace it normally
982        agg.record(absolute.clone());
983        let mut out = vec![];
984        // We should flush 1 item incremental
985        agg.flush_into(&mut out);
986        assert_eq!(1, out.len());
987        assert_eq!(&absolute, &out[0]);
988
989        // Start out with an absolute
990        agg.record(absolute.clone());
991        // Replace it normally
992        agg.record(absolute);
993        // Send an incremental with the same name, will replace due to a failed update
994        agg.record(incremental.clone());
995        // Send another incremental will "add"
996        agg.record(incremental);
997        let mut out = vec![];
998        // We should flush 1 item incremental
999        agg.flush_into(&mut out);
1000        assert_eq!(1, out.len());
1001        assert_eq!(&summed, &out[0]);
1002    }
1003
1004    #[tokio::test]
1005    async fn transform_shutdown() {
1006        let agg = toml::from_str::<AggregateConfig>(
1007            r"
1008interval_ms = 999999
1009",
1010        )
1011        .unwrap()
1012        .build(&TransformContext::default())
1013        .await
1014        .unwrap();
1015
1016        let agg = agg.into_task();
1017
1018        let counter_a_1 = make_metric(
1019            "counter_a",
1020            MetricKind::Incremental,
1021            MetricValue::Counter { value: 42.0 },
1022        );
1023        let counter_a_2 = make_metric(
1024            "counter_a",
1025            MetricKind::Incremental,
1026            MetricValue::Counter { value: 43.0 },
1027        );
1028        let counter_a_summed = make_metric(
1029            "counter_a",
1030            MetricKind::Incremental,
1031            MetricValue::Counter { value: 85.0 },
1032        );
1033        let gauge_a_1 = make_metric(
1034            "gauge_a",
1035            MetricKind::Absolute,
1036            MetricValue::Gauge { value: 42.0 },
1037        );
1038        let gauge_a_2 = make_metric(
1039            "gauge_a",
1040            MetricKind::Absolute,
1041            MetricValue::Gauge { value: 43.0 },
1042        );
1043        let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1044
1045        // Queue up some events to be consumed & recorded
1046        let in_stream = Box::pin(stream::iter(inputs));
1047        // Kick off the transform process which should consume & record them
1048        let mut out_stream = agg.transform_events(in_stream);
1049
1050        // B/c the input stream has ended we will have gone through the `input_rx.next() => None`
1051        // part of the loop and do the shutting down final flush immediately. We'll already be able
1052        // to read our expected bits on the output.
1053        let mut count = 0_u8;
1054        while let Some(event) = out_stream.next().await {
1055            count += 1;
1056            match event.as_metric().series().name.name.as_str() {
1057                "counter_a" => assert_eq!(counter_a_summed, event),
1058                "gauge_a" => assert_eq!(gauge_a_2, event),
1059                _ => panic!("Unexpected metric name in aggregate output"),
1060            };
1061        }
1062        // There were only 2
1063        assert_eq!(2, count);
1064    }
1065
1066    #[tokio::test]
1067    async fn transform_interval() {
1068        let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1069
1070        let counter_a_1 = make_metric(
1071            "counter_a",
1072            MetricKind::Incremental,
1073            MetricValue::Counter { value: 42.0 },
1074        );
1075        let counter_a_2 = make_metric(
1076            "counter_a",
1077            MetricKind::Incremental,
1078            MetricValue::Counter { value: 43.0 },
1079        );
1080        let counter_a_summed = make_metric(
1081            "counter_a",
1082            MetricKind::Incremental,
1083            MetricValue::Counter { value: 85.0 },
1084        );
1085        let gauge_a_1 = make_metric(
1086            "gauge_a",
1087            MetricKind::Absolute,
1088            MetricValue::Gauge { value: 42.0 },
1089        );
1090        let gauge_a_2 = make_metric(
1091            "gauge_a",
1092            MetricKind::Absolute,
1093            MetricValue::Gauge { value: 43.0 },
1094        );
1095
1096        assert_transform_compliance(async {
1097            let (tx, rx) = mpsc::channel(10);
1098            let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1099            let mut out = ReceiverStream::new(out);
1100
1101            tokio::time::pause();
1102
1103            // tokio interval is always immediately ready, so we poll once to make sure
1104            // we trip it/set the interval in the future
1105            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1106
1107            // Now send our events
1108            tx.send(counter_a_1).await.unwrap();
1109            tx.send(counter_a_2).await.unwrap();
1110            tx.send(gauge_a_1).await.unwrap();
1111            tx.send(gauge_a_2.clone()).await.unwrap();
1112            // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs
1113            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1114            // Now fast forward time enough that our flush should trigger.
1115            tokio::time::advance(Duration::from_secs(11)).await;
1116            // We should have had an interval fire now and our output aggregate events should be
1117            // available.
1118            let mut count = 0_u8;
1119            while count < 2 {
1120                match out.next().await {
1121                    Some(event) => {
1122                        match event.as_metric().series().name.name.as_str() {
1123                            "counter_a" => assert_eq!(counter_a_summed, event),
1124                            "gauge_a" => assert_eq!(gauge_a_2, event),
1125                            _ => panic!("Unexpected metric name in aggregate output"),
1126                        };
1127                        count += 1;
1128                    }
1129                    _ => {
1130                        panic!("Unexpectedly received None in output stream");
1131                    }
1132                }
1133            }
1134            // We should be back to pending, having nothing waiting for us
1135            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1136
1137            drop(tx);
1138            topology.stop().await;
1139            assert_eq!(out.next().await, None);
1140        })
1141        .await;
1142    }
1143}