vector/sinks/datadog/traces/apm_stats/
bucket.rs

1use std::collections::BTreeMap;
2
3use prost::Message;
4
5use super::{
6    ClientGroupedStats, ClientStatsBucket,
7    aggregation::{AggregationKey, PayloadAggregationKey},
8    ddsketch_full,
9};
10use crate::{
11    event::{ObjectMap, Value},
12    metrics::AgentDDSketch,
13};
14
15pub(crate) struct GroupedStats {
16    hits: f64,
17    top_level_hits: f64,
18    errors: f64,
19    duration: f64,
20    ok_distribution: AgentDDSketch,
21    err_distribution: AgentDDSketch,
22}
23
24impl GroupedStats {
25    fn new() -> Self {
26        GroupedStats {
27            hits: 0.0,
28            top_level_hits: 0.0,
29            errors: 0.0,
30            duration: 0.0,
31            ok_distribution: AgentDDSketch::with_agent_defaults(),
32            err_distribution: AgentDDSketch::with_agent_defaults(),
33        }
34    }
35
36    fn export(&self, key: &AggregationKey) -> ClientGroupedStats {
37        ClientGroupedStats {
38            service: key.bucket_key.service.clone(),
39            name: key.bucket_key.name.clone(),
40            resource: key.bucket_key.resource.clone(),
41            http_status_code: key.bucket_key.status_code,
42            r#type: key.bucket_key.ty.clone(),
43            db_type: "".to_string(),
44            hits: self.hits.round() as u64,
45            errors: self.errors.round() as u64,
46            duration: self.duration.round() as u64,
47            ok_summary: encode_sketch(&self.ok_distribution),
48            error_summary: encode_sketch(&self.err_distribution),
49            synthetics: key.bucket_key.synthetics,
50            top_level_hits: self.top_level_hits.round() as u64,
51        }
52    }
53}
54
55/// Convert agent sketch variant to ./proto/dd_sketch_full.proto
56fn encode_sketch(agent_sketch: &AgentDDSketch) -> Vec<u8> {
57    // AgentDDSketch partitions the set of real numbers into intervals like [gamma^(n), gamma^(n+1)[,
58    let index_mapping = ddsketch_full::IndexMapping {
59        // This is the gamma value used to build the aforementioned partition scheme
60        gamma: agent_sketch.gamma(),
61        // This offset is applied to the powers of gamma to adjust sketch accuracy
62        index_offset: agent_sketch.bin_index_offset() as f64,
63        // Interpolation::None is the interpolation type as there is no interpolation when using the
64        // aforementioned partition scheme
65        interpolation: ddsketch_full::index_mapping::Interpolation::None as i32,
66    };
67
68    // zeroes depicts the number of values that fell around zero based on the sketch local accuracy
69    // positives and negatives stores are respectively storing positive and negative values using the
70    // exact same mechanism.
71    let (positives, negatives, zeroes) = convert_stores(agent_sketch);
72    let positives_store = ddsketch_full::Store {
73        bin_counts: positives,
74        contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None)
75        contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None)
76    };
77    let negatives_store = ddsketch_full::Store {
78        bin_counts: negatives,
79        contiguous_bin_counts: Vec::new(), // Empty as this not used for the current interpolation (Interpolation::None)
80        contiguous_bin_index_offset: 0, // Empty as this not used for the current interpolation (Interpolation::None)
81    };
82    ddsketch_full::DdSketch {
83        mapping: Some(index_mapping),
84        positive_values: Some(positives_store),
85        negative_values: Some(negatives_store),
86        zero_count: zeroes,
87    }
88    .encode_to_vec()
89}
90
91/// Split negative and positive values from an AgentDDSketch, also extract the number of values
92/// that were accounted as 0.0.
93fn convert_stores(agent_sketch: &AgentDDSketch) -> (BTreeMap<i32, f64>, BTreeMap<i32, f64>, f64) {
94    let mut positives = BTreeMap::<i32, f64>::new();
95    let mut negatives = BTreeMap::<i32, f64>::new();
96    let mut zeroes = 0.0;
97    let bin_map = agent_sketch.bin_map();
98    bin_map
99        .keys
100        .into_iter()
101        .zip(bin_map.counts)
102        .for_each(|(k, n)| {
103            match k.signum() {
104                0 => zeroes = n as f64,
105                1 => {
106                    positives.insert(k as i32, n as f64);
107                }
108                -1 => {
109                    negatives.insert((-k) as i32, n as f64);
110                }
111                _ => {}
112            };
113        });
114    (positives, negatives, zeroes)
115}
116
117/// Stores statistics for various `AggregationKey` in a given time window
118pub(crate) struct Bucket {
119    pub(crate) start: u64,
120    pub(crate) duration: u64,
121    pub(crate) data: BTreeMap<AggregationKey, GroupedStats>,
122}
123
124impl Bucket {
125    pub(crate) fn export(&self) -> BTreeMap<PayloadAggregationKey, ClientStatsBucket> {
126        let mut m = BTreeMap::<PayloadAggregationKey, ClientStatsBucket>::new();
127        self.data.iter().for_each(|(k, v)| {
128            let b = v.export(k);
129            match m.get_mut(&k.payload_key) {
130                None => {
131                    let sb = ClientStatsBucket {
132                        start: self.start,
133                        duration: self.duration,
134                        agent_time_shift: 0,
135                        stats: vec![b],
136                    };
137                    m.insert(k.payload_key.clone(), sb);
138                }
139                Some(s) => {
140                    s.stats.push(b);
141                }
142            };
143        });
144        m
145    }
146
147    pub(crate) fn add(
148        &mut self,
149        span: &ObjectMap,
150        weight: f64,
151        is_top: bool,
152        aggkey: AggregationKey,
153    ) {
154        match self.data.get_mut(&aggkey) {
155            Some(gs) => Bucket::update(span, weight, is_top, gs),
156            None => {
157                let mut gs = GroupedStats::new();
158                Bucket::update(span, weight, is_top, &mut gs);
159                self.data.insert(aggkey, gs);
160            }
161        }
162    }
163
164    /// Update a bucket with a new span. Computed statistics include the number of hits and the actual distribution of
165    /// execution time, with isolated measurements for spans flagged as errored and spans without error.
166    fn update(span: &ObjectMap, weight: f64, is_top: bool, gs: &mut GroupedStats) {
167        is_top.then(|| {
168            gs.top_level_hits += weight;
169        });
170        gs.hits += weight;
171        let error = match span.get("error") {
172            Some(Value::Integer(val)) => *val,
173            None => 0,
174            _ => panic!("`error` should be an i64"),
175        };
176        if error != 0 {
177            gs.errors += weight;
178        }
179        let duration = match span.get("duration") {
180            Some(Value::Integer(val)) => *val,
181            None => 0,
182            _ => panic!("`duration` should be an i64"),
183        };
184        gs.duration += (duration as f64) * weight;
185        if error != 0 {
186            gs.err_distribution.insert(duration as f64)
187        } else {
188            gs.ok_distribution.insert(duration as f64)
189        }
190    }
191}