1use std::sync::{
2 Arc,
3 atomic::{AtomicU32, Ordering},
4};
5
6use metrics::{HistogramFn, atomics::AtomicU64};
7use metrics_util::registry::Storage;
8use vector_common::atomic::AtomicF64;
9
10use crate::event::{MetricValue, metric::Bucket};
11
12pub(super) struct VectorStorage;
13
14impl<K> Storage<K> for VectorStorage {
15 type Counter = Arc<AtomicU64>;
16 type Gauge = Arc<AtomicF64>;
17 type Histogram = Arc<Histogram>;
18
19 fn counter(&self, _: &K) -> Self::Counter {
20 Arc::new(AtomicU64::new(0))
21 }
22
23 fn gauge(&self, _: &K) -> Self::Gauge {
24 Arc::new(AtomicF64::new(0.0))
25 }
26
27 fn histogram(&self, _: &K) -> Self::Histogram {
28 Arc::new(Histogram::new())
29 }
30}
31
32#[derive(Debug)]
33pub(super) struct Histogram {
34 buckets: Box<[(f64, AtomicU32); 26]>,
35 count: AtomicU64,
36 sum: AtomicF64,
37}
38
39impl Histogram {
40 const MIN_BUCKET: f64 = 1.0 / (1 << 12) as f64; const MIN_BUCKET_EXP: f64 = -12.0;
42 const BUCKETS: usize = 26;
43
44 pub(crate) fn new() -> Self {
45 let buckets = Box::new([
55 (2.0f64.powi(-12), AtomicU32::new(0)),
56 (2.0f64.powi(-11), AtomicU32::new(0)),
57 (2.0f64.powi(-10), AtomicU32::new(0)),
58 (2.0f64.powi(-9), AtomicU32::new(0)),
59 (2.0f64.powi(-8), AtomicU32::new(0)),
60 (2.0f64.powi(-7), AtomicU32::new(0)),
61 (2.0f64.powi(-6), AtomicU32::new(0)),
62 (2.0f64.powi(-5), AtomicU32::new(0)),
63 (2.0f64.powi(-4), AtomicU32::new(0)),
64 (2.0f64.powi(-3), AtomicU32::new(0)),
65 (2.0f64.powi(-2), AtomicU32::new(0)),
66 (2.0f64.powi(-1), AtomicU32::new(0)),
67 (2.0f64.powi(0), AtomicU32::new(0)),
68 (2.0f64.powi(1), AtomicU32::new(0)),
69 (2.0f64.powi(2), AtomicU32::new(0)),
70 (2.0f64.powi(3), AtomicU32::new(0)),
71 (2.0f64.powi(4), AtomicU32::new(0)),
72 (2.0f64.powi(5), AtomicU32::new(0)),
73 (2.0f64.powi(6), AtomicU32::new(0)),
74 (2.0f64.powi(7), AtomicU32::new(0)),
75 (2.0f64.powi(8), AtomicU32::new(0)),
76 (2.0f64.powi(9), AtomicU32::new(0)),
77 (2.0f64.powi(10), AtomicU32::new(0)),
78 (2.0f64.powi(11), AtomicU32::new(0)),
79 (2.0f64.powi(12), AtomicU32::new(0)),
80 (f64::INFINITY, AtomicU32::new(0)),
81 ]);
82 Self {
83 buckets,
84 count: AtomicU64::new(0),
85 sum: AtomicF64::new(0.0),
86 }
87 }
88
89 pub(self) fn bucket_index(value: f64) -> usize {
90 let log = value.max(Self::MIN_BUCKET).log2().ceil();
93 #[allow(clippy::cast_possible_truncation)] let index = (log - Self::MIN_BUCKET_EXP) as usize;
97 index.min(Self::BUCKETS - 1)
99 }
100
101 pub(super) fn count(&self) -> u64 {
102 self.count.load(Ordering::Relaxed)
103 }
104
105 pub(super) fn sum(&self) -> f64 {
106 self.sum.load(Ordering::Relaxed)
107 }
108
109 fn buckets(&self) -> Vec<Bucket> {
110 self.buckets
111 .iter()
112 .map(|(upper_limit, count)| Bucket {
113 upper_limit: *upper_limit,
114 count: u64::from(count.load(Ordering::Relaxed)),
115 })
116 .collect()
117 }
118
119 pub(super) fn make_metric(&self) -> MetricValue {
120 MetricValue::AggregatedHistogram {
121 buckets: self.buckets(),
122 count: self.count(),
123 sum: self.sum(),
124 }
125 }
126}
127
128impl HistogramFn for Histogram {
129 fn record(&self, value: f64) {
130 let index = Self::bucket_index(value);
131 self.buckets[index].1.fetch_add(1, Ordering::Relaxed);
132 self.count.fetch_add(1, Ordering::Relaxed);
133 self.sum
134 .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |cur| cur + value);
135 }
136}
137
138#[cfg(test)]
139mod test {
140 use metrics::HistogramFn;
141 use quickcheck::{QuickCheck, TestResult};
142
143 use super::Histogram;
144
145 fn nearly_equal(a: f64, b: f64) -> bool {
147 let abs_a = a.abs();
148 let abs_b = b.abs();
149 let diff = (a - b).abs();
150
151 if a == b {
152 true
154 } else if a == 0.0 || b == 0.0 || diff < f64::MIN_POSITIVE {
155 diff < (f64::EPSILON * f64::MIN_POSITIVE)
157 } else {
158 (diff / f64::min(abs_a + abs_b, f64::MAX)) < f64::EPSILON
160 }
161 }
162
163 #[test]
164 #[allow(clippy::needless_pass_by_value)] fn histogram() {
166 fn inner(values: Vec<f64>) -> TestResult {
167 let sut = Histogram::new();
168 let mut model_count: u64 = 0;
169 let mut model_sum: f64 = 0.0;
170
171 for value in values {
172 if value.is_infinite() || value.is_nan() {
173 continue;
174 }
175
176 let index = Histogram::bucket_index(value);
177 assert!(
178 value <= sut.buckets[index].0,
179 "Value {} is not less than the upper limit {}.",
180 value,
181 sut.buckets[index].0
182 );
183 if index > 0 {
184 assert!(
185 value > sut.buckets[index - 1].0,
186 "Value {} is not greater than the previous upper limit {}.",
187 value,
188 sut.buckets[index - 1].0
189 );
190 }
191
192 sut.record(value);
193 model_count = model_count.wrapping_add(1);
194 model_sum += value;
195
196 assert_eq!(sut.count(), model_count);
197 assert!(nearly_equal(sut.sum(), model_sum));
198 }
199 TestResult::passed()
200 }
201
202 QuickCheck::new()
203 .tests(1_000)
204 .max_tests(2_000)
205 .quickcheck(inner as fn(Vec<f64>) -> TestResult);
206 }
207}