vector/sinks/datadog/traces/apm_stats/
bucket.rs

1use std::collections::BTreeMap;
2
3use prost::Message;
4
5use super::{
6    aggregation::{AggregationKey, PayloadAggregationKey},
7    ddsketch_full, ClientGroupedStats, ClientStatsBucket,
8};
9use crate::{event::ObjectMap, event::Value, metrics::AgentDDSketch};
10
11pub(crate) struct GroupedStats {
12    hits: f64,
13    top_level_hits: f64,
14    errors: f64,
15    duration: f64,
16    ok_distribution: AgentDDSketch,
17    err_distribution: AgentDDSketch,
18}
19
20impl GroupedStats {
21    fn new() -> Self {
22        GroupedStats {
23            hits: 0.0,
24            top_level_hits: 0.0,
25            errors: 0.0,
26            duration: 0.0,
27            ok_distribution: AgentDDSketch::with_agent_defaults(),
28            err_distribution: AgentDDSketch::with_agent_defaults(),
29        }
30    }
31
32    fn export(&self, key: &AggregationKey) -> ClientGroupedStats {
33        ClientGroupedStats {
34            service: key.bucket_key.service.clone(),
35            name: key.bucket_key.name.clone(),
36            resource: key.bucket_key.resource.clone(),
37            http_status_code: key.bucket_key.status_code,
38            r#type: key.bucket_key.ty.clone(),
39            db_type: "".to_string(),
40            hits: self.hits.round() as u64,
41            errors: self.errors.round() as u64,
42            duration: self.duration.round() as u64,
43            ok_summary: encode_sketch(&self.ok_distribution),
44            error_summary: encode_sketch(&self.err_distribution),
45            synthetics: key.bucket_key.synthetics,
46            top_level_hits: self.top_level_hits.round() as u64,
47        }
48    }
49}
50
51/// Convert agent sketch variant to ./proto/dd_sketch_full.proto
52fn encode_sketch(agent_sketch: &AgentDDSketch) -> Vec<u8> {
53    // AgentDDSketch partitions the set of real numbers into intervals like [gamma^(n), gamma^(n+1)[,
54    let index_mapping = ddsketch_full::IndexMapping {
55        // This is the gamma value used to build the aforementioned partition scheme
56        gamma: agent_sketch.gamma(),
57        // This offset is applied to the powers of gamma to adjust sketch accuracy
58        index_offset: agent_sketch.bin_index_offset() as f64,
59        // Interpolation::None is the interpolation type as there is no interpolation when using the
60        // aforementioned partition scheme
61        interpolation: ddsketch_full::index_mapping::Interpolation::None as i32,
62    };
63
64    // zeroes depicts the number of values that fell around zero based on the sketch local accuracy
65    // positives and negatives stores are respectively storing positive and negative values using the
66    // exact same mechanism.
67    let (positives, negatives, zeroes) = convert_stores(agent_sketch);
68    let positives_store = ddsketch_full::Store {
69        bin_counts: positives,
70        contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None)
71        contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None)
72    };
73    let negatives_store = ddsketch_full::Store {
74        bin_counts: negatives,
75        contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None)
76        contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None)
77    };
78    ddsketch_full::DdSketch {
79        mapping: Some(index_mapping),
80        positive_values: Some(positives_store),
81        negative_values: Some(negatives_store),
82        zero_count: zeroes,
83    }
84    .encode_to_vec()
85}
86
87/// Split negative and positive values from an AgentDDSketch, also extract the number of values
88/// that were accounted as 0.0.
89fn convert_stores(agent_sketch: &AgentDDSketch) -> (BTreeMap<i32, f64>, BTreeMap<i32, f64>, f64) {
90    let mut positives = BTreeMap::<i32, f64>::new();
91    let mut negatives = BTreeMap::<i32, f64>::new();
92    let mut zeroes = 0.0;
93    let bin_map = agent_sketch.bin_map();
94    bin_map
95        .keys
96        .into_iter()
97        .zip(bin_map.counts)
98        .for_each(|(k, n)| {
99            match k.signum() {
100                0 => zeroes = n as f64,
101                1 => {
102                    positives.insert(k as i32, n as f64);
103                }
104                -1 => {
105                    negatives.insert((-k) as i32, n as f64);
106                }
107                _ => {}
108            };
109        });
110    (positives, negatives, zeroes)
111}
112
113/// Stores statistics for various `AggregationKey` in a given time window
114pub(crate) struct Bucket {
115    pub(crate) start: u64,
116    pub(crate) duration: u64,
117    pub(crate) data: BTreeMap<AggregationKey, GroupedStats>,
118}
119
120impl Bucket {
121    pub(crate) fn export(&self) -> BTreeMap<PayloadAggregationKey, ClientStatsBucket> {
122        let mut m = BTreeMap::<PayloadAggregationKey, ClientStatsBucket>::new();
123        self.data.iter().for_each(|(k, v)| {
124            let b = v.export(k);
125            match m.get_mut(&k.payload_key) {
126                None => {
127                    let sb = ClientStatsBucket {
128                        start: self.start,
129                        duration: self.duration,
130                        agent_time_shift: 0,
131                        stats: vec![b],
132                    };
133                    m.insert(k.payload_key.clone(), sb);
134                }
135                Some(s) => {
136                    s.stats.push(b);
137                }
138            };
139        });
140        m
141    }
142
143    pub(crate) fn add(
144        &mut self,
145        span: &ObjectMap,
146        weight: f64,
147        is_top: bool,
148        aggkey: AggregationKey,
149    ) {
150        match self.data.get_mut(&aggkey) {
151            Some(gs) => Bucket::update(span, weight, is_top, gs),
152            None => {
153                let mut gs = GroupedStats::new();
154                Bucket::update(span, weight, is_top, &mut gs);
155                self.data.insert(aggkey, gs);
156            }
157        }
158    }
159
160    /// Update a bucket with a new span. Computed statistics include the number of hits and the actual distribution of
161    /// execution time, with isolated measurements for spans flagged as errored and spans without error.
162    fn update(span: &ObjectMap, weight: f64, is_top: bool, gs: &mut GroupedStats) {
163        is_top.then(|| {
164            gs.top_level_hits += weight;
165        });
166        gs.hits += weight;
167        let error = match span.get("error") {
168            Some(Value::Integer(val)) => *val,
169            None => 0,
170            _ => panic!("`error` should be an i64"),
171        };
172        if error != 0 {
173            gs.errors += weight;
174        }
175        let duration = match span.get("duration") {
176            Some(Value::Integer(val)) => *val,
177            None => 0,
178            _ => panic!("`duration` should be an i64"),
179        };
180        gs.duration += (duration as f64) * weight;
181        if error != 0 {
182            gs.err_distribution.insert(duration as f64)
183        } else {
184            gs.ok_distribution.insert(duration as f64)
185        }
186    }
187}