vector_common/stats/
ewma_gauge.rs

1use std::sync::{Arc, Mutex};
2use std::time::Instant;
3
4use metrics::Gauge;
5
6use super::{AtomicEwma, TimeEwma};
7
8/// Couples a [`Gauge`] with an [`AtomicEwma`] so gauge readings reflect the EWMA.
9#[derive(Clone, Debug)]
10pub struct EwmaGauge {
11    gauge: Gauge,
12    // Note that the `Gauge` internally is equivalent to an `Arc<AtomicF64>` so we need to use the
13    // same semantics for the EWMA calculation as well.
14    ewma: Arc<AtomicEwma>,
15}
16
17impl EwmaGauge {
18    #[must_use]
19    pub fn new(gauge: Gauge, alpha: Option<f64>) -> Self {
20        let alpha = alpha.unwrap_or(super::DEFAULT_EWMA_ALPHA);
21        let ewma = Arc::new(AtomicEwma::new(alpha));
22        Self { gauge, ewma }
23    }
24
25    /// Records a new value, updates the EWMA, and sets the gauge accordingly.
26    pub fn record(&self, value: f64) {
27        let average = self.ewma.update(value);
28        self.gauge.set(average);
29    }
30}
31
32/// Couples a [`Gauge`] with a [`TimeEwma`] so gauge readings reflect the EWMA. Since `TimeEwma` has
33/// an internal state consisting of multiple values, this gauge requires a mutex to protect the
34/// state update.
35#[derive(Clone, Debug)]
36pub struct TimeEwmaGauge {
37    gauge: Gauge,
38    ewma: Arc<Mutex<TimeEwma>>,
39}
40
41impl TimeEwmaGauge {
42    #[must_use]
43    pub fn new(gauge: Gauge, half_life_seconds: f64) -> Self {
44        let ewma = Arc::new(Mutex::new(TimeEwma::new(half_life_seconds)));
45        Self { gauge, ewma }
46    }
47
48    /// Records a new value, updates the EWMA, and sets the gauge accordingly.
49    ///
50    /// # Panics
51    ///
52    /// Panics if the EWMA mutex is poisoned.
53    pub fn record(&self, value: f64, reference: Instant) {
54        let mut ewma = self.ewma.lock().expect("time ewma gauge mutex poisoned");
55        let average = ewma.update(value, reference);
56        self.gauge.set(average);
57    }
58}