1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10 configurable::configurable_component,
11 event::{
12 MetricValue,
13 metric::{Metric, MetricData, MetricKind, MetricSeries},
14 },
15};
16
17use crate::{
18 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
19 event::{Event, EventMetadata},
20 internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
21 schema,
22 transforms::{TaskTransform, Transform},
23};
24
25#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
27#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
28#[serde(deny_unknown_fields)]
29pub struct AggregateConfig {
30 #[serde(default = "default_interval_ms")]
34 #[configurable(metadata(docs::human_name = "Flush Interval"))]
35 pub interval_ms: u64,
36 #[serde(default = "default_mode")]
40 #[configurable(derived)]
41 pub mode: AggregationMode,
42}
43
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
46#[configurable(description = "The aggregation mode to use.")]
47pub enum AggregationMode {
48 #[default]
50 Auto,
51
52 Sum,
54
55 Latest,
57
58 Count,
60
61 Diff,
63
64 Max,
66
67 Min,
69
70 Mean,
72
73 Stdev,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78enum InnerMode {
79 #[default]
81 Auto,
82
83 Sum,
85
86 Latest,
88
89 Count,
91
92 Diff {
94 prev_map: HashMap<MetricSeries, MetricEntry>,
95 },
96
97 Max,
99
100 Min,
102
103 Mean {
105 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
106 },
107
108 Stdev {
110 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
111 },
112}
113
114impl From<AggregationMode> for InnerMode {
115 fn from(value: AggregationMode) -> Self {
116 match value {
117 AggregationMode::Auto => InnerMode::Auto,
118 AggregationMode::Sum => InnerMode::Sum,
119 AggregationMode::Latest => InnerMode::Latest,
120 AggregationMode::Count => InnerMode::Count,
121 AggregationMode::Diff => InnerMode::Diff {
122 prev_map: HashMap::default(),
123 },
124 AggregationMode::Max => InnerMode::Max,
125 AggregationMode::Min => InnerMode::Min,
126 AggregationMode::Mean => InnerMode::Mean {
127 multi_map: HashMap::default(),
128 },
129 AggregationMode::Stdev => InnerMode::Stdev {
130 multi_map: HashMap::default(),
131 },
132 }
133 }
134}
135
136const fn default_mode() -> AggregationMode {
137 AggregationMode::Auto
138}
139
140const fn default_interval_ms() -> u64 {
141 10 * 1000
142}
143
144impl_generate_config_from_default!(AggregateConfig);
145
146#[async_trait::async_trait]
147#[typetag::serde(name = "aggregate")]
148impl TransformConfig for AggregateConfig {
149 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
150 Aggregate::new(self).map(Transform::event_task)
151 }
152
153 fn input(&self) -> Input {
154 Input::metric()
155 }
156
157 fn outputs(
158 &self,
159 _: &TransformContext,
160 _: &[(OutputId, schema::Definition)],
161 ) -> Vec<TransformOutput> {
162 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
163 }
164}
165
166type MetricEntry = (MetricData, EventMetadata);
167
168#[derive(Debug)]
169pub struct Aggregate {
170 interval: Duration,
171 map: HashMap<MetricSeries, MetricEntry>,
172 mode: InnerMode,
173}
174
175impl Aggregate {
176 pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
177 Ok(Self {
178 interval: Duration::from_millis(config.interval_ms),
179 map: Default::default(),
180 mode: config.mode.into(),
181 })
182 }
183
184 fn record(&mut self, event: Event) {
185 let (series, data, metadata) = event.into_metric().into_parts();
186
187 match &mut self.mode {
188 InnerMode::Auto => match data.kind {
189 MetricKind::Incremental => self.record_sum(series, data, metadata),
190 MetricKind::Absolute => {
191 self.map.insert(series, (data, metadata));
192 }
193 },
194 InnerMode::Sum => self.record_sum(series, data, metadata),
195 InnerMode::Latest | InnerMode::Diff { .. } => match data.kind {
196 MetricKind::Incremental => (),
197 MetricKind::Absolute => {
198 self.map.insert(series, (data, metadata));
199 }
200 },
201 InnerMode::Count => self.record_count(series, data, metadata),
202 InnerMode::Max | InnerMode::Min => self.record_comparison(series, data, metadata),
203 InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => match data.kind {
204 MetricKind::Incremental => (),
205 MetricKind::Absolute => {
206 if matches!(data.value, MetricValue::Gauge { value: _ }) {
207 match multi_map.entry(series) {
208 Entry::Occupied(mut entry) => {
209 let existing = entry.get_mut();
210 existing.push((data, metadata));
211 }
212 Entry::Vacant(entry) => {
213 entry.insert(vec![(data, metadata)]);
214 }
215 }
216 }
217 }
218 },
219 }
220
221 emit!(AggregateEventRecorded);
222 }
223
224 fn record_count(
225 &mut self,
226 series: MetricSeries,
227 mut data: MetricData,
228 metadata: EventMetadata,
229 ) {
230 let mut count_data = data.clone();
231 let existing = self.map.entry(series).or_insert_with(|| {
232 *data.value_mut() = MetricValue::Counter { value: 0f64 };
233 (data.clone(), metadata.clone())
234 });
235 *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
236 if existing.0.kind == data.kind && existing.0.update(&count_data) {
237 existing.1.merge(metadata);
238 } else {
239 emit!(AggregateUpdateFailed);
240 }
241 }
242
243 fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
244 match data.kind {
245 MetricKind::Incremental => match self.map.entry(series) {
246 Entry::Occupied(mut entry) => {
247 let existing = entry.get_mut();
248 if existing.0.kind == data.kind && existing.0.update(&data) {
250 existing.1.merge(metadata);
251 } else {
252 emit!(AggregateUpdateFailed);
253 *existing = (data, metadata);
254 }
255 }
256 Entry::Vacant(entry) => {
257 entry.insert((data, metadata));
258 }
259 },
260 MetricKind::Absolute => {}
261 }
262 }
263
264 fn record_comparison(
265 &mut self,
266 series: MetricSeries,
267 data: MetricData,
268 metadata: EventMetadata,
269 ) {
270 match data.kind {
271 MetricKind::Incremental => (),
272 MetricKind::Absolute => match self.map.entry(series) {
273 Entry::Occupied(mut entry) => {
274 let existing = entry.get_mut();
275 if existing.0.kind == data.kind {
277 if let MetricValue::Gauge {
278 value: existing_value,
279 } = existing.0.value()
280 && let MetricValue::Gauge { value: new_value } = data.value()
281 {
282 let should_update = match self.mode {
283 InnerMode::Max => new_value > existing_value,
284 InnerMode::Min => new_value < existing_value,
285 _ => false,
286 };
287 if should_update {
288 *existing = (data, metadata);
289 }
290 }
291 } else {
292 emit!(AggregateUpdateFailed);
293 *existing = (data, metadata);
294 }
295 }
296 Entry::Vacant(entry) => {
297 entry.insert((data, metadata));
298 }
299 },
300 }
301 }
302
303 fn flush_into(&mut self, output: &mut Vec<Event>) {
304 let map = std::mem::take(&mut self.map);
305 for (series, entry) in map.clone().into_iter() {
306 let mut metric = Metric::from_parts(series, entry.0, entry.1);
307 if let InnerMode::Diff { prev_map } = &self.mode
308 && let Some(prev_entry) = prev_map.get(metric.series())
309 && metric.data().kind == prev_entry.0.kind
310 && !metric.subtract(&prev_entry.0)
311 {
312 emit!(AggregateUpdateFailed);
313 }
314 output.push(Event::Metric(metric));
315 }
316
317 let multi_map = match &mut self.mode {
318 InnerMode::Mean { multi_map } | InnerMode::Stdev { multi_map } => {
319 std::mem::take(multi_map)
320 }
321 _ => HashMap::default(),
322 };
323
324 'outer: for (series, entries) in multi_map.into_iter() {
325 if entries.is_empty() {
326 continue;
327 }
328
329 let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
330 for (data, metadata) in entries.iter().skip(1) {
331 if !final_sum.update(data) {
332 emit!(AggregateUpdateFailed);
334 continue 'outer;
335 }
336 final_metadata.merge(metadata.clone());
337 }
338
339 let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
340 *value /= entries.len() as f64;
342 *value
343 } else {
344 0.0
345 };
346
347 let final_mean = final_sum.clone();
348 match self.mode {
349 InnerMode::Mean { .. } => {
350 let metric = Metric::from_parts(series, final_mean, final_metadata);
351 output.push(Event::Metric(metric));
352 }
353 InnerMode::Stdev { .. } => {
354 let variance = entries
355 .iter()
356 .filter_map(|(data, _)| {
357 if let MetricValue::Gauge { value } = data.value() {
358 let diff = final_mean_value - value;
359 Some(diff * diff)
360 } else {
361 None
362 }
363 })
364 .sum::<f64>()
365 / entries.len() as f64;
366 let mut final_stdev = final_mean;
367 if let MetricValue::Gauge { value } = final_stdev.value_mut() {
368 *value = variance.sqrt()
369 }
370 let metric = Metric::from_parts(series, final_stdev, final_metadata);
371 output.push(Event::Metric(metric));
372 }
373 _ => (),
374 }
375 }
376
377 if let InnerMode::Diff { prev_map } = &mut self.mode {
378 *prev_map = map;
379 }
380 emit!(AggregateFlushed);
381 }
382}
383
384impl TaskTransform<Event> for Aggregate {
385 fn transform(
386 mut self: Box<Self>,
387 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
388 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
389 where
390 Self: 'static,
391 {
392 let mut flush_stream = tokio::time::interval(self.interval);
393
394 Box::pin(stream! {
395 let mut output = Vec::new();
396 let mut done = false;
397 while !done {
398 tokio::select! {
399 _ = flush_stream.tick() => {
400 self.flush_into(&mut output);
401 },
402 maybe_event = input_rx.next() => {
403 match maybe_event {
404 None => {
405 self.flush_into(&mut output);
406 done = true;
407 }
408 Some(event) => self.record(event),
409 }
410 }
411 };
412 for event in output.drain(..) {
413 yield event;
414 }
415 }
416 })
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use std::{collections::BTreeSet, sync::Arc, task::Poll};
423
424 use futures::stream;
425 use tokio::sync::mpsc;
426 use tokio_stream::wrappers::ReceiverStream;
427 use vector_lib::config::{ComponentKey, LogNamespace};
428 use vrl::value::Kind;
429
430 use super::*;
431 use crate::{
432 event::{
433 Event, Metric,
434 metric::{MetricKind, MetricValue},
435 },
436 schema::Definition,
437 test_util::components::assert_transform_compliance,
438 transforms::test::create_topology,
439 };
440
441 #[test]
442 fn generate_config() {
443 crate::test_util::test_generate_config::<AggregateConfig>();
444 }
445
446 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
447 let mut event = Event::Metric(Metric::new(name, kind, value))
448 .with_source_id(Arc::new(ComponentKey::from("in")))
449 .with_upstream_id(Arc::new(OutputId::from("transform")));
450 event.metadata_mut().set_schema_definition(&Arc::new(
451 Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
452 ));
453
454 event.metadata_mut().set_source_type("unit_test_stream");
455
456 event
457 }
458
459 #[test]
460 fn incremental_auto() {
461 let mut agg = Aggregate::new(&AggregateConfig {
462 interval_ms: 1000_u64,
463 mode: AggregationMode::Auto,
464 })
465 .unwrap();
466
467 let counter_a_1 = make_metric(
468 "counter_a",
469 MetricKind::Incremental,
470 MetricValue::Counter { value: 42.0 },
471 );
472 let counter_a_2 = make_metric(
473 "counter_a",
474 MetricKind::Incremental,
475 MetricValue::Counter { value: 43.0 },
476 );
477 let counter_a_summed = make_metric(
478 "counter_a",
479 MetricKind::Incremental,
480 MetricValue::Counter { value: 85.0 },
481 );
482
483 agg.record(counter_a_1.clone());
485 let mut out = vec![];
486 agg.flush_into(&mut out);
488 assert_eq!(1, out.len());
489 assert_eq!(&counter_a_1, &out[0]);
490
491 out.clear();
493 agg.flush_into(&mut out);
494 assert_eq!(0, out.len());
495
496 out.clear();
498 agg.flush_into(&mut out);
499 assert_eq!(0, out.len());
500
501 agg.record(counter_a_1.clone());
503 agg.record(counter_a_2);
504 out.clear();
505 agg.flush_into(&mut out);
506 assert_eq!(1, out.len());
507 assert_eq!(&counter_a_summed, &out[0]);
508
509 let counter_b_1 = make_metric(
510 "counter_b",
511 MetricKind::Incremental,
512 MetricValue::Counter { value: 44.0 },
513 );
514 agg.record(counter_a_1.clone());
516 agg.record(counter_b_1.clone());
517 out.clear();
518 agg.flush_into(&mut out);
519 assert_eq!(2, out.len());
520 for event in out {
522 match event.as_metric().series().name.name.as_str() {
523 "counter_a" => assert_eq!(counter_a_1, event),
524 "counter_b" => assert_eq!(counter_b_1, event),
525 _ => panic!("Unexpected metric name in aggregate output"),
526 }
527 }
528 }
529
530 #[test]
531 fn absolute_auto() {
532 let mut agg = Aggregate::new(&AggregateConfig {
533 interval_ms: 1000_u64,
534 mode: AggregationMode::Auto,
535 })
536 .unwrap();
537
538 let gauge_a_1 = make_metric(
539 "gauge_a",
540 MetricKind::Absolute,
541 MetricValue::Gauge { value: 42.0 },
542 );
543 let gauge_a_2 = make_metric(
544 "gauge_a",
545 MetricKind::Absolute,
546 MetricValue::Gauge { value: 43.0 },
547 );
548
549 agg.record(gauge_a_1.clone());
551 let mut out = vec![];
552 agg.flush_into(&mut out);
554 assert_eq!(1, out.len());
555 assert_eq!(&gauge_a_1, &out[0]);
556
557 out.clear();
559 agg.flush_into(&mut out);
560 assert_eq!(0, out.len());
561
562 out.clear();
564 agg.flush_into(&mut out);
565 assert_eq!(0, out.len());
566
567 agg.record(gauge_a_1.clone());
569 agg.record(gauge_a_2.clone());
570 out.clear();
571 agg.flush_into(&mut out);
572 assert_eq!(1, out.len());
573 assert_eq!(&gauge_a_2, &out[0]);
574
575 let gauge_b_1 = make_metric(
576 "gauge_b",
577 MetricKind::Absolute,
578 MetricValue::Gauge { value: 44.0 },
579 );
580 agg.record(gauge_a_1.clone());
582 agg.record(gauge_b_1.clone());
583 out.clear();
584 agg.flush_into(&mut out);
585 assert_eq!(2, out.len());
586 for event in out {
588 match event.as_metric().series().name.name.as_str() {
589 "gauge_a" => assert_eq!(gauge_a_1, event),
590 "gauge_b" => assert_eq!(gauge_b_1, event),
591 _ => panic!("Unexpected metric name in aggregate output"),
592 }
593 }
594 }
595
596 #[test]
597 fn count_agg() {
598 let mut agg = Aggregate::new(&AggregateConfig {
599 interval_ms: 1000_u64,
600 mode: AggregationMode::Count,
601 })
602 .unwrap();
603
604 let gauge_a_1 = make_metric(
605 "gauge_a",
606 MetricKind::Absolute,
607 MetricValue::Gauge { value: 42.0 },
608 );
609 let gauge_a_2 = make_metric(
610 "gauge_a",
611 MetricKind::Absolute,
612 MetricValue::Gauge { value: 43.0 },
613 );
614 let result_count = make_metric(
615 "gauge_a",
616 MetricKind::Absolute,
617 MetricValue::Counter { value: 1.0 },
618 );
619 let result_count_2 = make_metric(
620 "gauge_a",
621 MetricKind::Absolute,
622 MetricValue::Counter { value: 2.0 },
623 );
624
625 agg.record(gauge_a_1.clone());
627 let mut out = vec![];
628 agg.flush_into(&mut out);
630 assert_eq!(1, out.len());
631 assert_eq!(&result_count, &out[0]);
632
633 out.clear();
635 agg.flush_into(&mut out);
636 assert_eq!(0, out.len());
637
638 out.clear();
640 agg.flush_into(&mut out);
641 assert_eq!(0, out.len());
642
643 agg.record(gauge_a_1.clone());
645 agg.record(gauge_a_2.clone());
646 out.clear();
647 agg.flush_into(&mut out);
648 assert_eq!(1, out.len());
649 assert_eq!(&result_count_2, &out[0]);
650 }
651
652 #[test]
653 fn absolute_max() {
654 let mut agg = Aggregate::new(&AggregateConfig {
655 interval_ms: 1000_u64,
656 mode: AggregationMode::Max,
657 })
658 .unwrap();
659
660 let gauge_a_1 = make_metric(
661 "gauge_a",
662 MetricKind::Absolute,
663 MetricValue::Gauge { value: 112.0 },
664 );
665 let gauge_a_2 = make_metric(
666 "gauge_a",
667 MetricKind::Absolute,
668 MetricValue::Gauge { value: 89.0 },
669 );
670
671 agg.record(gauge_a_2.clone());
673 let mut out = vec![];
674 agg.flush_into(&mut out);
676 assert_eq!(1, out.len());
677 assert_eq!(&gauge_a_2, &out[0]);
678
679 out.clear();
681 agg.flush_into(&mut out);
682 assert_eq!(0, out.len());
683
684 out.clear();
686 agg.flush_into(&mut out);
687 assert_eq!(0, out.len());
688
689 agg.record(gauge_a_1.clone());
691 agg.record(gauge_a_2.clone());
692 out.clear();
693 agg.flush_into(&mut out);
694 assert_eq!(1, out.len());
695 assert_eq!(&gauge_a_1, &out[0]);
696 }
697
698 #[test]
699 fn absolute_min() {
700 let mut agg = Aggregate::new(&AggregateConfig {
701 interval_ms: 1000_u64,
702 mode: AggregationMode::Min,
703 })
704 .unwrap();
705
706 let gauge_a_1 = make_metric(
707 "gauge_a",
708 MetricKind::Absolute,
709 MetricValue::Gauge { value: 32.0 },
710 );
711 let gauge_a_2 = make_metric(
712 "gauge_a",
713 MetricKind::Absolute,
714 MetricValue::Gauge { value: 89.0 },
715 );
716
717 agg.record(gauge_a_2.clone());
719 let mut out = vec![];
720 agg.flush_into(&mut out);
722 assert_eq!(1, out.len());
723 assert_eq!(&gauge_a_2, &out[0]);
724
725 out.clear();
727 agg.flush_into(&mut out);
728 assert_eq!(0, out.len());
729
730 out.clear();
732 agg.flush_into(&mut out);
733 assert_eq!(0, out.len());
734
735 agg.record(gauge_a_1.clone());
737 agg.record(gauge_a_2.clone());
738 out.clear();
739 agg.flush_into(&mut out);
740 assert_eq!(1, out.len());
741 assert_eq!(&gauge_a_1, &out[0]);
742 }
743
744 #[test]
745 fn absolute_diff() {
746 let mut agg = Aggregate::new(&AggregateConfig {
747 interval_ms: 1000_u64,
748 mode: AggregationMode::Diff,
749 })
750 .unwrap();
751
752 let gauge_a_1 = make_metric(
753 "gauge_a",
754 MetricKind::Absolute,
755 MetricValue::Gauge { value: 32.0 },
756 );
757 let gauge_a_2 = make_metric(
758 "gauge_a",
759 MetricKind::Absolute,
760 MetricValue::Gauge { value: 82.0 },
761 );
762 let result = make_metric(
763 "gauge_a",
764 MetricKind::Absolute,
765 MetricValue::Gauge { value: 50.0 },
766 );
767
768 agg.record(gauge_a_2.clone());
770 let mut out = vec![];
771 agg.flush_into(&mut out);
773 assert_eq!(1, out.len());
774 assert_eq!(&gauge_a_2, &out[0]);
775
776 out.clear();
778 agg.flush_into(&mut out);
779 assert_eq!(0, out.len());
780
781 out.clear();
783 agg.flush_into(&mut out);
784 assert_eq!(0, out.len());
785
786 agg.record(gauge_a_1.clone());
788 out.clear();
789 agg.flush_into(&mut out);
790 assert_eq!(1, out.len());
791 assert_eq!(&gauge_a_1, &out[0]);
792
793 agg.record(gauge_a_2.clone());
794 out.clear();
795 agg.flush_into(&mut out);
796 assert_eq!(1, out.len());
797 assert_eq!(&result, &out[0]);
798 }
799
800 #[test]
801 fn absolute_diff_conflicting_type() {
802 let mut agg = Aggregate::new(&AggregateConfig {
803 interval_ms: 1000_u64,
804 mode: AggregationMode::Diff,
805 })
806 .unwrap();
807
808 let gauge_a_1 = make_metric(
809 "gauge_a",
810 MetricKind::Absolute,
811 MetricValue::Gauge { value: 32.0 },
812 );
813 let gauge_a_2 = make_metric(
814 "gauge_a",
815 MetricKind::Absolute,
816 MetricValue::Counter { value: 1.0 },
817 );
818
819 let mut out = vec![];
820 agg.record(gauge_a_1.clone());
822 out.clear();
823 agg.flush_into(&mut out);
824 assert_eq!(1, out.len());
825 assert_eq!(&gauge_a_1, &out[0]);
826
827 agg.record(gauge_a_2.clone());
828 out.clear();
829 agg.flush_into(&mut out);
830 assert_eq!(1, out.len());
831 assert_eq!(&gauge_a_2, &out[0]);
833 }
834
835 #[test]
836 fn absolute_mean() {
837 let mut agg = Aggregate::new(&AggregateConfig {
838 interval_ms: 1000_u64,
839 mode: AggregationMode::Mean,
840 })
841 .unwrap();
842
843 let gauge_a_1 = make_metric(
844 "gauge_a",
845 MetricKind::Absolute,
846 MetricValue::Gauge { value: 32.0 },
847 );
848 let gauge_a_2 = make_metric(
849 "gauge_a",
850 MetricKind::Absolute,
851 MetricValue::Gauge { value: 82.0 },
852 );
853 let gauge_a_3 = make_metric(
854 "gauge_a",
855 MetricKind::Absolute,
856 MetricValue::Gauge { value: 51.0 },
857 );
858 let mean_result = make_metric(
859 "gauge_a",
860 MetricKind::Absolute,
861 MetricValue::Gauge { value: 55.0 },
862 );
863
864 agg.record(gauge_a_2.clone());
866 let mut out = vec![];
867 agg.flush_into(&mut out);
869 assert_eq!(1, out.len());
870 assert_eq!(&gauge_a_2, &out[0]);
871
872 out.clear();
874 agg.flush_into(&mut out);
875 assert_eq!(0, out.len());
876
877 out.clear();
879 agg.flush_into(&mut out);
880 assert_eq!(0, out.len());
881
882 agg.record(gauge_a_1.clone());
884 agg.record(gauge_a_2.clone());
885 agg.record(gauge_a_3.clone());
886 out.clear();
887 agg.flush_into(&mut out);
888 assert_eq!(1, out.len());
889 assert_eq!(&mean_result, &out[0]);
890 }
891
892 #[test]
893 fn absolute_stdev() {
894 let mut agg = Aggregate::new(&AggregateConfig {
895 interval_ms: 1000_u64,
896 mode: AggregationMode::Stdev,
897 })
898 .unwrap();
899
900 let gauges = vec![
901 make_metric(
902 "gauge_a",
903 MetricKind::Absolute,
904 MetricValue::Gauge { value: 25.0 },
905 ),
906 make_metric(
907 "gauge_a",
908 MetricKind::Absolute,
909 MetricValue::Gauge { value: 30.0 },
910 ),
911 make_metric(
912 "gauge_a",
913 MetricKind::Absolute,
914 MetricValue::Gauge { value: 35.0 },
915 ),
916 make_metric(
917 "gauge_a",
918 MetricKind::Absolute,
919 MetricValue::Gauge { value: 40.0 },
920 ),
921 make_metric(
922 "gauge_a",
923 MetricKind::Absolute,
924 MetricValue::Gauge { value: 45.0 },
925 ),
926 make_metric(
927 "gauge_a",
928 MetricKind::Absolute,
929 MetricValue::Gauge { value: 50.0 },
930 ),
931 make_metric(
932 "gauge_a",
933 MetricKind::Absolute,
934 MetricValue::Gauge { value: 55.0 },
935 ),
936 ];
937 let stdev_result = make_metric(
938 "gauge_a",
939 MetricKind::Absolute,
940 MetricValue::Gauge { value: 10.0 },
941 );
942
943 for gauge in gauges {
944 agg.record(gauge);
945 }
946 let mut out = vec![];
947 agg.flush_into(&mut out);
948 assert_eq!(1, out.len());
949 assert_eq!(&stdev_result, &out[0]);
950 }
951
952 #[test]
953 fn conflicting_value_type() {
954 let mut agg = Aggregate::new(&AggregateConfig {
955 interval_ms: 1000_u64,
956 mode: AggregationMode::Auto,
957 })
958 .unwrap();
959
960 let counter = make_metric(
961 "the-thing",
962 MetricKind::Incremental,
963 MetricValue::Counter { value: 42.0 },
964 );
965 let mut values = BTreeSet::<String>::new();
966 values.insert("a".into());
967 values.insert("b".into());
968 let set = make_metric(
969 "the-thing",
970 MetricKind::Incremental,
971 MetricValue::Set { values },
972 );
973 let summed = make_metric(
974 "the-thing",
975 MetricKind::Incremental,
976 MetricValue::Counter { value: 84.0 },
977 );
978
979 agg.record(counter.clone());
983 agg.record(counter.clone());
985 agg.record(set.clone());
987 agg.record(set.clone());
989 let mut out = vec![];
990 agg.flush_into(&mut out);
992 assert_eq!(1, out.len());
993 assert_eq!(&set, &out[0]);
994
995 agg.record(set.clone());
997 agg.record(set);
999 agg.record(counter.clone());
1001 agg.record(counter);
1003 let mut out = vec![];
1004 agg.flush_into(&mut out);
1006 assert_eq!(1, out.len());
1007 assert_eq!(&summed, &out[0]);
1008 }
1009
1010 #[test]
1011 fn conflicting_kinds() {
1012 let mut agg = Aggregate::new(&AggregateConfig {
1013 interval_ms: 1000_u64,
1014 mode: AggregationMode::Auto,
1015 })
1016 .unwrap();
1017
1018 let incremental = make_metric(
1019 "the-thing",
1020 MetricKind::Incremental,
1021 MetricValue::Counter { value: 42.0 },
1022 );
1023 let absolute = make_metric(
1024 "the-thing",
1025 MetricKind::Absolute,
1026 MetricValue::Counter { value: 43.0 },
1027 );
1028 let summed = make_metric(
1029 "the-thing",
1030 MetricKind::Incremental,
1031 MetricValue::Counter { value: 84.0 },
1032 );
1033
1034 agg.record(incremental.clone());
1038 agg.record(incremental.clone());
1040 agg.record(absolute.clone());
1042 agg.record(absolute.clone());
1044 let mut out = vec![];
1045 agg.flush_into(&mut out);
1047 assert_eq!(1, out.len());
1048 assert_eq!(&absolute, &out[0]);
1049
1050 agg.record(absolute.clone());
1052 agg.record(absolute);
1054 agg.record(incremental.clone());
1056 agg.record(incremental);
1058 let mut out = vec![];
1059 agg.flush_into(&mut out);
1061 assert_eq!(1, out.len());
1062 assert_eq!(&summed, &out[0]);
1063 }
1064
1065 #[tokio::test]
1066 async fn transform_shutdown() {
1067 let agg = toml::from_str::<AggregateConfig>(
1068 r"
1069interval_ms = 999999
1070",
1071 )
1072 .unwrap()
1073 .build(&TransformContext::default())
1074 .await
1075 .unwrap();
1076
1077 let agg = agg.into_task();
1078
1079 let counter_a_1 = make_metric(
1080 "counter_a",
1081 MetricKind::Incremental,
1082 MetricValue::Counter { value: 42.0 },
1083 );
1084 let counter_a_2 = make_metric(
1085 "counter_a",
1086 MetricKind::Incremental,
1087 MetricValue::Counter { value: 43.0 },
1088 );
1089 let counter_a_summed = make_metric(
1090 "counter_a",
1091 MetricKind::Incremental,
1092 MetricValue::Counter { value: 85.0 },
1093 );
1094 let gauge_a_1 = make_metric(
1095 "gauge_a",
1096 MetricKind::Absolute,
1097 MetricValue::Gauge { value: 42.0 },
1098 );
1099 let gauge_a_2 = make_metric(
1100 "gauge_a",
1101 MetricKind::Absolute,
1102 MetricValue::Gauge { value: 43.0 },
1103 );
1104 let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1105
1106 let in_stream = Box::pin(stream::iter(inputs));
1108 let mut out_stream = agg.transform_events(in_stream);
1110
1111 let mut count = 0_u8;
1115 while let Some(event) = out_stream.next().await {
1116 count += 1;
1117 match event.as_metric().series().name.name.as_str() {
1118 "counter_a" => assert_eq!(counter_a_summed, event),
1119 "gauge_a" => assert_eq!(gauge_a_2, event),
1120 _ => panic!("Unexpected metric name in aggregate output"),
1121 };
1122 }
1123 assert_eq!(2, count);
1125 }
1126
1127 #[tokio::test]
1128 async fn transform_interval() {
1129 let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1130
1131 let counter_a_1 = make_metric(
1132 "counter_a",
1133 MetricKind::Incremental,
1134 MetricValue::Counter { value: 42.0 },
1135 );
1136 let counter_a_2 = make_metric(
1137 "counter_a",
1138 MetricKind::Incremental,
1139 MetricValue::Counter { value: 43.0 },
1140 );
1141 let counter_a_summed = make_metric(
1142 "counter_a",
1143 MetricKind::Incremental,
1144 MetricValue::Counter { value: 85.0 },
1145 );
1146 let gauge_a_1 = make_metric(
1147 "gauge_a",
1148 MetricKind::Absolute,
1149 MetricValue::Gauge { value: 42.0 },
1150 );
1151 let gauge_a_2 = make_metric(
1152 "gauge_a",
1153 MetricKind::Absolute,
1154 MetricValue::Gauge { value: 43.0 },
1155 );
1156
1157 assert_transform_compliance(async {
1158 let (tx, rx) = mpsc::channel(10);
1159 let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1160 let mut out = ReceiverStream::new(out);
1161
1162 tokio::time::pause();
1163
1164 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1167
1168 tx.send(counter_a_1).await.unwrap();
1170 tx.send(counter_a_2).await.unwrap();
1171 tx.send(gauge_a_1).await.unwrap();
1172 tx.send(gauge_a_2.clone()).await.unwrap();
1173 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1175 tokio::time::advance(Duration::from_secs(11)).await;
1177 let mut count = 0_u8;
1180 while count < 2 {
1181 match out.next().await {
1182 Some(event) => {
1183 match event.as_metric().series().name.name.as_str() {
1184 "counter_a" => assert_eq!(counter_a_summed, event),
1185 "gauge_a" => assert_eq!(gauge_a_2, event),
1186 _ => panic!("Unexpected metric name in aggregate output"),
1187 };
1188 count += 1;
1189 }
1190 _ => {
1191 panic!("Unexpectedly received None in output stream");
1192 }
1193 }
1194 }
1195 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1197
1198 drop(tx);
1199 topology.stop().await;
1200 assert_eq!(out.next().await, None);
1201 })
1202 .await;
1203 }
1204}