1use vector_lib::event::metric::{Metric, MetricValue, Sample};
2
3use crate::sinks::util::{
4 Merged, SinkBatchSettings,
5 batch::{Batch, BatchConfig, BatchError, BatchSize, PushResult},
6};
7
8mod normalize;
9pub use self::normalize::*;
10
11mod split;
12pub use self::split::*;
13
14pub struct MetricsBuffer {
24 metrics: Option<MetricSet>,
25 max_events: usize,
26}
27
28impl MetricsBuffer {
29 pub const fn new(settings: BatchSize<Self>) -> Self {
31 Self::with_capacity(settings.events)
32 }
33
34 const fn with_capacity(max_events: usize) -> Self {
35 Self {
36 metrics: None,
37 max_events,
38 }
39 }
40}
41
42impl Batch for MetricsBuffer {
43 type Input = Metric;
44 type Output = Vec<Metric>;
45
46 fn get_settings_defaults<D: SinkBatchSettings + Clone>(
47 config: BatchConfig<D, Merged>,
48 ) -> Result<BatchConfig<D, Merged>, BatchError> {
49 config.disallow_max_bytes()
50 }
51
52 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
53 if self.num_items() >= self.max_events {
54 PushResult::Overflow(item)
55 } else {
56 let max_events = self.max_events;
57 self.metrics
58 .get_or_insert_with(|| {
59 MetricSet::new(MetricSetSettings {
60 max_events: Some(max_events),
61 ..Default::default()
62 })
63 })
64 .insert_update(item);
65 PushResult::Ok(self.num_items() >= self.max_events)
66 }
67 }
68
69 fn is_empty(&self) -> bool {
70 self.num_items() == 0
71 }
72
73 fn fresh(&self) -> Self {
74 Self::with_capacity(self.max_events)
75 }
76
77 fn finish(self) -> Self::Output {
78 let mut finalized = self
80 .metrics
81 .map(MetricSet::into_metrics)
82 .unwrap_or_default();
83 finalized.iter_mut().for_each(finalize_metric);
84 finalized
85 }
86
87 fn num_items(&self) -> usize {
88 self.metrics
89 .as_ref()
90 .map(|metrics| metrics.len())
91 .unwrap_or(0)
92 }
93}
94
95fn finalize_metric(metric: &mut Metric) {
96 if let MetricValue::Distribution { samples, .. } = metric.data_mut().value_mut() {
97 let compressed_samples = compress_distribution(samples);
98 *samples = compressed_samples;
99 }
100}
101
102pub fn compress_distribution(samples: &mut Vec<Sample>) -> Vec<Sample> {
103 if samples.is_empty() {
104 return Vec::new();
105 }
106
107 samples.sort_by(|a, b| a.value.total_cmp(&b.value));
108
109 let mut acc = Sample {
110 value: samples[0].value,
111 rate: 0,
112 };
113 let mut result = Vec::new();
114
115 for sample in samples {
116 if acc.value == sample.value {
117 acc.rate += sample.rate;
118 } else {
119 result.push(acc);
120 acc = *sample;
121 }
122 }
123 result.push(acc);
124
125 result
126}
127
128#[cfg(test)]
129mod tests {
130 use itertools::Itertools;
131 use similar_asserts::assert_eq;
132 use vector_lib::{
133 event::metric::{MetricKind, MetricKind::*, MetricValue, StatisticKind},
134 metric_tags,
135 };
136
137 use super::*;
138 use crate::{
139 sinks::util::BatchSettings,
140 test_util::metrics::{AbsoluteMetricNormalizer, IncrementalMetricNormalizer},
141 };
142
143 type Buffer = Vec<Vec<Metric>>;
144
145 pub fn sample_counter(num: usize, tagstr: &str, kind: MetricKind, value: f64) -> Metric {
146 Metric::new(
147 format!("counter-{num}"),
148 kind,
149 MetricValue::Counter { value },
150 )
151 .with_tags(Some(metric_tags!(tagstr => "true")))
152 }
153
154 pub fn sample_gauge(num: usize, kind: MetricKind, value: f64) -> Metric {
155 Metric::new(format!("gauge-{num}"), kind, MetricValue::Gauge { value })
156 }
157
158 pub fn sample_set<T: ToString>(num: usize, kind: MetricKind, values: &[T]) -> Metric {
159 Metric::new(
160 format!("set-{num}"),
161 kind,
162 MetricValue::Set {
163 values: values.iter().map(|s| s.to_string()).collect(),
164 },
165 )
166 }
167
168 pub fn sample_distribution_histogram(num: u32, kind: MetricKind, rate: u32) -> Metric {
169 Metric::new(
170 format!("dist-{num}"),
171 kind,
172 MetricValue::Distribution {
173 samples: vector_lib::samples![num as f64 => rate],
174 statistic: StatisticKind::Histogram,
175 },
176 )
177 }
178
179 pub fn sample_aggregated_histogram(
180 num: usize,
181 kind: MetricKind,
182 bpower: f64,
183 cfactor: u64,
184 sum: f64,
185 ) -> Metric {
186 Metric::new(
187 format!("buckets-{num}"),
188 kind,
189 MetricValue::AggregatedHistogram {
190 buckets: vector_lib::buckets![
191 1.0 => cfactor,
192 bpower.exp2() => cfactor * 2,
193 4.0f64.powf(bpower) => cfactor * 4
194 ],
195 count: 7 * cfactor,
196 sum,
197 },
198 )
199 }
200
201 pub fn sample_aggregated_summary(num: u32, kind: MetricKind, factor: f64) -> Metric {
202 Metric::new(
203 format!("quantiles-{num}"),
204 kind,
205 MetricValue::AggregatedSummary {
206 quantiles: vector_lib::quantiles![
207 0.0 => factor,
208 0.5 => factor * 2.0,
209 1.0 => factor * 4.0
210 ],
211 count: factor as u64 * 10,
212 sum: factor * 7.0,
213 },
214 )
215 }
216
217 fn rebuffer<State: MetricNormalize + Default>(metrics: Vec<Metric>) -> Buffer {
218 let mut batch_settings = BatchSettings::default();
219 batch_settings.size.bytes = 9999;
220 batch_settings.size.events = 6;
221
222 let mut normalizer = MetricNormalizer::<State>::default();
223 let mut buffer = MetricsBuffer::new(batch_settings.size);
224 let mut result = vec![];
225
226 for metric in metrics {
227 if let Some(event) = normalizer.normalize(metric) {
228 match buffer.push(event) {
229 PushResult::Overflow(_) => panic!("overflowed too early"),
230 PushResult::Ok(true) => {
231 let batch =
232 std::mem::replace(&mut buffer, MetricsBuffer::new(batch_settings.size));
233 result.push(batch.finish());
234 }
235 PushResult::Ok(false) => (),
236 }
237 }
238 }
239
240 if !buffer.is_empty() {
241 result.push(buffer.finish())
242 }
243
244 result
246 .into_iter()
247 .map(|mut batch| {
248 batch.sort_by_key(|k| format!("{k:?}"));
249 batch
250 })
251 .collect()
252 }
253
254 fn rebuffer_incremental_counters<State: MetricNormalize + Default>() -> Buffer {
255 let mut events = Vec::new();
256 for i in 0..4 {
257 events.push(sample_counter(0, "production", Incremental, i as f64));
259 }
260
261 for i in 0..4 {
262 events.push(sample_counter(i, "staging", Incremental, i as f64));
264 }
265
266 for i in 0..4 {
267 events.push(sample_counter(i, "production", Incremental, i as f64));
269 }
270
271 rebuffer::<State>(events)
272 }
273
274 #[test]
275 fn abs_buffer_incremental_counters() {
276 let buffer = rebuffer_incremental_counters::<AbsoluteMetricNormalizer>();
277
278 assert_eq!(
279 buffer[0],
280 [
281 sample_counter(0, "production", Absolute, 6.0),
282 sample_counter(0, "staging", Absolute, 0.0),
283 sample_counter(1, "production", Absolute, 1.0),
284 sample_counter(1, "staging", Absolute, 1.0),
285 sample_counter(2, "staging", Absolute, 2.0),
286 sample_counter(3, "staging", Absolute, 3.0),
287 ]
288 );
289
290 assert_eq!(
291 buffer[1],
292 [
293 sample_counter(2, "production", Absolute, 2.0),
294 sample_counter(3, "production", Absolute, 3.0),
295 ]
296 );
297
298 assert_eq!(buffer.len(), 2);
299 }
300
301 #[test]
302 fn inc_buffer_incremental_counters() {
303 let buffer = rebuffer_incremental_counters::<IncrementalMetricNormalizer>();
304
305 assert_eq!(
306 buffer[0],
307 [
308 sample_counter(0, "production", Incremental, 6.0),
309 sample_counter(0, "staging", Incremental, 0.0),
310 sample_counter(1, "production", Incremental, 1.0),
311 sample_counter(1, "staging", Incremental, 1.0),
312 sample_counter(2, "staging", Incremental, 2.0),
313 sample_counter(3, "staging", Incremental, 3.0),
314 ]
315 );
316
317 assert_eq!(
318 buffer[1],
319 [
320 sample_counter(2, "production", Incremental, 2.0),
321 sample_counter(3, "production", Incremental, 3.0),
322 ]
323 );
324
325 assert_eq!(buffer.len(), 2);
326 }
327
328 fn rebuffer_absolute_counters<State: MetricNormalize + Default>() -> Buffer {
329 let mut events = Vec::new();
330 for i in 0..4 {
334 events.push(sample_counter(i, "production", Absolute, i as f64));
335 }
336
337 for i in 2..6 {
338 events.push(sample_counter(i, "production", Absolute, i as f64 * 3.0));
339 }
340
341 rebuffer::<State>(events)
342 }
343
344 #[test]
345 fn abs_buffer_absolute_counters() {
346 let buffer = rebuffer_absolute_counters::<AbsoluteMetricNormalizer>();
347
348 assert_eq!(
349 buffer[0],
350 [
351 sample_counter(0, "production", Absolute, 0.0),
352 sample_counter(1, "production", Absolute, 1.0),
353 sample_counter(2, "production", Absolute, 6.0),
354 sample_counter(3, "production", Absolute, 9.0),
355 sample_counter(4, "production", Absolute, 12.0),
356 sample_counter(5, "production", Absolute, 15.0),
357 ]
358 );
359
360 assert_eq!(buffer.len(), 1);
361 }
362
363 #[test]
364 fn inc_buffer_absolute_counters() {
365 let buffer = rebuffer_absolute_counters::<IncrementalMetricNormalizer>();
366
367 assert_eq!(
368 buffer[0],
369 [
370 sample_counter(2, "production", Incremental, 4.0),
371 sample_counter(3, "production", Incremental, 6.0),
372 ]
373 );
374
375 assert_eq!(buffer.len(), 1);
376 }
377
378 fn rebuffer_incremental_gauges<State: MetricNormalize + Default>() -> Buffer {
379 let mut events = Vec::new();
380 for i in 1..5 {
384 events.push(sample_gauge(i, Incremental, i as f64));
385 }
386
387 for i in 2..6 {
388 events.push(sample_gauge(i, Incremental, i as f64));
389 }
390
391 rebuffer::<State>(events)
392 }
393
394 #[test]
395 fn abs_buffer_incremental_gauges() {
396 let buffer = rebuffer_incremental_gauges::<AbsoluteMetricNormalizer>();
397
398 assert_eq!(
399 buffer[0],
400 [
401 sample_gauge(1, Absolute, 1.0),
402 sample_gauge(2, Absolute, 4.0),
403 sample_gauge(3, Absolute, 6.0),
404 sample_gauge(4, Absolute, 8.0),
405 sample_gauge(5, Absolute, 5.0),
406 ]
407 );
408
409 assert_eq!(buffer.len(), 1);
410 }
411
412 #[test]
413 fn inc_buffer_incremental_gauges() {
414 let buffer = rebuffer_incremental_gauges::<IncrementalMetricNormalizer>();
415
416 assert_eq!(
417 buffer[0],
418 [
419 sample_gauge(1, Incremental, 1.0),
420 sample_gauge(2, Incremental, 4.0),
421 sample_gauge(3, Incremental, 6.0),
422 sample_gauge(4, Incremental, 8.0),
423 sample_gauge(5, Incremental, 5.0),
424 ]
425 );
426
427 assert_eq!(buffer.len(), 1);
428 }
429
430 fn rebuffer_absolute_gauges<State: MetricNormalize + Default>() -> Buffer {
431 let mut events = Vec::new();
432 for i in 2..5 {
436 events.push(sample_gauge(i, Absolute, i as f64 * 2.0));
437 }
438
439 for i in 3..6 {
440 events.push(sample_gauge(i, Absolute, i as f64 * 10.0));
441 }
442
443 rebuffer::<State>(events)
444 }
445
446 #[test]
447 fn abs_buffer_absolute_gauges() {
448 let buffer = rebuffer_absolute_gauges::<AbsoluteMetricNormalizer>();
449
450 assert_eq!(
451 buffer[0],
452 [
453 sample_gauge(2, Absolute, 4.0),
454 sample_gauge(3, Absolute, 30.0),
455 sample_gauge(4, Absolute, 40.0),
456 sample_gauge(5, Absolute, 50.0),
457 ]
458 );
459
460 assert_eq!(buffer.len(), 1);
461 }
462
463 #[test]
464 fn inc_buffer_absolute_gauges() {
465 let buffer = rebuffer_absolute_gauges::<IncrementalMetricNormalizer>();
466
467 assert_eq!(
468 buffer[0],
469 [
470 sample_gauge(3, Incremental, 24.0),
471 sample_gauge(4, Incremental, 32.0),
472 ]
473 );
474
475 assert_eq!(buffer.len(), 1);
476 }
477
478 fn rebuffer_incremental_sets<State: MetricNormalize + Default>() -> Buffer {
479 let mut events = Vec::new();
480 for i in 0..4 {
483 events.push(sample_set(0, Incremental, &[i]));
484 }
485
486 for i in 0..4 {
487 events.push(sample_set(0, Incremental, &[i]));
488 }
489
490 events.push(sample_set(1, Incremental, &[1, 2, 3, 4]));
491
492 rebuffer::<State>(events)
493 }
494
495 #[test]
496 fn abs_buffer_incremental_sets() {
497 let buffer = rebuffer_incremental_sets::<AbsoluteMetricNormalizer>();
498
499 assert_eq!(
500 buffer[0],
501 [
502 sample_set(0, Absolute, &[0, 1, 2, 3]),
503 sample_set(1, Absolute, &[1, 2, 3, 4]),
504 ]
505 );
506
507 assert_eq!(buffer.len(), 1);
508 }
509
510 #[test]
511 fn inc_buffer_incremental_sets() {
512 let buffer = rebuffer_incremental_sets::<IncrementalMetricNormalizer>();
513
514 assert_eq!(
515 buffer[0],
516 [
517 sample_set(0, Incremental, &[0, 1, 2, 3]),
518 sample_set(1, Incremental, &[1, 2, 3, 4]),
519 ]
520 );
521
522 assert_eq!(buffer.len(), 1);
523 }
524
525 fn rebuffer_incremental_distributions<State: MetricNormalize + Default>() -> Buffer {
526 let mut events = Vec::new();
527 for _ in 2..6 {
528 events.push(sample_distribution_histogram(2, Incremental, 10));
529 }
530
531 for i in 2..6 {
532 events.push(sample_distribution_histogram(i, Incremental, 10));
533 }
534
535 rebuffer::<State>(events)
536 }
537
538 #[test]
539 fn abs_buffer_incremental_distributions() {
540 let buffer = rebuffer_incremental_distributions::<AbsoluteMetricNormalizer>();
541
542 assert_eq!(
543 buffer[0],
544 [
545 sample_distribution_histogram(2, Absolute, 50),
546 sample_distribution_histogram(3, Absolute, 10),
547 sample_distribution_histogram(4, Absolute, 10),
548 sample_distribution_histogram(5, Absolute, 10),
549 ]
550 );
551
552 assert_eq!(buffer.len(), 1);
553 }
554
555 #[test]
556 fn inc_buffer_incremental_distributions() {
557 let buffer = rebuffer_incremental_distributions::<IncrementalMetricNormalizer>();
558
559 assert_eq!(
560 buffer[0],
561 [
562 sample_distribution_histogram(2, Incremental, 50),
563 sample_distribution_histogram(3, Incremental, 10),
564 sample_distribution_histogram(4, Incremental, 10),
565 sample_distribution_histogram(5, Incremental, 10),
566 ]
567 );
568
569 assert_eq!(buffer.len(), 1);
570 }
571
572 #[test]
573 fn compress_distributions() {
574 let mut samples = vector_lib::samples![
575 2.0 => 12,
576 2.0 => 12,
577 3.0 => 13,
578 1.0 => 11,
579 2.0 => 12,
580 2.0 => 12,
581 3.0 => 13
582 ];
583
584 assert_eq!(
585 compress_distribution(&mut samples),
586 vector_lib::samples![1.0 => 11, 2.0 => 48, 3.0 => 26]
587 );
588 }
589
590 #[test]
591 fn compress_distributions_doesnt_panic() {
592 let to_float = |v: i32| -> f64 { v as f64 };
593
594 let mut samples = (0..=15)
595 .map(to_float)
596 .chain(std::iter::once(f64::NAN))
597 .chain((16..=20).map(to_float))
598 .rev()
599 .map(|value| Sample { value, rate: 1 })
600 .collect_vec();
601
602 assert_eq!(
603 compress_distribution(&mut samples),
604 (0..=20)
605 .map(to_float)
606 .chain(std::iter::once(f64::NAN))
607 .map(|value| Sample { value, rate: 1 })
608 .collect_vec()
609 );
610 }
611
612 fn rebuffer_absolute_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
613 let mut events = Vec::new();
614 for _ in 2..5 {
615 events.push(sample_aggregated_histogram(2, Absolute, 1.0, 1, 10.0));
616 }
617
618 for i in 2..5 {
619 events.push(sample_aggregated_histogram(
620 i,
621 Absolute,
622 1.0,
623 i as u64,
624 i as f64 * 10.0,
625 ));
626 }
627
628 rebuffer::<State>(events)
629 }
630
631 #[test]
632 fn abs_buffer_absolute_aggregated_histograms() {
633 let buffer = rebuffer_absolute_aggregated_histograms::<AbsoluteMetricNormalizer>();
634
635 assert_eq!(
636 buffer[0],
637 [
638 sample_aggregated_histogram(2, Absolute, 1.0, 2, 20.0),
639 sample_aggregated_histogram(3, Absolute, 1.0, 3, 30.0),
640 sample_aggregated_histogram(4, Absolute, 1.0, 4, 40.0),
641 ]
642 );
643
644 assert_eq!(buffer.len(), 1);
645 }
646
647 #[test]
648 fn inc_buffer_absolute_aggregated_histograms() {
649 let buffer = rebuffer_absolute_aggregated_histograms::<IncrementalMetricNormalizer>();
650
651 assert_eq!(
652 buffer[0],
653 [sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)]
654 );
655
656 assert_eq!(buffer.len(), 1);
657 }
658
659 fn rebuffer_incremental_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
660 let mut events = vec![sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)];
661
662 for i in 1..4 {
663 events.push(sample_aggregated_histogram(2, Incremental, 2.0, i, 10.0));
664 }
665
666 rebuffer::<State>(events)
667 }
668
669 #[test]
670 fn abs_buffer_incremental_aggregated_histograms() {
671 let buffer = rebuffer_incremental_aggregated_histograms::<AbsoluteMetricNormalizer>();
672
673 assert_eq!(
674 buffer[0],
675 [sample_aggregated_histogram(2, Absolute, 2.0, 6, 30.0)]
676 );
677
678 assert_eq!(buffer.len(), 1);
679 }
680
681 #[test]
682 fn inc_buffer_incremental_aggregated_histograms() {
683 let buffer = rebuffer_incremental_aggregated_histograms::<IncrementalMetricNormalizer>();
684
685 assert_eq!(
686 buffer[0],
687 [sample_aggregated_histogram(2, Incremental, 2.0, 6, 30.0)]
688 );
689
690 assert_eq!(buffer.len(), 1);
691 }
692
693 fn rebuffer_aggregated_summaries<State: MetricNormalize + Default>() -> Buffer {
694 let mut events = Vec::new();
695 for factor in 0..2 {
696 for num in 2..4 {
697 events.push(sample_aggregated_summary(
698 num,
699 Absolute,
700 (factor + num) as f64,
701 ));
702 }
703 }
704
705 rebuffer::<State>(events)
706 }
707
708 #[test]
709 fn abs_buffer_aggregated_summaries() {
710 let buffer = rebuffer_aggregated_summaries::<AbsoluteMetricNormalizer>();
711
712 assert_eq!(
713 buffer[0],
714 [
715 sample_aggregated_summary(2, Absolute, 3.0),
716 sample_aggregated_summary(3, Absolute, 4.0),
717 ]
718 );
719
720 assert_eq!(buffer.len(), 1);
721 }
722
723 #[test]
724 fn inc_buffer_aggregated_summaries() {
725 let buffer = rebuffer_aggregated_summaries::<IncrementalMetricNormalizer>();
726
727 assert_eq!(buffer.len(), 0);
730 }
731}