vector_core/metrics/
storage.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicU32, Ordering},
4};
5
6use metrics::{HistogramFn, atomics::AtomicU64};
7use metrics_util::registry::Storage;
8use vector_common::atomic::AtomicF64;
9
10use crate::event::{MetricValue, metric::Bucket};
11
12pub(super) struct VectorStorage;
13
14impl<K> Storage<K> for VectorStorage {
15    type Counter = Arc<AtomicU64>;
16    type Gauge = Arc<AtomicF64>;
17    type Histogram = Arc<Histogram>;
18
19    fn counter(&self, _: &K) -> Self::Counter {
20        Arc::new(AtomicU64::new(0))
21    }
22
23    fn gauge(&self, _: &K) -> Self::Gauge {
24        Arc::new(AtomicF64::new(0.0))
25    }
26
27    fn histogram(&self, _: &K) -> Self::Histogram {
28        Arc::new(Histogram::new())
29    }
30}
31
32#[derive(Debug)]
33pub(super) struct Histogram {
34    buckets: Box<[(f64, AtomicU32); 26]>,
35    count: AtomicU64,
36    sum: AtomicF64,
37}
38
39impl Histogram {
40    const MIN_BUCKET: f64 = 1.0 / (1 << 12) as f64; // f64::powi() is not const yet
41    const MIN_BUCKET_EXP: f64 = -12.0;
42    const BUCKETS: usize = 26;
43
44    pub(crate) fn new() -> Self {
45        // Box to avoid having this large array inline to the structure, blowing
46        // out cache coherence.
47        //
48        // The sequence here is based on powers of two. Other sequences are more
49        // suitable for different distributions but since our present use case
50        // is mostly non-negative and measures smallish latencies we cluster
51        // around but never quite get to zero with an increasingly coarse
52        // long-tail. This also lets us find the right bucket to record into using simple
53        // constant-time math operations instead of a loop-and-compare construct.
54        let buckets = Box::new([
55            (2.0f64.powi(-12), AtomicU32::new(0)),
56            (2.0f64.powi(-11), AtomicU32::new(0)),
57            (2.0f64.powi(-10), AtomicU32::new(0)),
58            (2.0f64.powi(-9), AtomicU32::new(0)),
59            (2.0f64.powi(-8), AtomicU32::new(0)),
60            (2.0f64.powi(-7), AtomicU32::new(0)),
61            (2.0f64.powi(-6), AtomicU32::new(0)),
62            (2.0f64.powi(-5), AtomicU32::new(0)),
63            (2.0f64.powi(-4), AtomicU32::new(0)),
64            (2.0f64.powi(-3), AtomicU32::new(0)),
65            (2.0f64.powi(-2), AtomicU32::new(0)),
66            (2.0f64.powi(-1), AtomicU32::new(0)),
67            (2.0f64.powi(0), AtomicU32::new(0)),
68            (2.0f64.powi(1), AtomicU32::new(0)),
69            (2.0f64.powi(2), AtomicU32::new(0)),
70            (2.0f64.powi(3), AtomicU32::new(0)),
71            (2.0f64.powi(4), AtomicU32::new(0)),
72            (2.0f64.powi(5), AtomicU32::new(0)),
73            (2.0f64.powi(6), AtomicU32::new(0)),
74            (2.0f64.powi(7), AtomicU32::new(0)),
75            (2.0f64.powi(8), AtomicU32::new(0)),
76            (2.0f64.powi(9), AtomicU32::new(0)),
77            (2.0f64.powi(10), AtomicU32::new(0)),
78            (2.0f64.powi(11), AtomicU32::new(0)),
79            (2.0f64.powi(12), AtomicU32::new(0)),
80            (f64::INFINITY, AtomicU32::new(0)),
81        ]);
82        Self {
83            buckets,
84            count: AtomicU64::new(0),
85            sum: AtomicF64::new(0.0),
86        }
87    }
88
89    pub(self) fn bucket_index(value: f64) -> usize {
90        // The buckets are all powers of two, so compute the ceiling of the log_2 of the
91        // value. Apply a lower bound to prevent zero or negative values from blowing up the log.
92        let log = value.max(Self::MIN_BUCKET).log2().ceil();
93        // Offset it based on the minimum bucket's exponent. The result will be non-negative thanks
94        // to the `.max` above, so we can coerce it directly to `usize`.
95        #[allow(clippy::cast_possible_truncation)] // The log will always be smaller than `usize`.
96        let index = (log - Self::MIN_BUCKET_EXP) as usize;
97        // Now bound the value for values larger than the largest bucket.
98        index.min(Self::BUCKETS - 1)
99    }
100
101    pub(super) fn count(&self) -> u64 {
102        self.count.load(Ordering::Relaxed)
103    }
104
105    pub(super) fn sum(&self) -> f64 {
106        self.sum.load(Ordering::Relaxed)
107    }
108
109    fn buckets(&self) -> Vec<Bucket> {
110        self.buckets
111            .iter()
112            .map(|(upper_limit, count)| Bucket {
113                upper_limit: *upper_limit,
114                count: u64::from(count.load(Ordering::Relaxed)),
115            })
116            .collect()
117    }
118
119    pub(super) fn make_metric(&self) -> MetricValue {
120        MetricValue::AggregatedHistogram {
121            buckets: self.buckets(),
122            count: self.count(),
123            sum: self.sum(),
124        }
125    }
126}
127
128impl HistogramFn for Histogram {
129    fn record(&self, value: f64) {
130        let index = Self::bucket_index(value);
131        self.buckets[index].1.fetch_add(1, Ordering::Relaxed);
132        self.count.fetch_add(1, Ordering::Relaxed);
133        self.sum
134            .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |cur| cur + value);
135    }
136}
137
138#[cfg(test)]
139mod test {
140    use metrics::HistogramFn;
141    use quickcheck::{QuickCheck, TestResult};
142
143    use super::Histogram;
144
145    // Adapted from https://users.rust-lang.org/t/assert-eq-for-float-numbers/7034/4?u=blt
146    fn nearly_equal(a: f64, b: f64) -> bool {
147        let abs_a = a.abs();
148        let abs_b = b.abs();
149        let diff = (a - b).abs();
150
151        if a == b {
152            // Handle infinities.
153            true
154        } else if a == 0.0 || b == 0.0 || diff < f64::MIN_POSITIVE {
155            // One of a or b is zero (or both are extremely close to it,) use absolute error.
156            diff < (f64::EPSILON * f64::MIN_POSITIVE)
157        } else {
158            // Use relative error.
159            (diff / f64::min(abs_a + abs_b, f64::MAX)) < f64::EPSILON
160        }
161    }
162
163    #[test]
164    #[allow(clippy::needless_pass_by_value)] // `&[T]` does not implement `Arbitrary`
165    fn histogram() {
166        fn inner(values: Vec<f64>) -> TestResult {
167            let sut = Histogram::new();
168            let mut model_count: u64 = 0;
169            let mut model_sum: f64 = 0.0;
170
171            for value in values {
172                if value.is_infinite() || value.is_nan() {
173                    continue;
174                }
175
176                let index = Histogram::bucket_index(value);
177                assert!(
178                    value <= sut.buckets[index].0,
179                    "Value {} is not less than the upper limit {}.",
180                    value,
181                    sut.buckets[index].0
182                );
183                if index > 0 {
184                    assert!(
185                        value > sut.buckets[index - 1].0,
186                        "Value {} is not greater than the previous upper limit {}.",
187                        value,
188                        sut.buckets[index - 1].0
189                    );
190                }
191
192                sut.record(value);
193                model_count = model_count.wrapping_add(1);
194                model_sum += value;
195
196                assert_eq!(sut.count(), model_count);
197                assert!(nearly_equal(sut.sum(), model_sum));
198            }
199            TestResult::passed()
200        }
201
202        QuickCheck::new()
203            .tests(1_000)
204            .max_tests(2_000)
205            .quickcheck(inner as fn(Vec<f64>) -> TestResult);
206    }
207}