vector_core/metrics/
storage.rs

1use std::sync::{
2    atomic::{AtomicU32, Ordering},
3    Arc,
4};
5
6use metrics::{atomics::AtomicU64, GaugeFn, HistogramFn};
7use metrics_util::registry::Storage;
8
9use crate::event::{metric::Bucket, MetricValue};
10
11pub(super) struct VectorStorage;
12
13impl<K> Storage<K> for VectorStorage {
14    type Counter = Arc<AtomicU64>;
15    type Gauge = Arc<AtomicF64>;
16    type Histogram = Arc<Histogram>;
17
18    fn counter(&self, _: &K) -> Self::Counter {
19        Arc::new(AtomicU64::new(0))
20    }
21
22    fn gauge(&self, _: &K) -> Self::Gauge {
23        Arc::new(AtomicF64::new(0.0))
24    }
25
26    fn histogram(&self, _: &K) -> Self::Histogram {
27        Arc::new(Histogram::new())
28    }
29}
30
31#[derive(Debug)]
32pub(super) struct AtomicF64 {
33    inner: AtomicU64,
34}
35
36impl AtomicF64 {
37    fn new(init: f64) -> Self {
38        Self {
39            inner: AtomicU64::new(init.to_bits()),
40        }
41    }
42
43    fn fetch_update(
44        &self,
45        set_order: Ordering,
46        fetch_order: Ordering,
47        mut f: impl FnMut(f64) -> f64,
48    ) {
49        self.inner
50            .fetch_update(set_order, fetch_order, |x| {
51                Some(f(f64::from_bits(x)).to_bits())
52            })
53            .expect("Cannot fail");
54    }
55
56    pub(super) fn load(&self, order: Ordering) -> f64 {
57        f64::from_bits(self.inner.load(order))
58    }
59}
60
61impl GaugeFn for AtomicF64 {
62    fn increment(&self, amount: f64) {
63        self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value + amount);
64    }
65
66    fn decrement(&self, amount: f64) {
67        self.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |value| value - amount);
68    }
69
70    fn set(&self, value: f64) {
71        self.inner.store(f64::to_bits(value), Ordering::Relaxed);
72    }
73}
74
75#[derive(Debug)]
76pub(super) struct Histogram {
77    buckets: Box<[(f64, AtomicU32); 20]>,
78    count: AtomicU64,
79    sum: AtomicF64,
80}
81
82impl Histogram {
83    const MIN_BUCKET: f64 = 0.015_625; // (-6_f64).exp2() is not const yet
84    const MIN_BUCKET_EXP: f64 = -6.0;
85    const BUCKETS: usize = 20;
86
87    pub(crate) fn new() -> Self {
88        // Box to avoid having this large array inline to the structure, blowing
89        // out cache coherence.
90        //
91        // The sequence here is based on powers of two. Other sequences are more
92        // suitable for different distributions but since our present use case
93        // is mostly non-negative and measures smallish latencies we cluster
94        // around but never quite get to zero with an increasingly coarse
95        // long-tail. This also lets us find the right bucket to record into using simple
96        // constant-time math operations instead of a loop-and-compare construct.
97        let buckets = Box::new([
98            ((-6_f64).exp2(), AtomicU32::new(0)),
99            ((-5_f64).exp2(), AtomicU32::new(0)),
100            ((-4_f64).exp2(), AtomicU32::new(0)),
101            ((-3_f64).exp2(), AtomicU32::new(0)),
102            ((-2_f64).exp2(), AtomicU32::new(0)),
103            ((-1_f64).exp2(), AtomicU32::new(0)),
104            (0_f64.exp2(), AtomicU32::new(0)),
105            (1_f64.exp2(), AtomicU32::new(0)),
106            (2_f64.exp2(), AtomicU32::new(0)),
107            (3_f64.exp2(), AtomicU32::new(0)),
108            (4_f64.exp2(), AtomicU32::new(0)),
109            (5_f64.exp2(), AtomicU32::new(0)),
110            (6_f64.exp2(), AtomicU32::new(0)),
111            (7_f64.exp2(), AtomicU32::new(0)),
112            (8_f64.exp2(), AtomicU32::new(0)),
113            (9_f64.exp2(), AtomicU32::new(0)),
114            (10_f64.exp2(), AtomicU32::new(0)),
115            (11_f64.exp2(), AtomicU32::new(0)),
116            (12_f64.exp2(), AtomicU32::new(0)),
117            (f64::INFINITY, AtomicU32::new(0)),
118        ]);
119        Self {
120            buckets,
121            count: AtomicU64::new(0),
122            sum: AtomicF64::new(0.0),
123        }
124    }
125
126    pub(self) fn bucket_index(value: f64) -> usize {
127        // The buckets are all powers of two, so compute the ceiling of the log_2 of the
128        // value. Apply a lower bound to prevent zero or negative values from blowing up the log.
129        let log = value.max(Self::MIN_BUCKET).log2().ceil();
130        // Offset it based on the minimum bucket's exponent. The result will be non-negative thanks
131        // to the `.max` above, so we can coerce it directly to `usize`.
132        #[allow(clippy::cast_possible_truncation)] // The log will always be smaller than `usize`.
133        let index = (log - Self::MIN_BUCKET_EXP) as usize;
134        // Now bound the value for values larger than the largest bucket.
135        index.min(Self::BUCKETS - 1)
136    }
137
138    pub(super) fn count(&self) -> u64 {
139        self.count.load(Ordering::Relaxed)
140    }
141
142    pub(super) fn sum(&self) -> f64 {
143        self.sum.load(Ordering::Relaxed)
144    }
145
146    fn buckets(&self) -> Vec<Bucket> {
147        self.buckets
148            .iter()
149            .map(|(upper_limit, count)| Bucket {
150                upper_limit: *upper_limit,
151                count: u64::from(count.load(Ordering::Relaxed)),
152            })
153            .collect()
154    }
155
156    pub(super) fn make_metric(&self) -> MetricValue {
157        MetricValue::AggregatedHistogram {
158            buckets: self.buckets(),
159            count: self.count(),
160            sum: self.sum(),
161        }
162    }
163}
164
165impl HistogramFn for Histogram {
166    fn record(&self, value: f64) {
167        let index = Self::bucket_index(value);
168        self.buckets[index].1.fetch_add(1, Ordering::Relaxed);
169        self.count.fetch_add(1, Ordering::Relaxed);
170        self.sum
171            .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |cur| cur + value);
172    }
173}
174
175#[cfg(test)]
176mod test {
177    use metrics::HistogramFn;
178    use quickcheck::{QuickCheck, TestResult};
179
180    use super::Histogram;
181
182    // Adapted from https://users.rust-lang.org/t/assert-eq-for-float-numbers/7034/4?u=blt
183    fn nearly_equal(a: f64, b: f64) -> bool {
184        let abs_a = a.abs();
185        let abs_b = b.abs();
186        let diff = (a - b).abs();
187
188        if a == b {
189            // Handle infinities.
190            true
191        } else if a == 0.0 || b == 0.0 || diff < f64::MIN_POSITIVE {
192            // One of a or b is zero (or both are extremely close to it,) use absolute error.
193            diff < (f64::EPSILON * f64::MIN_POSITIVE)
194        } else {
195            // Use relative error.
196            (diff / f64::min(abs_a + abs_b, f64::MAX)) < f64::EPSILON
197        }
198    }
199
200    #[test]
201    #[allow(clippy::needless_pass_by_value)] // `&[T]` does not implement `Arbitrary`
202    fn histogram() {
203        fn inner(values: Vec<f64>) -> TestResult {
204            let sut = Histogram::new();
205            let mut model_count: u64 = 0;
206            let mut model_sum: f64 = 0.0;
207
208            for value in values {
209                if value.is_infinite() || value.is_nan() {
210                    continue;
211                }
212
213                let index = Histogram::bucket_index(value);
214                assert!(
215                    value <= sut.buckets[index].0,
216                    "Value {} is not less than the upper limit {}.",
217                    value,
218                    sut.buckets[index].0
219                );
220                if index > 0 {
221                    assert!(
222                        value > sut.buckets[index - 1].0,
223                        "Value {} is not greater than the previous upper limit {}.",
224                        value,
225                        sut.buckets[index - 1].0
226                    );
227                }
228
229                sut.record(value);
230                model_count = model_count.wrapping_add(1);
231                model_sum += value;
232
233                assert_eq!(sut.count(), model_count);
234                assert!(nearly_equal(sut.sum(), model_sum));
235            }
236            TestResult::passed()
237        }
238
239        QuickCheck::new()
240            .tests(1_000)
241            .max_tests(2_000)
242            .quickcheck(inner as fn(Vec<f64>) -> TestResult);
243    }
244}