vector/sinks/datadog/traces/apm_stats/
bucket.rs1use std::collections::BTreeMap;
2
3use prost::Message;
4
5use super::{
6 ClientGroupedStats, ClientStatsBucket,
7 aggregation::{AggregationKey, PayloadAggregationKey},
8 ddsketch_full,
9};
10use crate::{
11 event::{ObjectMap, Value},
12 metrics::AgentDDSketch,
13};
14
15pub(crate) struct GroupedStats {
16 hits: f64,
17 top_level_hits: f64,
18 errors: f64,
19 duration: f64,
20 ok_distribution: AgentDDSketch,
21 err_distribution: AgentDDSketch,
22}
23
24impl GroupedStats {
25 fn new() -> Self {
26 GroupedStats {
27 hits: 0.0,
28 top_level_hits: 0.0,
29 errors: 0.0,
30 duration: 0.0,
31 ok_distribution: AgentDDSketch::with_agent_defaults(),
32 err_distribution: AgentDDSketch::with_agent_defaults(),
33 }
34 }
35
36 fn export(&self, key: &AggregationKey) -> ClientGroupedStats {
37 ClientGroupedStats {
38 service: key.bucket_key.service.clone(),
39 name: key.bucket_key.name.clone(),
40 resource: key.bucket_key.resource.clone(),
41 http_status_code: key.bucket_key.status_code,
42 r#type: key.bucket_key.ty.clone(),
43 db_type: "".to_string(),
44 hits: self.hits.round() as u64,
45 errors: self.errors.round() as u64,
46 duration: self.duration.round() as u64,
47 ok_summary: encode_sketch(&self.ok_distribution),
48 error_summary: encode_sketch(&self.err_distribution),
49 synthetics: key.bucket_key.synthetics,
50 top_level_hits: self.top_level_hits.round() as u64,
51 }
52 }
53}
54
55fn encode_sketch(agent_sketch: &AgentDDSketch) -> Vec<u8> {
57 let index_mapping = ddsketch_full::IndexMapping {
59 gamma: agent_sketch.gamma(),
61 index_offset: agent_sketch.bin_index_offset() as f64,
63 interpolation: ddsketch_full::index_mapping::Interpolation::None as i32,
66 };
67
68 let (positives, negatives, zeroes) = convert_stores(agent_sketch);
72 let positives_store = ddsketch_full::Store {
73 bin_counts: positives,
74 contiguous_bin_counts: Vec::new(), contiguous_bin_index_offset: 0, };
77 let negatives_store = ddsketch_full::Store {
78 bin_counts: negatives,
79 contiguous_bin_counts: Vec::new(), contiguous_bin_index_offset: 0, };
82 ddsketch_full::DdSketch {
83 mapping: Some(index_mapping),
84 positive_values: Some(positives_store),
85 negative_values: Some(negatives_store),
86 zero_count: zeroes,
87 }
88 .encode_to_vec()
89}
90
91fn convert_stores(agent_sketch: &AgentDDSketch) -> (BTreeMap<i32, f64>, BTreeMap<i32, f64>, f64) {
94 let mut positives = BTreeMap::<i32, f64>::new();
95 let mut negatives = BTreeMap::<i32, f64>::new();
96 let mut zeroes = 0.0;
97 let bin_map = agent_sketch.bin_map();
98 bin_map
99 .keys
100 .into_iter()
101 .zip(bin_map.counts)
102 .for_each(|(k, n)| {
103 match k.signum() {
104 0 => zeroes = n as f64,
105 1 => {
106 positives.insert(k as i32, n as f64);
107 }
108 -1 => {
109 negatives.insert((-k) as i32, n as f64);
110 }
111 _ => {}
112 };
113 });
114 (positives, negatives, zeroes)
115}
116
117pub(crate) struct Bucket {
119 pub(crate) start: u64,
120 pub(crate) duration: u64,
121 pub(crate) data: BTreeMap<AggregationKey, GroupedStats>,
122}
123
124impl Bucket {
125 pub(crate) fn export(&self) -> BTreeMap<PayloadAggregationKey, ClientStatsBucket> {
126 let mut m = BTreeMap::<PayloadAggregationKey, ClientStatsBucket>::new();
127 self.data.iter().for_each(|(k, v)| {
128 let b = v.export(k);
129 match m.get_mut(&k.payload_key) {
130 None => {
131 let sb = ClientStatsBucket {
132 start: self.start,
133 duration: self.duration,
134 agent_time_shift: 0,
135 stats: vec![b],
136 };
137 m.insert(k.payload_key.clone(), sb);
138 }
139 Some(s) => {
140 s.stats.push(b);
141 }
142 };
143 });
144 m
145 }
146
147 pub(crate) fn add(
148 &mut self,
149 span: &ObjectMap,
150 weight: f64,
151 is_top: bool,
152 aggkey: AggregationKey,
153 ) {
154 match self.data.get_mut(&aggkey) {
155 Some(gs) => Bucket::update(span, weight, is_top, gs),
156 None => {
157 let mut gs = GroupedStats::new();
158 Bucket::update(span, weight, is_top, &mut gs);
159 self.data.insert(aggkey, gs);
160 }
161 }
162 }
163
164 fn update(span: &ObjectMap, weight: f64, is_top: bool, gs: &mut GroupedStats) {
167 is_top.then(|| {
168 gs.top_level_hits += weight;
169 });
170 gs.hits += weight;
171 let error = match span.get("error") {
172 Some(Value::Integer(val)) => *val,
173 None => 0,
174 _ => panic!("`error` should be an i64"),
175 };
176 if error != 0 {
177 gs.errors += weight;
178 }
179 let duration = match span.get("duration") {
180 Some(Value::Integer(val)) => *val,
181 None => 0,
182 _ => panic!("`duration` should be an i64"),
183 };
184 gs.duration += (duration as f64) * weight;
185 if error != 0 {
186 gs.err_distribution.insert(duration as f64)
187 } else {
188 gs.ok_distribution.insert(duration as f64)
189 }
190 }
191}