vector_core/metrics/
ddsketch.rs

1use std::{
2    cmp::{self, Ordering},
3    mem,
4};
5
6use ordered_float::OrderedFloat;
7use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
8use serde_with::{serde_as, DeserializeAs, SerializeAs};
9use snafu::Snafu;
10use vector_common::byte_size_of::ByteSizeOf;
11use vector_config::configurable_component;
12
13use crate::{
14    event::{metric::Bucket, Metric, MetricValue},
15    float_eq,
16};
17
18const AGENT_DEFAULT_BIN_LIMIT: u16 = 4096;
19const AGENT_DEFAULT_EPS: f64 = 1.0 / 128.0;
20const AGENT_DEFAULT_MIN_VALUE: f64 = 1.0e-9;
21
22const UV_INF: i16 = i16::MAX;
23const MAX_KEY: i16 = UV_INF;
24
25const INITIAL_BINS: u16 = 128;
26const MAX_BIN_WIDTH: u16 = u16::MAX;
27
28#[inline]
29fn log_gamma(gamma_ln: f64, v: f64) -> f64 {
30    v.ln() / gamma_ln
31}
32
33#[inline]
34fn pow_gamma(gamma_v: f64, y: f64) -> f64 {
35    gamma_v.powf(y)
36}
37
38#[inline]
39fn lower_bound(gamma_v: f64, bias: i32, k: i16) -> f64 {
40    if k < 0 {
41        return -lower_bound(gamma_v, bias, -k);
42    }
43
44    if k == MAX_KEY {
45        return f64::INFINITY;
46    }
47
48    if k == 0 {
49        return 0.0;
50    }
51
52    pow_gamma(gamma_v, f64::from(i32::from(k) - bias))
53}
54
55#[derive(Debug, Snafu)]
56pub enum MergeError {
57    #[snafu(display("cannot merge two sketches with mismatched configuration parameters"))]
58    MismatchedConfigs,
59}
60
61#[derive(Copy, Clone, Debug, PartialEq)]
62pub struct Config {
63    bin_limit: u16,
64    // gamma_ln is the natural log of gamma_v, used to speed up calculating log base gamma.
65    gamma_v: f64,
66    gamma_ln: f64,
67    // Min and max values representable by a sketch with these params.
68    //
69    // key(x) =
70    //    0 : -min > x < min
71    //    1 : x == min
72    //   -1 : x == -min
73    // +Inf : x > max
74    // -Inf : x < -max.
75    norm_min: f64,
76    // Bias of the exponent, used to ensure key(x) >= 1.
77    norm_bias: i32,
78}
79
80impl Config {
81    #[allow(clippy::cast_possible_truncation)]
82    pub(self) fn new(mut eps: f64, min_value: f64, bin_limit: u16) -> Self {
83        assert!(eps > 0.0 && eps < 1.0, "eps must be between 0.0 and 1.0");
84        assert!(min_value > 0.0, "min value must be greater than 0.0");
85        assert!(bin_limit > 0, "bin limit must be greater than 0");
86
87        eps *= 2.0;
88        let gamma_v = 1.0 + eps;
89        let gamma_ln = eps.ln_1p();
90
91        // SAFETY: We expect `log_gamma` to return a value between -2^16 and 2^16, so it will always
92        // fit in an i32.
93        let norm_eff_min = log_gamma(gamma_ln, min_value).floor() as i32;
94        let norm_bias = -norm_eff_min + 1;
95
96        let norm_min = lower_bound(gamma_v, norm_bias, 1);
97
98        assert!(
99            norm_min <= min_value,
100            "norm min should not exceed min_value"
101        );
102
103        Self {
104            bin_limit,
105            gamma_v,
106            gamma_ln,
107            norm_min,
108            norm_bias,
109        }
110    }
111
112    #[allow(dead_code)]
113    pub(crate) fn relative_accuracy(&self) -> f64 {
114        // Only used for unit tests, hence the allow.
115        (self.gamma_v - 1.0) / 2.0
116    }
117
118    /// Gets the value lower bound of the bin at the given key.
119    pub fn bin_lower_bound(&self, k: i16) -> f64 {
120        lower_bound(self.gamma_v, self.norm_bias, k)
121    }
122
123    /// Gets the key for the given value.
124    ///
125    /// The key corresponds to the bin where this value would be represented. The value returned here
126    /// is such that: γ^k <= v < γ^(k+1).
127    #[allow(clippy::cast_possible_truncation)]
128    pub fn key(&self, v: f64) -> i16 {
129        if v < 0.0 {
130            return -self.key(-v);
131        }
132
133        if v == 0.0 || (v > 0.0 && v < self.norm_min) || (v < 0.0 && v > -self.norm_min) {
134            return 0;
135        }
136
137        // SAFETY: `rounded` is intentionally meant to be a whole integer, and additionally, based
138        // on our target gamma ln, we expect `log_gamma` to return a value between -2^16 and 2^16,
139        // so it will always fit in an i32.
140        let rounded = round_to_even(self.log_gamma(v)) as i32;
141        let key = rounded.wrapping_add(self.norm_bias);
142
143        // SAFETY: Our upper bound of POS_INF_KEY is i16, and our lower bound is simply one, so
144        // there is no risk of truncation via conversion.
145        key.clamp(1, i32::from(MAX_KEY)) as i16
146    }
147
148    pub fn log_gamma(&self, v: f64) -> f64 {
149        log_gamma(self.gamma_ln, v)
150    }
151}
152
153impl Default for Config {
154    fn default() -> Self {
155        Config::new(
156            AGENT_DEFAULT_EPS,
157            AGENT_DEFAULT_MIN_VALUE,
158            AGENT_DEFAULT_BIN_LIMIT,
159        )
160    }
161}
162
163/// A sketch bin.
164#[derive(Clone, Copy, Debug, Eq, PartialEq)]
165pub struct Bin {
166    /// The bin index.
167    k: i16,
168
169    /// The number of observations within the bin.
170    n: u16,
171}
172
173impl Bin {
174    #[allow(clippy::cast_possible_truncation)]
175    fn increment(&mut self, n: u32) -> u32 {
176        let next = n + u32::from(self.n);
177        if next > u32::from(MAX_BIN_WIDTH) {
178            self.n = MAX_BIN_WIDTH;
179            return next - u32::from(MAX_BIN_WIDTH);
180        }
181
182        // SAFETY: We already know `next` is less than or equal to `MAX_BIN_WIDTH` if we got here, and
183        // `MAX_BIN_WIDTH` is u16, so next can't possibly be larger than a u16.
184        self.n = next as u16;
185        0
186    }
187}
188
189/// [DDSketch][ddsketch] implementation based on the [Datadog Agent][ddagent].
190///
191/// This implementation is subtly different from the open-source implementations of `DDSketch`, as
192/// Datadog made some slight tweaks to configuration values and in-memory layout to optimize it for
193/// insertion performance within the agent.
194///
195/// We've mimicked the agent version of `DDSketch` here in order to support a future where we can
196/// take sketches shipped by the agent, handle them internally, merge them, and so on, without any
197/// loss of accuracy, eventually forwarding them to Datadog ourselves.
198///
199/// As such, this implementation is constrained in the same ways: the configuration parameters
200/// cannot be changed, the collapsing strategy is fixed, and we support a limited number of methods
201/// for inserting into the sketch.
202///
203/// Importantly, we have a special function, again taken from the agent version, to allow us to
204/// interpolate histograms, specifically our own aggregated histograms, into a sketch so that we can
205/// emit useful default quantiles, rather than having to ship the buckets -- upper bound and count
206/// -- to a downstream system that might have no native way to do the same thing, basically
207/// providing no value as they have no way to render useful data from them.
208///
209/// [ddsketch]: https://www.vldb.org/pvldb/vol12/p2195-masson.pdf
210/// [ddagent]: https://github.com/DataDog/datadog-agent
211#[serde_as]
212#[configurable_component]
213#[derive(Clone, Debug)]
214pub struct AgentDDSketch {
215    #[serde(skip)]
216    config: Config,
217
218    /// The bins within the sketch.
219    #[serde_as(as = "BinMap")]
220    bins: Vec<Bin>,
221
222    /// The number of observations within the sketch.
223    count: u32,
224
225    /// The minimum value of all observations within the sketch.
226    min: f64,
227
228    /// The maximum value of all observations within the sketch.
229    max: f64,
230
231    /// The sum of all observations within the sketch.
232    sum: f64,
233
234    /// The average value of all observations within the sketch.
235    avg: f64,
236}
237
238impl AgentDDSketch {
239    /// Creates a new `AgentDDSketch` based on a configuration that is identical to the one used by
240    /// the Datadog agent itself.
241    pub fn with_agent_defaults() -> Self {
242        let config = Config::default();
243        let initial_bins = cmp::max(INITIAL_BINS, config.bin_limit) as usize;
244
245        Self {
246            config,
247            bins: Vec::with_capacity(initial_bins),
248            count: 0,
249            min: f64::MAX,
250            max: f64::MIN,
251            sum: 0.0,
252            avg: 0.0,
253        }
254    }
255
256    /// Creates a new `AgentDDSketch` based on the given inputs.
257    ///
258    /// This is _only_ useful for constructing a sketch from the raw components when the sketch has
259    /// passed through the transform boundary into Lua/VRL and needs to be reconstructed.
260    ///
261    /// This is a light code smell, as our intention is to rigorously mediate access and mutation of
262    /// a sketch through `AgentDDSketch` and the provided methods.
263    pub fn from_raw(
264        count: u32,
265        min: f64,
266        max: f64,
267        sum: f64,
268        avg: f64,
269        keys: &[i16],
270        counts: &[u16],
271    ) -> Option<AgentDDSketch> {
272        let bin_map = BinMap {
273            keys: keys.into(),
274            counts: counts.into(),
275        };
276        bin_map.into_bins().map(|bins| Self {
277            config: Config::default(),
278            bins,
279            count,
280            min,
281            max,
282            sum,
283            avg,
284        })
285    }
286
287    pub fn gamma(&self) -> f64 {
288        self.config.gamma_v
289    }
290
291    pub fn bin_index_offset(&self) -> i32 {
292        self.config.norm_bias
293    }
294
295    #[allow(dead_code)]
296    fn bin_count(&self) -> usize {
297        self.bins.len()
298    }
299
300    pub fn bins(&self) -> &[Bin] {
301        &self.bins
302    }
303
304    pub fn bin_map(&self) -> BinMap {
305        BinMap::from_bins(&self.bins)
306    }
307
308    pub fn config(&self) -> &Config {
309        &self.config
310    }
311
312    /// Whether or not this sketch is empty.
313    pub fn is_empty(&self) -> bool {
314        self.count == 0
315    }
316
317    /// Number of samples currently represented by this sketch.
318    pub fn count(&self) -> u32 {
319        self.count
320    }
321
322    /// Minimum value seen by this sketch.
323    ///
324    /// Returns `None` if the sketch is empty.
325    pub fn min(&self) -> Option<f64> {
326        if self.is_empty() {
327            None
328        } else {
329            Some(self.min)
330        }
331    }
332
333    /// Maximum value seen by this sketch.
334    ///
335    /// Returns `None` if the sketch is empty.
336    pub fn max(&self) -> Option<f64> {
337        if self.is_empty() {
338            None
339        } else {
340            Some(self.max)
341        }
342    }
343
344    /// Sum of all values seen by this sketch.
345    ///
346    /// Returns `None` if the sketch is empty.
347    pub fn sum(&self) -> Option<f64> {
348        if self.is_empty() {
349            None
350        } else {
351            Some(self.sum)
352        }
353    }
354
355    /// Average value seen by this sketch.
356    ///
357    /// Returns `None` if the sketch is empty.
358    pub fn avg(&self) -> Option<f64> {
359        if self.is_empty() {
360            None
361        } else {
362            Some(self.avg)
363        }
364    }
365
366    /// Clears the sketch, removing all bins and resetting all statistics.
367    pub fn clear(&mut self) {
368        self.count = 0;
369        self.min = f64::MAX;
370        self.max = f64::MIN;
371        self.avg = 0.0;
372        self.sum = 0.0;
373        self.bins.clear();
374    }
375
376    fn adjust_basic_stats(&mut self, v: f64, n: u32) {
377        if v < self.min {
378            self.min = v;
379        }
380
381        if v > self.max {
382            self.max = v;
383        }
384
385        self.count += n;
386        self.sum += v * f64::from(n);
387
388        if n == 1 {
389            self.avg += (v - self.avg) / f64::from(self.count);
390        } else {
391            // TODO: From the Agent source code, this method apparently loses precision when the
392            // two averages -- v and self.avg -- are close.  Is there a better approach?
393            self.avg = self.avg + (v - self.avg) * f64::from(n) / f64::from(self.count);
394        }
395    }
396
397    fn insert_key_counts(&mut self, mut counts: Vec<(i16, u32)>) {
398        // Counts need to be sorted by key.
399        counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
400
401        let mut temp = Vec::new();
402
403        let mut bins_idx = 0;
404        let mut key_idx = 0;
405        let bins_len = self.bins.len();
406        let counts_len = counts.len();
407
408        // PERF TODO: there's probably a fast path to be had where could check if all if the counts
409        // have existing bins that aren't yet full, and we just update them directly, although we'd
410        // still be doing a linear scan to find them since keys aren't 1:1 with their position in
411        // `self.bins` but using this method just to update one or two bins is clearly suboptimal
412        // and we wouldn't really want to scan them all just to have to back out and actually do the
413        // non-fast path.. maybe a first pass could be checking if the first/last key falls within
414        // our known min/max key, and if it doesn't, then we know we have to go through the non-fast
415        // path, and if it passes, we do the scan to see if we can just update bins directly?
416        while bins_idx < bins_len && key_idx < counts_len {
417            let bin = self.bins[bins_idx];
418            let vk = counts[key_idx].0;
419            let kn = counts[key_idx].1;
420
421            match bin.k.cmp(&vk) {
422                Ordering::Greater => {
423                    generate_bins(&mut temp, vk, kn);
424                    key_idx += 1;
425                }
426                Ordering::Less => {
427                    temp.push(bin);
428                    bins_idx += 1;
429                }
430                Ordering::Equal => {
431                    generate_bins(&mut temp, bin.k, u32::from(bin.n) + kn);
432                    bins_idx += 1;
433                    key_idx += 1;
434                }
435            }
436        }
437
438        temp.extend_from_slice(&self.bins[bins_idx..]);
439
440        while key_idx < counts_len {
441            let vk = counts[key_idx].0;
442            let kn = counts[key_idx].1;
443            generate_bins(&mut temp, vk, kn);
444            key_idx += 1;
445        }
446
447        trim_left(&mut temp, self.config.bin_limit);
448
449        // PERF TODO: This is where we might do a mem::swap instead so that we could shove the
450        // bin vector into an object pool but I'm not sure this actually matters at the moment.
451        self.bins = temp;
452    }
453
454    fn insert_keys(&mut self, mut keys: Vec<i16>) {
455        // Updating more than 4 billion keys would be very very weird and likely indicative of
456        // something horribly broken.
457        assert!(
458            keys.len()
459                <= TryInto::<usize>::try_into(u32::MAX).expect("we don't support 16-bit systems")
460        );
461
462        keys.sort_unstable();
463
464        let mut temp = Vec::new();
465
466        let mut bins_idx = 0;
467        let mut key_idx = 0;
468        let bins_len = self.bins.len();
469        let keys_len = keys.len();
470
471        // PERF TODO: there's probably a fast path to be had where could check if all if the counts
472        // have existing bins that aren't yet full, and we just update them directly, although we'd
473        // still be doing a linear scan to find them since keys aren't 1:1 with their position in
474        // `self.bins` but using this method just to update one or two bins is clearly suboptimal
475        // and we wouldn't really want to scan them all just to have to back out and actually do the
476        // non-fast path.. maybe a first pass could be checking if the first/last key falls within
477        // our known min/max key, and if it doesn't, then we know we have to go through the non-fast
478        // path, and if it passes, we do the scan to see if we can just update bins directly?
479        while bins_idx < bins_len && key_idx < keys_len {
480            let bin = self.bins[bins_idx];
481            let vk = keys[key_idx];
482
483            match bin.k.cmp(&vk) {
484                Ordering::Greater => {
485                    let kn = buf_count_leading_equal(&keys, key_idx);
486                    generate_bins(&mut temp, vk, kn);
487                    key_idx += kn as usize;
488                }
489                Ordering::Less => {
490                    temp.push(bin);
491                    bins_idx += 1;
492                }
493                Ordering::Equal => {
494                    let kn = buf_count_leading_equal(&keys, key_idx);
495                    generate_bins(&mut temp, bin.k, u32::from(bin.n) + kn);
496                    bins_idx += 1;
497                    key_idx += kn as usize;
498                }
499            }
500        }
501
502        temp.extend_from_slice(&self.bins[bins_idx..]);
503
504        while key_idx < keys_len {
505            let vk = keys[key_idx];
506            let kn = buf_count_leading_equal(&keys, key_idx);
507            generate_bins(&mut temp, vk, kn);
508            key_idx += kn as usize;
509        }
510
511        trim_left(&mut temp, self.config.bin_limit);
512
513        // PERF TODO: This is where we might do a mem::swap instead so that we could shove the
514        // bin vector into an object pool but I'm not sure this actually matters at the moment.
515        self.bins = temp;
516    }
517
518    pub fn insert(&mut self, v: f64) {
519        // TODO: this should return a result that makes sure we have enough room to actually add 1
520        // more sample without hitting `self.config.max_count()`
521        self.adjust_basic_stats(v, 1);
522
523        let key = self.config.key(v);
524        self.insert_keys(vec![key]);
525    }
526
527    pub fn insert_many(&mut self, vs: &[f64]) {
528        // TODO: this should return a result that makes sure we have enough room to actually add 1
529        // more sample without hitting `self.config.max_count()`
530        let mut keys = Vec::with_capacity(vs.len());
531        for v in vs {
532            self.adjust_basic_stats(*v, 1);
533            keys.push(self.config.key(*v));
534        }
535        self.insert_keys(keys);
536    }
537
538    pub fn insert_n(&mut self, v: f64, n: u32) {
539        // TODO: this should return a result that makes sure we have enough room to actually add N
540        // more samples without hitting `self.config.max_count()`
541        self.adjust_basic_stats(v, n);
542
543        let key = self.config.key(v);
544        self.insert_key_counts(vec![(key, n)]);
545    }
546
547    fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u32) {
548        // Find the keys for the bins where the lower bound and upper bound would end up, and
549        // collect all of the keys in between, inclusive.
550        let lower_key = self.config.key(lower);
551        let upper_key = self.config.key(upper);
552        let keys = (lower_key..=upper_key).collect::<Vec<_>>();
553
554        let mut key_counts = Vec::new();
555        let mut remaining_count = count;
556        let distance = upper - lower;
557        let mut start_idx = 0;
558        let mut end_idx = 1;
559        let mut lower_bound = self.config.bin_lower_bound(keys[start_idx]);
560        let mut remainder = 0.0;
561
562        while end_idx < keys.len() && remaining_count > 0 {
563            // For each key, map the total distance between the input lower/upper bound against the sketch
564            // lower/upper bound for the current sketch bin, which tells us how much of the input
565            // count to apply to the current sketch bin.
566            let upper_bound = self.config.bin_lower_bound(keys[end_idx]);
567            let fkn = ((upper_bound - lower_bound) / distance) * f64::from(count);
568            if fkn > 1.0 {
569                remainder += fkn - fkn.trunc();
570            }
571
572            // SAFETY: This integer cast is intentional: we want to get the non-fractional part, as
573            // we've captured the fractional part in the above conditional.
574            #[allow(clippy::cast_possible_truncation)]
575            let mut kn = fkn as u32;
576            if remainder > 1.0 {
577                kn += 1;
578                remainder -= 1.0;
579            }
580
581            if kn > 0 {
582                if kn > remaining_count {
583                    kn = remaining_count;
584                }
585
586                self.adjust_basic_stats(lower_bound, kn);
587                key_counts.push((keys[start_idx], kn));
588
589                remaining_count -= kn;
590                start_idx = end_idx;
591                lower_bound = upper_bound;
592            }
593
594            end_idx += 1;
595        }
596
597        if remaining_count > 0 {
598            let last_key = keys[start_idx];
599            lower_bound = self.config.bin_lower_bound(last_key);
600            self.adjust_basic_stats(lower_bound, remaining_count);
601            key_counts.push((last_key, remaining_count));
602        }
603
604        self.insert_key_counts(key_counts);
605    }
606
607    /// ## Errors
608    ///
609    /// Returns an error if a bucket size is greater that `u32::MAX`.
610    pub fn insert_interpolate_buckets(
611        &mut self,
612        mut buckets: Vec<Bucket>,
613    ) -> Result<(), &'static str> {
614        // Buckets need to be sorted from lowest to highest so that we can properly calculate the
615        // rolling lower/upper bounds.
616        buckets.sort_by(|a, b| {
617            let oa = OrderedFloat(a.upper_limit);
618            let ob = OrderedFloat(b.upper_limit);
619
620            oa.cmp(&ob)
621        });
622
623        let mut lower = f64::NEG_INFINITY;
624
625        if buckets
626            .iter()
627            .any(|bucket| bucket.count > u64::from(u32::MAX))
628        {
629            return Err("bucket size greater than u32::MAX");
630        }
631
632        for bucket in buckets {
633            let mut upper = bucket.upper_limit;
634            if upper.is_sign_positive() && upper.is_infinite() {
635                upper = lower;
636            } else if lower.is_sign_negative() && lower.is_infinite() {
637                lower = upper;
638            }
639
640            // Each bucket should only have the values that fit within that bucket, which is
641            // generally enforced at the source level by converting from cumulative buckets, or
642            // enforced by the internal structures that hold bucketed data i.e. Vector's internal
643            // `Histogram` data structure used for collecting histograms from `metrics`.
644            let count = u32::try_from(bucket.count)
645                .unwrap_or_else(|_| unreachable!("count range has already been checked."));
646
647            self.insert_interpolate_bucket(lower, upper, count);
648            lower = bucket.upper_limit;
649        }
650
651        Ok(())
652    }
653
654    /// Adds a bin directly into the sketch.
655    ///
656    /// Used only for unit testing so that we can create a sketch with an exact layout, which allows
657    /// testing around the resulting bins when feeding in specific values, as well as generating
658    /// explicitly bad layouts for testing.
659    #[allow(dead_code)]
660    pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u16) {
661        let v = self.config.bin_lower_bound(k);
662        self.adjust_basic_stats(v, u32::from(n));
663        self.bins.push(Bin { k, n });
664    }
665
666    pub fn quantile(&self, q: f64) -> Option<f64> {
667        if self.count == 0 {
668            return None;
669        }
670
671        if q <= 0.0 {
672            return Some(self.min);
673        }
674
675        if q >= 1.0 {
676            return Some(self.max);
677        }
678
679        let mut n = 0.0;
680        let mut estimated = None;
681        let wanted_rank = rank(self.count, q);
682
683        for (i, bin) in self.bins.iter().enumerate() {
684            n += f64::from(bin.n);
685            if n <= wanted_rank {
686                continue;
687            }
688
689            let weight = (n - wanted_rank) / f64::from(bin.n);
690            let mut v_low = self.config.bin_lower_bound(bin.k);
691            let mut v_high = v_low * self.config.gamma_v;
692
693            if i == self.bins.len() {
694                v_high = self.max;
695            } else if i == 0 {
696                v_low = self.min;
697            }
698
699            estimated = Some(v_low * weight + v_high * (1.0 - weight));
700            break;
701        }
702
703        estimated
704            .map(|v| v.clamp(self.min, self.max))
705            .or(Some(f64::NAN))
706    }
707
708    /// Merges another sketch into this sketch, without a loss of accuracy.
709    ///
710    /// All samples present in the other sketch will be correctly represented in this sketch, and
711    /// summary statistics such as the sum, average, count, min, and max, will represent the sum of
712    /// samples from both sketches.
713    ///
714    /// ## Errors
715    ///
716    /// If there is an error while merging the two sketches together, an error variant will be
717    /// returned that describes the issue.
718    pub fn merge(&mut self, other: &AgentDDSketch) -> Result<(), MergeError> {
719        if self.config != other.config {
720            return Err(MergeError::MismatchedConfigs);
721        }
722
723        // Merge the basic statistics together.
724        self.count += other.count;
725        if other.max > self.max {
726            self.max = other.max;
727        }
728        if other.min < self.min {
729            self.min = other.min;
730        }
731        self.sum += other.sum;
732        self.avg =
733            self.avg + (other.avg - self.avg) * f64::from(other.count) / f64::from(self.count);
734
735        // Now merge the bins.
736        let mut temp = Vec::new();
737
738        let mut bins_idx = 0;
739        for other_bin in &other.bins {
740            let start = bins_idx;
741            while bins_idx < self.bins.len() && self.bins[bins_idx].k < other_bin.k {
742                bins_idx += 1;
743            }
744
745            temp.extend_from_slice(&self.bins[start..bins_idx]);
746
747            if bins_idx >= self.bins.len() || self.bins[bins_idx].k > other_bin.k {
748                temp.push(*other_bin);
749            } else if self.bins[bins_idx].k == other_bin.k {
750                generate_bins(
751                    &mut temp,
752                    other_bin.k,
753                    u32::from(other_bin.n) + u32::from(self.bins[bins_idx].n),
754                );
755                bins_idx += 1;
756            }
757        }
758
759        temp.extend_from_slice(&self.bins[bins_idx..]);
760        trim_left(&mut temp, self.config.bin_limit);
761
762        self.bins = temp;
763
764        Ok(())
765    }
766
767    /// Converts a `Metric` to a sketch representation, if possible, using `AgentDDSketch`.
768    ///
769    /// For certain types of metric values, such as distributions or aggregated histograms, we can
770    /// easily convert them to a sketch-based representation.  Rather than push the logic of how to
771    /// do that up to callers that wish to use a sketch-based representation, we bundle it here as a
772    /// free function on `AgentDDSketch` itself.
773    ///
774    /// If the metric value cannot be represented as a sketch -- essentially, everything that isn't
775    /// a distribution or aggregated histogram -- then the metric is passed back unmodified.  All
776    /// existing metadata -- series name, tags, timestamp, etc -- is left unmodified, even if the
777    /// metric is converted to a sketch internally.
778    ///
779    /// ## Errors
780    ///
781    /// Returns an error if a bucket size is greater that `u32::MAX`.
782    pub fn transform_to_sketch(mut metric: Metric) -> Result<Metric, &'static str> {
783        let sketch = match metric.data_mut().value_mut() {
784            MetricValue::Distribution { samples, .. } => {
785                let mut sketch = AgentDDSketch::with_agent_defaults();
786                for sample in samples {
787                    sketch.insert_n(sample.value, sample.rate);
788                }
789                Some(sketch)
790            }
791            MetricValue::AggregatedHistogram { buckets, .. } => {
792                let delta_buckets = mem::take(buckets);
793                let mut sketch = AgentDDSketch::with_agent_defaults();
794                sketch.insert_interpolate_buckets(delta_buckets)?;
795                Some(sketch)
796            }
797            // We can't convert from any other metric value.
798            _ => None,
799        };
800
801        match sketch {
802            // Metric was not able to be converted to a sketch, so pass it back.
803            None => Ok(metric),
804            // Metric was able to be converted to a sketch, so adjust the value.
805            Some(sketch) => Ok(metric.with_value(sketch.into())),
806        }
807    }
808}
809
810impl PartialEq for AgentDDSketch {
811    fn eq(&self, other: &Self) -> bool {
812        // We skip checking the configuration because we don't allow creating configurations by
813        // hand, and it's always locked to the constants used by the Datadog Agent.  We only check
814        // the configuration equality manually in `AgentDDSketch::merge`, to protect ourselves in
815        // the future if different configurations become allowed.
816        //
817        // Additionally, we also use floating-point-specific relative comparisons for sum/avg
818        // because they can be minimally different between sketches purely due to floating-point
819        // behavior, despite being fed the same exact data in terms of recorded samples.
820        self.count == other.count
821            && float_eq(self.min, other.min)
822            && float_eq(self.max, other.max)
823            && float_eq(self.sum, other.sum)
824            && float_eq(self.avg, other.avg)
825            && self.bins == other.bins
826    }
827}
828
829impl Eq for AgentDDSketch {}
830
831impl ByteSizeOf for AgentDDSketch {
832    fn allocated_bytes(&self) -> usize {
833        self.bins.len() * mem::size_of::<Bin>()
834    }
835}
836
837/// A split representation of sketch bins.
838///
839/// While internally we depend on a vector of bins, the serialized form used in Protocol Buffers
840/// splits the bins -- which contain their own key and bin count -- into two separate vectors, one
841/// for the keys and one for the bin counts.
842///
843/// This type provides the glue to go from our internal representation to the Protocol
844/// Buffers-specific representation.
845#[configurable_component]
846#[derive(Clone, Debug)]
847pub struct BinMap {
848    /// The bin keys.
849    #[serde(rename = "k")]
850    pub keys: Vec<i16>,
851
852    /// The bin counts.
853    #[serde(rename = "n")]
854    pub counts: Vec<u16>,
855}
856
857impl BinMap {
858    pub fn from_bins<B>(bins: B) -> BinMap
859    where
860        B: AsRef<[Bin]>,
861    {
862        let (keys, counts) =
863            bins.as_ref()
864                .iter()
865                .fold((Vec::new(), Vec::new()), |(mut keys, mut counts), bin| {
866                    keys.push(bin.k);
867                    counts.push(bin.n);
868
869                    (keys, counts)
870                });
871
872        BinMap { keys, counts }
873    }
874
875    pub fn into_parts(self) -> (Vec<i16>, Vec<u16>) {
876        (self.keys, self.counts)
877    }
878
879    pub fn into_bins(self) -> Option<Vec<Bin>> {
880        if self.keys.len() == self.counts.len() {
881            Some(
882                self.keys
883                    .into_iter()
884                    .zip(self.counts)
885                    .map(|(k, n)| Bin { k, n })
886                    .collect(),
887            )
888        } else {
889            None
890        }
891    }
892}
893
894impl SerializeAs<Vec<Bin>> for BinMap {
895    fn serialize_as<S>(source: &Vec<Bin>, serializer: S) -> Result<S::Ok, S::Error>
896    where
897        S: Serializer,
898    {
899        let bin_map = BinMap::from_bins(source);
900        bin_map.serialize(serializer)
901    }
902}
903
904impl<'de> DeserializeAs<'de, Vec<Bin>> for BinMap {
905    fn deserialize_as<D>(deserializer: D) -> Result<Vec<Bin>, D::Error>
906    where
907        D: Deserializer<'de>,
908    {
909        let bin_map: BinMap = Deserialize::deserialize(deserializer)?;
910        bin_map
911            .into_bins()
912            .ok_or("keys and counts must match in length")
913            .map_err(D::Error::custom)
914    }
915}
916
917fn rank(count: u32, q: f64) -> f64 {
918    round_to_even(q * f64::from(count - 1))
919}
920
921#[allow(clippy::cast_possible_truncation)]
922fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u32 {
923    if start_idx == keys.len() - 1 {
924        return 1;
925    }
926
927    let mut idx = start_idx;
928    while idx < keys.len() && keys[idx] == keys[start_idx] {
929        idx += 1;
930    }
931
932    // SAFETY: We limit the size of the vector (used to provide the slice given to us here) to be no
933    // larger than 2^32, so we can't exceed u32 here.
934    (idx - start_idx) as u32
935}
936
937fn trim_left(bins: &mut Vec<Bin>, bin_limit: u16) {
938    // We won't ever support Vector running on anything other than a 32-bit platform and above, I
939    // imagine, so this should always be safe.
940    let bin_limit = bin_limit as usize;
941    if bin_limit == 0 || bins.len() < bin_limit {
942        return;
943    }
944
945    let num_to_remove = bins.len() - bin_limit;
946    let mut missing = 0;
947    let mut overflow = Vec::new();
948
949    for bin in bins.iter().take(num_to_remove) {
950        missing += u32::from(bin.n);
951
952        if missing > u32::from(MAX_BIN_WIDTH) {
953            overflow.push(Bin {
954                k: bin.k,
955                n: MAX_BIN_WIDTH,
956            });
957
958            missing -= u32::from(MAX_BIN_WIDTH);
959        }
960    }
961
962    let bin_remove = &mut bins[num_to_remove];
963    missing = bin_remove.increment(missing);
964    if missing > 0 {
965        generate_bins(&mut overflow, bin_remove.k, missing);
966    }
967
968    let overflow_len = overflow.len();
969    let (_, bins_end) = bins.split_at(num_to_remove);
970    overflow.extend_from_slice(bins_end);
971
972    // I still don't yet understand how this works, since you'd think bin limit should be the
973    // overall limit of the number of bins, but we're allowing more than that.. :thinkies:
974    overflow.truncate(bin_limit + overflow_len);
975
976    mem::swap(bins, &mut overflow);
977}
978
979#[allow(clippy::cast_possible_truncation)]
980fn generate_bins(bins: &mut Vec<Bin>, k: i16, n: u32) {
981    if n < u32::from(MAX_BIN_WIDTH) {
982        // SAFETY: Cannot truncate `n`, as it's less than a u16 value.
983        bins.push(Bin { k, n: n as u16 });
984    } else {
985        let overflow = n % u32::from(MAX_BIN_WIDTH);
986        if overflow != 0 {
987            bins.push(Bin {
988                k,
989                // SAFETY: Cannot truncate `overflow`, as it's modulo'd by a u16 value.
990                n: overflow as u16,
991            });
992        }
993
994        for _ in 0..(n / u32::from(MAX_BIN_WIDTH)) {
995            bins.push(Bin {
996                k,
997                n: MAX_BIN_WIDTH,
998            });
999        }
1000    }
1001}
1002
1003#[allow(clippy::cast_possible_truncation)]
1004#[inline]
1005fn capped_u64_shift(shift: u64) -> u32 {
1006    if shift >= 64 {
1007        u32::MAX
1008    } else {
1009        // SAFETY: There's no way that we end up truncating `shift`, since we cap it to 64 above.
1010        shift as u32
1011    }
1012}
1013
1014fn round_to_even(v: f64) -> f64 {
1015    // Taken from Go: src/math/floor.go
1016    //
1017    // Copyright (c) 2009 The Go Authors. All rights reserved.
1018    //
1019    // Redistribution and use in source and binary forms, with or without
1020    // modification, are permitted provided that the following conditions are
1021    // met:
1022    //
1023    //    * Redistributions of source code must retain the above copyright
1024    // notice, this list of conditions and the following disclaimer.
1025    //    * Redistributions in binary form must reproduce the above
1026    // copyright notice, this list of conditions and the following disclaimer
1027    // in the documentation and/or other materials provided with the
1028    // distribution.
1029    //    * Neither the name of Google Inc. nor the names of its
1030    // contributors may be used to endorse or promote products derived from
1031    // this software without specific prior written permission.
1032    //
1033    // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
1034    // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
1035    // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
1036    // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
1037    // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
1038    // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
1039    // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
1040    // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
1041    // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
1042    // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
1043    // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
1044
1045    // HEAR YE: There's a non-zero chance that we could rewrite this function in a way that is far
1046    // more Rust like, rather than porting over the particulars of how Go works, but we're
1047    // aiming for compatibility with a Go implementation, and so we've ported this over as
1048    // faithfully as possible.  With that said, here are the specifics we're dealing with:
1049    // - in Go, subtraction of unsigned numbers implicitly wraps around i.e. 1u64 - u64::MAX == 2
1050    // - in Go, right shifts are capped at the bitwidth of the left operand, which means that if
1051    //   you shift a 64-bit unsigned integers by 65 or above (some_u64 >> 65), instead of being told that
1052    //   you're doing something wrong, Go just caps the shift amount, and you end up with 0,
1053    //   whether you shift by 64 or by 9 million
1054    // - in Rust, it's not possible to directly do `some_u64 >> 64` even, as this would be
1055    //   considered an overflow
1056    // - in Rust, there are methods on unsigned integer primitives that allow for safely
1057    //   shifting by amounts greater than the bitwidth of the primitive type, although they mask off
1058    //   the bits in the shift amount that are higher than the bitwidth i.e. shift values above 64,
1059    //   for shifting a u64, are masked such that the resulting shift amount is 0, effectively
1060    //   not shifting the operand at all
1061    //
1062    // With all of this in mind, the code below compensates for that by doing wrapped
1063    // subtraction and shifting, which is straightforward, but also by utilizing
1064    // `capped_u64_shift` to approximate the behavior Go has when shifting by amounts larger
1065    // than the bitwidth of the left operand.
1066    //
1067    // I'm really proud of myself for reverse engineering this, but I sincerely hope we can
1068    // flush it down the toilet in the near future for something vastly simpler.
1069    const MASK: u64 = 0x7ff;
1070    const BIAS: u64 = 1023;
1071    const SHIFT: u64 = 64 - 11 - 1;
1072    const SIGN_MASK: u64 = 1 << 63;
1073    const FRAC_MASK: u64 = (1 << SHIFT) - 1;
1074    #[allow(clippy::unreadable_literal)]
1075    const UV_ONE: u64 = 0x3FF0000000000000;
1076    const HALF_MINUS_ULP: u64 = (1 << (SHIFT - 1)) - 1;
1077
1078    let mut bits = v.to_bits();
1079    let mut e = (bits >> SHIFT) & MASK;
1080    if e >= BIAS {
1081        e = e.wrapping_sub(BIAS);
1082        let shift_amount = SHIFT.wrapping_sub(e);
1083        let shifted = bits.wrapping_shr(capped_u64_shift(shift_amount));
1084        let plus_ulp = HALF_MINUS_ULP + (shifted & 1);
1085        bits += plus_ulp.wrapping_shr(capped_u64_shift(e));
1086        bits &= !(FRAC_MASK.wrapping_shr(capped_u64_shift(e)));
1087    } else if e == BIAS - 1 && bits & FRAC_MASK != 0 {
1088        // Round 0.5 < abs(x) < 1.
1089        bits = bits & SIGN_MASK | UV_ONE; // +-1
1090    } else {
1091        // Round abs(x) <= 0.5 including denormals.
1092        bits &= SIGN_MASK; // +-0
1093    }
1094    f64::from_bits(bits)
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099    use super::{round_to_even, AgentDDSketch, Config, AGENT_DEFAULT_EPS, MAX_KEY};
1100    use crate::event::metric::Bucket;
1101
1102    const FLOATING_POINT_ACCEPTABLE_ERROR: f64 = 1.0e-10;
1103
1104    #[cfg(ddsketch_extended)]
1105    fn generate_pareto_distribution() -> Vec<OrderedFloat<f64>> {
1106        use ordered_float::OrderedFloat;
1107        use rand::rng;
1108        use rand_distr::{Distribution, Pareto};
1109
1110        // Generate a set of samples that roughly correspond to the latency of a typical web
1111        // service, in microseconds, with a gamma distribution: big hump at the beginning with a
1112        // long tail.  We limit this so the samples represent latencies that bottom out at 15
1113        // milliseconds and tail off all the way up to 10 seconds.
1114        //let distribution = Gamma::new(1.2, 100.0).unwrap();
1115        let distribution = Pareto::new(1.0, 1.0).expect("pareto distribution should be valid");
1116        let mut samples = distribution
1117            .sample_iter(rng())
1118            // Scale by 10,000 to get microseconds.
1119            .map(|n| n * 10_000.0)
1120            .filter(|n| *n > 15_000.0 && *n < 10_000_000.0)
1121            .map(OrderedFloat)
1122            .take(1000)
1123            .collect::<Vec<_>>();
1124
1125        // Sort smallest to largest.
1126        samples.sort();
1127
1128        samples
1129    }
1130
1131    #[test]
1132    fn test_ddsketch_config_key_lower_bound_identity() {
1133        let config = Config::default();
1134        for k in (-MAX_KEY + 1)..MAX_KEY {
1135            assert_eq!(k, config.key(config.bin_lower_bound(k)));
1136        }
1137    }
1138
1139    #[test]
1140    fn test_ddsketch_basic() {
1141        let mut sketch = AgentDDSketch::with_agent_defaults();
1142        assert!(sketch.is_empty());
1143        assert_eq!(sketch.count(), 0);
1144        assert_eq!(sketch.min(), None);
1145        assert_eq!(sketch.max(), None);
1146        assert_eq!(sketch.sum(), None);
1147        assert_eq!(sketch.avg(), None);
1148
1149        sketch.insert(3.15);
1150        assert!(!sketch.is_empty());
1151        assert_eq!(sketch.count(), 1);
1152        assert_eq!(sketch.min(), Some(3.15));
1153        assert_eq!(sketch.max(), Some(3.15));
1154        assert_eq!(sketch.sum(), Some(3.15));
1155        assert_eq!(sketch.avg(), Some(3.15));
1156
1157        sketch.insert(2.28);
1158        assert!(!sketch.is_empty());
1159        assert_eq!(sketch.count(), 2);
1160        assert_eq!(sketch.min(), Some(2.28));
1161        assert_eq!(sketch.max(), Some(3.15));
1162        assert_eq!(sketch.sum(), Some(5.43));
1163        assert_eq!(sketch.avg(), Some(2.715));
1164    }
1165
1166    #[test]
1167    fn test_ddsketch_clear() {
1168        let sketch1 = AgentDDSketch::with_agent_defaults();
1169        let mut sketch2 = AgentDDSketch::with_agent_defaults();
1170
1171        assert_eq!(sketch1, sketch2);
1172        assert!(sketch1.is_empty());
1173        assert!(sketch2.is_empty());
1174
1175        sketch2.insert(3.15);
1176        assert_ne!(sketch1, sketch2);
1177        assert!(!sketch2.is_empty());
1178
1179        sketch2.clear();
1180        assert_eq!(sketch1, sketch2);
1181        assert!(sketch2.is_empty());
1182    }
1183
1184    #[test]
1185    fn test_ddsketch_neg_to_pos() {
1186        // This gives us 10k values because otherwise this test runs really slow in debug mode.
1187        let start = -1.0;
1188        let end = 1.0;
1189        let delta = 0.0002;
1190
1191        let mut sketch = AgentDDSketch::with_agent_defaults();
1192
1193        let mut v = start;
1194        while v <= end {
1195            sketch.insert(v);
1196
1197            v += delta;
1198        }
1199
1200        let min = sketch.quantile(0.0).expect("should have value");
1201        let median = sketch.quantile(0.5).expect("should have value");
1202        let max = sketch.quantile(1.0).expect("should have value");
1203
1204        assert_eq!(start, min);
1205        assert!(median.abs() < FLOATING_POINT_ACCEPTABLE_ERROR);
1206        assert!((end - max).abs() < FLOATING_POINT_ACCEPTABLE_ERROR);
1207    }
1208
1209    #[test]
1210    fn test_out_of_range_buckets_error() {
1211        let mut sketch = AgentDDSketch::with_agent_defaults();
1212
1213        let buckets = vec![
1214            Bucket {
1215                upper_limit: 5.4,
1216                count: 32,
1217            },
1218            Bucket {
1219                upper_limit: 5.8,
1220                count: u64::from(u32::MAX) + 1,
1221            },
1222            Bucket {
1223                upper_limit: 9.2,
1224                count: 320,
1225            },
1226        ];
1227
1228        assert_eq!(
1229            Err("bucket size greater than u32::MAX"),
1230            sketch.insert_interpolate_buckets(buckets)
1231        );
1232
1233        // Assert the sketch remains unchanged.
1234        assert_eq!(sketch, AgentDDSketch::with_agent_defaults());
1235    }
1236
1237    #[test]
1238    fn test_merge() {
1239        let mut all_values = AgentDDSketch::with_agent_defaults();
1240        let mut odd_values = AgentDDSketch::with_agent_defaults();
1241        let mut even_values = AgentDDSketch::with_agent_defaults();
1242        let mut all_values_many = AgentDDSketch::with_agent_defaults();
1243
1244        let mut values = Vec::new();
1245        for i in -50..=50 {
1246            let v = f64::from(i);
1247
1248            all_values.insert(v);
1249
1250            if i & 1 == 0 {
1251                odd_values.insert(v);
1252            } else {
1253                even_values.insert(v);
1254            }
1255
1256            values.push(v);
1257        }
1258
1259        all_values_many.insert_many(&values);
1260
1261        assert!(odd_values.merge(&even_values).is_ok());
1262        let merged_values = odd_values;
1263
1264        // Number of bins should be equal to the number of values we inserted.
1265        assert_eq!(all_values.bin_count(), values.len());
1266
1267        // Values at both ends of the quantile range should be equal.
1268        let low_end = all_values
1269            .quantile(0.01)
1270            .expect("should have estimated value");
1271        let high_end = all_values
1272            .quantile(0.99)
1273            .expect("should have estimated value");
1274        assert_eq!(high_end, -low_end);
1275
1276        let target_bin_count = all_values.bin_count();
1277        for sketch in &[all_values, all_values_many, merged_values] {
1278            assert_eq!(sketch.quantile(0.5), Some(0.0));
1279            assert_eq!(sketch.quantile(0.0), Some(-50.0));
1280            assert_eq!(sketch.quantile(1.0), Some(50.0));
1281
1282            for p in 0..50 {
1283                let q = f64::from(p) / 100.0;
1284                let positive = sketch
1285                    .quantile(q + 0.5)
1286                    .expect("should have estimated value");
1287                let negative = -sketch
1288                    .quantile(0.5 - q)
1289                    .expect("should have estimated value");
1290
1291                assert!(
1292                    (positive - negative).abs() <= 1.0e-6,
1293                    "positive vs negative difference too great ({positive} vs {negative})",
1294                );
1295            }
1296
1297            assert_eq!(target_bin_count, sketch.bin_count());
1298        }
1299    }
1300
1301    #[test]
1302    fn test_merge_different_configs() {
1303        let mut first = AgentDDSketch::with_agent_defaults();
1304        let mut second = AgentDDSketch::with_agent_defaults();
1305
1306        // Subtly tweak the config of the second sketch to ensure that merging fails.
1307        second.config.norm_bias += 1;
1308
1309        assert!(first.merge(&second).is_err());
1310    }
1311
1312    #[test]
1313    #[cfg(ddsketch_extended)]
1314    fn test_ddsketch_pareto_distribution() {
1315        use ndarray::{Array, Axis};
1316        use ndarray_stats::{interpolate::Midpoint, QuantileExt};
1317        use noisy_float::prelude::N64;
1318
1319        // NOTE: This test unexpectedly fails to meet the relative accuracy guarantees when checking
1320        // the samples against quantiles pulled via `ndarray_stats`.  When feeding the same samples
1321        // to the actual DDSketch implementation in datadog-agent, we get identical results at each
1322        // quantile. This doesn't make a huge amount of sense to me, since we have a unit test that
1323        // verifies the relative accuracy of the configuration itself, which should only fail to be
1324        // met if we hit the bin limit and bins have to be collapsed.
1325        //
1326        // We're keeping it here as a reminder of the seemingly practical difference in accuracy
1327        // vs deriving the quantiles of the sample sets directly.
1328
1329        // We generate a straightforward Pareto distribution to simulate web request latencies.
1330        let samples = generate_pareto_distribution();
1331
1332        // Prepare our data for querying.
1333        let mut sketch = AgentDDSketch::with_agent_defaults();
1334
1335        let relative_accuracy = AGENT_DEFAULT_EPS;
1336        for sample in &samples {
1337            sketch.insert(sample.into_inner());
1338        }
1339
1340        let mut array = Array::from_iter(samples);
1341
1342        // Now check the estimated quantile via `AgentDDSketch` vs the true quantile via `ndarray`.
1343        //
1344        // TODO: what's a reasonable quantile to start from? from testing the actual agent code, it
1345        // seems like <p50 is gonna be rough no matter what, which I think is expected but also not great?
1346        for p in 1..=100 {
1347            let q = p as f64 / 100.0;
1348            let x = sketch.quantile(q);
1349            assert!(x.is_some());
1350
1351            let estimated = x.unwrap();
1352            let actual = array
1353                .quantile_axis_mut(Axis(0), N64::unchecked_new(q), &Midpoint)
1354                .expect("quantile should be in range")
1355                .get(())
1356                .expect("quantile value should be present")
1357                .clone()
1358                .into_inner();
1359
1360            let _err = (estimated - actual).abs() / actual;
1361            assert!(err <= relative_accuracy,
1362				"relative accuracy out of bounds: q={}, estimate={}, actual={}, target-rel-acc={}, actual-rel-acc={}, bin-count={}",
1363				q, estimated, actual, relative_accuracy, err, sketch.bin_count());
1364        }
1365    }
1366
1367    #[test]
1368    fn test_relative_accuracy_fast() {
1369        // These values are based on the agent's unit tests for asserting relative accuracy of the
1370        // DDSketch implementation.  Notably, it does not seem to test the full extent of values
1371        // that the open-source implementations do, but then again... all we care about is parity
1372        // with the agent version so we can pass them through.
1373        //
1374        // Another noteworthy thing: it seems that they don't test from the actual targeted minimum
1375        // value, which is 1.0e-9, which would give nanosecond granularity vs just microsecond
1376        // granularity.
1377        let config = Config::default();
1378        let min_value = 1.0;
1379        // We don't care about precision loss, just consistency.
1380        #[allow(clippy::cast_possible_truncation)]
1381        let max_value = config.gamma_v.powf(5.0) as f32;
1382
1383        test_relative_accuracy(config, AGENT_DEFAULT_EPS, min_value, max_value);
1384    }
1385
1386    #[test]
1387    #[cfg(ddsketch_extended)]
1388    fn test_relative_accuracy_slow() {
1389        // These values are based on the agent's unit tests for asserting relative accuracy of the
1390        // DDSketch implementation.  Notably, it does not seem to test the full extent of values
1391        // that the open-source implementations do, but then again... all we care about is parity
1392        // with the agent version so we can pass them through.
1393        //
1394        // Another noteworthy thing: it seems that they don't test from the actual targeted minimum
1395        // value, which is 1.0e-9, which would give nanosecond granularity vs just microsecond
1396        // granularity.
1397        //
1398        // This test uses a far larger range of values, and takes 60-70 seconds, hence why we've
1399        // guarded it here behind a cfg flag.
1400        let config = Config::default();
1401        let min_value = 1.0e-6;
1402        let max_value = i64::MAX as f32;
1403
1404        test_relative_accuracy(config, AGENT_DEFAULT_EPS, min_value, max_value)
1405    }
1406
1407    fn parse_sketch_from_string_bins(layout: &str) -> AgentDDSketch {
1408        layout
1409            .split(' ')
1410            .map(|pair| pair.split(':').map(ToOwned::to_owned).collect::<Vec<_>>())
1411            .fold(
1412                AgentDDSketch::with_agent_defaults(),
1413                |mut sketch, mut kn| {
1414                    let k = kn.remove(0).parse::<i16>().unwrap();
1415                    let n = kn.remove(0).parse::<u16>().unwrap();
1416
1417                    sketch.insert_raw_bin(k, n);
1418                    sketch
1419                },
1420            )
1421    }
1422
1423    fn compare_sketches(actual: &AgentDDSketch, expected: &AgentDDSketch, allowed_err: f64) {
1424        let actual_sum = actual.sum().unwrap();
1425        let expected_sum = expected.sum().unwrap();
1426        let actual_avg = actual.avg().unwrap();
1427        let expected_avg = expected.avg().unwrap();
1428        let sum_delta = (actual_sum - expected_sum).abs();
1429        let avg_delta = (actual_avg - expected_avg).abs();
1430        assert!(sum_delta <= allowed_err);
1431        assert!(avg_delta <= allowed_err);
1432        assert_eq!(actual.min(), expected.min());
1433        assert_eq!(actual.max(), expected.max());
1434        assert_eq!(actual.count(), expected.count());
1435        assert_eq!(actual.bins(), expected.bins());
1436    }
1437
1438    #[test]
1439    fn test_histogram_interpolation_agent_similarity() {
1440        #[derive(Clone)]
1441        struct Case {
1442            lower: f64,
1443            upper: f64,
1444            count: u32,
1445            allowed_err: f64,
1446            expected: &'static str,
1447        }
1448
1449        let check_result = |actual: &AgentDDSketch, case: &Case| {
1450            let expected = parse_sketch_from_string_bins(case.expected);
1451
1452            assert_eq!(expected.count(), case.count);
1453            assert_eq!(actual.count(), case.count);
1454            assert_eq!(actual.bins(), expected.bins());
1455            compare_sketches(actual, &expected, case.allowed_err);
1456
1457            let actual_count: u32 = actual.bins.iter().map(|b| u32::from(b.n)).sum();
1458            assert_eq!(actual_count, case.count);
1459        };
1460
1461        let cases = &[
1462            Case { lower: 0.0, upper: 10.0, count: 2, allowed_err: 0.0, expected: "0:1 1442:1" },
1463            Case { lower: 10.0, upper: 20.0, count: 4,  allowed_err: 0.0, expected: "1487:1 1502:1 1514:1 1524:1" },
1464		    Case { lower: -10.0, upper: 10.0, count: 4, allowed_err: 0.0, expected: "-1487:1 -1442:1 -1067:1 1442:1"},
1465            Case { lower: 0.0, upper: 10.0, count: 100, allowed_err: 0.0, expected: "0:1 1190:1 1235:1 1261:1 1280:1 1295:1 1307:1 1317:1 1326:1 1334:1 1341:1 1347:1 1353:1 1358:1 1363:1 1368:1 1372:2 1376:1 1380:1 1384:1 1388:1 1391:1 1394:1 1397:2 1400:1 1403:1 1406:2 1409:1 1412:1 1415:2 1417:1 1419:1 1421:1 1423:1 1425:1 1427:1 1429:2 1431:1 1433:1 1435:2 1437:1 1439:2 1441:1 1443:2 1445:2 1447:1 1449:2 1451:2 1453:2 1455:2 1457:2 1459:1 1460:1 1461:1 1462:1 1463:1 1464:1 1465:1 1466:1 1467:2 1468:1 1469:1 1470:1 1471:1 1472:2 1473:1 1474:1 1475:1 1476:2 1477:1 1478:2 1479:1 1480:1 1481:2 1482:1 1483:2 1484:1 1485:2 1486:1" },
1466		    Case { lower: 1_000.0, upper: 100_000.0, count: 1_000_000 - 1, allowed_err: 0.0, expected: "1784:158 1785:162 1786:164 1787:166 1788:170 1789:171 1790:175 1791:177 1792:180 1793:183 1794:185 1795:189 1796:191 1797:195 1798:197 1799:201 1800:203 1801:207 1802:210 1803:214 1804:217 1805:220 1806:223 1807:227 1808:231 1809:234 1810:238 1811:242 1812:245 1813:249 1814:253 1815:257 1816:261 1817:265 1818:270 1819:273 1820:278 1821:282 1822:287 1823:291 1824:295 1825:300 1826:305 1827:310 1828:314 1829:320 1830:324 1831:329 1832:335 1833:340 1834:345 1835:350 1836:356 1837:362 1838:367 1839:373 1840:379 1841:384 1842:391 1843:397 1844:403 1845:409 1846:416 1847:422 1848:429 1849:435 1850:442 1851:449 1852:457 1853:463 1854:470 1855:478 1856:486 1857:493 1858:500 1859:509 1860:516 1861:525 1862:532 1863:541 1864:550 1865:558 1866:567 1867:575 1868:585 1869:594 1870:603 1871:612 1872:622 1873:632 1874:642 1875:651 1876:662 1877:672 1878:683 1879:693 1880:704 1881:716 1882:726 1883:738 1884:749 1885:761 1886:773 1887:785 1888:797 1889:809 1890:823 1891:835 1892:848 1893:861 1894:875 1895:889 1896:902 1897:917 1898:931 1899:945 1900:960 1901:975 1902:991 1903:1006 1904:1021 1905:1038 1906:1053 1907:1071 1908:1087 1909:1104 1910:1121 1911:1138 1912:1157 1913:1175 1914:1192 1915:1212 1916:1231 1917:1249 1918:1269 1919:1290 1920:1309 1921:1329 1922:1351 1923:1371 1924:1393 1925:1415 1926:1437 1927:1459 1928:1482 1929:1506 1930:1529 1931:1552 1932:1577 1933:1602 1934:1626 1935:1652 1936:1678 1937:1704 1938:1731 1939:1758 1940:1785 1941:1813 1942:1841 1943:1870 1944:1900 1945:1929 1946:1959 1947:1990 1948:2021 1949:2052 1950:2085 1951:2117 1952:2150 1953:2184 1954:2218 1955:2253 1956:2287 1957:2324 1958:2360 1959:2396 1960:2435 1961:2472 1962:2511 1963:2550 1964:2589 1965:2631 1966:2671 1967:2714 1968:2755 1969:2799 1970:2842 1971:2887 1972:2932 1973:2978 1974:3024 1975:3071 1976:3120 1977:3168 1978:3218 1979:3268 1980:3319 1981:3371 1982:3423 1983:3477 1984:3532 1985:3586 1986:3643 1987:3700 1988:3757 1989:3816 1990:3876 1991:3936 1992:3998 1993:4060 1994:4124 1995:4188 1996:4253 1997:4320 1998:4388 1999:4456 2000:4526 2001:4596 2002:4668 2003:4741 2004:4816 2005:4890 2006:4967 2007:5044 2008:5124 2009:5203 2010:5285 2011:5367 2012:5451 2013:5536 2014:5623 2015:5711 2016:5800 2017:5890 2018:5983 2019:6076 2020:6171 2021:6267 2022:6365 2023:6465 2024:6566 2025:6668 2026:6773 2027:6878 2028:6986 2029:7095 2030:7206 2031:7318 2032:7433 2033:7549 2034:7667 2035:7786 2036:7909 2037:8032 2038:8157 2039:8285 2040:8414 2041:8546 2042:8679 2043:8815 2044:8953 2045:9092 2046:9235 2047:9379 2048:9525 2049:9675 2050:9825 2051:9979 2052:10135 2053:10293 2054:10454 2055:10618 2056:10783 2057:10952 2058:11123 2059:11297 2060:11473 2061:11653 2062:11834 2063:12020 2064:12207 2065:12398 2066:12592 2067:12788 2068:12989 2069:13191 2070:13397 2071:13607 2072:13819 2073:14036 2074:14254 2075:14478 2076:14703 2077:14933 2078:15167 2079:15403 2080:8942" },
1467		    Case { lower: 1_000.0, upper: 10_000.0, count: 10_000_000 - 1, allowed_err: 0.00001, expected: "1784:17485 1785:17758 1786:18035 1787:18318 1788:18604 1789:18894 1790:19190 1791:19489 1792:19794 1793:20103 1794:20418 1795:20736 1796:21061 1797:21389 1798:21724 1799:22063 1800:22408 1801:22758 1802:23113 1803:23475 1804:23841 1805:24215 1806:24592 1807:24977 1808:25366 1809:25764 1810:26165 1811:26575 1812:26990 1813:27412 1814:27839 1815:28275 1816:28717 1817:29165 1818:29622 1819:30083 1820:30554 1821:31032 1822:31516 1823:32009 1824:32509 1825:33016 1826:33533 1827:34057 1828:34589 1829:35129 1830:35678 1831:36235 1832:36802 1833:37377 1834:37961 1835:38554 1836:39156 1837:39768 1838:40390 1839:41020 1840:41662 1841:42312 1842:42974 1843:43645 1844:44327 1845:45020 1846:45723 1847:46438 1848:47163 1849:47900 1850:48648 1851:49409 1852:50181 1853:50964 1854:51761 1855:52570 1856:53391 1857:54226 1858:55072 1859:55934 1860:56807 1861:57695 1862:58596 1863:59512 1864:60441 1865:61387 1866:62345 1867:63319 1868:64309 1869:65314 1870:799 1870:65535 1871:1835 1871:65535 1872:2889 1872:65535 1873:3957 1873:65535 1874:5043 1874:65535 1875:6146 1875:65535 1876:7266 1876:65535 1877:8404 1877:65535 1878:9559 1878:65535 1879:10732 1879:65535 1880:11923 1880:65535 1881:13135 1881:65535 1882:14363 1882:65535 1883:15612 1883:65535 1884:16879 1884:65535 1885:18168 1885:65535 1886:19475 1886:65535 1887:20803 1887:65535 1888:22153 1888:65535 1889:23523 1889:65535 1890:24914 1890:65535 1891:26327 1891:65535 1892:27763 1892:65535 1893:29221 1893:65535 1894:30701 1894:65535 1895:32205 1895:65535 1896:33732 1896:65535 1897:35283 1897:65535 1898:36858 1898:65535 1899:38458 1899:65535 1900:40084 1900:65535 1901:41733 1901:65535 1902:43409 1902:65535 1903:45112 1903:65535 1904:46841 1904:65535 1905:48596 1905:65535 1906:50380 1906:65535 1907:52191 1907:65535 1908:54030 1908:65535 1909:55899 1909:65535 1910:57796 1910:65535 1911:59723 1911:65535 1912:61680 1912:65535 1913:63668 1913:65535 1914:152 1914:65535 1914:65535 1915:2202 1915:65535 1915:65535 1916:4285 1916:65535 1916:65535 1917:6399 1917:65535 1917:65535 1918:8547 1918:65535 1918:65535 1919:10729 1919:65535 1919:65535 1920:12945 1920:65535 1920:65535 1921:15195 1921:65535 1921:65535 1922:17480 1922:65535 1922:65535 1923:19801 1923:65535 1923:65535 1924:22158 1924:65535 1924:65535 1925:24553 1925:65535 1925:65535 1926:26985 1926:65535 1926:65535 1927:29453 1927:65535 1927:65535 1928:31963 1928:65535 1928:65535 1929:34509 1929:65535 1929:65535 1930:37097 1930:65535 1930:65535 1931:39724 1931:65535 1931:65535 1932:17411"},
1468        ];
1469
1470        let double_insert_cases = &[
1471            Case { lower: 1_000.0, upper: 10_000.0, count: 10_000_000 - 1, allowed_err: 0.0002, expected: "1784:34970 1785:35516 1786:36070 1787:36636 1788:37208 1789:37788 1790:38380 1791:38978 1792:39588 1793:40206 1794:40836 1795:41472 1796:42122 1797:42778 1798:43448 1799:44126 1800:44816 1801:45516 1802:46226 1803:46950 1804:47682 1805:48430 1806:49184 1807:49954 1808:50732 1809:51528 1810:52330 1811:53150 1812:53980 1813:54824 1814:55678 1815:56550 1816:57434 1817:58330 1818:59244 1819:60166 1820:61108 1821:62064 1822:63032 1823:64018 1824:65018 1825:497 1825:65535 1826:1531 1826:65535 1827:2579 1827:65535 1828:3643 1828:65535 1829:4723 1829:65535 1830:5821 1830:65535 1831:6935 1831:65535 1832:8069 1832:65535 1833:9219 1833:65535 1834:10387 1834:65535 1835:11573 1835:65535 1836:12777 1836:65535 1837:14001 1837:65535 1838:15245 1838:65535 1839:16505 1839:65535 1840:17789 1840:65535 1841:19089 1841:65535 1842:20413 1842:65535 1843:21755 1843:65535 1844:23119 1844:65535 1845:24505 1845:65535 1846:25911 1846:65535 1847:27341 1847:65535 1848:28791 1848:65535 1849:30265 1849:65535 1850:31761 1850:65535 1851:33283 1851:65535 1852:34827 1852:65535 1853:36393 1853:65535 1854:37987 1854:65535 1855:39605 1855:65535 1856:41247 1856:65535 1857:42917 1857:65535 1858:44609 1858:65535 1859:46333 1859:65535 1860:48079 1860:65535 1861:49855 1861:65535 1862:51657 1862:65535 1863:53489 1863:65535 1864:55347 1864:65535 1865:57239 1865:65535 1866:59155 1866:65535 1867:61103 1867:65535 1868:63083 1868:65535 1869:65093 1869:65535 1870:1598 1870:65535 1870:65535 1871:3670 1871:65535 1871:65535 1872:5778 1872:65535 1872:65535 1873:7914 1873:65535 1873:65535 1874:10086 1874:65535 1874:65535 1875:12292 1875:65535 1875:65535 1876:14532 1876:65535 1876:65535 1877:16808 1877:65535 1877:65535 1878:19118 1878:65535 1878:65535 1879:21464 1879:65535 1879:65535 1880:23846 1880:65535 1880:65535 1881:26270 1881:65535 1881:65535 1882:28726 1882:65535 1882:65535 1883:31224 1883:65535 1883:65535 1884:33758 1884:65535 1884:65535 1885:36336 1885:65535 1885:65535 1886:38950 1886:65535 1886:65535 1887:41606 1887:65535 1887:65535 1888:44306 1888:65535 1888:65535 1889:47046 1889:65535 1889:65535 1890:49828 1890:65535 1890:65535 1891:52654 1891:65535 1891:65535 1892:55526 1892:65535 1892:65535 1893:58442 1893:65535 1893:65535 1894:61402 1894:65535 1894:65535 1895:64410 1895:65535 1895:65535 1896:1929 1896:65535 1896:65535 1896:65535 1897:5031 1897:65535 1897:65535 1897:65535 1898:8181 1898:65535 1898:65535 1898:65535 1899:11381 1899:65535 1899:65535 1899:65535 1900:14633 1900:65535 1900:65535 1900:65535 1901:17931 1901:65535 1901:65535 1901:65535 1902:21283 1902:65535 1902:65535 1902:65535 1903:24689 1903:65535 1903:65535 1903:65535 1904:28147 1904:65535 1904:65535 1904:65535 1905:31657 1905:65535 1905:65535 1905:65535 1906:35225 1906:65535 1906:65535 1906:65535 1907:38847 1907:65535 1907:65535 1907:65535 1908:42525 1908:65535 1908:65535 1908:65535 1909:46263 1909:65535 1909:65535 1909:65535 1910:50057 1910:65535 1910:65535 1910:65535 1911:53911 1911:65535 1911:65535 1911:65535 1912:57825 1912:65535 1912:65535 1912:65535 1913:61801 1913:65535 1913:65535 1913:65535 1914:304 1914:65535 1914:65535 1914:65535 1914:65535 1915:4404 1915:65535 1915:65535 1915:65535 1915:65535 1916:8570 1916:65535 1916:65535 1916:65535 1916:65535 1917:12798 1917:65535 1917:65535 1917:65535 1917:65535 1918:17094 1918:65535 1918:65535 1918:65535 1918:65535 1919:21458 1919:65535 1919:65535 1919:65535 1919:65535 1920:25890 1920:65535 1920:65535 1920:65535 1920:65535 1921:30390 1921:65535 1921:65535 1921:65535 1921:65535 1922:34960 1922:65535 1922:65535 1922:65535 1922:65535 1923:39602 1923:65535 1923:65535 1923:65535 1923:65535 1924:44316 1924:65535 1924:65535 1924:65535 1924:65535 1925:49106 1925:65535 1925:65535 1925:65535 1925:65535 1926:53970 1926:65535 1926:65535 1926:65535 1926:65535 1927:58906 1927:65535 1927:65535 1927:65535 1927:65535 1928:63926 1928:65535 1928:65535 1928:65535 1928:65535 1929:3483 1929:65535 1929:65535 1929:65535 1929:65535 1929:65535 1930:8659 1930:65535 1930:65535 1930:65535 1930:65535 1930:65535 1931:13913 1931:65535 1931:65535 1931:65535 1931:65535 1931:65535 1932:34822" },
1472        ];
1473
1474        for case in cases {
1475            let mut sketch = AgentDDSketch::with_agent_defaults();
1476            assert!(sketch.is_empty());
1477
1478            sketch.insert_interpolate_bucket(case.lower, case.upper, case.count);
1479            check_result(&sketch, case);
1480        }
1481
1482        for case in double_insert_cases {
1483            let mut sketch = AgentDDSketch::with_agent_defaults();
1484            assert!(sketch.is_empty());
1485
1486            sketch.insert_interpolate_bucket(case.lower, case.upper, case.count);
1487            sketch.insert_interpolate_bucket(case.lower, case.upper, case.count);
1488
1489            let mut case = case.clone();
1490            case.count *= 2;
1491            check_result(&sketch, &case);
1492        }
1493    }
1494
1495    fn test_relative_accuracy(config: Config, rel_acc: f64, min_value: f32, max_value: f32) {
1496        let max_observed_rel_acc = check_max_relative_accuracy(config, min_value, max_value);
1497        assert!(
1498            max_observed_rel_acc <= rel_acc + FLOATING_POINT_ACCEPTABLE_ERROR,
1499            "observed out of bound max relative acc: {max_observed_rel_acc}, target rel acc={rel_acc}",
1500        );
1501    }
1502
1503    fn compute_relative_accuracy(target: f64, actual: f64) -> f64 {
1504        assert!(
1505            !(target < 0.0 || actual < 0.0),
1506            "expected/actual values must be greater than 0.0; target={target}, actual={actual}",
1507        );
1508
1509        if target == actual {
1510            0.0
1511        } else if target == 0.0 {
1512            if actual == 0.0 {
1513                0.0
1514            } else {
1515                f64::INFINITY
1516            }
1517        } else if actual < target {
1518            (target - actual) / target
1519        } else {
1520            (actual - target) / target
1521        }
1522    }
1523
1524    fn check_max_relative_accuracy(config: Config, min_value: f32, max_value: f32) -> f64 {
1525        assert!(
1526            min_value < max_value,
1527            "min_value must be less than max_value"
1528        );
1529
1530        let mut v = min_value;
1531        let mut max_relative_acc = 0.0;
1532        while v < max_value {
1533            let target = f64::from(v);
1534            let actual = config.bin_lower_bound(config.key(target));
1535
1536            let relative_acc = compute_relative_accuracy(target, actual);
1537            if relative_acc > max_relative_acc {
1538                max_relative_acc = relative_acc;
1539            }
1540
1541            v = f32::from_bits(v.to_bits() + 1);
1542        }
1543
1544        // Final iteration to make sure we hit the highest value.
1545        let actual = config.bin_lower_bound(config.key(f64::from(max_value)));
1546        let relative_acc = compute_relative_accuracy(f64::from(max_value), actual);
1547        if relative_acc > max_relative_acc {
1548            max_relative_acc = relative_acc;
1549        }
1550
1551        max_relative_acc
1552    }
1553
1554    #[test]
1555    fn test_round_to_even() {
1556        let alike = |a: f64, b: f64| -> bool {
1557            if a.is_nan() && b.is_nan() {
1558                true
1559            } else if a == b {
1560                a.is_sign_positive() == b.is_sign_positive()
1561            } else {
1562                false
1563            }
1564        };
1565
1566        #[allow(clippy::unreadable_literal)]
1567        let test_cases = &[
1568            (f64::NAN, f64::NAN),
1569            (0.5000000000000001, 1.0), // 0.5+epsilon
1570            (0.0, 0.0),
1571            (1.390671161567e-309, 0.0), // denormal
1572            (0.49999999999999994, 0.0), // 0.5-epsilon
1573            (0.5, 0.0),
1574            (-1.5, -2.0),
1575            (-2.5, -2.0),
1576            (f64::INFINITY, f64::INFINITY),
1577            (2251799813685249.5, 2251799813685250.0), // 1 bit fraction
1578            (2251799813685250.5, 2251799813685250.0),
1579            (4503599627370495.5, 4503599627370496.0), // 1 bit fraction, rounding to 0 bit fraction
1580            (4503599627370497.0, 4503599627370497.0), // large integer
1581        ];
1582
1583        for (input, expected) in test_cases {
1584            let actual = round_to_even(*input);
1585            assert!(
1586                alike(actual, *expected),
1587                "input -> {}, expected {}, got {}",
1588                *input,
1589                *expected,
1590                actual
1591            );
1592        }
1593    }
1594}