vector/sinks/util/
statistic.rs

1use snafu::Snafu;
2
3use crate::event::metric::Sample;
4
5#[derive(Debug, Snafu)]
6pub enum ValidationError {
7    #[snafu(display("Quantiles must be in range [0.0,1.0]"))]
8    QuantileOutOfRange,
9}
10
11#[derive(Debug)]
12pub struct DistributionStatistic {
13    pub min: f64,
14    pub max: f64,
15    pub median: f64,
16    pub avg: f64,
17    pub sum: f64,
18    pub count: u64,
19    /// (quantile, value)
20    pub quantiles: Vec<(f64, f64)>,
21}
22
23impl DistributionStatistic {
24    pub fn from_samples(source: &[Sample], quantiles: &[f64]) -> Option<Self> {
25        let mut bins = source
26            .iter()
27            .filter(|sample| sample.rate > 0)
28            .copied()
29            .collect::<Vec<_>>();
30
31        match bins.len() {
32            0 => None,
33            1 => Some({
34                let val = bins[0].value;
35                let count = bins[0].rate;
36                Self {
37                    min: val,
38                    max: val,
39                    median: val,
40                    avg: val,
41                    sum: val * count as f64,
42                    count: count as u64,
43                    quantiles: quantiles.iter().map(|&p| (p, val)).collect(),
44                }
45            }),
46            _ => Some({
47                bins.sort_unstable_by(|a, b| a.value.total_cmp(&b.value));
48
49                let min = bins.first().unwrap().value;
50                let max = bins.last().unwrap().value;
51                let sum = bins
52                    .iter()
53                    .map(|sample| sample.value * sample.rate as f64)
54                    .sum::<f64>();
55
56                for i in 1..bins.len() {
57                    bins[i].rate += bins[i - 1].rate;
58                }
59
60                let count = bins.last().unwrap().rate;
61                let avg = sum / count as f64;
62
63                let median = find_quantile(&bins, 0.5);
64                let quantiles = quantiles
65                    .iter()
66                    .map(|&p| (p, find_quantile(&bins, p)))
67                    .collect();
68
69                Self {
70                    min,
71                    max,
72                    median,
73                    avg,
74                    sum,
75                    count: count as u64,
76                    quantiles,
77                }
78            }),
79        }
80    }
81}
82
83/// `bins` is a cumulative histogram
84/// We are using R-3 (without choosing the even integer in the case of a tie),
85/// it might be preferable to use a more common function, such as R-7.
86///
87/// List of quantile functions:
88/// <https://en.wikipedia.org/wiki/Quantile#Estimating_quantiles_from_a_sample>
89fn find_quantile(bins: &[Sample], p: f64) -> f64 {
90    let count = bins.last().expect("bins is empty").rate;
91    find_sample(bins, (p * count as f64).round() as u32)
92}
93
94/// `bins` is a cumulative histogram
95/// Return the i-th smallest value,
96/// i starts from 1 (i == 1 mean the smallest value).
97/// i == 0 is equivalent to i == 1.
98fn find_sample(bins: &[Sample], i: u32) -> f64 {
99    let index = match bins.binary_search_by_key(&i, |sample| sample.rate) {
100        Ok(index) => index,
101        Err(index) => index,
102    };
103    bins[index].value
104}
105
106pub fn validate_quantiles(quantiles: &[f64]) -> Result<(), ValidationError> {
107    if quantiles
108        .iter()
109        .all(|&quantile| (0.0..=1.0).contains(&quantile))
110    {
111        Ok(())
112    } else {
113        Err(ValidationError::QuantileOutOfRange)
114    }
115}
116
117#[cfg(test)]
118mod test {
119    use super::*;
120
121    impl PartialEq<Self> for DistributionStatistic {
122        fn eq(&self, other: &Self) -> bool {
123            self.min == other.min
124                && self.max == other.max
125                && self.median == other.median
126                && self.avg == other.avg
127                && self.sum == other.sum
128                && self.count == other.count
129                && self
130                    .quantiles
131                    .iter()
132                    .zip(other.quantiles.iter())
133                    .all(|(this, other)| this.0 == other.0 && this.1 == other.1)
134        }
135    }
136
137    impl Eq for DistributionStatistic {}
138
139    fn samples(v: &[(f64, u32)]) -> Vec<Sample> {
140        v.iter()
141            .map(|&(value, rate)| Sample { value, rate })
142            .collect()
143    }
144
145    #[test]
146    fn test_distribution() {
147        // should return None on empty input
148        assert_eq!(DistributionStatistic::from_samples(&[], &[0.5]), None);
149        assert_eq!(
150            DistributionStatistic::from_samples(&samples(&[(0.0, 0)]), &[0.5]),
151            None
152        );
153
154        // test len == 1 case
155        assert_eq!(
156            DistributionStatistic::from_samples(&samples(&[(0.9, 100)]), &[0.5],).unwrap(),
157            DistributionStatistic {
158                min: 0.9,
159                max: 0.9,
160                median: 0.9,
161                avg: 0.9,
162                sum: 90.0,
163                count: 100,
164                quantiles: vec![(0.5, 0.9)],
165            }
166        );
167
168        assert_eq!(
169            DistributionStatistic::from_samples(
170                &samples(&[(1.0, 1), (2.0, 1), (3.0, 1), (4.0, 1), (5.0, 1)]),
171                &[]
172            )
173            .unwrap(),
174            DistributionStatistic {
175                min: 1.0,
176                max: 5.0,
177                median: 3.0,
178                avg: 3.0,
179                sum: 15.0,
180                count: 5,
181                quantiles: Vec::new(),
182            }
183        );
184
185        assert_eq!(
186            DistributionStatistic::from_samples(
187                &samples(&[(1.0, 1), (2.0, 1), (4.0, 1), (3.0, 1)]),
188                &[0.0, 1.0, 0.9]
189            )
190            .unwrap(),
191            DistributionStatistic {
192                min: 1.0,
193                max: 4.0,
194                median: 2.0,
195                avg: 2.5,
196                sum: 10.0,
197                count: 4,
198                quantiles: vec![(0.0, 1.0), (1.0, 4.0), (0.9, 4.0)],
199            }
200        );
201
202        assert_eq!(
203            DistributionStatistic::from_samples(
204                &samples(&[(1.0, 2), (2.0, 1), (3.0, 4), (4.0, 3)]),
205                &[0.75, 0.3, 0.31, 0.29, 0.24],
206            )
207            .unwrap(),
208            DistributionStatistic {
209                min: 1.0,
210                max: 4.0,
211                median: 3.0,
212                avg: 2.8,
213                sum: 28.0,
214                count: 10,
215                quantiles: vec![
216                    (0.75, 4.0),
217                    (0.3, 2.0),
218                    (0.31, 2.0),
219                    (0.29, 2.0),
220                    (0.24, 1.0)
221                ],
222            }
223        );
224    }
225
226    #[test]
227    fn sort_unstable_doesnt_panic() {
228        let to_float = |v: i32| -> f64 { v as f64 };
229
230        let v: Vec<f64> = (0..=15)
231            .map(to_float)
232            .chain(std::iter::once(f64::NAN))
233            .chain((16..=20).map(to_float))
234            .rev()
235            .collect();
236
237        // For <20 items the internal sort implementation is different and doesn't panic
238        assert!(v.len() > 20);
239
240        let rates: Vec<u32> = std::iter::repeat([1]).flatten().take(v.len()).collect();
241        let s: Vec<(f64, u32)> = v.into_iter().zip(rates).collect();
242        DistributionStatistic::from_samples(&samples(&s), &[0.0, 1.0]);
243    }
244}