1use std::sync::{
2 atomic::{AtomicU32, Ordering},
3 Arc,
4};
5
6use metrics::{atomics::AtomicU64, GaugeFn, HistogramFn};
7use metrics_util::registry::Storage;
8
9use crate::event::{metric::Bucket, MetricValue};
10
11pub(super) struct VectorStorage;
12
13impl<K> Storage<K> for VectorStorage {
14 type Counter = Arc<AtomicU64>;
15 type Gauge = Arc<AtomicF64>;
16 type Histogram = Arc<Histogram>;
17
18 fn counter(&self, _: &K) -> Self::Counter {
19 Arc::new(AtomicU64::new(0))
20 }
21
22 fn gauge(&self, _: &K) -> Self::Gauge {
23 Arc::new(AtomicF64::new(0.0))
24 }
25
26 fn histogram(&self, _: &K) -> Self::Histogram {
27 Arc::new(Histogram::new())
28 }
29}
30
31#[derive(Debug)]
32pub(super) struct AtomicF64 {
33 inner: AtomicU64,
34}
35
36impl AtomicF64 {
37 fn new(init: f64) -> Self {
38 Self {
39 inner: AtomicU64::new(init.to_bits()),
40 }
41 }
42
43 fn fetch_update(
44 &self,
45 set_order: Ordering,
46 fetch_order: Ordering,
47 mut f: impl FnMut(f64) -> f64,
48 ) {
49 self.inner
50 .fetch_update(set_order, fetch_order, |x| {
51 Some(f(f64::from_bits(x)).to_bits())
52 })
53 .expect("Cannot fail");
54 }
55
56 pub(super) fn load(&self, order: Ordering) -> f64 {
57 f64::from_bits(self.inner.load(order))
58 }
59}
60
61impl GaugeFn for AtomicF64 {
62 fn increment(&self, amount: f64) {
63 self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount);
64 }
65
66 fn decrement(&self, amount: f64) {
67 self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount);
68 }
69
70 fn set(&self, value: f64) {
71 self.inner.store(f64::to_bits(value), Ordering::Relaxed);
72 }
73}
74
75#[derive(Debug)]
76pub(super) struct Histogram {
77 buckets: Box<[(f64, AtomicU32); 20]>,
78 count: AtomicU64,
79 sum: AtomicF64,
80}
81
82impl Histogram {
83 const MIN_BUCKET: f64 = 0.015_625; const MIN_BUCKET_EXP: f64 = -6.0;
85 const BUCKETS: usize = 20;
86
87 pub(crate) fn new() -> Self {
88 let buckets = Box::new([
98 ((-6_f64).exp2(), AtomicU32::new(0)),
99 ((-5_f64).exp2(), AtomicU32::new(0)),
100 ((-4_f64).exp2(), AtomicU32::new(0)),
101 ((-3_f64).exp2(), AtomicU32::new(0)),
102 ((-2_f64).exp2(), AtomicU32::new(0)),
103 ((-1_f64).exp2(), AtomicU32::new(0)),
104 (0_f64.exp2(), AtomicU32::new(0)),
105 (1_f64.exp2(), AtomicU32::new(0)),
106 (2_f64.exp2(), AtomicU32::new(0)),
107 (3_f64.exp2(), AtomicU32::new(0)),
108 (4_f64.exp2(), AtomicU32::new(0)),
109 (5_f64.exp2(), AtomicU32::new(0)),
110 (6_f64.exp2(), AtomicU32::new(0)),
111 (7_f64.exp2(), AtomicU32::new(0)),
112 (8_f64.exp2(), AtomicU32::new(0)),
113 (9_f64.exp2(), AtomicU32::new(0)),
114 (10_f64.exp2(), AtomicU32::new(0)),
115 (11_f64.exp2(), AtomicU32::new(0)),
116 (12_f64.exp2(), AtomicU32::new(0)),
117 (f64::INFINITY, AtomicU32::new(0)),
118 ]);
119 Self {
120 buckets,
121 count: AtomicU64::new(0),
122 sum: AtomicF64::new(0.0),
123 }
124 }
125
126 pub(self) fn bucket_index(value: f64) -> usize {
127 let log = value.max(Self::MIN_BUCKET).log2().ceil();
130 #[allow(clippy::cast_possible_truncation)] let index = (log - Self::MIN_BUCKET_EXP) as usize;
134 index.min(Self::BUCKETS - 1)
136 }
137
138 pub(super) fn count(&self) -> u64 {
139 self.count.load(Ordering::Relaxed)
140 }
141
142 pub(super) fn sum(&self) -> f64 {
143 self.sum.load(Ordering::Relaxed)
144 }
145
146 fn buckets(&self) -> Vec<Bucket> {
147 self.buckets
148 .iter()
149 .map(|(upper_limit, count)| Bucket {
150 upper_limit: *upper_limit,
151 count: u64::from(count.load(Ordering::Relaxed)),
152 })
153 .collect()
154 }
155
156 pub(super) fn make_metric(&self) -> MetricValue {
157 MetricValue::AggregatedHistogram {
158 buckets: self.buckets(),
159 count: self.count(),
160 sum: self.sum(),
161 }
162 }
163}
164
165impl HistogramFn for Histogram {
166 fn record(&self, value: f64) {
167 let index = Self::bucket_index(value);
168 self.buckets[index].1.fetch_add(1, Ordering::Relaxed);
169 self.count.fetch_add(1, Ordering::Relaxed);
170 self.sum
171 .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |cur| cur + value);
172 }
173}
174
175#[cfg(test)]
176mod test {
177 use metrics::HistogramFn;
178 use quickcheck::{QuickCheck, TestResult};
179
180 use super::Histogram;
181
182 fn nearly_equal(a: f64, b: f64) -> bool {
184 let abs_a = a.abs();
185 let abs_b = b.abs();
186 let diff = (a - b).abs();
187
188 if a == b {
189 true
191 } else if a == 0.0 || b == 0.0 || diff < f64::MIN_POSITIVE {
192 diff < (f64::EPSILON * f64::MIN_POSITIVE)
194 } else {
195 (diff / f64::min(abs_a + abs_b, f64::MAX)) < f64::EPSILON
197 }
198 }
199
200 #[test]
201 #[allow(clippy::needless_pass_by_value)] fn histogram() {
203 fn inner(values: Vec<f64>) -> TestResult {
204 let sut = Histogram::new();
205 let mut model_count: u64 = 0;
206 let mut model_sum: f64 = 0.0;
207
208 for value in values {
209 if value.is_infinite() || value.is_nan() {
210 continue;
211 }
212
213 let index = Histogram::bucket_index(value);
214 assert!(
215 value <= sut.buckets[index].0,
216 "Value {} is not less than the upper limit {}.",
217 value,
218 sut.buckets[index].0
219 );
220 if index > 0 {
221 assert!(
222 value > sut.buckets[index - 1].0,
223 "Value {} is not greater than the previous upper limit {}.",
224 value,
225 sut.buckets[index - 1].0
226 );
227 }
228
229 sut.record(value);
230 model_count = model_count.wrapping_add(1);
231 model_sum += value;
232
233 assert_eq!(sut.count(), model_count);
234 assert!(nearly_equal(sut.sum(), model_sum));
235 }
236 TestResult::passed()
237 }
238
239 QuickCheck::new()
240 .tests(1_000)
241 .max_tests(2_000)
242 .quickcheck(inner as fn(Vec<f64>) -> TestResult);
243 }
244}