1use std::{
2 collections::{HashMap, hash_map::Entry},
3 pin::Pin,
4 time::Duration,
5};
6
7use async_stream::stream;
8use futures::{Stream, StreamExt};
9use vector_lib::{
10 config::LogNamespace,
11 configurable::configurable_component,
12 event::{
13 MetricValue,
14 metric::{Metric, MetricData, MetricKind, MetricSeries},
15 },
16};
17
18use crate::{
19 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
20 event::{Event, EventMetadata},
21 internal_events::{AggregateEventRecorded, AggregateFlushed, AggregateUpdateFailed},
22 schema,
23 transforms::{TaskTransform, Transform},
24};
25
26#[configurable_component(transform("aggregate", "Aggregate metrics passing through a topology."))]
28#[derive(Clone, Debug, Default)]
29#[serde(deny_unknown_fields)]
30pub struct AggregateConfig {
31 #[serde(default = "default_interval_ms")]
35 #[configurable(metadata(docs::human_name = "Flush Interval"))]
36 pub interval_ms: u64,
37 #[serde(default = "default_mode")]
41 #[configurable(derived)]
42 pub mode: AggregationMode,
43}
44
45#[configurable_component]
46#[derive(Clone, Debug, Default)]
47#[configurable(description = "The aggregation mode to use.")]
48pub enum AggregationMode {
49 #[default]
51 Auto,
52
53 Sum,
55
56 Latest,
58
59 Count,
61
62 Diff,
64
65 Max,
67
68 Min,
70
71 Mean,
73
74 Stdev,
76}
77
78const fn default_mode() -> AggregationMode {
79 AggregationMode::Auto
80}
81
82const fn default_interval_ms() -> u64 {
83 10 * 1000
84}
85
86impl_generate_config_from_default!(AggregateConfig);
87
88#[async_trait::async_trait]
89#[typetag::serde(name = "aggregate")]
90impl TransformConfig for AggregateConfig {
91 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
92 Aggregate::new(self).map(Transform::event_task)
93 }
94
95 fn input(&self) -> Input {
96 Input::metric()
97 }
98
99 fn outputs(
100 &self,
101 _: vector_lib::enrichment::TableRegistry,
102 _: &[(OutputId, schema::Definition)],
103 _: LogNamespace,
104 ) -> Vec<TransformOutput> {
105 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
106 }
107}
108
109type MetricEntry = (MetricData, EventMetadata);
110
111#[derive(Debug)]
112pub struct Aggregate {
113 interval: Duration,
114 map: HashMap<MetricSeries, MetricEntry>,
115 prev_map: HashMap<MetricSeries, MetricEntry>,
116 multi_map: HashMap<MetricSeries, Vec<MetricEntry>>,
117 mode: AggregationMode,
118}
119
120impl Aggregate {
121 pub fn new(config: &AggregateConfig) -> crate::Result<Self> {
122 Ok(Self {
123 interval: Duration::from_millis(config.interval_ms),
124 map: Default::default(),
125 prev_map: Default::default(),
126 multi_map: Default::default(),
127 mode: config.mode.clone(),
128 })
129 }
130
131 fn record(&mut self, event: Event) {
132 let (series, data, metadata) = event.into_metric().into_parts();
133
134 match self.mode {
135 AggregationMode::Auto => match data.kind {
136 MetricKind::Incremental => self.record_sum(series, data, metadata),
137 MetricKind::Absolute => {
138 self.map.insert(series, (data, metadata));
139 }
140 },
141 AggregationMode::Sum => self.record_sum(series, data, metadata),
142 AggregationMode::Latest | AggregationMode::Diff => match data.kind {
143 MetricKind::Incremental => (),
144 MetricKind::Absolute => {
145 self.map.insert(series, (data, metadata));
146 }
147 },
148 AggregationMode::Count => self.record_count(series, data, metadata),
149 AggregationMode::Max | AggregationMode::Min => {
150 self.record_comparison(series, data, metadata)
151 }
152 AggregationMode::Mean | AggregationMode::Stdev => match data.kind {
153 MetricKind::Incremental => (),
154 MetricKind::Absolute => {
155 if matches!(data.value, MetricValue::Gauge { value: _ }) {
156 match self.multi_map.entry(series) {
157 Entry::Occupied(mut entry) => {
158 let existing = entry.get_mut();
159 existing.push((data, metadata));
160 }
161 Entry::Vacant(entry) => {
162 entry.insert(vec![(data, metadata)]);
163 }
164 }
165 }
166 }
167 },
168 }
169
170 emit!(AggregateEventRecorded);
171 }
172
173 fn record_count(
174 &mut self,
175 series: MetricSeries,
176 mut data: MetricData,
177 metadata: EventMetadata,
178 ) {
179 let mut count_data = data.clone();
180 let existing = self.map.entry(series).or_insert_with(|| {
181 *data.value_mut() = MetricValue::Counter { value: 0f64 };
182 (data.clone(), metadata.clone())
183 });
184 *count_data.value_mut() = MetricValue::Counter { value: 1f64 };
185 if existing.0.kind == data.kind && existing.0.update(&count_data) {
186 existing.1.merge(metadata);
187 } else {
188 emit!(AggregateUpdateFailed);
189 }
190 }
191
192 fn record_sum(&mut self, series: MetricSeries, data: MetricData, metadata: EventMetadata) {
193 match data.kind {
194 MetricKind::Incremental => match self.map.entry(series) {
195 Entry::Occupied(mut entry) => {
196 let existing = entry.get_mut();
197 if existing.0.kind == data.kind && existing.0.update(&data) {
199 existing.1.merge(metadata);
200 } else {
201 emit!(AggregateUpdateFailed);
202 *existing = (data, metadata);
203 }
204 }
205 Entry::Vacant(entry) => {
206 entry.insert((data, metadata));
207 }
208 },
209 MetricKind::Absolute => {}
210 }
211 }
212
213 fn record_comparison(
214 &mut self,
215 series: MetricSeries,
216 data: MetricData,
217 metadata: EventMetadata,
218 ) {
219 match data.kind {
220 MetricKind::Incremental => (),
221 MetricKind::Absolute => match self.map.entry(series) {
222 Entry::Occupied(mut entry) => {
223 let existing = entry.get_mut();
224 if existing.0.kind == data.kind {
226 if let MetricValue::Gauge {
227 value: existing_value,
228 } = existing.0.value()
229 && let MetricValue::Gauge { value: new_value } = data.value()
230 {
231 let should_update = match self.mode {
232 AggregationMode::Max => new_value > existing_value,
233 AggregationMode::Min => new_value < existing_value,
234 _ => false,
235 };
236 if should_update {
237 *existing = (data, metadata);
238 }
239 }
240 } else {
241 emit!(AggregateUpdateFailed);
242 *existing = (data, metadata);
243 }
244 }
245 Entry::Vacant(entry) => {
246 entry.insert((data, metadata));
247 }
248 },
249 }
250 }
251
252 fn flush_into(&mut self, output: &mut Vec<Event>) {
253 let map = std::mem::take(&mut self.map);
254 for (series, entry) in map.clone().into_iter() {
255 let mut metric = Metric::from_parts(series, entry.0, entry.1);
256 if matches!(self.mode, AggregationMode::Diff)
257 && let Some(prev_entry) = self.prev_map.get(metric.series())
258 && metric.data().kind == prev_entry.0.kind
259 && !metric.subtract(&prev_entry.0)
260 {
261 emit!(AggregateUpdateFailed);
262 }
263 output.push(Event::Metric(metric));
264 }
265
266 let multi_map = std::mem::take(&mut self.multi_map);
267 'outer: for (series, entries) in multi_map.into_iter() {
268 if entries.is_empty() {
269 continue;
270 }
271
272 let (mut final_sum, mut final_metadata) = entries.first().unwrap().clone();
273 for (data, metadata) in entries.iter().skip(1) {
274 if !final_sum.update(data) {
275 emit!(AggregateUpdateFailed);
277 continue 'outer;
278 }
279 final_metadata.merge(metadata.clone());
280 }
281
282 let final_mean_value = if let MetricValue::Gauge { value } = final_sum.value_mut() {
283 *value /= entries.len() as f64;
285 *value
286 } else {
287 0.0
288 };
289
290 let final_mean = final_sum.clone();
291 match self.mode {
292 AggregationMode::Mean => {
293 let metric = Metric::from_parts(series, final_mean, final_metadata);
294 output.push(Event::Metric(metric));
295 }
296 AggregationMode::Stdev => {
297 let variance = entries
298 .iter()
299 .filter_map(|(data, _)| {
300 if let MetricValue::Gauge { value } = data.value() {
301 let diff = final_mean_value - value;
302 Some(diff * diff)
303 } else {
304 None
305 }
306 })
307 .sum::<f64>()
308 / entries.len() as f64;
309 let mut final_stdev = final_mean;
310 if let MetricValue::Gauge { value } = final_stdev.value_mut() {
311 *value = variance.sqrt()
312 }
313 let metric = Metric::from_parts(series, final_stdev, final_metadata);
314 output.push(Event::Metric(metric));
315 }
316 _ => (),
317 }
318 }
319
320 self.prev_map = map;
321 emit!(AggregateFlushed);
322 }
323}
324
325impl TaskTransform<Event> for Aggregate {
326 fn transform(
327 mut self: Box<Self>,
328 mut input_rx: Pin<Box<dyn Stream<Item = Event> + Send>>,
329 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
330 where
331 Self: 'static,
332 {
333 let mut flush_stream = tokio::time::interval(self.interval);
334
335 Box::pin(stream! {
336 let mut output = Vec::new();
337 let mut done = false;
338 while !done {
339 tokio::select! {
340 _ = flush_stream.tick() => {
341 self.flush_into(&mut output);
342 },
343 maybe_event = input_rx.next() => {
344 match maybe_event {
345 None => {
346 self.flush_into(&mut output);
347 done = true;
348 }
349 Some(event) => self.record(event),
350 }
351 }
352 };
353 for event in output.drain(..) {
354 yield event;
355 }
356 }
357 })
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use std::{collections::BTreeSet, sync::Arc, task::Poll};
364
365 use futures::stream;
366 use tokio::sync::mpsc;
367 use tokio_stream::wrappers::ReceiverStream;
368 use vector_lib::config::ComponentKey;
369 use vrl::value::Kind;
370
371 use super::*;
372 use crate::{
373 event::{
374 Event, Metric,
375 metric::{MetricKind, MetricValue},
376 },
377 schema::Definition,
378 test_util::components::assert_transform_compliance,
379 transforms::test::create_topology,
380 };
381
382 #[test]
383 fn generate_config() {
384 crate::test_util::test_generate_config::<AggregateConfig>();
385 }
386
387 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
388 let mut event = Event::Metric(Metric::new(name, kind, value))
389 .with_source_id(Arc::new(ComponentKey::from("in")))
390 .with_upstream_id(Arc::new(OutputId::from("transform")));
391 event.metadata_mut().set_schema_definition(&Arc::new(
392 Definition::new_with_default_metadata(Kind::any_object(), [LogNamespace::Legacy]),
393 ));
394
395 event.metadata_mut().set_source_type("unit_test_stream");
396
397 event
398 }
399
400 #[test]
401 fn incremental_auto() {
402 let mut agg = Aggregate::new(&AggregateConfig {
403 interval_ms: 1000_u64,
404 mode: AggregationMode::Auto,
405 })
406 .unwrap();
407
408 let counter_a_1 = make_metric(
409 "counter_a",
410 MetricKind::Incremental,
411 MetricValue::Counter { value: 42.0 },
412 );
413 let counter_a_2 = make_metric(
414 "counter_a",
415 MetricKind::Incremental,
416 MetricValue::Counter { value: 43.0 },
417 );
418 let counter_a_summed = make_metric(
419 "counter_a",
420 MetricKind::Incremental,
421 MetricValue::Counter { value: 85.0 },
422 );
423
424 agg.record(counter_a_1.clone());
426 let mut out = vec![];
427 agg.flush_into(&mut out);
429 assert_eq!(1, out.len());
430 assert_eq!(&counter_a_1, &out[0]);
431
432 out.clear();
434 agg.flush_into(&mut out);
435 assert_eq!(0, out.len());
436
437 out.clear();
439 agg.flush_into(&mut out);
440 assert_eq!(0, out.len());
441
442 agg.record(counter_a_1.clone());
444 agg.record(counter_a_2);
445 out.clear();
446 agg.flush_into(&mut out);
447 assert_eq!(1, out.len());
448 assert_eq!(&counter_a_summed, &out[0]);
449
450 let counter_b_1 = make_metric(
451 "counter_b",
452 MetricKind::Incremental,
453 MetricValue::Counter { value: 44.0 },
454 );
455 agg.record(counter_a_1.clone());
457 agg.record(counter_b_1.clone());
458 out.clear();
459 agg.flush_into(&mut out);
460 assert_eq!(2, out.len());
461 for event in out {
463 match event.as_metric().series().name.name.as_str() {
464 "counter_a" => assert_eq!(counter_a_1, event),
465 "counter_b" => assert_eq!(counter_b_1, event),
466 _ => panic!("Unexpected metric name in aggregate output"),
467 }
468 }
469 }
470
471 #[test]
472 fn absolute_auto() {
473 let mut agg = Aggregate::new(&AggregateConfig {
474 interval_ms: 1000_u64,
475 mode: AggregationMode::Auto,
476 })
477 .unwrap();
478
479 let gauge_a_1 = make_metric(
480 "gauge_a",
481 MetricKind::Absolute,
482 MetricValue::Gauge { value: 42.0 },
483 );
484 let gauge_a_2 = make_metric(
485 "gauge_a",
486 MetricKind::Absolute,
487 MetricValue::Gauge { value: 43.0 },
488 );
489
490 agg.record(gauge_a_1.clone());
492 let mut out = vec![];
493 agg.flush_into(&mut out);
495 assert_eq!(1, out.len());
496 assert_eq!(&gauge_a_1, &out[0]);
497
498 out.clear();
500 agg.flush_into(&mut out);
501 assert_eq!(0, out.len());
502
503 out.clear();
505 agg.flush_into(&mut out);
506 assert_eq!(0, out.len());
507
508 agg.record(gauge_a_1.clone());
510 agg.record(gauge_a_2.clone());
511 out.clear();
512 agg.flush_into(&mut out);
513 assert_eq!(1, out.len());
514 assert_eq!(&gauge_a_2, &out[0]);
515
516 let gauge_b_1 = make_metric(
517 "gauge_b",
518 MetricKind::Absolute,
519 MetricValue::Gauge { value: 44.0 },
520 );
521 agg.record(gauge_a_1.clone());
523 agg.record(gauge_b_1.clone());
524 out.clear();
525 agg.flush_into(&mut out);
526 assert_eq!(2, out.len());
527 for event in out {
529 match event.as_metric().series().name.name.as_str() {
530 "gauge_a" => assert_eq!(gauge_a_1, event),
531 "gauge_b" => assert_eq!(gauge_b_1, event),
532 _ => panic!("Unexpected metric name in aggregate output"),
533 }
534 }
535 }
536
537 #[test]
538 fn count_agg() {
539 let mut agg = Aggregate::new(&AggregateConfig {
540 interval_ms: 1000_u64,
541 mode: AggregationMode::Count,
542 })
543 .unwrap();
544
545 let gauge_a_1 = make_metric(
546 "gauge_a",
547 MetricKind::Absolute,
548 MetricValue::Gauge { value: 42.0 },
549 );
550 let gauge_a_2 = make_metric(
551 "gauge_a",
552 MetricKind::Absolute,
553 MetricValue::Gauge { value: 43.0 },
554 );
555 let result_count = make_metric(
556 "gauge_a",
557 MetricKind::Absolute,
558 MetricValue::Counter { value: 1.0 },
559 );
560 let result_count_2 = make_metric(
561 "gauge_a",
562 MetricKind::Absolute,
563 MetricValue::Counter { value: 2.0 },
564 );
565
566 agg.record(gauge_a_1.clone());
568 let mut out = vec![];
569 agg.flush_into(&mut out);
571 assert_eq!(1, out.len());
572 assert_eq!(&result_count, &out[0]);
573
574 out.clear();
576 agg.flush_into(&mut out);
577 assert_eq!(0, out.len());
578
579 out.clear();
581 agg.flush_into(&mut out);
582 assert_eq!(0, out.len());
583
584 agg.record(gauge_a_1.clone());
586 agg.record(gauge_a_2.clone());
587 out.clear();
588 agg.flush_into(&mut out);
589 assert_eq!(1, out.len());
590 assert_eq!(&result_count_2, &out[0]);
591 }
592
593 #[test]
594 fn absolute_max() {
595 let mut agg = Aggregate::new(&AggregateConfig {
596 interval_ms: 1000_u64,
597 mode: AggregationMode::Max,
598 })
599 .unwrap();
600
601 let gauge_a_1 = make_metric(
602 "gauge_a",
603 MetricKind::Absolute,
604 MetricValue::Gauge { value: 112.0 },
605 );
606 let gauge_a_2 = make_metric(
607 "gauge_a",
608 MetricKind::Absolute,
609 MetricValue::Gauge { value: 89.0 },
610 );
611
612 agg.record(gauge_a_2.clone());
614 let mut out = vec![];
615 agg.flush_into(&mut out);
617 assert_eq!(1, out.len());
618 assert_eq!(&gauge_a_2, &out[0]);
619
620 out.clear();
622 agg.flush_into(&mut out);
623 assert_eq!(0, out.len());
624
625 out.clear();
627 agg.flush_into(&mut out);
628 assert_eq!(0, out.len());
629
630 agg.record(gauge_a_1.clone());
632 agg.record(gauge_a_2.clone());
633 out.clear();
634 agg.flush_into(&mut out);
635 assert_eq!(1, out.len());
636 assert_eq!(&gauge_a_1, &out[0]);
637 }
638
639 #[test]
640 fn absolute_min() {
641 let mut agg = Aggregate::new(&AggregateConfig {
642 interval_ms: 1000_u64,
643 mode: AggregationMode::Min,
644 })
645 .unwrap();
646
647 let gauge_a_1 = make_metric(
648 "gauge_a",
649 MetricKind::Absolute,
650 MetricValue::Gauge { value: 32.0 },
651 );
652 let gauge_a_2 = make_metric(
653 "gauge_a",
654 MetricKind::Absolute,
655 MetricValue::Gauge { value: 89.0 },
656 );
657
658 agg.record(gauge_a_2.clone());
660 let mut out = vec![];
661 agg.flush_into(&mut out);
663 assert_eq!(1, out.len());
664 assert_eq!(&gauge_a_2, &out[0]);
665
666 out.clear();
668 agg.flush_into(&mut out);
669 assert_eq!(0, out.len());
670
671 out.clear();
673 agg.flush_into(&mut out);
674 assert_eq!(0, out.len());
675
676 agg.record(gauge_a_1.clone());
678 agg.record(gauge_a_2.clone());
679 out.clear();
680 agg.flush_into(&mut out);
681 assert_eq!(1, out.len());
682 assert_eq!(&gauge_a_1, &out[0]);
683 }
684
685 #[test]
686 fn absolute_diff() {
687 let mut agg = Aggregate::new(&AggregateConfig {
688 interval_ms: 1000_u64,
689 mode: AggregationMode::Diff,
690 })
691 .unwrap();
692
693 let gauge_a_1 = make_metric(
694 "gauge_a",
695 MetricKind::Absolute,
696 MetricValue::Gauge { value: 32.0 },
697 );
698 let gauge_a_2 = make_metric(
699 "gauge_a",
700 MetricKind::Absolute,
701 MetricValue::Gauge { value: 82.0 },
702 );
703 let result = make_metric(
704 "gauge_a",
705 MetricKind::Absolute,
706 MetricValue::Gauge { value: 50.0 },
707 );
708
709 agg.record(gauge_a_2.clone());
711 let mut out = vec![];
712 agg.flush_into(&mut out);
714 assert_eq!(1, out.len());
715 assert_eq!(&gauge_a_2, &out[0]);
716
717 out.clear();
719 agg.flush_into(&mut out);
720 assert_eq!(0, out.len());
721
722 out.clear();
724 agg.flush_into(&mut out);
725 assert_eq!(0, out.len());
726
727 agg.record(gauge_a_1.clone());
729 out.clear();
730 agg.flush_into(&mut out);
731 assert_eq!(1, out.len());
732 assert_eq!(&gauge_a_1, &out[0]);
733
734 agg.record(gauge_a_2.clone());
735 out.clear();
736 agg.flush_into(&mut out);
737 assert_eq!(1, out.len());
738 assert_eq!(&result, &out[0]);
739 }
740
741 #[test]
742 fn absolute_diff_conflicting_type() {
743 let mut agg = Aggregate::new(&AggregateConfig {
744 interval_ms: 1000_u64,
745 mode: AggregationMode::Diff,
746 })
747 .unwrap();
748
749 let gauge_a_1 = make_metric(
750 "gauge_a",
751 MetricKind::Absolute,
752 MetricValue::Gauge { value: 32.0 },
753 );
754 let gauge_a_2 = make_metric(
755 "gauge_a",
756 MetricKind::Absolute,
757 MetricValue::Counter { value: 1.0 },
758 );
759
760 let mut out = vec![];
761 agg.record(gauge_a_1.clone());
763 out.clear();
764 agg.flush_into(&mut out);
765 assert_eq!(1, out.len());
766 assert_eq!(&gauge_a_1, &out[0]);
767
768 agg.record(gauge_a_2.clone());
769 out.clear();
770 agg.flush_into(&mut out);
771 assert_eq!(1, out.len());
772 assert_eq!(&gauge_a_2, &out[0]);
774 }
775
776 #[test]
777 fn absolute_mean() {
778 let mut agg = Aggregate::new(&AggregateConfig {
779 interval_ms: 1000_u64,
780 mode: AggregationMode::Mean,
781 })
782 .unwrap();
783
784 let gauge_a_1 = make_metric(
785 "gauge_a",
786 MetricKind::Absolute,
787 MetricValue::Gauge { value: 32.0 },
788 );
789 let gauge_a_2 = make_metric(
790 "gauge_a",
791 MetricKind::Absolute,
792 MetricValue::Gauge { value: 82.0 },
793 );
794 let gauge_a_3 = make_metric(
795 "gauge_a",
796 MetricKind::Absolute,
797 MetricValue::Gauge { value: 51.0 },
798 );
799 let mean_result = make_metric(
800 "gauge_a",
801 MetricKind::Absolute,
802 MetricValue::Gauge { value: 55.0 },
803 );
804
805 agg.record(gauge_a_2.clone());
807 let mut out = vec![];
808 agg.flush_into(&mut out);
810 assert_eq!(1, out.len());
811 assert_eq!(&gauge_a_2, &out[0]);
812
813 out.clear();
815 agg.flush_into(&mut out);
816 assert_eq!(0, out.len());
817
818 out.clear();
820 agg.flush_into(&mut out);
821 assert_eq!(0, out.len());
822
823 agg.record(gauge_a_1.clone());
825 agg.record(gauge_a_2.clone());
826 agg.record(gauge_a_3.clone());
827 out.clear();
828 agg.flush_into(&mut out);
829 assert_eq!(1, out.len());
830 assert_eq!(&mean_result, &out[0]);
831 }
832
833 #[test]
834 fn absolute_stdev() {
835 let mut agg = Aggregate::new(&AggregateConfig {
836 interval_ms: 1000_u64,
837 mode: AggregationMode::Stdev,
838 })
839 .unwrap();
840
841 let gauges = vec![
842 make_metric(
843 "gauge_a",
844 MetricKind::Absolute,
845 MetricValue::Gauge { value: 25.0 },
846 ),
847 make_metric(
848 "gauge_a",
849 MetricKind::Absolute,
850 MetricValue::Gauge { value: 30.0 },
851 ),
852 make_metric(
853 "gauge_a",
854 MetricKind::Absolute,
855 MetricValue::Gauge { value: 35.0 },
856 ),
857 make_metric(
858 "gauge_a",
859 MetricKind::Absolute,
860 MetricValue::Gauge { value: 40.0 },
861 ),
862 make_metric(
863 "gauge_a",
864 MetricKind::Absolute,
865 MetricValue::Gauge { value: 45.0 },
866 ),
867 make_metric(
868 "gauge_a",
869 MetricKind::Absolute,
870 MetricValue::Gauge { value: 50.0 },
871 ),
872 make_metric(
873 "gauge_a",
874 MetricKind::Absolute,
875 MetricValue::Gauge { value: 55.0 },
876 ),
877 ];
878 let stdev_result = make_metric(
879 "gauge_a",
880 MetricKind::Absolute,
881 MetricValue::Gauge { value: 10.0 },
882 );
883
884 for gauge in gauges {
885 agg.record(gauge);
886 }
887 let mut out = vec![];
888 agg.flush_into(&mut out);
889 assert_eq!(1, out.len());
890 assert_eq!(&stdev_result, &out[0]);
891 }
892
893 #[test]
894 fn conflicting_value_type() {
895 let mut agg = Aggregate::new(&AggregateConfig {
896 interval_ms: 1000_u64,
897 mode: AggregationMode::Auto,
898 })
899 .unwrap();
900
901 let counter = make_metric(
902 "the-thing",
903 MetricKind::Incremental,
904 MetricValue::Counter { value: 42.0 },
905 );
906 let mut values = BTreeSet::<String>::new();
907 values.insert("a".into());
908 values.insert("b".into());
909 let set = make_metric(
910 "the-thing",
911 MetricKind::Incremental,
912 MetricValue::Set { values },
913 );
914 let summed = make_metric(
915 "the-thing",
916 MetricKind::Incremental,
917 MetricValue::Counter { value: 84.0 },
918 );
919
920 agg.record(counter.clone());
924 agg.record(counter.clone());
926 agg.record(set.clone());
928 agg.record(set.clone());
930 let mut out = vec![];
931 agg.flush_into(&mut out);
933 assert_eq!(1, out.len());
934 assert_eq!(&set, &out[0]);
935
936 agg.record(set.clone());
938 agg.record(set);
940 agg.record(counter.clone());
942 agg.record(counter);
944 let mut out = vec![];
945 agg.flush_into(&mut out);
947 assert_eq!(1, out.len());
948 assert_eq!(&summed, &out[0]);
949 }
950
951 #[test]
952 fn conflicting_kinds() {
953 let mut agg = Aggregate::new(&AggregateConfig {
954 interval_ms: 1000_u64,
955 mode: AggregationMode::Auto,
956 })
957 .unwrap();
958
959 let incremental = make_metric(
960 "the-thing",
961 MetricKind::Incremental,
962 MetricValue::Counter { value: 42.0 },
963 );
964 let absolute = make_metric(
965 "the-thing",
966 MetricKind::Absolute,
967 MetricValue::Counter { value: 43.0 },
968 );
969 let summed = make_metric(
970 "the-thing",
971 MetricKind::Incremental,
972 MetricValue::Counter { value: 84.0 },
973 );
974
975 agg.record(incremental.clone());
979 agg.record(incremental.clone());
981 agg.record(absolute.clone());
983 agg.record(absolute.clone());
985 let mut out = vec![];
986 agg.flush_into(&mut out);
988 assert_eq!(1, out.len());
989 assert_eq!(&absolute, &out[0]);
990
991 agg.record(absolute.clone());
993 agg.record(absolute);
995 agg.record(incremental.clone());
997 agg.record(incremental);
999 let mut out = vec![];
1000 agg.flush_into(&mut out);
1002 assert_eq!(1, out.len());
1003 assert_eq!(&summed, &out[0]);
1004 }
1005
1006 #[tokio::test]
1007 async fn transform_shutdown() {
1008 let agg = toml::from_str::<AggregateConfig>(
1009 r"
1010interval_ms = 999999
1011",
1012 )
1013 .unwrap()
1014 .build(&TransformContext::default())
1015 .await
1016 .unwrap();
1017
1018 let agg = agg.into_task();
1019
1020 let counter_a_1 = make_metric(
1021 "counter_a",
1022 MetricKind::Incremental,
1023 MetricValue::Counter { value: 42.0 },
1024 );
1025 let counter_a_2 = make_metric(
1026 "counter_a",
1027 MetricKind::Incremental,
1028 MetricValue::Counter { value: 43.0 },
1029 );
1030 let counter_a_summed = make_metric(
1031 "counter_a",
1032 MetricKind::Incremental,
1033 MetricValue::Counter { value: 85.0 },
1034 );
1035 let gauge_a_1 = make_metric(
1036 "gauge_a",
1037 MetricKind::Absolute,
1038 MetricValue::Gauge { value: 42.0 },
1039 );
1040 let gauge_a_2 = make_metric(
1041 "gauge_a",
1042 MetricKind::Absolute,
1043 MetricValue::Gauge { value: 43.0 },
1044 );
1045 let inputs = vec![counter_a_1, counter_a_2, gauge_a_1, gauge_a_2.clone()];
1046
1047 let in_stream = Box::pin(stream::iter(inputs));
1049 let mut out_stream = agg.transform_events(in_stream);
1051
1052 let mut count = 0_u8;
1056 while let Some(event) = out_stream.next().await {
1057 count += 1;
1058 match event.as_metric().series().name.name.as_str() {
1059 "counter_a" => assert_eq!(counter_a_summed, event),
1060 "gauge_a" => assert_eq!(gauge_a_2, event),
1061 _ => panic!("Unexpected metric name in aggregate output"),
1062 };
1063 }
1064 assert_eq!(2, count);
1066 }
1067
1068 #[tokio::test]
1069 async fn transform_interval() {
1070 let transform_config = toml::from_str::<AggregateConfig>("").unwrap();
1071
1072 let counter_a_1 = make_metric(
1073 "counter_a",
1074 MetricKind::Incremental,
1075 MetricValue::Counter { value: 42.0 },
1076 );
1077 let counter_a_2 = make_metric(
1078 "counter_a",
1079 MetricKind::Incremental,
1080 MetricValue::Counter { value: 43.0 },
1081 );
1082 let counter_a_summed = make_metric(
1083 "counter_a",
1084 MetricKind::Incremental,
1085 MetricValue::Counter { value: 85.0 },
1086 );
1087 let gauge_a_1 = make_metric(
1088 "gauge_a",
1089 MetricKind::Absolute,
1090 MetricValue::Gauge { value: 42.0 },
1091 );
1092 let gauge_a_2 = make_metric(
1093 "gauge_a",
1094 MetricKind::Absolute,
1095 MetricValue::Gauge { value: 43.0 },
1096 );
1097
1098 assert_transform_compliance(async {
1099 let (tx, rx) = mpsc::channel(10);
1100 let (topology, out) = create_topology(ReceiverStream::new(rx), transform_config).await;
1101 let mut out = ReceiverStream::new(out);
1102
1103 tokio::time::pause();
1104
1105 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1108
1109 tx.send(counter_a_1).await.unwrap();
1111 tx.send(counter_a_2).await.unwrap();
1112 tx.send(gauge_a_1).await.unwrap();
1113 tx.send(gauge_a_2.clone()).await.unwrap();
1114 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1116 tokio::time::advance(Duration::from_secs(11)).await;
1118 let mut count = 0_u8;
1121 while count < 2 {
1122 match out.next().await {
1123 Some(event) => {
1124 match event.as_metric().series().name.name.as_str() {
1125 "counter_a" => assert_eq!(counter_a_summed, event),
1126 "gauge_a" => assert_eq!(gauge_a_2, event),
1127 _ => panic!("Unexpected metric name in aggregate output"),
1128 };
1129 count += 1;
1130 }
1131 _ => {
1132 panic!("Unexpectedly received None in output stream");
1133 }
1134 }
1135 }
1136 assert_eq!(Poll::Pending, futures::poll!(out.next()));
1138
1139 drop(tx);
1140 topology.stop().await;
1141 assert_eq!(out.next().await, None);
1142 })
1143 .await;
1144 }
1145}