1use std::{
2 collections::{hash_map::Entry, HashMap},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{config::LogNamespace, event::MetricValue};
10use vector_lib::{
11 configurable::configurable_component,
12 event::metric::{Metric, MetricData, MetricKind, MetricSeries},
13};
14
15use crate::{
16 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
17 event::{Event, EventMetadata},
18 internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
19 schema,
20 transforms::{TaskTransform, Transform},
21};
22
23#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
25#[derive(Clone, Debug, Default)]
26#[serde(deny_unknown_fields)]
27pub struct AggregateConfig {
28 #[serde(default = "default_interval_ms")]
32 #[configurable(metadata(docs::human_name = "Flush Interval"))]
33 pub interval_ms: u64,
34 #[serde(default = "default_mode")]
38 #[configurable(derived)]
39 pub mode: AggregationMode,
40}
41
42#[configurable_component]
43#[derive(Clone, Debug, Default)]
44#[configurable(description = "The aggregation mode to use.")]
45pub enum AggregationMode {
46 #[default]
48 Auto,
49
50 Sum,
52
53 Latest,
55
56 Count,
58
59 Diff,
61
62 Max,
64
65 Min,
67
68 Mean,
70
71 Stdev,
73}
74
75const fn default_mode() -> AggregationMode {
76 AggregationMode::Auto
77}
78
79const fn default_interval_ms() -> u64 {
80 10 * 1000
81}
82
83impl_generate_config_from_default!(AggregateConfig);
84
85#[async_trait::async_trait]
86#[typetag::serde(name = "aggregate")]
87impl TransformConfig for AggregateConfig {
88 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
89 Aggregate::new(self).map(Transform::event_task)
90 }
91
92 fn input(&self) -> Input {
93 Input::metric()
94 }
95
96 fn outputs(
97 &self,
98 _: vector_lib::enrichment::TableRegistry,
99 _: &[(OutputId, schema::Definition)],
100 _: LogNamespace,
101 ) -> Vec<TransformOutput> {
102 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
103 }
104}
105
106type MetricEntry = (MetricData, EventMetadata);
107
108#[derive(Debug)]
109pub struct Aggregate {
110 interval: Duration,
111 map: HashMap<MetricSeries, MetricEntry>,
112 prev_map: HashMap<MetricSeries, MetricEntry>,
113 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
114 mode: AggregationMode,
115}
116
117impl Aggregate {
118 pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
119 Ok(Self {
120 interval: Duration::from_millis(config.interval_ms),
121 map: Default::default(),
122 prev_map: Default::default(),
123 multi_map: Default::default(),
124 mode: config.mode.clone(),
125 })
126 }
127
128 fn record(&mut self, event: Event) {
129 let (series, data, metadata) = event.into_metric().into_parts();
130
131 match self.mode {
132 AggregationMode::Auto => match data.kind {
133 MetricKind::Incremental => self.record_sum(series, data, metadata),
134 MetricKind::Absolute => {
135 self.map.insert(series, (data, metadata));
136 }
137 },
138 AggregationMode::Sum => self.record_sum(series, data, metadata),
139 AggregationMode::Latest | AggregationMode::Diff => match data.kind {
140 MetricKind::Incremental => (),
141 MetricKind::Absolute => {
142 self.map.insert(series, (data, metadata));
143 }
144 },
145 AggregationMode::Count => self.record_count(series, data, metadata),
146 AggregationMode::Max | AggregationMode::Min => {
147 self.record_comparison(series, data, metadata)
148 }
149 AggregationMode::Mean | AggregationMode::Stdev => match data.kind {
150 MetricKind::Incremental => (),
151 MetricKind::Absolute => {
152 if matches!(data.value, MetricValue::Gauge { value: _ }) {
153 match self.multi_map.entry(series) {
154 Entry::Occupied(mut entry) => {
155 let existing = entry.get_mut();
156 existing.push((data, metadata));
157 }
158 Entry::Vacant(entry) => {
159 entry.insert(vec![(data, metadata)]);
160 }
161 }
162 }
163 }
164 },
165 }
166
167 emit!(AggregateEventRecorded);
168 }
169
170 fn record_count(
171 &mut self,
172 series: MetricSeries,
173 mut data: MetricData,
174 metadata: EventMetadata,
175 ) {
176 let mut count_data = data.clone();
177 let existing = self.map.entry(series).or_insert_with(|| {
178 *data.value_mut() = MetricValue::Counter { value: 0f64 };
179 (data.clone(), metadata.clone())
180 });
181 *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
182 if existing.0.kind == data.kind && existing.0.update(&count_data) {
183 existing.1.merge(metadata);
184 } else {
185 emit!(AggregateUpdateFailed);
186 }
187 }
188
189 fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
190 match data.kind {
191 MetricKind::Incremental => match self.map.entry(series) {
192 Entry::Occupied(mut entry) => {
193 let existing = entry.get_mut();
194 if existing.0.kind == data.kind && existing.0.update(&data) {
196 existing.1.merge(metadata);
197 } else {
198 emit!(AggregateUpdateFailed);
199 *existing = (data, metadata);
200 }
201 }
202 Entry::Vacant(entry) => {
203 entry.insert((data, metadata));
204 }
205 },
206 MetricKind::Absolute => {}
207 }
208 }
209
210 fn record_comparison(
211 &mut self,
212 series: MetricSeries,
213 data: MetricData,
214 metadata: EventMetadata,
215 ) {
216 match data.kind {
217 MetricKind::Incremental => (),
218 MetricKind::Absolute => match self.map.entry(series) {
219 Entry::Occupied(mut entry) => {
220 let existing = entry.get_mut();
221 if existing.0.kind == data.kind {
223 if let MetricValue::Gauge {
224 value: existing_value,
225 } = existing.0.value()
226 {
227 if let MetricValue::Gauge { value: new_value } = data.value() {
228 let should_update = match self.mode {
229 AggregationMode::Max => new_value > existing_value,
230 AggregationMode::Min => new_value < existing_value,
231 _ => false,
232 };
233 if should_update {
234 *existing = (data, metadata);
235 }
236 }
237 }
238 } else {
239 emit!(AggregateUpdateFailed);
240 *existing = (data, metadata);
241 }
242 }
243 Entry::Vacant(entry) => {
244 entry.insert((data, metadata));
245 }
246 },
247 }
248 }
249
250 fn flush_into(&mut self, output: &mut Vec<Event>) {
251 let map = std::mem::take(&mut self.map);
252 for (series, entry) in map.clone().into_iter() {
253 let mut metric = Metric::from_parts(series, entry.0, entry.1);
254 if matches!(self.mode, AggregationMode::Diff) {
255 if let Some(prev_entry) = self.prev_map.get(metric.series()) {
256 if metric.data().kind == prev_entry.0.kind && !metric.subtract(&prev_entry.0) {
257 emit!(AggregateUpdateFailed);
258 }
259 }
260 }
261 output.push(Event::Metric(metric));
262 }
263
264 let multi_map = std::mem::take(&mut self.multi_map);
265 'outer: for (series, entries) in multi_map.into_iter() {
266 if entries.is_empty() {
267 continue;
268 }
269
270 let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
271 for (data, metadata) in entries.iter().skip(1) {
272 if !final_sum.update(data) {
273 emit!(AggregateUpdateFailed);
275 continue 'outer;
276 }
277 final_metadata.merge(metadata.clone());
278 }
279
280 let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
281 *value /= entries.len() as f64;
283 *value
284 } else {
285 0.0
286 };
287
288 let final_mean = final_sum.clone();
289 match self.mode {
290 AggregationMode::Mean => {
291 let metric = Metric::from_parts(series, final_mean, final_metadata);
292 output.push(Event::Metric(metric));
293 }
294 AggregationMode::Stdev => {
295 let variance = entries
296 .iter()
297 .filter_map(|(data, _)| {
298 if let MetricValue::Gauge { value } = data.value() {
299 let diff = final_mean_value - value;
300 Some(diff * diff)
301 } else {
302 None
303 }
304 })
305 .sum::<f64>()
306 / entries.len() as f64;
307 let mut final_stdev = final_mean;
308 if let MetricValue::Gauge { value } = final_stdev.value_mut() {
309 *value = variance.sqrt()
310 }
311 let metric = Metric::from_parts(series, final_stdev, final_metadata);
312 output.push(Event::Metric(metric));
313 }
314 _ => (),
315 }
316 }
317
318 self.prev_map = map;
319 emit!(AggregateFlushed);
320 }
321}
322
323impl TaskTransform<Event> for Aggregate {
324 fn transform(
325 mut self: Box<Self>,
326 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
327 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
328 where
329 Self: 'static,
330 {
331 let mut flush_stream = tokio::time::interval(self.interval);
332
333 Box::pin(stream! {
334 let mut output = Vec::new();
335 let mut done = false;
336 while !done {
337 tokio::select! {
338 _ = flush_stream.tick() => {
339 self.flush_into(&mut output);
340 },
341 maybe_event = input_rx.next() => {
342 match maybe_event {
343 None => {
344 self.flush_into(&mut output);
345 done = true;
346 }
347 Some(event) => self.record(event),
348 }
349 }
350 };
351 for event in output.drain(..) {
352 yield event;
353 }
354 }
355 })
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use std::{collections::BTreeSet, sync::Arc, task::Poll};
362
363 use futures::stream;
364 use tokio::sync::mpsc;
365 use tokio_stream::wrappers::ReceiverStream;
366 use vector_lib::config::ComponentKey;
367 use vrl::value::Kind;
368
369 use super::*;
370 use crate::schema::Definition;
371 use crate::{
372 event::{
373 metric::{MetricKind, MetricValue},
374 Event, Metric,
375 },
376 test_util::components::assert_transform_compliance,
377 transforms::test::create_topology,
378 };
379
380 #[test]
381 fn generate_config() {
382 crate::test_util::test_generate_config::<AggregateConfig>();
383 }
384
385 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
386 let mut event = Event::Metric(Metric::new(name, kind, value))
387 .with_source_id(Arc::new(ComponentKey::from("in")))
388 .with_upstream_id(Arc::new(OutputId::from("transform")));
389 event.metadata_mut().set_schema_definition(&Arc::new(
390 Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
391 ));
392
393 event.metadata_mut().set_source_type("unit_test_stream");
394
395 event
396 }
397
398 #[test]
399 fn incremental_auto() {
400 let mut agg = Aggregate::new(&AggregateConfig {
401 interval_ms: 1000_u64,
402 mode: AggregationMode::Auto,
403 })
404 .unwrap();
405
406 let counter_a_1 = make_metric(
407 "counter_a",
408 MetricKind::Incremental,
409 MetricValue::Counter { value: 42.0 },
410 );
411 let counter_a_2 = make_metric(
412 "counter_a",
413 MetricKind::Incremental,
414 MetricValue::Counter { value: 43.0 },
415 );
416 let counter_a_summed = make_metric(
417 "counter_a",
418 MetricKind::Incremental,
419 MetricValue::Counter { value: 85.0 },
420 );
421
422 agg.record(counter_a_1.clone());
424 let mut out = vec![];
425 agg.flush_into(&mut out);
427 assert_eq!(1, out.len());
428 assert_eq!(&counter_a_1, &out[0]);
429
430 out.clear();
432 agg.flush_into(&mut out);
433 assert_eq!(0, out.len());
434
435 out.clear();
437 agg.flush_into(&mut out);
438 assert_eq!(0, out.len());
439
440 agg.record(counter_a_1.clone());
442 agg.record(counter_a_2);
443 out.clear();
444 agg.flush_into(&mut out);
445 assert_eq!(1, out.len());
446 assert_eq!(&counter_a_summed, &out[0]);
447
448 let counter_b_1 = make_metric(
449 "counter_b",
450 MetricKind::Incremental,
451 MetricValue::Counter { value: 44.0 },
452 );
453 agg.record(counter_a_1.clone());
455 agg.record(counter_b_1.clone());
456 out.clear();
457 agg.flush_into(&mut out);
458 assert_eq!(2, out.len());
459 for event in out {
461 match event.as_metric().series().name.name.as_str() {
462 "counter_a" => assert_eq!(counter_a_1, event),
463 "counter_b" => assert_eq!(counter_b_1, event),
464 _ => panic!("Unexpected metric name in aggregate output"),
465 }
466 }
467 }
468
469 #[test]
470 fn absolute_auto() {
471 let mut agg = Aggregate::new(&AggregateConfig {
472 interval_ms: 1000_u64,
473 mode: AggregationMode::Auto,
474 })
475 .unwrap();
476
477 let gauge_a_1 = make_metric(
478 "gauge_a",
479 MetricKind::Absolute,
480 MetricValue::Gauge { value: 42.0 },
481 );
482 let gauge_a_2 = make_metric(
483 "gauge_a",
484 MetricKind::Absolute,
485 MetricValue::Gauge { value: 43.0 },
486 );
487
488 agg.record(gauge_a_1.clone());
490 let mut out = vec![];
491 agg.flush_into(&mut out);
493 assert_eq!(1, out.len());
494 assert_eq!(&gauge_a_1, &out[0]);
495
496 out.clear();
498 agg.flush_into(&mut out);
499 assert_eq!(0, out.len());
500
501 out.clear();
503 agg.flush_into(&mut out);
504 assert_eq!(0, out.len());
505
506 agg.record(gauge_a_1.clone());
508 agg.record(gauge_a_2.clone());
509 out.clear();
510 agg.flush_into(&mut out);
511 assert_eq!(1, out.len());
512 assert_eq!(&gauge_a_2, &out[0]);
513
514 let gauge_b_1 = make_metric(
515 "gauge_b",
516 MetricKind::Absolute,
517 MetricValue::Gauge { value: 44.0 },
518 );
519 agg.record(gauge_a_1.clone());
521 agg.record(gauge_b_1.clone());
522 out.clear();
523 agg.flush_into(&mut out);
524 assert_eq!(2, out.len());
525 for event in out {
527 match event.as_metric().series().name.name.as_str() {
528 "gauge_a" => assert_eq!(gauge_a_1, event),
529 "gauge_b" => assert_eq!(gauge_b_1, event),
530 _ => panic!("Unexpected metric name in aggregate output"),
531 }
532 }
533 }
534
535 #[test]
536 fn count_agg() {
537 let mut agg = Aggregate::new(&AggregateConfig {
538 interval_ms: 1000_u64,
539 mode: AggregationMode::Count,
540 })
541 .unwrap();
542
543 let gauge_a_1 = make_metric(
544 "gauge_a",
545 MetricKind::Absolute,
546 MetricValue::Gauge { value: 42.0 },
547 );
548 let gauge_a_2 = make_metric(
549 "gauge_a",
550 MetricKind::Absolute,
551 MetricValue::Gauge { value: 43.0 },
552 );
553 let result_count = make_metric(
554 "gauge_a",
555 MetricKind::Absolute,
556 MetricValue::Counter { value: 1.0 },
557 );
558 let result_count_2 = make_metric(
559 "gauge_a",
560 MetricKind::Absolute,
561 MetricValue::Counter { value: 2.0 },
562 );
563
564 agg.record(gauge_a_1.clone());
566 let mut out = vec![];
567 agg.flush_into(&mut out);
569 assert_eq!(1, out.len());
570 assert_eq!(&result_count, &out[0]);
571
572 out.clear();
574 agg.flush_into(&mut out);
575 assert_eq!(0, out.len());
576
577 out.clear();
579 agg.flush_into(&mut out);
580 assert_eq!(0, out.len());
581
582 agg.record(gauge_a_1.clone());
584 agg.record(gauge_a_2.clone());
585 out.clear();
586 agg.flush_into(&mut out);
587 assert_eq!(1, out.len());
588 assert_eq!(&result_count_2, &out[0]);
589 }
590
591 #[test]
592 fn absolute_max() {
593 let mut agg = Aggregate::new(&AggregateConfig {
594 interval_ms: 1000_u64,
595 mode: AggregationMode::Max,
596 })
597 .unwrap();
598
599 let gauge_a_1 = make_metric(
600 "gauge_a",
601 MetricKind::Absolute,
602 MetricValue::Gauge { value: 112.0 },
603 );
604 let gauge_a_2 = make_metric(
605 "gauge_a",
606 MetricKind::Absolute,
607 MetricValue::Gauge { value: 89.0 },
608 );
609
610 agg.record(gauge_a_2.clone());
612 let mut out = vec![];
613 agg.flush_into(&mut out);
615 assert_eq!(1, out.len());
616 assert_eq!(&gauge_a_2, &out[0]);
617
618 out.clear();
620 agg.flush_into(&mut out);
621 assert_eq!(0, out.len());
622
623 out.clear();
625 agg.flush_into(&mut out);
626 assert_eq!(0, out.len());
627
628 agg.record(gauge_a_1.clone());
630 agg.record(gauge_a_2.clone());
631 out.clear();
632 agg.flush_into(&mut out);
633 assert_eq!(1, out.len());
634 assert_eq!(&gauge_a_1, &out[0]);
635 }
636
637 #[test]
638 fn absolute_min() {
639 let mut agg = Aggregate::new(&AggregateConfig {
640 interval_ms: 1000_u64,
641 mode: AggregationMode::Min,
642 })
643 .unwrap();
644
645 let gauge_a_1 = make_metric(
646 "gauge_a",
647 MetricKind::Absolute,
648 MetricValue::Gauge { value: 32.0 },
649 );
650 let gauge_a_2 = make_metric(
651 "gauge_a",
652 MetricKind::Absolute,
653 MetricValue::Gauge { value: 89.0 },
654 );
655
656 agg.record(gauge_a_2.clone());
658 let mut out = vec![];
659 agg.flush_into(&mut out);
661 assert_eq!(1, out.len());
662 assert_eq!(&gauge_a_2, &out[0]);
663
664 out.clear();
666 agg.flush_into(&mut out);
667 assert_eq!(0, out.len());
668
669 out.clear();
671 agg.flush_into(&mut out);
672 assert_eq!(0, out.len());
673
674 agg.record(gauge_a_1.clone());
676 agg.record(gauge_a_2.clone());
677 out.clear();
678 agg.flush_into(&mut out);
679 assert_eq!(1, out.len());
680 assert_eq!(&gauge_a_1, &out[0]);
681 }
682
683 #[test]
684 fn absolute_diff() {
685 let mut agg = Aggregate::new(&AggregateConfig {
686 interval_ms: 1000_u64,
687 mode: AggregationMode::Diff,
688 })
689 .unwrap();
690
691 let gauge_a_1 = make_metric(
692 "gauge_a",
693 MetricKind::Absolute,
694 MetricValue::Gauge { value: 32.0 },
695 );
696 let gauge_a_2 = make_metric(
697 "gauge_a",
698 MetricKind::Absolute,
699 MetricValue::Gauge { value: 82.0 },
700 );
701 let result = make_metric(
702 "gauge_a",
703 MetricKind::Absolute,
704 MetricValue::Gauge { value: 50.0 },
705 );
706
707 agg.record(gauge_a_2.clone());
709 let mut out = vec![];
710 agg.flush_into(&mut out);
712 assert_eq!(1, out.len());
713 assert_eq!(&gauge_a_2, &out[0]);
714
715 out.clear();
717 agg.flush_into(&mut out);
718 assert_eq!(0, out.len());
719
720 out.clear();
722 agg.flush_into(&mut out);
723 assert_eq!(0, out.len());
724
725 agg.record(gauge_a_1.clone());
727 out.clear();
728 agg.flush_into(&mut out);
729 assert_eq!(1, out.len());
730 assert_eq!(&gauge_a_1, &out[0]);
731
732 agg.record(gauge_a_2.clone());
733 out.clear();
734 agg.flush_into(&mut out);
735 assert_eq!(1, out.len());
736 assert_eq!(&result, &out[0]);
737 }
738
739 #[test]
740 fn absolute_diff_conflicting_type() {
741 let mut agg = Aggregate::new(&AggregateConfig {
742 interval_ms: 1000_u64,
743 mode: AggregationMode::Diff,
744 })
745 .unwrap();
746
747 let gauge_a_1 = make_metric(
748 "gauge_a",
749 MetricKind::Absolute,
750 MetricValue::Gauge { value: 32.0 },
751 );
752 let gauge_a_2 = make_metric(
753 "gauge_a",
754 MetricKind::Absolute,
755 MetricValue::Counter { value: 1.0 },
756 );
757
758 let mut out = vec![];
759 agg.record(gauge_a_1.clone());
761 out.clear();
762 agg.flush_into(&mut out);
763 assert_eq!(1, out.len());
764 assert_eq!(&gauge_a_1, &out[0]);
765
766 agg.record(gauge_a_2.clone());
767 out.clear();
768 agg.flush_into(&mut out);
769 assert_eq!(1, out.len());
770 assert_eq!(&gauge_a_2, &out[0]);
772 }
773
774 #[test]
775 fn absolute_mean() {
776 let mut agg = Aggregate::new(&AggregateConfig {
777 interval_ms: 1000_u64,
778 mode: AggregationMode::Mean,
779 })
780 .unwrap();
781
782 let gauge_a_1 = make_metric(
783 "gauge_a",
784 MetricKind::Absolute,
785 MetricValue::Gauge { value: 32.0 },
786 );
787 let gauge_a_2 = make_metric(
788 "gauge_a",
789 MetricKind::Absolute,
790 MetricValue::Gauge { value: 82.0 },
791 );
792 let gauge_a_3 = make_metric(
793 "gauge_a",
794 MetricKind::Absolute,
795 MetricValue::Gauge { value: 51.0 },
796 );
797 let mean_result = make_metric(
798 "gauge_a",
799 MetricKind::Absolute,
800 MetricValue::Gauge { value: 55.0 },
801 );
802
803 agg.record(gauge_a_2.clone());
805 let mut out = vec![];
806 agg.flush_into(&mut out);
808 assert_eq!(1, out.len());
809 assert_eq!(&gauge_a_2, &out[0]);
810
811 out.clear();
813 agg.flush_into(&mut out);
814 assert_eq!(0, out.len());
815
816 out.clear();
818 agg.flush_into(&mut out);
819 assert_eq!(0, out.len());
820
821 agg.record(gauge_a_1.clone());
823 agg.record(gauge_a_2.clone());
824 agg.record(gauge_a_3.clone());
825 out.clear();
826 agg.flush_into(&mut out);
827 assert_eq!(1, out.len());
828 assert_eq!(&mean_result, &out[0]);
829 }
830
831 #[test]
832 fn absolute_stdev() {
833 let mut agg = Aggregate::new(&AggregateConfig {
834 interval_ms: 1000_u64,
835 mode: AggregationMode::Stdev,
836 })
837 .unwrap();
838
839 let gauges = vec![
840 make_metric(
841 "gauge_a",
842 MetricKind::Absolute,
843 MetricValue::Gauge { value: 25.0 },
844 ),
845 make_metric(
846 "gauge_a",
847 MetricKind::Absolute,
848 MetricValue::Gauge { value: 30.0 },
849 ),
850 make_metric(
851 "gauge_a",
852 MetricKind::Absolute,
853 MetricValue::Gauge { value: 35.0 },
854 ),
855 make_metric(
856 "gauge_a",
857 MetricKind::Absolute,
858 MetricValue::Gauge { value: 40.0 },
859 ),
860 make_metric(
861 "gauge_a",
862 MetricKind::Absolute,
863 MetricValue::Gauge { value: 45.0 },
864 ),
865 make_metric(
866 "gauge_a",
867 MetricKind::Absolute,
868 MetricValue::Gauge { value: 50.0 },
869 ),
870 make_metric(
871 "gauge_a",
872 MetricKind::Absolute,
873 MetricValue::Gauge { value: 55.0 },
874 ),
875 ];
876 let stdev_result = make_metric(
877 "gauge_a",
878 MetricKind::Absolute,
879 MetricValue::Gauge { value: 10.0 },
880 );
881
882 for gauge in gauges {
883 agg.record(gauge);
884 }
885 let mut out = vec![];
886 agg.flush_into(&mut out);
887 assert_eq!(1, out.len());
888 assert_eq!(&stdev_result, &out[0]);
889 }
890
891 #[test]
892 fn conflicting_value_type() {
893 let mut agg = Aggregate::new(&AggregateConfig {
894 interval_ms: 1000_u64,
895 mode: AggregationMode::Auto,
896 })
897 .unwrap();
898
899 let counter = make_metric(
900 "the-thing",
901 MetricKind::Incremental,
902 MetricValue::Counter { value: 42.0 },
903 );
904 let mut values = BTreeSet::<String>::new();
905 values.insert("a".into());
906 values.insert("b".into());
907 let set = make_metric(
908 "the-thing",
909 MetricKind::Incremental,
910 MetricValue::Set { values },
911 );
912 let summed = make_metric(
913 "the-thing",
914 MetricKind::Incremental,
915 MetricValue::Counter { value: 84.0 },
916 );
917
918 agg.record(counter.clone());
922 agg.record(counter.clone());
924 agg.record(set.clone());
926 agg.record(set.clone());
928 let mut out = vec![];
929 agg.flush_into(&mut out);
931 assert_eq!(1, out.len());
932 assert_eq!(&set, &out[0]);
933
934 agg.record(set.clone());
936 agg.record(set);
938 agg.record(counter.clone());
940 agg.record(counter);
942 let mut out = vec![];
943 agg.flush_into(&mut out);
945 assert_eq!(1, out.len());
946 assert_eq!(&summed, &out[0]);
947 }
948
949 #[test]
950 fn conflicting_kinds() {
951 let mut agg = Aggregate::new(&AggregateConfig {
952 interval_ms: 1000_u64,
953 mode: AggregationMode::Auto,
954 })
955 .unwrap();
956
957 let incremental = make_metric(
958 "the-thing",
959 MetricKind::Incremental,
960 MetricValue::Counter { value: 42.0 },
961 );
962 let absolute = make_metric(
963 "the-thing",
964 MetricKind::Absolute,
965 MetricValue::Counter { value: 43.0 },
966 );
967 let summed = make_metric(
968 "the-thing",
969 MetricKind::Incremental,
970 MetricValue::Counter { value: 84.0 },
971 );
972
973 agg.record(incremental.clone());
977 agg.record(incremental.clone());
979 agg.record(absolute.clone());
981 agg.record(absolute.clone());
983 let mut out = vec![];
984 agg.flush_into(&mut out);
986 assert_eq!(1, out.len());
987 assert_eq!(&absolute, &out[0]);
988
989 agg.record(absolute.clone());
991 agg.record(absolute);
993 agg.record(incremental.clone());
995 agg.record(incremental);
997 let mut out = vec![];
998 agg.flush_into(&mut out);
1000 assert_eq!(1, out.len());
1001 assert_eq!(&summed, &out[0]);
1002 }
1003
1004 #[tokio::test]
1005 async fn transform_shutdown() {
1006 let agg = toml::from_str::<AggregateConfig>(
1007 r"
1008interval_ms = 999999
1009",
1010 )
1011 .unwrap()
1012 .build(&TransformContext::default())
1013 .await
1014 .unwrap();
1015
1016 let agg = agg.into_task();
1017
1018 let counter_a_1 = make_metric(
1019 "counter_a",
1020 MetricKind::Incremental,
1021 MetricValue::Counter { value: 42.0 },
1022 );
1023 let counter_a_2 = make_metric(
1024 "counter_a",
1025 MetricKind::Incremental,
1026 MetricValue::Counter { value: 43.0 },
1027 );
1028 let counter_a_summed = make_metric(
1029 "counter_a",
1030 MetricKind::Incremental,
1031 MetricValue::Counter { value: 85.0 },
1032 );
1033 let gauge_a_1 = make_metric(
1034 "gauge_a",
1035 MetricKind::Absolute,
1036 MetricValue::Gauge { value: 42.0 },
1037 );
1038 let gauge_a_2 = make_metric(
1039 "gauge_a",
1040 MetricKind::Absolute,
1041 MetricValue::Gauge { value: 43.0 },
1042 );
1043 let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1044
1045 let in_stream = Box::pin(stream::iter(inputs));
1047 let mut out_stream = agg.transform_events(in_stream);
1049
1050 let mut count = 0_u8;
1054 while let Some(event) = out_stream.next().await {
1055 count += 1;
1056 match event.as_metric().series().name.name.as_str() {
1057 "counter_a" => assert_eq!(counter_a_summed, event),
1058 "gauge_a" => assert_eq!(gauge_a_2, event),
1059 _ => panic!("Unexpected metric name in aggregate output"),
1060 };
1061 }
1062 assert_eq!(2, count);
1064 }
1065
1066 #[tokio::test]
1067 async fn transform_interval() {
1068 let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1069
1070 let counter_a_1 = make_metric(
1071 "counter_a",
1072 MetricKind::Incremental,
1073 MetricValue::Counter { value: 42.0 },
1074 );
1075 let counter_a_2 = make_metric(
1076 "counter_a",
1077 MetricKind::Incremental,
1078 MetricValue::Counter { value: 43.0 },
1079 );
1080 let counter_a_summed = make_metric(
1081 "counter_a",
1082 MetricKind::Incremental,
1083 MetricValue::Counter { value: 85.0 },
1084 );
1085 let gauge_a_1 = make_metric(
1086 "gauge_a",
1087 MetricKind::Absolute,
1088 MetricValue::Gauge { value: 42.0 },
1089 );
1090 let gauge_a_2 = make_metric(
1091 "gauge_a",
1092 MetricKind::Absolute,
1093 MetricValue::Gauge { value: 43.0 },
1094 );
1095
1096 assert_transform_compliance(async {
1097 let (tx, rx) = mpsc::channel(10);
1098 let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1099 let mut out = ReceiverStream::new(out);
1100
1101 tokio::time::pause();
1102
1103 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1106
1107 tx.send(counter_a_1).await.unwrap();
1109 tx.send(counter_a_2).await.unwrap();
1110 tx.send(gauge_a_1).await.unwrap();
1111 tx.send(gauge_a_2.clone()).await.unwrap();
1112 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1114 tokio::time::advance(Duration::from_secs(11)).await;
1116 let mut count = 0_u8;
1119 while count < 2 {
1120 match out.next().await {
1121 Some(event) => {
1122 match event.as_metric().series().name.name.as_str() {
1123 "counter_a" => assert_eq!(counter_a_summed, event),
1124 "gauge_a" => assert_eq!(gauge_a_2, event),
1125 _ => panic!("Unexpected metric name in aggregate output"),
1126 };
1127 count += 1;
1128 }
1129 _ => {
1130 panic!("Unexpectedly received None in output stream");
1131 }
1132 }
1133 }
1134 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1136
1137 drop(tx);
1138 topology.stop().await;
1139 assert_eq!(out.next().await, None);
1140 })
1141 .await;
1142 }
1143}