vector/sinks/util/buffer/metrics/
mod.rs

1use std::cmp::Ordering;
2
3use vector_lib::event::metric::{Metric, MetricValue, Sample};
4
5use crate::sinks::util::{
6    batch::{Batch, BatchConfig, BatchError, BatchSize, PushResult},
7    Merged, SinkBatchSettings,
8};
9
10mod normalize;
11pub use self::normalize::*;
12
13mod split;
14pub use self::split::*;
15
16/// The metrics buffer is a data structure for collecting a flow of data points into a batch.
17///
18/// Batching mostly means that we will aggregate away timestamp information, and apply metric-specific compression to
19/// improve the performance of the pipeline. In particular, only the latest in a series of metrics are output, and
20/// incremental metrics are summed into the output buffer. Any conversion of metrics is handled by the normalization
21/// type `N: MetricNormalize`. Further, distribution metrics have their samples compressed with
22/// `compress_distribution` below.
23///
24/// Note: This has been deprecated, please do not use when creating new Sinks.
25pub struct MetricsBuffer {
26    metrics: Option<MetricSet>,
27    max_events: usize,
28}
29
30impl MetricsBuffer {
31    /// Creates a new `MetricsBuffer` with the given batch settings.
32    pub const fn new(settings: BatchSize<Self>) -> Self {
33        Self::with_capacity(settings.events)
34    }
35
36    const fn with_capacity(max_events: usize) -> Self {
37        Self {
38            metrics: None,
39            max_events,
40        }
41    }
42}
43
44impl Batch for MetricsBuffer {
45    type Input = Metric;
46    type Output = Vec<Metric>;
47
48    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
49        config: BatchConfig<D, Merged>,
50    ) -> Result<BatchConfig<D, Merged>, BatchError> {
51        config.disallow_max_bytes()
52    }
53
54    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
55        if self.num_items() >= self.max_events {
56            PushResult::Overflow(item)
57        } else {
58            let max_events = self.max_events;
59            self.metrics
60                .get_or_insert_with(|| MetricSet::with_capacity(max_events))
61                .insert_update(item);
62            PushResult::Ok(self.num_items() >= self.max_events)
63        }
64    }
65
66    fn is_empty(&self) -> bool {
67        self.num_items() == 0
68    }
69
70    fn fresh(&self) -> Self {
71        Self::with_capacity(self.max_events)
72    }
73
74    fn finish(self) -> Self::Output {
75        // Collect all of our metrics together, finalize them, and hand them back.
76        let mut finalized = self
77            .metrics
78            .map(MetricSet::into_metrics)
79            .unwrap_or_default();
80        finalized.iter_mut().for_each(finalize_metric);
81        finalized
82    }
83
84    fn num_items(&self) -> usize {
85        self.metrics
86            .as_ref()
87            .map(|metrics| metrics.len())
88            .unwrap_or(0)
89    }
90}
91
92fn finalize_metric(metric: &mut Metric) {
93    if let MetricValue::Distribution { samples, .. } = metric.data_mut().value_mut() {
94        let compressed_samples = compress_distribution(samples);
95        *samples = compressed_samples;
96    }
97}
98
99pub fn compress_distribution(samples: &mut Vec<Sample>) -> Vec<Sample> {
100    if samples.is_empty() {
101        return Vec::new();
102    }
103
104    samples.sort_by(|a, b| a.value.partial_cmp(&b.value).unwrap_or(Ordering::Equal));
105
106    let mut acc = Sample {
107        value: samples[0].value,
108        rate: 0,
109    };
110    let mut result = Vec::new();
111
112    for sample in samples {
113        if acc.value == sample.value {
114            acc.rate += sample.rate;
115        } else {
116            result.push(acc);
117            acc = *sample;
118        }
119    }
120    result.push(acc);
121
122    result
123}
124
125#[cfg(test)]
126mod tests {
127    use similar_asserts::assert_eq;
128    use vector_lib::event::metric::{MetricKind, MetricKind::*, MetricValue, StatisticKind};
129    use vector_lib::metric_tags;
130
131    use super::*;
132    use crate::{
133        sinks::util::BatchSettings,
134        test_util::metrics::{AbsoluteMetricNormalizer, IncrementalMetricNormalizer},
135    };
136
137    type Buffer = Vec<Vec<Metric>>;
138
139    pub fn sample_counter(num: usize, tagstr: &str, kind: MetricKind, value: f64) -> Metric {
140        Metric::new(
141            format!("counter-{num}"),
142            kind,
143            MetricValue::Counter { value },
144        )
145        .with_tags(Some(metric_tags!(tagstr => "true")))
146    }
147
148    pub fn sample_gauge(num: usize, kind: MetricKind, value: f64) -> Metric {
149        Metric::new(format!("gauge-{num}"), kind, MetricValue::Gauge { value })
150    }
151
152    pub fn sample_set<T: ToString>(num: usize, kind: MetricKind, values: &[T]) -> Metric {
153        Metric::new(
154            format!("set-{num}"),
155            kind,
156            MetricValue::Set {
157                values: values.iter().map(|s| s.to_string()).collect(),
158            },
159        )
160    }
161
162    pub fn sample_distribution_histogram(num: u32, kind: MetricKind, rate: u32) -> Metric {
163        Metric::new(
164            format!("dist-{num}"),
165            kind,
166            MetricValue::Distribution {
167                samples: vector_lib::samples![num as f64 => rate],
168                statistic: StatisticKind::Histogram,
169            },
170        )
171    }
172
173    pub fn sample_aggregated_histogram(
174        num: usize,
175        kind: MetricKind,
176        bpower: f64,
177        cfactor: u64,
178        sum: f64,
179    ) -> Metric {
180        Metric::new(
181            format!("buckets-{num}"),
182            kind,
183            MetricValue::AggregatedHistogram {
184                buckets: vector_lib::buckets![
185                    1.0 => cfactor,
186                    bpower.exp2() => cfactor * 2,
187                    4.0f64.powf(bpower) => cfactor * 4
188                ],
189                count: 7 * cfactor,
190                sum,
191            },
192        )
193    }
194
195    pub fn sample_aggregated_summary(num: u32, kind: MetricKind, factor: f64) -> Metric {
196        Metric::new(
197            format!("quantiles-{num}"),
198            kind,
199            MetricValue::AggregatedSummary {
200                quantiles: vector_lib::quantiles![
201                    0.0 => factor,
202                    0.5 => factor * 2.0,
203                    1.0 => factor * 4.0
204                ],
205                count: factor as u64 * 10,
206                sum: factor * 7.0,
207            },
208        )
209    }
210
211    fn rebuffer<State: MetricNormalize + Default>(metrics: Vec<Metric>) -> Buffer {
212        let mut batch_settings = BatchSettings::default();
213        batch_settings.size.bytes = 9999;
214        batch_settings.size.events = 6;
215
216        let mut normalizer = MetricNormalizer::<State>::default();
217        let mut buffer = MetricsBuffer::new(batch_settings.size);
218        let mut result = vec![];
219
220        for metric in metrics {
221            if let Some(event) = normalizer.normalize(metric) {
222                match buffer.push(event) {
223                    PushResult::Overflow(_) => panic!("overflowed too early"),
224                    PushResult::Ok(true) => {
225                        let batch =
226                            std::mem::replace(&mut buffer, MetricsBuffer::new(batch_settings.size));
227                        result.push(batch.finish());
228                    }
229                    PushResult::Ok(false) => (),
230                }
231            }
232        }
233
234        if !buffer.is_empty() {
235            result.push(buffer.finish())
236        }
237
238        // Sort each batch to provide a predictable result ordering
239        result
240            .into_iter()
241            .map(|mut batch| {
242                batch.sort_by_key(|k| format!("{k:?}"));
243                batch
244            })
245            .collect()
246    }
247
248    fn rebuffer_incremental_counters<State: MetricNormalize + Default>() -> Buffer {
249        let mut events = Vec::new();
250        for i in 0..4 {
251            // counter-0 is repeated 5 times
252            events.push(sample_counter(0, "production", Incremental, i as f64));
253        }
254
255        for i in 0..4 {
256            // these counters cause a buffer flush
257            events.push(sample_counter(i, "staging", Incremental, i as f64));
258        }
259
260        for i in 0..4 {
261            // counter-0 increments the previous buffer, the rest are new
262            events.push(sample_counter(i, "production", Incremental, i as f64));
263        }
264
265        rebuffer::<State>(events)
266    }
267
268    #[test]
269    fn abs_buffer_incremental_counters() {
270        let buffer = rebuffer_incremental_counters::<AbsoluteMetricNormalizer>();
271
272        assert_eq!(
273            buffer[0],
274            [
275                sample_counter(0, "production", Absolute, 6.0),
276                sample_counter(0, "staging", Absolute, 0.0),
277                sample_counter(1, "production", Absolute, 1.0),
278                sample_counter(1, "staging", Absolute, 1.0),
279                sample_counter(2, "staging", Absolute, 2.0),
280                sample_counter(3, "staging", Absolute, 3.0),
281            ]
282        );
283
284        assert_eq!(
285            buffer[1],
286            [
287                sample_counter(2, "production", Absolute, 2.0),
288                sample_counter(3, "production", Absolute, 3.0),
289            ]
290        );
291
292        assert_eq!(buffer.len(), 2);
293    }
294
295    #[test]
296    fn inc_buffer_incremental_counters() {
297        let buffer = rebuffer_incremental_counters::<IncrementalMetricNormalizer>();
298
299        assert_eq!(
300            buffer[0],
301            [
302                sample_counter(0, "production", Incremental, 6.0),
303                sample_counter(0, "staging", Incremental, 0.0),
304                sample_counter(1, "production", Incremental, 1.0),
305                sample_counter(1, "staging", Incremental, 1.0),
306                sample_counter(2, "staging", Incremental, 2.0),
307                sample_counter(3, "staging", Incremental, 3.0),
308            ]
309        );
310
311        assert_eq!(
312            buffer[1],
313            [
314                sample_counter(2, "production", Incremental, 2.0),
315                sample_counter(3, "production", Incremental, 3.0),
316            ]
317        );
318
319        assert_eq!(buffer.len(), 2);
320    }
321
322    fn rebuffer_absolute_counters<State: MetricNormalize + Default>() -> Buffer {
323        let mut events = Vec::new();
324        // counter-0 and -1 only emitted once
325        // counter-2 and -3 emitted twice
326        // counter-4 and -5 emitted once
327        for i in 0..4 {
328            events.push(sample_counter(i, "production", Absolute, i as f64));
329        }
330
331        for i in 2..6 {
332            events.push(sample_counter(i, "production", Absolute, i as f64 * 3.0));
333        }
334
335        rebuffer::<State>(events)
336    }
337
338    #[test]
339    fn abs_buffer_absolute_counters() {
340        let buffer = rebuffer_absolute_counters::<AbsoluteMetricNormalizer>();
341
342        assert_eq!(
343            buffer[0],
344            [
345                sample_counter(0, "production", Absolute, 0.0),
346                sample_counter(1, "production", Absolute, 1.0),
347                sample_counter(2, "production", Absolute, 6.0),
348                sample_counter(3, "production", Absolute, 9.0),
349                sample_counter(4, "production", Absolute, 12.0),
350                sample_counter(5, "production", Absolute, 15.0),
351            ]
352        );
353
354        assert_eq!(buffer.len(), 1);
355    }
356
357    #[test]
358    fn inc_buffer_absolute_counters() {
359        let buffer = rebuffer_absolute_counters::<IncrementalMetricNormalizer>();
360
361        assert_eq!(
362            buffer[0],
363            [
364                sample_counter(2, "production", Incremental, 4.0),
365                sample_counter(3, "production", Incremental, 6.0),
366            ]
367        );
368
369        assert_eq!(buffer.len(), 1);
370    }
371
372    fn rebuffer_incremental_gauges<State: MetricNormalize + Default>() -> Buffer {
373        let mut events = Vec::new();
374        // gauge-1 emitted once
375        // gauge-2 through -4 are emitted twice
376        // gauge-5 emitted once
377        for i in 1..5 {
378            events.push(sample_gauge(i, Incremental, i as f64));
379        }
380
381        for i in 2..6 {
382            events.push(sample_gauge(i, Incremental, i as f64));
383        }
384
385        rebuffer::<State>(events)
386    }
387
388    #[test]
389    fn abs_buffer_incremental_gauges() {
390        let buffer = rebuffer_incremental_gauges::<AbsoluteMetricNormalizer>();
391
392        assert_eq!(
393            buffer[0],
394            [
395                sample_gauge(1, Absolute, 1.0),
396                sample_gauge(2, Absolute, 4.0),
397                sample_gauge(3, Absolute, 6.0),
398                sample_gauge(4, Absolute, 8.0),
399                sample_gauge(5, Absolute, 5.0),
400            ]
401        );
402
403        assert_eq!(buffer.len(), 1);
404    }
405
406    #[test]
407    fn inc_buffer_incremental_gauges() {
408        let buffer = rebuffer_incremental_gauges::<IncrementalMetricNormalizer>();
409
410        assert_eq!(
411            buffer[0],
412            [
413                sample_gauge(1, Incremental, 1.0),
414                sample_gauge(2, Incremental, 4.0),
415                sample_gauge(3, Incremental, 6.0),
416                sample_gauge(4, Incremental, 8.0),
417                sample_gauge(5, Incremental, 5.0),
418            ]
419        );
420
421        assert_eq!(buffer.len(), 1);
422    }
423
424    fn rebuffer_absolute_gauges<State: MetricNormalize + Default>() -> Buffer {
425        let mut events = Vec::new();
426        // gauge-2 emitted once
427        // gauge-3 and -4 emitted twice
428        // gauge-5 emitted once
429        for i in 2..5 {
430            events.push(sample_gauge(i, Absolute, i as f64 * 2.0));
431        }
432
433        for i in 3..6 {
434            events.push(sample_gauge(i, Absolute, i as f64 * 10.0));
435        }
436
437        rebuffer::<State>(events)
438    }
439
440    #[test]
441    fn abs_buffer_absolute_gauges() {
442        let buffer = rebuffer_absolute_gauges::<AbsoluteMetricNormalizer>();
443
444        assert_eq!(
445            buffer[0],
446            [
447                sample_gauge(2, Absolute, 4.0),
448                sample_gauge(3, Absolute, 30.0),
449                sample_gauge(4, Absolute, 40.0),
450                sample_gauge(5, Absolute, 50.0),
451            ]
452        );
453
454        assert_eq!(buffer.len(), 1);
455    }
456
457    #[test]
458    fn inc_buffer_absolute_gauges() {
459        let buffer = rebuffer_absolute_gauges::<IncrementalMetricNormalizer>();
460
461        assert_eq!(
462            buffer[0],
463            [
464                sample_gauge(3, Incremental, 24.0),
465                sample_gauge(4, Incremental, 32.0),
466            ]
467        );
468
469        assert_eq!(buffer.len(), 1);
470    }
471
472    fn rebuffer_incremental_sets<State: MetricNormalize + Default>() -> Buffer {
473        let mut events = Vec::new();
474        // set-0 emitted 8 times with 4 different values
475        // set-1 emitted once with 4 values
476        for i in 0..4 {
477            events.push(sample_set(0, Incremental, &[i]));
478        }
479
480        for i in 0..4 {
481            events.push(sample_set(0, Incremental, &[i]));
482        }
483
484        events.push(sample_set(1, Incremental, &[1, 2, 3, 4]));
485
486        rebuffer::<State>(events)
487    }
488
489    #[test]
490    fn abs_buffer_incremental_sets() {
491        let buffer = rebuffer_incremental_sets::<AbsoluteMetricNormalizer>();
492
493        assert_eq!(
494            buffer[0],
495            [
496                sample_set(0, Absolute, &[0, 1, 2, 3]),
497                sample_set(1, Absolute, &[1, 2, 3, 4]),
498            ]
499        );
500
501        assert_eq!(buffer.len(), 1);
502    }
503
504    #[test]
505    fn inc_buffer_incremental_sets() {
506        let buffer = rebuffer_incremental_sets::<IncrementalMetricNormalizer>();
507
508        assert_eq!(
509            buffer[0],
510            [
511                sample_set(0, Incremental, &[0, 1, 2, 3]),
512                sample_set(1, Incremental, &[1, 2, 3, 4]),
513            ]
514        );
515
516        assert_eq!(buffer.len(), 1);
517    }
518
519    fn rebuffer_incremental_distributions<State: MetricNormalize + Default>() -> Buffer {
520        let mut events = Vec::new();
521        for _ in 2..6 {
522            events.push(sample_distribution_histogram(2, Incremental, 10));
523        }
524
525        for i in 2..6 {
526            events.push(sample_distribution_histogram(i, Incremental, 10));
527        }
528
529        rebuffer::<State>(events)
530    }
531
532    #[test]
533    fn abs_buffer_incremental_distributions() {
534        let buffer = rebuffer_incremental_distributions::<AbsoluteMetricNormalizer>();
535
536        assert_eq!(
537            buffer[0],
538            [
539                sample_distribution_histogram(2, Absolute, 50),
540                sample_distribution_histogram(3, Absolute, 10),
541                sample_distribution_histogram(4, Absolute, 10),
542                sample_distribution_histogram(5, Absolute, 10),
543            ]
544        );
545
546        assert_eq!(buffer.len(), 1);
547    }
548
549    #[test]
550    fn inc_buffer_incremental_distributions() {
551        let buffer = rebuffer_incremental_distributions::<IncrementalMetricNormalizer>();
552
553        assert_eq!(
554            buffer[0],
555            [
556                sample_distribution_histogram(2, Incremental, 50),
557                sample_distribution_histogram(3, Incremental, 10),
558                sample_distribution_histogram(4, Incremental, 10),
559                sample_distribution_histogram(5, Incremental, 10),
560            ]
561        );
562
563        assert_eq!(buffer.len(), 1);
564    }
565
566    #[test]
567    fn compress_distributions() {
568        let mut samples = vector_lib::samples![
569            2.0 => 12,
570            2.0 => 12,
571            3.0 => 13,
572            1.0 => 11,
573            2.0 => 12,
574            2.0 => 12,
575            3.0 => 13
576        ];
577
578        assert_eq!(
579            compress_distribution(&mut samples),
580            vector_lib::samples![1.0 => 11, 2.0 => 48, 3.0 => 26]
581        );
582    }
583
584    fn rebuffer_absolute_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
585        let mut events = Vec::new();
586        for _ in 2..5 {
587            events.push(sample_aggregated_histogram(2, Absolute, 1.0, 1, 10.0));
588        }
589
590        for i in 2..5 {
591            events.push(sample_aggregated_histogram(
592                i,
593                Absolute,
594                1.0,
595                i as u64,
596                i as f64 * 10.0,
597            ));
598        }
599
600        rebuffer::<State>(events)
601    }
602
603    #[test]
604    fn abs_buffer_absolute_aggregated_histograms() {
605        let buffer = rebuffer_absolute_aggregated_histograms::<AbsoluteMetricNormalizer>();
606
607        assert_eq!(
608            buffer[0],
609            [
610                sample_aggregated_histogram(2, Absolute, 1.0, 2, 20.0),
611                sample_aggregated_histogram(3, Absolute, 1.0, 3, 30.0),
612                sample_aggregated_histogram(4, Absolute, 1.0, 4, 40.0),
613            ]
614        );
615
616        assert_eq!(buffer.len(), 1);
617    }
618
619    #[test]
620    fn inc_buffer_absolute_aggregated_histograms() {
621        let buffer = rebuffer_absolute_aggregated_histograms::<IncrementalMetricNormalizer>();
622
623        assert_eq!(
624            buffer[0],
625            [sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)]
626        );
627
628        assert_eq!(buffer.len(), 1);
629    }
630
631    fn rebuffer_incremental_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
632        let mut events = vec![sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)];
633
634        for i in 1..4 {
635            events.push(sample_aggregated_histogram(2, Incremental, 2.0, i, 10.0));
636        }
637
638        rebuffer::<State>(events)
639    }
640
641    #[test]
642    fn abs_buffer_incremental_aggregated_histograms() {
643        let buffer = rebuffer_incremental_aggregated_histograms::<AbsoluteMetricNormalizer>();
644
645        assert_eq!(
646            buffer[0],
647            [sample_aggregated_histogram(2, Absolute, 2.0, 6, 30.0)]
648        );
649
650        assert_eq!(buffer.len(), 1);
651    }
652
653    #[test]
654    fn inc_buffer_incremental_aggregated_histograms() {
655        let buffer = rebuffer_incremental_aggregated_histograms::<IncrementalMetricNormalizer>();
656
657        assert_eq!(
658            buffer[0],
659            [sample_aggregated_histogram(2, Incremental, 2.0, 6, 30.0)]
660        );
661
662        assert_eq!(buffer.len(), 1);
663    }
664
665    fn rebuffer_aggregated_summaries<State: MetricNormalize + Default>() -> Buffer {
666        let mut events = Vec::new();
667        for factor in 0..2 {
668            for num in 2..4 {
669                events.push(sample_aggregated_summary(
670                    num,
671                    Absolute,
672                    (factor + num) as f64,
673                ));
674            }
675        }
676
677        rebuffer::<State>(events)
678    }
679
680    #[test]
681    fn abs_buffer_aggregated_summaries() {
682        let buffer = rebuffer_aggregated_summaries::<AbsoluteMetricNormalizer>();
683
684        assert_eq!(
685            buffer[0],
686            [
687                sample_aggregated_summary(2, Absolute, 3.0),
688                sample_aggregated_summary(3, Absolute, 4.0),
689            ]
690        );
691
692        assert_eq!(buffer.len(), 1);
693    }
694
695    #[test]
696    fn inc_buffer_aggregated_summaries() {
697        let buffer = rebuffer_aggregated_summaries::<IncrementalMetricNormalizer>();
698
699        // Since aggregated summaries cannot be added, they don't work
700        // as incremental metrics and this results in an empty buffer.
701        assert_eq!(buffer.len(), 0);
702    }
703}