vector/sinks/util/buffer/metrics/
mod.rs

1use vector_lib::event::metric::{Metric, MetricValue, Sample};
2
3use crate::sinks::util::{
4    Merged, SinkBatchSettings,
5    batch::{Batch, BatchConfig, BatchError, BatchSize, PushResult},
6};
7
8mod normalize;
9pub use self::normalize::*;
10
11mod split;
12pub use self::split::*;
13
14/// The metrics buffer is a data structure for collecting a flow of data points into a batch.
15///
16/// Batching mostly means that we will aggregate away timestamp information, and apply metric-specific compression to
17/// improve the performance of the pipeline. In particular, only the latest in a series of metrics are output, and
18/// incremental metrics are summed into the output buffer. Any conversion of metrics is handled by the normalization
19/// type `N: MetricNormalize`. Further, distribution metrics have their samples compressed with
20/// `compress_distribution` below.
21///
22/// Note: This has been deprecated, please do not use when creating new Sinks.
23pub struct MetricsBuffer {
24    metrics: Option<MetricSet>,
25    max_events: usize,
26}
27
28impl MetricsBuffer {
29    /// Creates a new `MetricsBuffer` with the given batch settings.
30    pub const fn new(settings: BatchSize<Self>) -> Self {
31        Self::with_capacity(settings.events)
32    }
33
34    const fn with_capacity(max_events: usize) -> Self {
35        Self {
36            metrics: None,
37            max_events,
38        }
39    }
40}
41
42impl Batch for MetricsBuffer {
43    type Input = Metric;
44    type Output = Vec<Metric>;
45
46    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
47        config: BatchConfig<D, Merged>,
48    ) -> Result<BatchConfig<D, Merged>, BatchError> {
49        config.disallow_max_bytes()
50    }
51
52    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
53        if self.num_items() >= self.max_events {
54            PushResult::Overflow(item)
55        } else {
56            let max_events = self.max_events;
57            self.metrics
58                .get_or_insert_with(|| {
59                    MetricSet::new(MetricSetSettings {
60                        max_events: Some(max_events),
61                        ..Default::default()
62                    })
63                })
64                .insert_update(item);
65            PushResult::Ok(self.num_items() >= self.max_events)
66        }
67    }
68
69    fn is_empty(&self) -> bool {
70        self.num_items() == 0
71    }
72
73    fn fresh(&self) -> Self {
74        Self::with_capacity(self.max_events)
75    }
76
77    fn finish(self) -> Self::Output {
78        // Collect all of our metrics together, finalize them, and hand them back.
79        let mut finalized = self
80            .metrics
81            .map(MetricSet::into_metrics)
82            .unwrap_or_default();
83        finalized.iter_mut().for_each(finalize_metric);
84        finalized
85    }
86
87    fn num_items(&self) -> usize {
88        self.metrics
89            .as_ref()
90            .map(|metrics| metrics.len())
91            .unwrap_or(0)
92    }
93}
94
95fn finalize_metric(metric: &mut Metric) {
96    if let MetricValue::Distribution { samples, .. } = metric.data_mut().value_mut() {
97        let compressed_samples = compress_distribution(samples);
98        *samples = compressed_samples;
99    }
100}
101
102pub fn compress_distribution(samples: &mut Vec<Sample>) -> Vec<Sample> {
103    if samples.is_empty() {
104        return Vec::new();
105    }
106
107    samples.sort_by(|a, b| a.value.total_cmp(&b.value));
108
109    let mut acc = Sample {
110        value: samples[0].value,
111        rate: 0,
112    };
113    let mut result = Vec::new();
114
115    for sample in samples {
116        if acc.value == sample.value {
117            acc.rate += sample.rate;
118        } else {
119            result.push(acc);
120            acc = *sample;
121        }
122    }
123    result.push(acc);
124
125    result
126}
127
128#[cfg(test)]
129mod tests {
130    use itertools::Itertools;
131    use similar_asserts::assert_eq;
132    use vector_lib::{
133        event::metric::{MetricKind, MetricKind::*, MetricValue, StatisticKind},
134        metric_tags,
135    };
136
137    use super::*;
138    use crate::{
139        sinks::util::BatchSettings,
140        test_util::metrics::{AbsoluteMetricNormalizer, IncrementalMetricNormalizer},
141    };
142
143    type Buffer = Vec<Vec<Metric>>;
144
145    pub fn sample_counter(num: usize, tagstr: &str, kind: MetricKind, value: f64) -> Metric {
146        Metric::new(
147            format!("counter-{num}"),
148            kind,
149            MetricValue::Counter { value },
150        )
151        .with_tags(Some(metric_tags!(tagstr => "true")))
152    }
153
154    pub fn sample_gauge(num: usize, kind: MetricKind, value: f64) -> Metric {
155        Metric::new(format!("gauge-{num}"), kind, MetricValue::Gauge { value })
156    }
157
158    pub fn sample_set<T: ToString>(num: usize, kind: MetricKind, values: &[T]) -> Metric {
159        Metric::new(
160            format!("set-{num}"),
161            kind,
162            MetricValue::Set {
163                values: values.iter().map(|s| s.to_string()).collect(),
164            },
165        )
166    }
167
168    pub fn sample_distribution_histogram(num: u32, kind: MetricKind, rate: u32) -> Metric {
169        Metric::new(
170            format!("dist-{num}"),
171            kind,
172            MetricValue::Distribution {
173                samples: vector_lib::samples![num as f64 => rate],
174                statistic: StatisticKind::Histogram,
175            },
176        )
177    }
178
179    pub fn sample_aggregated_histogram(
180        num: usize,
181        kind: MetricKind,
182        bpower: f64,
183        cfactor: u64,
184        sum: f64,
185    ) -> Metric {
186        Metric::new(
187            format!("buckets-{num}"),
188            kind,
189            MetricValue::AggregatedHistogram {
190                buckets: vector_lib::buckets![
191                    1.0 => cfactor,
192                    bpower.exp2() => cfactor * 2,
193                    4.0f64.powf(bpower) => cfactor * 4
194                ],
195                count: 7 * cfactor,
196                sum,
197            },
198        )
199    }
200
201    pub fn sample_aggregated_summary(num: u32, kind: MetricKind, factor: f64) -> Metric {
202        Metric::new(
203            format!("quantiles-{num}"),
204            kind,
205            MetricValue::AggregatedSummary {
206                quantiles: vector_lib::quantiles![
207                    0.0 => factor,
208                    0.5 => factor * 2.0,
209                    1.0 => factor * 4.0
210                ],
211                count: factor as u64 * 10,
212                sum: factor * 7.0,
213            },
214        )
215    }
216
217    fn rebuffer<State: MetricNormalize + Default>(metrics: Vec<Metric>) -> Buffer {
218        let mut batch_settings = BatchSettings::default();
219        batch_settings.size.bytes = 9999;
220        batch_settings.size.events = 6;
221
222        let mut normalizer = MetricNormalizer::<State>::default();
223        let mut buffer = MetricsBuffer::new(batch_settings.size);
224        let mut result = vec![];
225
226        for metric in metrics {
227            if let Some(event) = normalizer.normalize(metric) {
228                match buffer.push(event) {
229                    PushResult::Overflow(_) => panic!("overflowed too early"),
230                    PushResult::Ok(true) => {
231                        let batch =
232                            std::mem::replace(&mut buffer, MetricsBuffer::new(batch_settings.size));
233                        result.push(batch.finish());
234                    }
235                    PushResult::Ok(false) => (),
236                }
237            }
238        }
239
240        if !buffer.is_empty() {
241            result.push(buffer.finish())
242        }
243
244        // Sort each batch to provide a predictable result ordering
245        result
246            .into_iter()
247            .map(|mut batch| {
248                batch.sort_by_key(|k| format!("{k:?}"));
249                batch
250            })
251            .collect()
252    }
253
254    fn rebuffer_incremental_counters<State: MetricNormalize + Default>() -> Buffer {
255        let mut events = Vec::new();
256        for i in 0..4 {
257            // counter-0 is repeated 5 times
258            events.push(sample_counter(0, "production", Incremental, i as f64));
259        }
260
261        for i in 0..4 {
262            // these counters cause a buffer flush
263            events.push(sample_counter(i, "staging", Incremental, i as f64));
264        }
265
266        for i in 0..4 {
267            // counter-0 increments the previous buffer, the rest are new
268            events.push(sample_counter(i, "production", Incremental, i as f64));
269        }
270
271        rebuffer::<State>(events)
272    }
273
274    #[test]
275    fn abs_buffer_incremental_counters() {
276        let buffer = rebuffer_incremental_counters::<AbsoluteMetricNormalizer>();
277
278        assert_eq!(
279            buffer[0],
280            [
281                sample_counter(0, "production", Absolute, 6.0),
282                sample_counter(0, "staging", Absolute, 0.0),
283                sample_counter(1, "production", Absolute, 1.0),
284                sample_counter(1, "staging", Absolute, 1.0),
285                sample_counter(2, "staging", Absolute, 2.0),
286                sample_counter(3, "staging", Absolute, 3.0),
287            ]
288        );
289
290        assert_eq!(
291            buffer[1],
292            [
293                sample_counter(2, "production", Absolute, 2.0),
294                sample_counter(3, "production", Absolute, 3.0),
295            ]
296        );
297
298        assert_eq!(buffer.len(), 2);
299    }
300
301    #[test]
302    fn inc_buffer_incremental_counters() {
303        let buffer = rebuffer_incremental_counters::<IncrementalMetricNormalizer>();
304
305        assert_eq!(
306            buffer[0],
307            [
308                sample_counter(0, "production", Incremental, 6.0),
309                sample_counter(0, "staging", Incremental, 0.0),
310                sample_counter(1, "production", Incremental, 1.0),
311                sample_counter(1, "staging", Incremental, 1.0),
312                sample_counter(2, "staging", Incremental, 2.0),
313                sample_counter(3, "staging", Incremental, 3.0),
314            ]
315        );
316
317        assert_eq!(
318            buffer[1],
319            [
320                sample_counter(2, "production", Incremental, 2.0),
321                sample_counter(3, "production", Incremental, 3.0),
322            ]
323        );
324
325        assert_eq!(buffer.len(), 2);
326    }
327
328    fn rebuffer_absolute_counters<State: MetricNormalize + Default>() -> Buffer {
329        let mut events = Vec::new();
330        // counter-0 and -1 only emitted once
331        // counter-2 and -3 emitted twice
332        // counter-4 and -5 emitted once
333        for i in 0..4 {
334            events.push(sample_counter(i, "production", Absolute, i as f64));
335        }
336
337        for i in 2..6 {
338            events.push(sample_counter(i, "production", Absolute, i as f64 * 3.0));
339        }
340
341        rebuffer::<State>(events)
342    }
343
344    #[test]
345    fn abs_buffer_absolute_counters() {
346        let buffer = rebuffer_absolute_counters::<AbsoluteMetricNormalizer>();
347
348        assert_eq!(
349            buffer[0],
350            [
351                sample_counter(0, "production", Absolute, 0.0),
352                sample_counter(1, "production", Absolute, 1.0),
353                sample_counter(2, "production", Absolute, 6.0),
354                sample_counter(3, "production", Absolute, 9.0),
355                sample_counter(4, "production", Absolute, 12.0),
356                sample_counter(5, "production", Absolute, 15.0),
357            ]
358        );
359
360        assert_eq!(buffer.len(), 1);
361    }
362
363    #[test]
364    fn inc_buffer_absolute_counters() {
365        let buffer = rebuffer_absolute_counters::<IncrementalMetricNormalizer>();
366
367        assert_eq!(
368            buffer[0],
369            [
370                sample_counter(2, "production", Incremental, 4.0),
371                sample_counter(3, "production", Incremental, 6.0),
372            ]
373        );
374
375        assert_eq!(buffer.len(), 1);
376    }
377
378    fn rebuffer_incremental_gauges<State: MetricNormalize + Default>() -> Buffer {
379        let mut events = Vec::new();
380        // gauge-1 emitted once
381        // gauge-2 through -4 are emitted twice
382        // gauge-5 emitted once
383        for i in 1..5 {
384            events.push(sample_gauge(i, Incremental, i as f64));
385        }
386
387        for i in 2..6 {
388            events.push(sample_gauge(i, Incremental, i as f64));
389        }
390
391        rebuffer::<State>(events)
392    }
393
394    #[test]
395    fn abs_buffer_incremental_gauges() {
396        let buffer = rebuffer_incremental_gauges::<AbsoluteMetricNormalizer>();
397
398        assert_eq!(
399            buffer[0],
400            [
401                sample_gauge(1, Absolute, 1.0),
402                sample_gauge(2, Absolute, 4.0),
403                sample_gauge(3, Absolute, 6.0),
404                sample_gauge(4, Absolute, 8.0),
405                sample_gauge(5, Absolute, 5.0),
406            ]
407        );
408
409        assert_eq!(buffer.len(), 1);
410    }
411
412    #[test]
413    fn inc_buffer_incremental_gauges() {
414        let buffer = rebuffer_incremental_gauges::<IncrementalMetricNormalizer>();
415
416        assert_eq!(
417            buffer[0],
418            [
419                sample_gauge(1, Incremental, 1.0),
420                sample_gauge(2, Incremental, 4.0),
421                sample_gauge(3, Incremental, 6.0),
422                sample_gauge(4, Incremental, 8.0),
423                sample_gauge(5, Incremental, 5.0),
424            ]
425        );
426
427        assert_eq!(buffer.len(), 1);
428    }
429
430    fn rebuffer_absolute_gauges<State: MetricNormalize + Default>() -> Buffer {
431        let mut events = Vec::new();
432        // gauge-2 emitted once
433        // gauge-3 and -4 emitted twice
434        // gauge-5 emitted once
435        for i in 2..5 {
436            events.push(sample_gauge(i, Absolute, i as f64 * 2.0));
437        }
438
439        for i in 3..6 {
440            events.push(sample_gauge(i, Absolute, i as f64 * 10.0));
441        }
442
443        rebuffer::<State>(events)
444    }
445
446    #[test]
447    fn abs_buffer_absolute_gauges() {
448        let buffer = rebuffer_absolute_gauges::<AbsoluteMetricNormalizer>();
449
450        assert_eq!(
451            buffer[0],
452            [
453                sample_gauge(2, Absolute, 4.0),
454                sample_gauge(3, Absolute, 30.0),
455                sample_gauge(4, Absolute, 40.0),
456                sample_gauge(5, Absolute, 50.0),
457            ]
458        );
459
460        assert_eq!(buffer.len(), 1);
461    }
462
463    #[test]
464    fn inc_buffer_absolute_gauges() {
465        let buffer = rebuffer_absolute_gauges::<IncrementalMetricNormalizer>();
466
467        assert_eq!(
468            buffer[0],
469            [
470                sample_gauge(3, Incremental, 24.0),
471                sample_gauge(4, Incremental, 32.0),
472            ]
473        );
474
475        assert_eq!(buffer.len(), 1);
476    }
477
478    fn rebuffer_incremental_sets<State: MetricNormalize + Default>() -> Buffer {
479        let mut events = Vec::new();
480        // set-0 emitted 8 times with 4 different values
481        // set-1 emitted once with 4 values
482        for i in 0..4 {
483            events.push(sample_set(0, Incremental, &[i]));
484        }
485
486        for i in 0..4 {
487            events.push(sample_set(0, Incremental, &[i]));
488        }
489
490        events.push(sample_set(1, Incremental, &[1, 2, 3, 4]));
491
492        rebuffer::<State>(events)
493    }
494
495    #[test]
496    fn abs_buffer_incremental_sets() {
497        let buffer = rebuffer_incremental_sets::<AbsoluteMetricNormalizer>();
498
499        assert_eq!(
500            buffer[0],
501            [
502                sample_set(0, Absolute, &[0, 1, 2, 3]),
503                sample_set(1, Absolute, &[1, 2, 3, 4]),
504            ]
505        );
506
507        assert_eq!(buffer.len(), 1);
508    }
509
510    #[test]
511    fn inc_buffer_incremental_sets() {
512        let buffer = rebuffer_incremental_sets::<IncrementalMetricNormalizer>();
513
514        assert_eq!(
515            buffer[0],
516            [
517                sample_set(0, Incremental, &[0, 1, 2, 3]),
518                sample_set(1, Incremental, &[1, 2, 3, 4]),
519            ]
520        );
521
522        assert_eq!(buffer.len(), 1);
523    }
524
525    fn rebuffer_incremental_distributions<State: MetricNormalize + Default>() -> Buffer {
526        let mut events = Vec::new();
527        for _ in 2..6 {
528            events.push(sample_distribution_histogram(2, Incremental, 10));
529        }
530
531        for i in 2..6 {
532            events.push(sample_distribution_histogram(i, Incremental, 10));
533        }
534
535        rebuffer::<State>(events)
536    }
537
538    #[test]
539    fn abs_buffer_incremental_distributions() {
540        let buffer = rebuffer_incremental_distributions::<AbsoluteMetricNormalizer>();
541
542        assert_eq!(
543            buffer[0],
544            [
545                sample_distribution_histogram(2, Absolute, 50),
546                sample_distribution_histogram(3, Absolute, 10),
547                sample_distribution_histogram(4, Absolute, 10),
548                sample_distribution_histogram(5, Absolute, 10),
549            ]
550        );
551
552        assert_eq!(buffer.len(), 1);
553    }
554
555    #[test]
556    fn inc_buffer_incremental_distributions() {
557        let buffer = rebuffer_incremental_distributions::<IncrementalMetricNormalizer>();
558
559        assert_eq!(
560            buffer[0],
561            [
562                sample_distribution_histogram(2, Incremental, 50),
563                sample_distribution_histogram(3, Incremental, 10),
564                sample_distribution_histogram(4, Incremental, 10),
565                sample_distribution_histogram(5, Incremental, 10),
566            ]
567        );
568
569        assert_eq!(buffer.len(), 1);
570    }
571
572    #[test]
573    fn compress_distributions() {
574        let mut samples = vector_lib::samples![
575            2.0 => 12,
576            2.0 => 12,
577            3.0 => 13,
578            1.0 => 11,
579            2.0 => 12,
580            2.0 => 12,
581            3.0 => 13
582        ];
583
584        assert_eq!(
585            compress_distribution(&mut samples),
586            vector_lib::samples![1.0 => 11, 2.0 => 48, 3.0 => 26]
587        );
588    }
589
590    #[test]
591    fn compress_distributions_doesnt_panic() {
592        let to_float = |v: i32| -> f64 { v as f64 };
593
594        let mut samples = (0..=15)
595            .map(to_float)
596            .chain(std::iter::once(f64::NAN))
597            .chain((16..=20).map(to_float))
598            .rev()
599            .map(|value| Sample { value, rate: 1 })
600            .collect_vec();
601
602        assert_eq!(
603            compress_distribution(&mut samples),
604            (0..=20)
605                .map(to_float)
606                .chain(std::iter::once(f64::NAN))
607                .map(|value| Sample { value, rate: 1 })
608                .collect_vec()
609        );
610    }
611
612    fn rebuffer_absolute_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
613        let mut events = Vec::new();
614        for _ in 2..5 {
615            events.push(sample_aggregated_histogram(2, Absolute, 1.0, 1, 10.0));
616        }
617
618        for i in 2..5 {
619            events.push(sample_aggregated_histogram(
620                i,
621                Absolute,
622                1.0,
623                i as u64,
624                i as f64 * 10.0,
625            ));
626        }
627
628        rebuffer::<State>(events)
629    }
630
631    #[test]
632    fn abs_buffer_absolute_aggregated_histograms() {
633        let buffer = rebuffer_absolute_aggregated_histograms::<AbsoluteMetricNormalizer>();
634
635        assert_eq!(
636            buffer[0],
637            [
638                sample_aggregated_histogram(2, Absolute, 1.0, 2, 20.0),
639                sample_aggregated_histogram(3, Absolute, 1.0, 3, 30.0),
640                sample_aggregated_histogram(4, Absolute, 1.0, 4, 40.0),
641            ]
642        );
643
644        assert_eq!(buffer.len(), 1);
645    }
646
647    #[test]
648    fn inc_buffer_absolute_aggregated_histograms() {
649        let buffer = rebuffer_absolute_aggregated_histograms::<IncrementalMetricNormalizer>();
650
651        assert_eq!(
652            buffer[0],
653            [sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)]
654        );
655
656        assert_eq!(buffer.len(), 1);
657    }
658
659    fn rebuffer_incremental_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
660        let mut events = vec![sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)];
661
662        for i in 1..4 {
663            events.push(sample_aggregated_histogram(2, Incremental, 2.0, i, 10.0));
664        }
665
666        rebuffer::<State>(events)
667    }
668
669    #[test]
670    fn abs_buffer_incremental_aggregated_histograms() {
671        let buffer = rebuffer_incremental_aggregated_histograms::<AbsoluteMetricNormalizer>();
672
673        assert_eq!(
674            buffer[0],
675            [sample_aggregated_histogram(2, Absolute, 2.0, 6, 30.0)]
676        );
677
678        assert_eq!(buffer.len(), 1);
679    }
680
681    #[test]
682    fn inc_buffer_incremental_aggregated_histograms() {
683        let buffer = rebuffer_incremental_aggregated_histograms::<IncrementalMetricNormalizer>();
684
685        assert_eq!(
686            buffer[0],
687            [sample_aggregated_histogram(2, Incremental, 2.0, 6, 30.0)]
688        );
689
690        assert_eq!(buffer.len(), 1);
691    }
692
693    fn rebuffer_aggregated_summaries<State: MetricNormalize + Default>() -> Buffer {
694        let mut events = Vec::new();
695        for factor in 0..2 {
696            for num in 2..4 {
697                events.push(sample_aggregated_summary(
698                    num,
699                    Absolute,
700                    (factor + num) as f64,
701                ));
702            }
703        }
704
705        rebuffer::<State>(events)
706    }
707
708    #[test]
709    fn abs_buffer_aggregated_summaries() {
710        let buffer = rebuffer_aggregated_summaries::<AbsoluteMetricNormalizer>();
711
712        assert_eq!(
713            buffer[0],
714            [
715                sample_aggregated_summary(2, Absolute, 3.0),
716                sample_aggregated_summary(3, Absolute, 4.0),
717            ]
718        );
719
720        assert_eq!(buffer.len(), 1);
721    }
722
723    #[test]
724    fn inc_buffer_aggregated_summaries() {
725        let buffer = rebuffer_aggregated_summaries::<IncrementalMetricNormalizer>();
726
727        // Since aggregated summaries cannot be added, they don't work
728        // as incremental metrics and this results in an empty buffer.
729        assert_eq!(buffer.len(), 0);
730    }
731}