use std::cmp::Ordering;
use vector_lib::event::metric::{Metric, MetricValue, Sample};
use crate::sinks::util::{
batch::{Batch, BatchConfig, BatchError, BatchSize, PushResult},
Merged, SinkBatchSettings,
};
mod normalize;
pub use self::normalize::*;
mod split;
pub use self::split::*;
pub struct MetricsBuffer {
metrics: Option<MetricSet>,
max_events: usize,
}
impl MetricsBuffer {
pub const fn new(settings: BatchSize<Self>) -> Self {
Self::with_capacity(settings.events)
}
const fn with_capacity(max_events: usize) -> Self {
Self {
metrics: None,
max_events,
}
}
}
impl Batch for MetricsBuffer {
type Input = Metric;
type Output = Vec<Metric>;
fn get_settings_defaults<D: SinkBatchSettings + Clone>(
config: BatchConfig<D, Merged>,
) -> Result<BatchConfig<D, Merged>, BatchError> {
config.disallow_max_bytes()
}
fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
if self.num_items() >= self.max_events {
PushResult::Overflow(item)
} else {
let max_events = self.max_events;
self.metrics
.get_or_insert_with(|| MetricSet::with_capacity(max_events))
.insert_update(item);
PushResult::Ok(self.num_items() >= self.max_events)
}
}
fn is_empty(&self) -> bool {
self.num_items() == 0
}
fn fresh(&self) -> Self {
Self::with_capacity(self.max_events)
}
fn finish(self) -> Self::Output {
let mut finalized = self
.metrics
.map(MetricSet::into_metrics)
.unwrap_or_default();
finalized.iter_mut().for_each(finalize_metric);
finalized
}
fn num_items(&self) -> usize {
self.metrics
.as_ref()
.map(|metrics| metrics.len())
.unwrap_or(0)
}
}
fn finalize_metric(metric: &mut Metric) {
if let MetricValue::Distribution { samples, .. } = metric.data_mut().value_mut() {
let compressed_samples = compress_distribution(samples);
*samples = compressed_samples;
}
}
pub fn compress_distribution(samples: &mut Vec<Sample>) -> Vec<Sample> {
if samples.is_empty() {
return Vec::new();
}
samples.sort_by(|a, b| a.value.partial_cmp(&b.value).unwrap_or(Ordering::Equal));
let mut acc = Sample {
value: samples[0].value,
rate: 0,
};
let mut result = Vec::new();
for sample in samples {
if acc.value == sample.value {
acc.rate += sample.rate;
} else {
result.push(acc);
acc = *sample;
}
}
result.push(acc);
result
}
#[cfg(test)]
mod tests {
use similar_asserts::assert_eq;
use vector_lib::event::metric::{MetricKind, MetricKind::*, MetricValue, StatisticKind};
use vector_lib::metric_tags;
use super::*;
use crate::{
sinks::util::BatchSettings,
test_util::metrics::{AbsoluteMetricNormalizer, IncrementalMetricNormalizer},
};
type Buffer = Vec<Vec<Metric>>;
pub fn sample_counter(num: usize, tagstr: &str, kind: MetricKind, value: f64) -> Metric {
Metric::new(
format!("counter-{}", num),
kind,
MetricValue::Counter { value },
)
.with_tags(Some(metric_tags!(tagstr => "true")))
}
pub fn sample_gauge(num: usize, kind: MetricKind, value: f64) -> Metric {
Metric::new(format!("gauge-{}", num), kind, MetricValue::Gauge { value })
}
pub fn sample_set<T: ToString>(num: usize, kind: MetricKind, values: &[T]) -> Metric {
Metric::new(
format!("set-{}", num),
kind,
MetricValue::Set {
values: values.iter().map(|s| s.to_string()).collect(),
},
)
}
pub fn sample_distribution_histogram(num: u32, kind: MetricKind, rate: u32) -> Metric {
Metric::new(
format!("dist-{}", num),
kind,
MetricValue::Distribution {
samples: vector_lib::samples![num as f64 => rate],
statistic: StatisticKind::Histogram,
},
)
}
pub fn sample_aggregated_histogram(
num: usize,
kind: MetricKind,
bpower: f64,
cfactor: u64,
sum: f64,
) -> Metric {
Metric::new(
format!("buckets-{}", num),
kind,
MetricValue::AggregatedHistogram {
buckets: vector_lib::buckets![
1.0 => cfactor,
bpower.exp2() => cfactor * 2,
4.0f64.powf(bpower) => cfactor * 4
],
count: 7 * cfactor,
sum,
},
)
}
pub fn sample_aggregated_summary(num: u32, kind: MetricKind, factor: f64) -> Metric {
Metric::new(
format!("quantiles-{}", num),
kind,
MetricValue::AggregatedSummary {
quantiles: vector_lib::quantiles![
0.0 => factor,
0.5 => factor * 2.0,
1.0 => factor * 4.0
],
count: factor as u64 * 10,
sum: factor * 7.0,
},
)
}
fn rebuffer<State: MetricNormalize + Default>(metrics: Vec<Metric>) -> Buffer {
let mut batch_settings = BatchSettings::default();
batch_settings.size.bytes = 9999;
batch_settings.size.events = 6;
let mut normalizer = MetricNormalizer::<State>::default();
let mut buffer = MetricsBuffer::new(batch_settings.size);
let mut result = vec![];
for metric in metrics {
if let Some(event) = normalizer.normalize(metric) {
match buffer.push(event) {
PushResult::Overflow(_) => panic!("overflowed too early"),
PushResult::Ok(true) => {
let batch =
std::mem::replace(&mut buffer, MetricsBuffer::new(batch_settings.size));
result.push(batch.finish());
}
PushResult::Ok(false) => (),
}
}
}
if !buffer.is_empty() {
result.push(buffer.finish())
}
result
.into_iter()
.map(|mut batch| {
batch.sort_by_key(|k| format!("{:?}", k));
batch
})
.collect()
}
fn rebuffer_incremental_counters<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for i in 0..4 {
events.push(sample_counter(0, "production", Incremental, i as f64));
}
for i in 0..4 {
events.push(sample_counter(i, "staging", Incremental, i as f64));
}
for i in 0..4 {
events.push(sample_counter(i, "production", Incremental, i as f64));
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_incremental_counters() {
let buffer = rebuffer_incremental_counters::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_counter(0, "production", Absolute, 6.0),
sample_counter(0, "staging", Absolute, 0.0),
sample_counter(1, "production", Absolute, 1.0),
sample_counter(1, "staging", Absolute, 1.0),
sample_counter(2, "staging", Absolute, 2.0),
sample_counter(3, "staging", Absolute, 3.0),
]
);
assert_eq!(
buffer[1],
[
sample_counter(2, "production", Absolute, 2.0),
sample_counter(3, "production", Absolute, 3.0),
]
);
assert_eq!(buffer.len(), 2);
}
#[test]
fn inc_buffer_incremental_counters() {
let buffer = rebuffer_incremental_counters::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_counter(0, "production", Incremental, 6.0),
sample_counter(0, "staging", Incremental, 0.0),
sample_counter(1, "production", Incremental, 1.0),
sample_counter(1, "staging", Incremental, 1.0),
sample_counter(2, "staging", Incremental, 2.0),
sample_counter(3, "staging", Incremental, 3.0),
]
);
assert_eq!(
buffer[1],
[
sample_counter(2, "production", Incremental, 2.0),
sample_counter(3, "production", Incremental, 3.0),
]
);
assert_eq!(buffer.len(), 2);
}
fn rebuffer_absolute_counters<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for i in 0..4 {
events.push(sample_counter(i, "production", Absolute, i as f64));
}
for i in 2..6 {
events.push(sample_counter(i, "production", Absolute, i as f64 * 3.0));
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_absolute_counters() {
let buffer = rebuffer_absolute_counters::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_counter(0, "production", Absolute, 0.0),
sample_counter(1, "production", Absolute, 1.0),
sample_counter(2, "production", Absolute, 6.0),
sample_counter(3, "production", Absolute, 9.0),
sample_counter(4, "production", Absolute, 12.0),
sample_counter(5, "production", Absolute, 15.0),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_absolute_counters() {
let buffer = rebuffer_absolute_counters::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_counter(2, "production", Incremental, 4.0),
sample_counter(3, "production", Incremental, 6.0),
]
);
assert_eq!(buffer.len(), 1);
}
fn rebuffer_incremental_gauges<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for i in 1..5 {
events.push(sample_gauge(i, Incremental, i as f64));
}
for i in 2..6 {
events.push(sample_gauge(i, Incremental, i as f64));
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_incremental_gauges() {
let buffer = rebuffer_incremental_gauges::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_gauge(1, Absolute, 1.0),
sample_gauge(2, Absolute, 4.0),
sample_gauge(3, Absolute, 6.0),
sample_gauge(4, Absolute, 8.0),
sample_gauge(5, Absolute, 5.0),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_incremental_gauges() {
let buffer = rebuffer_incremental_gauges::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_gauge(1, Incremental, 1.0),
sample_gauge(2, Incremental, 4.0),
sample_gauge(3, Incremental, 6.0),
sample_gauge(4, Incremental, 8.0),
sample_gauge(5, Incremental, 5.0),
]
);
assert_eq!(buffer.len(), 1);
}
fn rebuffer_absolute_gauges<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for i in 2..5 {
events.push(sample_gauge(i, Absolute, i as f64 * 2.0));
}
for i in 3..6 {
events.push(sample_gauge(i, Absolute, i as f64 * 10.0));
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_absolute_gauges() {
let buffer = rebuffer_absolute_gauges::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_gauge(2, Absolute, 4.0),
sample_gauge(3, Absolute, 30.0),
sample_gauge(4, Absolute, 40.0),
sample_gauge(5, Absolute, 50.0),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_absolute_gauges() {
let buffer = rebuffer_absolute_gauges::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_gauge(3, Incremental, 24.0),
sample_gauge(4, Incremental, 32.0),
]
);
assert_eq!(buffer.len(), 1);
}
fn rebuffer_incremental_sets<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for i in 0..4 {
events.push(sample_set(0, Incremental, &[i]));
}
for i in 0..4 {
events.push(sample_set(0, Incremental, &[i]));
}
events.push(sample_set(1, Incremental, &[1, 2, 3, 4]));
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_incremental_sets() {
let buffer = rebuffer_incremental_sets::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_set(0, Absolute, &[0, 1, 2, 3]),
sample_set(1, Absolute, &[1, 2, 3, 4]),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_incremental_sets() {
let buffer = rebuffer_incremental_sets::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_set(0, Incremental, &[0, 1, 2, 3]),
sample_set(1, Incremental, &[1, 2, 3, 4]),
]
);
assert_eq!(buffer.len(), 1);
}
fn rebuffer_incremental_distributions<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for _ in 2..6 {
events.push(sample_distribution_histogram(2, Incremental, 10));
}
for i in 2..6 {
events.push(sample_distribution_histogram(i, Incremental, 10));
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_incremental_distributions() {
let buffer = rebuffer_incremental_distributions::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_distribution_histogram(2, Absolute, 50),
sample_distribution_histogram(3, Absolute, 10),
sample_distribution_histogram(4, Absolute, 10),
sample_distribution_histogram(5, Absolute, 10),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_incremental_distributions() {
let buffer = rebuffer_incremental_distributions::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_distribution_histogram(2, Incremental, 50),
sample_distribution_histogram(3, Incremental, 10),
sample_distribution_histogram(4, Incremental, 10),
sample_distribution_histogram(5, Incremental, 10),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn compress_distributions() {
let mut samples = vector_lib::samples![
2.0 => 12,
2.0 => 12,
3.0 => 13,
1.0 => 11,
2.0 => 12,
2.0 => 12,
3.0 => 13
];
assert_eq!(
compress_distribution(&mut samples),
vector_lib::samples![1.0 => 11, 2.0 => 48, 3.0 => 26]
);
}
fn rebuffer_absolute_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for _ in 2..5 {
events.push(sample_aggregated_histogram(2, Absolute, 1.0, 1, 10.0));
}
for i in 2..5 {
events.push(sample_aggregated_histogram(
i,
Absolute,
1.0,
i as u64,
i as f64 * 10.0,
));
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_absolute_aggregated_histograms() {
let buffer = rebuffer_absolute_aggregated_histograms::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_aggregated_histogram(2, Absolute, 1.0, 2, 20.0),
sample_aggregated_histogram(3, Absolute, 1.0, 3, 30.0),
sample_aggregated_histogram(4, Absolute, 1.0, 4, 40.0),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_absolute_aggregated_histograms() {
let buffer = rebuffer_absolute_aggregated_histograms::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)]
);
assert_eq!(buffer.len(), 1);
}
fn rebuffer_incremental_aggregated_histograms<State: MetricNormalize + Default>() -> Buffer {
let mut events = vec![sample_aggregated_histogram(2, Incremental, 1.0, 1, 10.0)];
for i in 1..4 {
events.push(sample_aggregated_histogram(2, Incremental, 2.0, i, 10.0));
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_incremental_aggregated_histograms() {
let buffer = rebuffer_incremental_aggregated_histograms::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[sample_aggregated_histogram(2, Absolute, 2.0, 6, 30.0)]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_incremental_aggregated_histograms() {
let buffer = rebuffer_incremental_aggregated_histograms::<IncrementalMetricNormalizer>();
assert_eq!(
buffer[0],
[sample_aggregated_histogram(2, Incremental, 2.0, 6, 30.0)]
);
assert_eq!(buffer.len(), 1);
}
fn rebuffer_aggregated_summaries<State: MetricNormalize + Default>() -> Buffer {
let mut events = Vec::new();
for factor in 0..2 {
for num in 2..4 {
events.push(sample_aggregated_summary(
num,
Absolute,
(factor + num) as f64,
));
}
}
rebuffer::<State>(events)
}
#[test]
fn abs_buffer_aggregated_summaries() {
let buffer = rebuffer_aggregated_summaries::<AbsoluteMetricNormalizer>();
assert_eq!(
buffer[0],
[
sample_aggregated_summary(2, Absolute, 3.0),
sample_aggregated_summary(3, Absolute, 4.0),
]
);
assert_eq!(buffer.len(), 1);
}
#[test]
fn inc_buffer_aggregated_summaries() {
let buffer = rebuffer_aggregated_summaries::<IncrementalMetricNormalizer>();
assert_eq!(buffer.len(), 0);
}
}