vector/transforms/
aggregate.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    pin::Pin,
4    time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10    config::LogNamespace,
11    configurable::configurable_component,
12    event::{
13        MetricValue,
14        metric::{Metric, MetricData, MetricKind, MetricSeries},
15    },
16};
17
18use crate::{
19    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
20    event::{Event, EventMetadata},
21    internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
22    schema,
23    transforms::{TaskTransform, Transform},
24};
25
26/// Configuration for the `aggregate` transform.
27#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
28#[derive(Clone, Debug, Default)]
29#[serde(deny_unknown_fields)]
30pub struct AggregateConfig {
31    /// The interval between flushes, in milliseconds.
32    ///
33    /// During this time frame, metrics (beta) with the same series data (name, namespace, tags, and so on) are aggregated.
34    #[serde(default = "default_interval_ms")]
35    #[configurable(metadata(docs::human_name = "Flush Interval"))]
36    pub interval_ms: u64,
37    /// Function to use for aggregation.
38    ///
39    /// Some of the functions may only function on incremental and some only on absolute metrics.
40    #[serde(default = "default_mode")]
41    #[configurable(derived)]
42    pub mode: AggregationMode,
43}
44
45#[configurable_component]
46#[derive(Clone, Debug, Default)]
47#[configurable(description = "The aggregation mode to use.")]
48pub enum AggregationMode {
49    /// Default mode. Sums incremental metrics and uses the latest value for absolute metrics.
50    #[default]
51    Auto,
52
53    /// Sums incremental metrics, ignores absolute
54    Sum,
55
56    /// Returns the latest value for absolute metrics, ignores incremental
57    Latest,
58
59    /// Counts metrics for incremental and absolute metrics
60    Count,
61
62    /// Returns difference between latest value for absolute, ignores incremental
63    Diff,
64
65    /// Max value of absolute metric, ignores incremental
66    Max,
67
68    /// Min value of absolute metric, ignores incremental
69    Min,
70
71    /// Mean value of absolute metric, ignores incremental
72    Mean,
73
74    /// Stdev value of absolute metric, ignores incremental
75    Stdev,
76}
77
78const fn default_mode() -> AggregationMode {
79    AggregationMode::Auto
80}
81
82const fn default_interval_ms() -> u64 {
83    10 * 1000
84}
85
86impl_generate_config_from_default!(AggregateConfig);
87
88#[async_trait::async_trait]
89#[typetag::serde(name = "aggregate")]
90impl TransformConfig for AggregateConfig {
91    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
92        Aggregate::new(self).map(Transform::event_task)
93    }
94
95    fn input(&self) -> Input {
96        Input::metric()
97    }
98
99    fn outputs(
100        &self,
101        _: vector_lib::enrichment::TableRegistry,
102        _: &[(OutputId, schema::Definition)],
103        _: LogNamespace,
104    ) -> Vec<TransformOutput> {
105        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
106    }
107}
108
109type MetricEntry = (MetricData, EventMetadata);
110
111#[derive(Debug)]
112pub struct Aggregate {
113    interval: Duration,
114    map: HashMap<MetricSeries, MetricEntry>,
115    prev_map: HashMap<MetricSeries, MetricEntry>,
116    multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
117    mode: AggregationMode,
118}
119
120impl Aggregate {
121    pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
122        Ok(Self {
123            interval: Duration::from_millis(config.interval_ms),
124            map: Default::default(),
125            prev_map: Default::default(),
126            multi_map: Default::default(),
127            mode: config.mode.clone(),
128        })
129    }
130
131    fn record(&mut self, event: Event) {
132        let (series, data, metadata) = event.into_metric().into_parts();
133
134        match self.mode {
135            AggregationMode::Auto => match data.kind {
136                MetricKind::Incremental => self.record_sum(series, data, metadata),
137                MetricKind::Absolute => {
138                    self.map.insert(series, (data, metadata));
139                }
140            },
141            AggregationMode::Sum => self.record_sum(series, data, metadata),
142            AggregationMode::Latest | AggregationMode::Diff => match data.kind {
143                MetricKind::Incremental => (),
144                MetricKind::Absolute => {
145                    self.map.insert(series, (data, metadata));
146                }
147            },
148            AggregationMode::Count => self.record_count(series, data, metadata),
149            AggregationMode::Max | AggregationMode::Min => {
150                self.record_comparison(series, data, metadata)
151            }
152            AggregationMode::Mean | AggregationMode::Stdev => match data.kind {
153                MetricKind::Incremental => (),
154                MetricKind::Absolute => {
155                    if matches!(data.value, MetricValue::Gauge { value: _ }) {
156                        match self.multi_map.entry(series) {
157                            Entry::Occupied(mut entry) => {
158                                let existing = entry.get_mut();
159                                existing.push((data, metadata));
160                            }
161                            Entry::Vacant(entry) => {
162                                entry.insert(vec![(data, metadata)]);
163                            }
164                        }
165                    }
166                }
167            },
168        }
169
170        emit!(AggregateEventRecorded);
171    }
172
173    fn record_count(
174        &mut self,
175        series: MetricSeries,
176        mut data: MetricData,
177        metadata: EventMetadata,
178    ) {
179        let mut count_data = data.clone();
180        let existing = self.map.entry(series).or_insert_with(|| {
181            *data.value_mut() = MetricValue::Counter { value: 0f64 };
182            (data.clone(), metadata.clone())
183        });
184        *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
185        if existing.0.kind == data.kind && existing.0.update(&count_data) {
186            existing.1.merge(metadata);
187        } else {
188            emit!(AggregateUpdateFailed);
189        }
190    }
191
192    fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
193        match data.kind {
194            MetricKind::Incremental => match self.map.entry(series) {
195                Entry::Occupied(mut entry) => {
196                    let existing = entry.get_mut();
197                    // In order to update (add) the new and old kind's must match
198                    if existing.0.kind == data.kind && existing.0.update(&data) {
199                        existing.1.merge(metadata);
200                    } else {
201                        emit!(AggregateUpdateFailed);
202                        *existing = (data, metadata);
203                    }
204                }
205                Entry::Vacant(entry) => {
206                    entry.insert((data, metadata));
207                }
208            },
209            MetricKind::Absolute => {}
210        }
211    }
212
213    fn record_comparison(
214        &mut self,
215        series: MetricSeries,
216        data: MetricData,
217        metadata: EventMetadata,
218    ) {
219        match data.kind {
220            MetricKind::Incremental => (),
221            MetricKind::Absolute => match self.map.entry(series) {
222                Entry::Occupied(mut entry) => {
223                    let existing = entry.get_mut();
224                    // In order to update (add) the new and old kind's must match
225                    if existing.0.kind == data.kind {
226                        if let MetricValue::Gauge {
227                            value: existing_value,
228                        } = existing.0.value()
229                            && let MetricValue::Gauge { value: new_value } = data.value()
230                        {
231                            let should_update = match self.mode {
232                                AggregationMode::Max => new_value > existing_value,
233                                AggregationMode::Min => new_value < existing_value,
234                                _ => false,
235                            };
236                            if should_update {
237                                *existing = (data, metadata);
238                            }
239                        }
240                    } else {
241                        emit!(AggregateUpdateFailed);
242                        *existing = (data, metadata);
243                    }
244                }
245                Entry::Vacant(entry) => {
246                    entry.insert((data, metadata));
247                }
248            },
249        }
250    }
251
252    fn flush_into(&mut self, output: &mut Vec<Event>) {
253        let map = std::mem::take(&mut self.map);
254        for (series, entry) in map.clone().into_iter() {
255            let mut metric = Metric::from_parts(series, entry.0, entry.1);
256            if matches!(self.mode, AggregationMode::Diff)
257                && let Some(prev_entry) = self.prev_map.get(metric.series())
258                && metric.data().kind == prev_entry.0.kind
259                && !metric.subtract(&prev_entry.0)
260            {
261                emit!(AggregateUpdateFailed);
262            }
263            output.push(Event::Metric(metric));
264        }
265
266        let multi_map = std::mem::take(&mut self.multi_map);
267        'outer: for (series, entries) in multi_map.into_iter() {
268            if entries.is_empty() {
269                continue;
270            }
271
272            let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
273            for (data, metadata) in entries.iter().skip(1) {
274                if !final_sum.update(data) {
275                    // Incompatible types, skip this metric
276                    emit!(AggregateUpdateFailed);
277                    continue 'outer;
278                }
279                final_metadata.merge(metadata.clone());
280            }
281
282            let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
283                // Entries are not empty so this is safe.
284                *value /= entries.len() as f64;
285                *value
286            } else {
287                0.0
288            };
289
290            let final_mean = final_sum.clone();
291            match self.mode {
292                AggregationMode::Mean => {
293                    let metric = Metric::from_parts(series, final_mean, final_metadata);
294                    output.push(Event::Metric(metric));
295                }
296                AggregationMode::Stdev => {
297                    let variance = entries
298                        .iter()
299                        .filter_map(|(data, _)| {
300                            if let MetricValue::Gauge { value } = data.value() {
301                                let diff = final_mean_value - value;
302                                Some(diff * diff)
303                            } else {
304                                None
305                            }
306                        })
307                        .sum::<f64>()
308                        / entries.len() as f64;
309                    let mut final_stdev = final_mean;
310                    if let MetricValue::Gauge { value } = final_stdev.value_mut() {
311                        *value = variance.sqrt()
312                    }
313                    let metric = Metric::from_parts(series, final_stdev, final_metadata);
314                    output.push(Event::Metric(metric));
315                }
316                _ => (),
317            }
318        }
319
320        self.prev_map = map;
321        emit!(AggregateFlushed);
322    }
323}
324
325impl TaskTransform<Event> for Aggregate {
326    fn transform(
327        mut self: Box<Self>,
328        mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
329    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
330    where
331        Self: 'static,
332    {
333        let mut flush_stream = tokio::time::interval(self.interval);
334
335        Box::pin(stream! {
336            let mut output = Vec::new();
337            let mut done = false;
338            while !done {
339                tokio::select! {
340                    _ = flush_stream.tick() => {
341                        self.flush_into(&mut output);
342                    },
343                    maybe_event = input_rx.next() => {
344                        match maybe_event {
345                            None => {
346                                self.flush_into(&mut output);
347                                done = true;
348                            }
349                            Some(event) => self.record(event),
350                        }
351                    }
352                };
353                for event in output.drain(..) {
354                    yield event;
355                }
356            }
357        })
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use std::{collections::BTreeSet, sync::Arc, task::Poll};
364
365    use futures::stream;
366    use tokio::sync::mpsc;
367    use tokio_stream::wrappers::ReceiverStream;
368    use vector_lib::config::ComponentKey;
369    use vrl::value::Kind;
370
371    use super::*;
372    use crate::{
373        event::{
374            Event, Metric,
375            metric::{MetricKind, MetricValue},
376        },
377        schema::Definition,
378        test_util::components::assert_transform_compliance,
379        transforms::test::create_topology,
380    };
381
382    #[test]
383    fn generate_config() {
384        crate::test_util::test_generate_config::<AggregateConfig>();
385    }
386
387    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
388        let mut event = Event::Metric(Metric::new(name, kind, value))
389            .with_source_id(Arc::new(ComponentKey::from("in")))
390            .with_upstream_id(Arc::new(OutputId::from("transform")));
391        event.metadata_mut().set_schema_definition(&Arc::new(
392            Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
393        ));
394
395        event.metadata_mut().set_source_type("unit_test_stream");
396
397        event
398    }
399
400    #[test]
401    fn incremental_auto() {
402        let mut agg = Aggregate::new(&AggregateConfig {
403            interval_ms: 1000_u64,
404            mode: AggregationMode::Auto,
405        })
406        .unwrap();
407
408        let counter_a_1 = make_metric(
409            "counter_a",
410            MetricKind::Incremental,
411            MetricValue::Counter { value: 42.0 },
412        );
413        let counter_a_2 = make_metric(
414            "counter_a",
415            MetricKind::Incremental,
416            MetricValue::Counter { value: 43.0 },
417        );
418        let counter_a_summed = make_metric(
419            "counter_a",
420            MetricKind::Incremental,
421            MetricValue::Counter { value: 85.0 },
422        );
423
424        // Single item, just stored regardless of kind
425        agg.record(counter_a_1.clone());
426        let mut out = vec![];
427        // We should flush 1 item counter_a_1
428        agg.flush_into(&mut out);
429        assert_eq!(1, out.len());
430        assert_eq!(&counter_a_1, &out[0]);
431
432        // A subsequent flush doesn't send out anything
433        out.clear();
434        agg.flush_into(&mut out);
435        assert_eq!(0, out.len());
436
437        // One more just to make sure that we don't re-see from the other buffer
438        out.clear();
439        agg.flush_into(&mut out);
440        assert_eq!(0, out.len());
441
442        // Two increments with the same series, should sum into 1
443        agg.record(counter_a_1.clone());
444        agg.record(counter_a_2);
445        out.clear();
446        agg.flush_into(&mut out);
447        assert_eq!(1, out.len());
448        assert_eq!(&counter_a_summed, &out[0]);
449
450        let counter_b_1 = make_metric(
451            "counter_b",
452            MetricKind::Incremental,
453            MetricValue::Counter { value: 44.0 },
454        );
455        // Two increments with the different series, should get each back as-is
456        agg.record(counter_a_1.clone());
457        agg.record(counter_b_1.clone());
458        out.clear();
459        agg.flush_into(&mut out);
460        assert_eq!(2, out.len());
461        // B/c we don't know the order they'll come back
462        for event in out {
463            match event.as_metric().series().name.name.as_str() {
464                "counter_a" => assert_eq!(counter_a_1, event),
465                "counter_b" => assert_eq!(counter_b_1, event),
466                _ => panic!("Unexpected metric name in aggregate output"),
467            }
468        }
469    }
470
471    #[test]
472    fn absolute_auto() {
473        let mut agg = Aggregate::new(&AggregateConfig {
474            interval_ms: 1000_u64,
475            mode: AggregationMode::Auto,
476        })
477        .unwrap();
478
479        let gauge_a_1 = make_metric(
480            "gauge_a",
481            MetricKind::Absolute,
482            MetricValue::Gauge { value: 42.0 },
483        );
484        let gauge_a_2 = make_metric(
485            "gauge_a",
486            MetricKind::Absolute,
487            MetricValue::Gauge { value: 43.0 },
488        );
489
490        // Single item, just stored regardless of kind
491        agg.record(gauge_a_1.clone());
492        let mut out = vec![];
493        // We should flush 1 item gauge_a_1
494        agg.flush_into(&mut out);
495        assert_eq!(1, out.len());
496        assert_eq!(&gauge_a_1, &out[0]);
497
498        // A subsequent flush doesn't send out anything
499        out.clear();
500        agg.flush_into(&mut out);
501        assert_eq!(0, out.len());
502
503        // One more just to make sure that we don't re-see from the other buffer
504        out.clear();
505        agg.flush_into(&mut out);
506        assert_eq!(0, out.len());
507
508        // Two absolutes with the same series, should get the 2nd (last) back.
509        agg.record(gauge_a_1.clone());
510        agg.record(gauge_a_2.clone());
511        out.clear();
512        agg.flush_into(&mut out);
513        assert_eq!(1, out.len());
514        assert_eq!(&gauge_a_2, &out[0]);
515
516        let gauge_b_1 = make_metric(
517            "gauge_b",
518            MetricKind::Absolute,
519            MetricValue::Gauge { value: 44.0 },
520        );
521        // Two increments with the different series, should get each back as-is
522        agg.record(gauge_a_1.clone());
523        agg.record(gauge_b_1.clone());
524        out.clear();
525        agg.flush_into(&mut out);
526        assert_eq!(2, out.len());
527        // B/c we don't know the order they'll come back
528        for event in out {
529            match event.as_metric().series().name.name.as_str() {
530                "gauge_a" => assert_eq!(gauge_a_1, event),
531                "gauge_b" => assert_eq!(gauge_b_1, event),
532                _ => panic!("Unexpected metric name in aggregate output"),
533            }
534        }
535    }
536
537    #[test]
538    fn count_agg() {
539        let mut agg = Aggregate::new(&AggregateConfig {
540            interval_ms: 1000_u64,
541            mode: AggregationMode::Count,
542        })
543        .unwrap();
544
545        let gauge_a_1 = make_metric(
546            "gauge_a",
547            MetricKind::Absolute,
548            MetricValue::Gauge { value: 42.0 },
549        );
550        let gauge_a_2 = make_metric(
551            "gauge_a",
552            MetricKind::Absolute,
553            MetricValue::Gauge { value: 43.0 },
554        );
555        let result_count = make_metric(
556            "gauge_a",
557            MetricKind::Absolute,
558            MetricValue::Counter { value: 1.0 },
559        );
560        let result_count_2 = make_metric(
561            "gauge_a",
562            MetricKind::Absolute,
563            MetricValue::Counter { value: 2.0 },
564        );
565
566        // Single item, counter should be 1
567        agg.record(gauge_a_1.clone());
568        let mut out = vec![];
569        // We should flush 1 item gauge_a_1
570        agg.flush_into(&mut out);
571        assert_eq!(1, out.len());
572        assert_eq!(&result_count, &out[0]);
573
574        // A subsequent flush doesn't send out anything
575        out.clear();
576        agg.flush_into(&mut out);
577        assert_eq!(0, out.len());
578
579        // One more just to make sure that we don't re-see from the other buffer
580        out.clear();
581        agg.flush_into(&mut out);
582        assert_eq!(0, out.len());
583
584        // Two absolutes with the same series, counter should be 2
585        agg.record(gauge_a_1.clone());
586        agg.record(gauge_a_2.clone());
587        out.clear();
588        agg.flush_into(&mut out);
589        assert_eq!(1, out.len());
590        assert_eq!(&result_count_2, &out[0]);
591    }
592
593    #[test]
594    fn absolute_max() {
595        let mut agg = Aggregate::new(&AggregateConfig {
596            interval_ms: 1000_u64,
597            mode: AggregationMode::Max,
598        })
599        .unwrap();
600
601        let gauge_a_1 = make_metric(
602            "gauge_a",
603            MetricKind::Absolute,
604            MetricValue::Gauge { value: 112.0 },
605        );
606        let gauge_a_2 = make_metric(
607            "gauge_a",
608            MetricKind::Absolute,
609            MetricValue::Gauge { value: 89.0 },
610        );
611
612        // Single item, it should be returned as is
613        agg.record(gauge_a_2.clone());
614        let mut out = vec![];
615        // We should flush 1 item gauge_a_2
616        agg.flush_into(&mut out);
617        assert_eq!(1, out.len());
618        assert_eq!(&gauge_a_2, &out[0]);
619
620        // A subsequent flush doesn't send out anything
621        out.clear();
622        agg.flush_into(&mut out);
623        assert_eq!(0, out.len());
624
625        // One more just to make sure that we don't re-see from the other buffer
626        out.clear();
627        agg.flush_into(&mut out);
628        assert_eq!(0, out.len());
629
630        // Two absolutes, result should be higher of the 2
631        agg.record(gauge_a_1.clone());
632        agg.record(gauge_a_2.clone());
633        out.clear();
634        agg.flush_into(&mut out);
635        assert_eq!(1, out.len());
636        assert_eq!(&gauge_a_1, &out[0]);
637    }
638
639    #[test]
640    fn absolute_min() {
641        let mut agg = Aggregate::new(&AggregateConfig {
642            interval_ms: 1000_u64,
643            mode: AggregationMode::Min,
644        })
645        .unwrap();
646
647        let gauge_a_1 = make_metric(
648            "gauge_a",
649            MetricKind::Absolute,
650            MetricValue::Gauge { value: 32.0 },
651        );
652        let gauge_a_2 = make_metric(
653            "gauge_a",
654            MetricKind::Absolute,
655            MetricValue::Gauge { value: 89.0 },
656        );
657
658        // Single item, it should be returned as is
659        agg.record(gauge_a_2.clone());
660        let mut out = vec![];
661        // We should flush 1 item gauge_a_2
662        agg.flush_into(&mut out);
663        assert_eq!(1, out.len());
664        assert_eq!(&gauge_a_2, &out[0]);
665
666        // A subsequent flush doesn't send out anything
667        out.clear();
668        agg.flush_into(&mut out);
669        assert_eq!(0, out.len());
670
671        // One more just to make sure that we don't re-see from the other buffer
672        out.clear();
673        agg.flush_into(&mut out);
674        assert_eq!(0, out.len());
675
676        // Two absolutes, result should be lower of the 2
677        agg.record(gauge_a_1.clone());
678        agg.record(gauge_a_2.clone());
679        out.clear();
680        agg.flush_into(&mut out);
681        assert_eq!(1, out.len());
682        assert_eq!(&gauge_a_1, &out[0]);
683    }
684
685    #[test]
686    fn absolute_diff() {
687        let mut agg = Aggregate::new(&AggregateConfig {
688            interval_ms: 1000_u64,
689            mode: AggregationMode::Diff,
690        })
691        .unwrap();
692
693        let gauge_a_1 = make_metric(
694            "gauge_a",
695            MetricKind::Absolute,
696            MetricValue::Gauge { value: 32.0 },
697        );
698        let gauge_a_2 = make_metric(
699            "gauge_a",
700            MetricKind::Absolute,
701            MetricValue::Gauge { value: 82.0 },
702        );
703        let result = make_metric(
704            "gauge_a",
705            MetricKind::Absolute,
706            MetricValue::Gauge { value: 50.0 },
707        );
708
709        // Single item, it should be returned as is
710        agg.record(gauge_a_2.clone());
711        let mut out = vec![];
712        // We should flush 1 item gauge_a_2
713        agg.flush_into(&mut out);
714        assert_eq!(1, out.len());
715        assert_eq!(&gauge_a_2, &out[0]);
716
717        // A subsequent flush doesn't send out anything
718        out.clear();
719        agg.flush_into(&mut out);
720        assert_eq!(0, out.len());
721
722        // One more just to make sure that we don't re-see from the other buffer
723        out.clear();
724        agg.flush_into(&mut out);
725        assert_eq!(0, out.len());
726
727        // Two absolutes in 2 separate flushes, result should be diff between the 2
728        agg.record(gauge_a_1.clone());
729        out.clear();
730        agg.flush_into(&mut out);
731        assert_eq!(1, out.len());
732        assert_eq!(&gauge_a_1, &out[0]);
733
734        agg.record(gauge_a_2.clone());
735        out.clear();
736        agg.flush_into(&mut out);
737        assert_eq!(1, out.len());
738        assert_eq!(&result, &out[0]);
739    }
740
741    #[test]
742    fn absolute_diff_conflicting_type() {
743        let mut agg = Aggregate::new(&AggregateConfig {
744            interval_ms: 1000_u64,
745            mode: AggregationMode::Diff,
746        })
747        .unwrap();
748
749        let gauge_a_1 = make_metric(
750            "gauge_a",
751            MetricKind::Absolute,
752            MetricValue::Gauge { value: 32.0 },
753        );
754        let gauge_a_2 = make_metric(
755            "gauge_a",
756            MetricKind::Absolute,
757            MetricValue::Counter { value: 1.0 },
758        );
759
760        let mut out = vec![];
761        // Two absolutes in 2 separate flushes, result should be second one due to different types
762        agg.record(gauge_a_1.clone());
763        out.clear();
764        agg.flush_into(&mut out);
765        assert_eq!(1, out.len());
766        assert_eq!(&gauge_a_1, &out[0]);
767
768        agg.record(gauge_a_2.clone());
769        out.clear();
770        agg.flush_into(&mut out);
771        assert_eq!(1, out.len());
772        // Due to incompatible results, the new value just overwrites the old one
773        assert_eq!(&gauge_a_2, &out[0]);
774    }
775
776    #[test]
777    fn absolute_mean() {
778        let mut agg = Aggregate::new(&AggregateConfig {
779            interval_ms: 1000_u64,
780            mode: AggregationMode::Mean,
781        })
782        .unwrap();
783
784        let gauge_a_1 = make_metric(
785            "gauge_a",
786            MetricKind::Absolute,
787            MetricValue::Gauge { value: 32.0 },
788        );
789        let gauge_a_2 = make_metric(
790            "gauge_a",
791            MetricKind::Absolute,
792            MetricValue::Gauge { value: 82.0 },
793        );
794        let gauge_a_3 = make_metric(
795            "gauge_a",
796            MetricKind::Absolute,
797            MetricValue::Gauge { value: 51.0 },
798        );
799        let mean_result = make_metric(
800            "gauge_a",
801            MetricKind::Absolute,
802            MetricValue::Gauge { value: 55.0 },
803        );
804
805        // Single item, it should be returned as is
806        agg.record(gauge_a_2.clone());
807        let mut out = vec![];
808        // We should flush 1 item gauge_a_2
809        agg.flush_into(&mut out);
810        assert_eq!(1, out.len());
811        assert_eq!(&gauge_a_2, &out[0]);
812
813        // A subsequent flush doesn't send out anything
814        out.clear();
815        agg.flush_into(&mut out);
816        assert_eq!(0, out.len());
817
818        // One more just to make sure that we don't re-see from the other buffer
819        out.clear();
820        agg.flush_into(&mut out);
821        assert_eq!(0, out.len());
822
823        // Three absolutes, result should be mean
824        agg.record(gauge_a_1.clone());
825        agg.record(gauge_a_2.clone());
826        agg.record(gauge_a_3.clone());
827        out.clear();
828        agg.flush_into(&mut out);
829        assert_eq!(1, out.len());
830        assert_eq!(&mean_result, &out[0]);
831    }
832
833    #[test]
834    fn absolute_stdev() {
835        let mut agg = Aggregate::new(&AggregateConfig {
836            interval_ms: 1000_u64,
837            mode: AggregationMode::Stdev,
838        })
839        .unwrap();
840
841        let gauges = vec![
842            make_metric(
843                "gauge_a",
844                MetricKind::Absolute,
845                MetricValue::Gauge { value: 25.0 },
846            ),
847            make_metric(
848                "gauge_a",
849                MetricKind::Absolute,
850                MetricValue::Gauge { value: 30.0 },
851            ),
852            make_metric(
853                "gauge_a",
854                MetricKind::Absolute,
855                MetricValue::Gauge { value: 35.0 },
856            ),
857            make_metric(
858                "gauge_a",
859                MetricKind::Absolute,
860                MetricValue::Gauge { value: 40.0 },
861            ),
862            make_metric(
863                "gauge_a",
864                MetricKind::Absolute,
865                MetricValue::Gauge { value: 45.0 },
866            ),
867            make_metric(
868                "gauge_a",
869                MetricKind::Absolute,
870                MetricValue::Gauge { value: 50.0 },
871            ),
872            make_metric(
873                "gauge_a",
874                MetricKind::Absolute,
875                MetricValue::Gauge { value: 55.0 },
876            ),
877        ];
878        let stdev_result = make_metric(
879            "gauge_a",
880            MetricKind::Absolute,
881            MetricValue::Gauge { value: 10.0 },
882        );
883
884        for gauge in gauges {
885            agg.record(gauge);
886        }
887        let mut out = vec![];
888        agg.flush_into(&mut out);
889        assert_eq!(1, out.len());
890        assert_eq!(&stdev_result, &out[0]);
891    }
892
893    #[test]
894    fn conflicting_value_type() {
895        let mut agg = Aggregate::new(&AggregateConfig {
896            interval_ms: 1000_u64,
897            mode: AggregationMode::Auto,
898        })
899        .unwrap();
900
901        let counter = make_metric(
902            "the-thing",
903            MetricKind::Incremental,
904            MetricValue::Counter { value: 42.0 },
905        );
906        let mut values = BTreeSet::<String>::new();
907        values.insert("a".into());
908        values.insert("b".into());
909        let set = make_metric(
910            "the-thing",
911            MetricKind::Incremental,
912            MetricValue::Set { values },
913        );
914        let summed = make_metric(
915            "the-thing",
916            MetricKind::Incremental,
917            MetricValue::Counter { value: 84.0 },
918        );
919
920        // when types conflict the new values replaces whatever is there
921
922        // Start with an counter
923        agg.record(counter.clone());
924        // Another will "add" to it
925        agg.record(counter.clone());
926        // Then an set will replace it due to a failed update
927        agg.record(set.clone());
928        // Then a set union would be a noop
929        agg.record(set.clone());
930        let mut out = vec![];
931        // We should flush 1 item counter
932        agg.flush_into(&mut out);
933        assert_eq!(1, out.len());
934        assert_eq!(&set, &out[0]);
935
936        // Start out with an set
937        agg.record(set.clone());
938        // Union with itself, a noop
939        agg.record(set);
940        // Send an counter with the same name, will replace due to a failed update
941        agg.record(counter.clone());
942        // Send another counter will "add"
943        agg.record(counter);
944        let mut out = vec![];
945        // We should flush 1 item counter
946        agg.flush_into(&mut out);
947        assert_eq!(1, out.len());
948        assert_eq!(&summed, &out[0]);
949    }
950
951    #[test]
952    fn conflicting_kinds() {
953        let mut agg = Aggregate::new(&AggregateConfig {
954            interval_ms: 1000_u64,
955            mode: AggregationMode::Auto,
956        })
957        .unwrap();
958
959        let incremental = make_metric(
960            "the-thing",
961            MetricKind::Incremental,
962            MetricValue::Counter { value: 42.0 },
963        );
964        let absolute = make_metric(
965            "the-thing",
966            MetricKind::Absolute,
967            MetricValue::Counter { value: 43.0 },
968        );
969        let summed = make_metric(
970            "the-thing",
971            MetricKind::Incremental,
972            MetricValue::Counter { value: 84.0 },
973        );
974
975        // when types conflict the new values replaces whatever is there
976
977        // Start with an incremental
978        agg.record(incremental.clone());
979        // Another will "add" to it
980        agg.record(incremental.clone());
981        // Then an absolute will replace it with a failed update
982        agg.record(absolute.clone());
983        // Then another absolute will replace it normally
984        agg.record(absolute.clone());
985        let mut out = vec![];
986        // We should flush 1 item incremental
987        agg.flush_into(&mut out);
988        assert_eq!(1, out.len());
989        assert_eq!(&absolute, &out[0]);
990
991        // Start out with an absolute
992        agg.record(absolute.clone());
993        // Replace it normally
994        agg.record(absolute);
995        // Send an incremental with the same name, will replace due to a failed update
996        agg.record(incremental.clone());
997        // Send another incremental will "add"
998        agg.record(incremental);
999        let mut out = vec![];
1000        // We should flush 1 item incremental
1001        agg.flush_into(&mut out);
1002        assert_eq!(1, out.len());
1003        assert_eq!(&summed, &out[0]);
1004    }
1005
1006    #[tokio::test]
1007    async fn transform_shutdown() {
1008        let agg = toml::from_str::<AggregateConfig>(
1009            r"
1010interval_ms = 999999
1011",
1012        )
1013        .unwrap()
1014        .build(&TransformContext::default())
1015        .await
1016        .unwrap();
1017
1018        let agg = agg.into_task();
1019
1020        let counter_a_1 = make_metric(
1021            "counter_a",
1022            MetricKind::Incremental,
1023            MetricValue::Counter { value: 42.0 },
1024        );
1025        let counter_a_2 = make_metric(
1026            "counter_a",
1027            MetricKind::Incremental,
1028            MetricValue::Counter { value: 43.0 },
1029        );
1030        let counter_a_summed = make_metric(
1031            "counter_a",
1032            MetricKind::Incremental,
1033            MetricValue::Counter { value: 85.0 },
1034        );
1035        let gauge_a_1 = make_metric(
1036            "gauge_a",
1037            MetricKind::Absolute,
1038            MetricValue::Gauge { value: 42.0 },
1039        );
1040        let gauge_a_2 = make_metric(
1041            "gauge_a",
1042            MetricKind::Absolute,
1043            MetricValue::Gauge { value: 43.0 },
1044        );
1045        let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1046
1047        // Queue up some events to be consumed & recorded
1048        let in_stream = Box::pin(stream::iter(inputs));
1049        // Kick off the transform process which should consume & record them
1050        let mut out_stream = agg.transform_events(in_stream);
1051
1052        // B/c the input stream has ended we will have gone through the `input_rx.next() => None`
1053        // part of the loop and do the shutting down final flush immediately. We'll already be able
1054        // to read our expected bits on the output.
1055        let mut count = 0_u8;
1056        while let Some(event) = out_stream.next().await {
1057            count += 1;
1058            match event.as_metric().series().name.name.as_str() {
1059                "counter_a" => assert_eq!(counter_a_summed, event),
1060                "gauge_a" => assert_eq!(gauge_a_2, event),
1061                _ => panic!("Unexpected metric name in aggregate output"),
1062            };
1063        }
1064        // There were only 2
1065        assert_eq!(2, count);
1066    }
1067
1068    #[tokio::test]
1069    async fn transform_interval() {
1070        let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1071
1072        let counter_a_1 = make_metric(
1073            "counter_a",
1074            MetricKind::Incremental,
1075            MetricValue::Counter { value: 42.0 },
1076        );
1077        let counter_a_2 = make_metric(
1078            "counter_a",
1079            MetricKind::Incremental,
1080            MetricValue::Counter { value: 43.0 },
1081        );
1082        let counter_a_summed = make_metric(
1083            "counter_a",
1084            MetricKind::Incremental,
1085            MetricValue::Counter { value: 85.0 },
1086        );
1087        let gauge_a_1 = make_metric(
1088            "gauge_a",
1089            MetricKind::Absolute,
1090            MetricValue::Gauge { value: 42.0 },
1091        );
1092        let gauge_a_2 = make_metric(
1093            "gauge_a",
1094            MetricKind::Absolute,
1095            MetricValue::Gauge { value: 43.0 },
1096        );
1097
1098        assert_transform_compliance(async {
1099            let (tx, rx) = mpsc::channel(10);
1100            let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1101            let mut out = ReceiverStream::new(out);
1102
1103            tokio::time::pause();
1104
1105            // tokio interval is always immediately ready, so we poll once to make sure
1106            // we trip it/set the interval in the future
1107            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1108
1109            // Now send our events
1110            tx.send(counter_a_1).await.unwrap();
1111            tx.send(counter_a_2).await.unwrap();
1112            tx.send(gauge_a_1).await.unwrap();
1113            tx.send(gauge_a_2.clone()).await.unwrap();
1114            // We won't have flushed yet b/c the interval hasn't elapsed, so no outputs
1115            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1116            // Now fast forward time enough that our flush should trigger.
1117            tokio::time::advance(Duration::from_secs(11)).await;
1118            // We should have had an interval fire now and our output aggregate events should be
1119            // available.
1120            let mut count = 0_u8;
1121            while count < 2 {
1122                match out.next().await {
1123                    Some(event) => {
1124                        match event.as_metric().series().name.name.as_str() {
1125                            "counter_a" => assert_eq!(counter_a_summed, event),
1126                            "gauge_a" => assert_eq!(gauge_a_2, event),
1127                            _ => panic!("Unexpected metric name in aggregate output"),
1128                        };
1129                        count += 1;
1130                    }
1131                    _ => {
1132                        panic!("Unexpectedly received None in output stream");
1133                    }
1134                }
1135            }
1136            // We should be back to pending, having nothing waiting for us
1137            assert_eq!(Poll::Pending, futures::poll!(out.next()));
1138
1139            drop(tx);
1140            topology.stop().await;
1141            assert_eq!(out.next().await, None);
1142        })
1143        .await;
1144    }
1145}