1use std::cmp::Ordering;
2
3use vector_lib::event::metric::{Metric, MetricValue, Sample};
4
5use crate::sinks::util::{
6 batch::{Batch, BatchConfig, BatchError, BatchSize, PushResult},
7 Merged, SinkBatchSettings,
8};
9
10mod normalize;
11pub use self::normalize::*;
12
13mod split;
14pub use self::split::*;
15
16pub struct MetricsBuffer {
26 metrics: Option<MetricSet>,
27 max_events: usize,
28}
29
30impl MetricsBuffer {
31 pub const fn new(settings: BatchSize<Self>) -> Self {
33 Self::with_capacity(settings.events)
34 }
35
36 const fn with_capacity(max_events: usize) -> Self {
37 Self {
38 metrics: None,
39 max_events,
40 }
41 }
42}
43
44impl Batch for MetricsBuffer {
45 type Input = Metric;
46 type Output = Vec<Metric>;
47
48 fn get_settings_defaults<D: SinkBatchSettings + Clone>(
49 config: BatchConfig<D, Merged>,
50 ) -> Result<BatchConfig<D, Merged>, BatchError> {
51 config.disallow_max_bytes()
52 }
53
54 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
55 if self.num_items() >= self.max_events {
56 PushResult::Overflow(item)
57 } else {
58 let max_events = self.max_events;
59 self.metrics
60 .get_or_insert_with(|| MetricSet::with_capacity(max_events))
61 .insert_update(item);
62 PushResult::Ok(self.num_items() >= self.max_events)
63 }
64 }
65
66 fn is_empty(&self) -> bool {
67 self.num_items() == 0
68 }
69
70 fn fresh(&self) -> Self {
71 Self::with_capacity(self.max_events)
72 }
73
74 fn finish(self) -> Self::Output {
75 let mut finalized = self
77 .metrics
78 .map(MetricSet::into_metrics)
79 .unwrap_or_default();
80 finalized.iter_mut().for_each(finalize_metric);
81 finalized
82 }
83
84 fn num_items(&self) -> usize {
85 self.metrics
86 .as_ref()
87 .map(|metrics| metrics.len())
88 .unwrap_or(0)
89 }
90}
91
92fn finalize_metric(metric: &mut Metric) {
93 if let MetricValue::Distribution { samples, .. } = metric.data_mut().value_mut() {
94 let compressed_samples = compress_distribution(samples);
95 *samples = compressed_samples;
96 }
97}
98
99pub fn compress_distribution(samples: &mut Vec<Sample>) -> Vec<Sample> {
100 if samples.is_empty() {
101 return Vec::new();
102 }
103
104 samples.sort_by(|a, b| a.value.partial_cmp(&b.value).unwrap_or(Ordering::Equal));
105
106 let mut acc = Sample {
107 value: samples[0].value,
108 rate: 0,
109 };
110 let mut result = Vec::new();
111
112 for sample in samples {
113 if acc.value == sample.value {
114 acc.rate += sample.rate;
115 } else {
116 result.push(acc);
117 acc = *sample;
118 }
119 }
120 result.push(acc);
121
122 result
123}
124
125#[cfg(test)]
126mod tests {
127 use similar_asserts::assert_eq;
128 use vector_lib::event::metric::{MetricKind, MetricKind::*, MetricValue, StatisticKind};
129 use vector_lib::metric_tags;
130
131 use super::*;
132 use crate::{
133 sinks::util::BatchSettings,
134 test_util::metrics::{AbsoluteMetricNormalizer, IncrementalMetricNormalizer},
135 };
136
137 type Buffer = Vec<Vec<Metric>>;
138
139 pub fn sample_counter(num: usize, tagstr: &str, kind: MetricKind, value: f64) -> Metric {
140 Metric::new(
141 format!("counter-{num}"),
142 kind,
143 MetricValue::Counter { value },
144 )
145 .with_tags(Some(metric_tags!(tagstr => "true")))
146 }
147
148 pub fn sample_gauge(num: usize, kind: MetricKind, value: f64) -> Metric {
149 Metric::new(format!("gauge-{num}"), kind, MetricValue::Gauge { value })
150 }
151
152 pub fn sample_set<T: ToString>(num: usize, kind: MetricKind, values: &[T]) -> Metric {
153 Metric::new(
154 format!("set-{num}"),
155 kind,
156 MetricValue::Set {
157 values: values.iter().map(|s| s.to_string()).collect(),
158 },
159 )
160 }
161
162 pub fn sample_distribution_histogram(num: u32, kind: MetricKind, rate: u32) -> Metric {
163 Metric::new(
164 format!("dist-{num}"),
165 kind,
166 MetricValue::Distribution {
167 samples: vector_lib::samples![num as f64 => rate],
168 statistic: StatisticKind::Histogram,
169 },
170 )
171 }
172
173 pub fn sample_aggregated_histogram(
174 num: usize,
175 kind: MetricKind,
176 bpower: f64,
177 cfactor: u64,
178 sum: f64,
179 ) -> Metric {
180 Metric::new(
181 format!("buckets-{num}"),
182 kind,
183 MetricValue::AggregatedHistogram {
184 buckets: vector_lib::buckets![
185 1.0 => cfactor,
186 bpower.exp2() => cfactor * 2,
187 4.0f64.powf(bpower) => cfactor * 4
188 ],
189 count: 7 * cfactor,
190 sum,
191 },
192 )
193 }
194
195 pub fn sample_aggregated_summary(num: u32, kind: MetricKind, factor: f64) -> Metric {
196 Metric::new(
197 format!("quantiles-{num}"),
198 kind,
199 MetricValue::AggregatedSummary {
200 quantiles: vector_lib::quantiles![
201 0.0 => factor,
202 0.5 => factor * 2.0,
203 1.0 => factor * 4.0
204 ],
205 count: factor as u64 * 10,
206 sum: factor * 7.0,
207 },
208 )
209 }
210
211 fn rebuffer<State: MetricNormalize + Default>(metrics: Vec<Metric>) -> Buffer {
212 let mut batch_settings = BatchSettings::default();
213 batch_settings.size.bytes = 9999;
214 batch_settings.size.events = 6;
215
216 let mut normalizer = MetricNormalizer::<State>::default();
217 let mut buffer = MetricsBuffer::new(batch_settings.size);
218 let mut result = vec![];
219
220 for metric in metrics {
221 if let Some(event) = normalizer.normalize(metric) {
222 match buffer.push(event) {
223 PushResult::Overflow(_) => panic!("overflowed too early"),
224 PushResult::Ok(true) => {
225 let batch =
226 std::mem::replace(&mut buffer, MetricsBuffer::new(batch_settings.size));
227 result.push(batch.finish());
228 }
229 PushResult::Ok(false) => (),
230 }
231 }
232 }
233
234 if !buffer.is_empty() {
235 result.push(buffer.finish())
236 }
237
238 result
240 .into_iter()
241 .map(|mut batch| {
242 batch.sort_by_key(|k| format!("{k:?}"));
243 batch
244 })
245 .collect()
246 }
247
248 fn rebuffer_incremental_counters<State: MetricNormalize + Default>() -> Buffer {
249 let mut events = Vec::new();
250 for i in 0..4 {
251 events.push(sample_counter(0, "production", Incremental, i as f64));
253 }
254
255 for i in 0..4 {
256 events.push(sample_counter(i, "staging", Incremental, i as f64));
258 }
259
260 for i in 0..4 {
261 events.push(sample_counter(i, "production", Incremental, i as f64));
263 }
264
265 rebuffer::<State>(events)
266 }
267
268 #[test]
269 fn abs_buffer_incremental_counters() {
270 let buffer = rebuffer_incremental_counters::<AbsoluteMetricNormalizer>();
271
272 assert_eq!(
273 buffer[0],
274 [
275 sample_counter(0, "production", Absolute, 6.0),
276 sample_counter(0, "staging", Absolute, 0.0),
277 sample_counter(1, "production", Absolute, 1.0),
278 sample_counter(1, "staging", Absolute, 1.0),
279 sample_counter(2, "staging", Absolute, 2.0),
280 sample_counter(3, "staging", Absolute, 3.0),
281 ]
282 );
283
284 assert_eq!(
285 buffer[1],
286 [
287 sample_counter(2, "production", Absolute, 2.0),
288 sample_counter(3, "production", Absolute, 3.0),
289 ]
290 );
291
292 assert_eq!(buffer.len(), 2);
293 }
294
295 #[test]
296 fn inc_buffer_incremental_counters() {
297 let buffer = rebuffer_incremental_counters::<IncrementalMetricNormalizer>();
298
299 assert_eq!(
300 buffer[0],
301 [
302 sample_counter(0, "production", Incremental, 6.0),
303 sample_counter(0, "staging", Incremental, 0.0),
304 sample_counter(1, "production", Incremental, 1.0),
305 sample_counter(1, "staging", Incremental, 1.0),
306 sample_counter(2, "staging", Incremental, 2.0),
307 sample_counter(3, "staging", Incremental, 3.0),
308 ]
309 );
310
311 assert_eq!(
312 buffer[1],
313 [
314 sample_counter(2, "production", Incremental, 2.0),
315 sample_counter(3, "production", Incremental, 3.0),
316 ]
317 );
318
319 assert_eq!(buffer.len(), 2);
320 }
321
322 fn rebuffer_absolute_counters<State: MetricNormalize + Default>() -> Buffer {
323 let mut events = Vec::new();
324 for i in 0..4 {
328 events.push(sample_counter(i, "production", Absolute, i as f64));
329 }
330
331 for i in 2..6 {
332 events.push(sample_counter(i, "production", Absolute, i as f64 * 3.0));
333 }
334
335 rebuffer::<State>(events)
336 }
337
338 #[test]
339 fn abs_buffer_absolute_counters() {
340 let buffer = rebuffer_absolute_counters::<AbsoluteMetricNormalizer>();
341
342 assert_eq!(
343 buffer[0],
344 [
345 sample_counter(0, "production", Absolute, 0.0),
346 sample_counter(1, "production", Absolute, 1.0),
347 sample_counter(2, "production", Absolute, 6.0),
348 sample_counter(3, "production", Absolute, 9.0),
349 sample_counter(4, "production", Absolute, 12.0),
350 sample_counter(5, "production", Absolute, 15.0),
351 ]
352 );
353
354 assert_eq!(buffer.len(), 1);
355 }
356
357 #[test]
358 fn inc_buffer_absolute_counters() {
359 let buffer = rebuffer_absolute_counters::<IncrementalMetricNormalizer>();
360
361 assert_eq!(
362 buffer[0],
363 [
364 sample_counter(2, "production", Incremental, 4.0),
365 sample_counter(3, "production", Incremental, 6.0),
366 ]
367 );
368
369 assert_eq!(buffer.len(), 1);
370 }
371
372 fn rebuffer_incremental_gauges<State: MetricNormalize + Default>() -> Buffer {
373 let mut events = Vec::new();
374 for i in 1..5 {
378 events.push(sample_gauge(i, Incremental, i as f64));
379 }
380
381 for i in 2..6 {
382 events.push(sample_gauge(i, Incremental, i as f64));
383 }
384
385 rebuffer::<State>(events)
386 }
387
388 #[test]
389 fn abs_buffer_incremental_gauges() {
390 let buffer = rebuffer_incremental_gauges::<AbsoluteMetricNormalizer>();
391
392 assert_eq!(
393 buffer[0],
394 [
395 sample_gauge(1, Absolute, 1.0),
396 sample_gauge(2, Absolute, 4.0),
397 sample_gauge(3, Absolute, 6.0),
398 sample_gauge(4, Absolute, 8.0),
399 sample_gauge(5, Absolute, 5.0),
400 ]
401 );
402
403 assert_eq!(buffer.len(), 1);
404 }
405
406 #[test]
407 fn inc_buffer_incremental_gauges() {
408 let buffer = rebuffer_incremental_gauges::<IncrementalMetricNormalizer>();
409
410 assert_eq!(
411 buffer[0],
412 [
413 sample_gauge(1, Incremental, 1.0),
414 sample_gauge(2, Incremental, 4.0),
415 sample_gauge(3, Incremental, 6.0),
416 sample_gauge(4, Incremental, 8.0),
417 sample_gauge(5, Incremental, 5.0),
418 ]
419 );
420
421 assert_eq!(buffer.len(), 1);
422 }
423
424 fn rebuffer_absolute_gauges<State: MetricNormalize + Default>() -> Buffer {
425 let mut events = Vec::new();
426 for i in 2..5 {
430 events.push(sample_gauge(i, Absolute, i as f64 * 2.0));
431 }
432
433 for i in 3..6 {
434 events.push(sample_gauge(i, Absolute, i as f64 * 10.0));
435 }
436
437 rebuffer::<State>(events)
438 }
439
440 #[test]
441 fn abs_buffer_absolute_gauges() {
442 let buffer = rebuffer_absolute_gauges::<AbsoluteMetricNormalizer>();
443
444 assert_eq!(
445 buffer[0],
446 [
447 sample_gauge(2, Absolute, 4.0),
448 sample_gauge(3, Absolute, 30.0),
449 sample_gauge(4, Absolute, 40.0),
450 sample_gauge(5, Absolute, 50.0),
451 ]
452 );
453
454 assert_eq!(buffer.len(), 1);
455 }
456
457 #[test]
458 fn inc_buffer_absolute_gauges() {
459 let buffer = rebuffer_absolute_gauges::<IncrementalMetricNormalizer>();
460
461 assert_eq!(
462 buffer[0],
463 [
464 sample_gauge(3, Incremental, 24.0),
465 sample_gauge(4, Incremental, 32.0),
466 ]
467 );
468
469 assert_eq!(buffer.len(), 1);
470 }
471
472 fn rebuffer_incremental_sets<State: MetricNormalize + Default>() -> Buffer {
473 let mut events = Vec::new();
474 for i in 0..4 {
477 events.push(sample_set(0, Incremental, &[i]));
478 }
479
480 for i in 0..4 {
481 events.push(sample_set(0, Incremental, &[i]));
482 }
483
484 events.push(sample_set(1, Incremental, &[1, 2, 3, 4]));
485
486 rebuffer::<State>(events)
487 }
488
489 #[test]
490 fn abs_buffer_incremental_sets() {
491 let buffer = rebuffer_incremental_sets::<AbsoluteMetricNormalizer>();
492
493 assert_eq!(
494 buffer[0],
495 [
496 sample_set(0, Absolute, &[0, 1, 2, 3]),
497 sample_set(1, Absolute, &[1, 2, 3, 4]),
498 ]
499 );
500
501 assert_eq!(buffer.len(), 1);
502 }
503
504 #[test]
505 fn inc_buffer_incremental_sets() {
506 let buffer = rebuffer_incremental_sets::<IncrementalMetricNormalizer>();
507
508 assert_eq!(
509 buffer[0],
510 [
511 sample_set(0, Incremental, &[0, 1, 2, 3]),
512 sample_set(1, Incremental, &[1, 2, 3, 4]),
513 ]
514 );
515
516 assert_eq!(buffer.len(), 1);
517 }
518
519 fn rebuffer_incremental_distributions<State: MetricNormalize + Default>() -> Buffer {
520 let mut events = Vec::new();
521 for _ in 2..6 {
522 events.push(sample_distribution_histogram(2, Incremental, 10));
523 }
524
525 for i in 2..6 {
526 events.push(sample_distribution_histogram(i, Incremental, 10));
527 }
528
529 rebuffer::<State>(events)
530 }
531
532 #[test]
533 fn abs_buffer_incremental_distributions() {
534 let buffer = rebuffer_incremental_distributions::<AbsoluteMetricNormalizer>();
535
536 assert_eq!(
537 buffer[0],
538 [
539 sample_distribution_histogram(2, Absolute, 50),
540 sample_distribution_histogram(3, Absolute, 10),
541 sample_distribution_histogram(4, Absolute, 10),
542 sample_distribution_histogram(5, Absolute, 10),
543 ]
544 );
545
546 assert_eq!(buffer.len(), 1);
547 }
548
549 #[test]
550 fn inc_buffer_incremental_distributions() {
551 let buffer = rebuffer_incremental_distributions::<IncrementalMetricNormalizer>();
552
553 assert_eq!(
554 buffer[0],
555 [
556 sample_distribution_histogram(2, Incremental, 50),
557 sample_distribution_histogram(3, Incremental, 10),
558 sample_distribution_histogram(4, Incremental, 10),
559 sample_distribution_histogram(5, Incremental, 10),
560 ]
561 );
562
563 assert_eq!(buffer.len(), 1);
564 }
565
566 #[test]
567 fn compress_distributions() {
568 let mut samples = vector_lib::samples![
569 2.0 => 12,
570 2.0 => 12,
571 3.0 => 13,
572 1.0 => 11,
573 2.0 => 12,
574 2.0 => 12,
575 3.0 => 13
576 ];
577
578 assert_eq!(
579 compress_distribution(&mut samples),
580 vector_lib::samples![1.0 => 11, 2.0 => 48, 3.0 => 26]
581 );
582 }
583
584 fn rebuffer_absolute_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
585 let mut events = Vec::new();
586 for _ in 2..5 {
587 events.push(sample_aggregated_histogram(2, Absolute, 1.0, 1, 10.0));
588 }
589
590 for i in 2..5 {
591 events.push(sample_aggregated_histogram(
592 i,
593 Absolute,
594 1.0,
595 i as u64,
596 i as f64 * 10.0,
597 ));
598 }
599
600 rebuffer::<State>(events)
601 }
602
603 #[test]
604 fn abs_buffer_absolute_aggregated_histograms() {
605 let buffer = rebuffer_absolute_aggregated_histograms::<AbsoluteMetricNormalizer>();
606
607 assert_eq!(
608 buffer[0],
609 [
610 sample_aggregated_histogram(2, Absolute, 1.0, 2, 20.0),
611 sample_aggregated_histogram(3, Absolute, 1.0, 3, 30.0),
612 sample_aggregated_histogram(4, Absolute, 1.0, 4, 40.0),
613 ]
614 );
615
616 assert_eq!(buffer.len(), 1);
617 }
618
619 #[test]
620 fn inc_buffer_absolute_aggregated_histograms() {
621 let buffer = rebuffer_absolute_aggregated_histograms::<IncrementalMetricNormalizer>();
622
623 assert_eq!(
624 buffer[0],
625 [sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)]
626 );
627
628 assert_eq!(buffer.len(), 1);
629 }
630
631 fn rebuffer_incremental_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
632 let mut events = vec![sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)];
633
634 for i in 1..4 {
635 events.push(sample_aggregated_histogram(2, Incremental, 2.0, i, 10.0));
636 }
637
638 rebuffer::<State>(events)
639 }
640
641 #[test]
642 fn abs_buffer_incremental_aggregated_histograms() {
643 let buffer = rebuffer_incremental_aggregated_histograms::<AbsoluteMetricNormalizer>();
644
645 assert_eq!(
646 buffer[0],
647 [sample_aggregated_histogram(2, Absolute, 2.0, 6, 30.0)]
648 );
649
650 assert_eq!(buffer.len(), 1);
651 }
652
653 #[test]
654 fn inc_buffer_incremental_aggregated_histograms() {
655 let buffer = rebuffer_incremental_aggregated_histograms::<IncrementalMetricNormalizer>();
656
657 assert_eq!(
658 buffer[0],
659 [sample_aggregated_histogram(2, Incremental, 2.0, 6, 30.0)]
660 );
661
662 assert_eq!(buffer.len(), 1);
663 }
664
665 fn rebuffer_aggregated_summaries<State: MetricNormalize + Default>() -> Buffer {
666 let mut events = Vec::new();
667 for factor in 0..2 {
668 for num in 2..4 {
669 events.push(sample_aggregated_summary(
670 num,
671 Absolute,
672 (factor + num) as f64,
673 ));
674 }
675 }
676
677 rebuffer::<State>(events)
678 }
679
680 #[test]
681 fn abs_buffer_aggregated_summaries() {
682 let buffer = rebuffer_aggregated_summaries::<AbsoluteMetricNormalizer>();
683
684 assert_eq!(
685 buffer[0],
686 [
687 sample_aggregated_summary(2, Absolute, 3.0),
688 sample_aggregated_summary(3, Absolute, 4.0),
689 ]
690 );
691
692 assert_eq!(buffer.len(), 1);
693 }
694
695 #[test]
696 fn inc_buffer_aggregated_summaries() {
697 let buffer = rebuffer_aggregated_summaries::<IncrementalMetricNormalizer>();
698
699 assert_eq!(buffer.len(), 0);
702 }
703}