vector/sinks/datadog/traces/apm_stats/
bucket.rs1use std::collections::BTreeMap;
2
3use prost::Message;
4
5use super::{
6 aggregation::{AggregationKey, PayloadAggregationKey},
7 ddsketch_full, ClientGroupedStats, ClientStatsBucket,
8};
9use crate::{event::ObjectMap, event::Value, metrics::AgentDDSketch};
10
11pub(crate) struct GroupedStats {
12 hits: f64,
13 top_level_hits: f64,
14 errors: f64,
15 duration: f64,
16 ok_distribution: AgentDDSketch,
17 err_distribution: AgentDDSketch,
18}
19
20impl GroupedStats {
21 fn new() -> Self {
22 GroupedStats {
23 hits: 0.0,
24 top_level_hits: 0.0,
25 errors: 0.0,
26 duration: 0.0,
27 ok_distribution: AgentDDSketch::with_agent_defaults(),
28 err_distribution: AgentDDSketch::with_agent_defaults(),
29 }
30 }
31
32 fn export(&self, key: &AggregationKey) -> ClientGroupedStats {
33 ClientGroupedStats {
34 service: key.bucket_key.service.clone(),
35 name: key.bucket_key.name.clone(),
36 resource: key.bucket_key.resource.clone(),
37 http_status_code: key.bucket_key.status_code,
38 r#type: key.bucket_key.ty.clone(),
39 db_type: "".to_string(),
40 hits: self.hits.round() as u64,
41 errors: self.errors.round() as u64,
42 duration: self.duration.round() as u64,
43 ok_summary: encode_sketch(&self.ok_distribution),
44 error_summary: encode_sketch(&self.err_distribution),
45 synthetics: key.bucket_key.synthetics,
46 top_level_hits: self.top_level_hits.round() as u64,
47 }
48 }
49}
50
51fn encode_sketch(agent_sketch: &AgentDDSketch) -> Vec<u8> {
53 let index_mapping = ddsketch_full::IndexMapping {
55 gamma: agent_sketch.gamma(),
57 index_offset: agent_sketch.bin_index_offset() as f64,
59 interpolation: ddsketch_full::index_mapping::Interpolation::None as i32,
62 };
63
64 let (positives, negatives, zeroes) = convert_stores(agent_sketch);
68 let positives_store = ddsketch_full::Store {
69 bin_counts: positives,
70 contiguous_bin_counts: Vec::new(), contiguous_bin_index_offset: 0, };
73 let negatives_store = ddsketch_full::Store {
74 bin_counts: negatives,
75 contiguous_bin_counts: Vec::new(), contiguous_bin_index_offset: 0, };
78 ddsketch_full::DdSketch {
79 mapping: Some(index_mapping),
80 positive_values: Some(positives_store),
81 negative_values: Some(negatives_store),
82 zero_count: zeroes,
83 }
84 .encode_to_vec()
85}
86
87fn convert_stores(agent_sketch: &AgentDDSketch) -> (BTreeMap<i32, f64>, BTreeMap<i32, f64>, f64) {
90 let mut positives = BTreeMap::<i32, f64>::new();
91 let mut negatives = BTreeMap::<i32, f64>::new();
92 let mut zeroes = 0.0;
93 let bin_map = agent_sketch.bin_map();
94 bin_map
95 .keys
96 .into_iter()
97 .zip(bin_map.counts)
98 .for_each(|(k, n)| {
99 match k.signum() {
100 0 => zeroes = n as f64,
101 1 => {
102 positives.insert(k as i32, n as f64);
103 }
104 -1 => {
105 negatives.insert((-k) as i32, n as f64);
106 }
107 _ => {}
108 };
109 });
110 (positives, negatives, zeroes)
111}
112
113pub(crate) struct Bucket {
115 pub(crate) start: u64,
116 pub(crate) duration: u64,
117 pub(crate) data: BTreeMap<AggregationKey, GroupedStats>,
118}
119
120impl Bucket {
121 pub(crate) fn export(&self) -> BTreeMap<PayloadAggregationKey, ClientStatsBucket> {
122 let mut m = BTreeMap::<PayloadAggregationKey, ClientStatsBucket>::new();
123 self.data.iter().for_each(|(k, v)| {
124 let b = v.export(k);
125 match m.get_mut(&k.payload_key) {
126 None => {
127 let sb = ClientStatsBucket {
128 start: self.start,
129 duration: self.duration,
130 agent_time_shift: 0,
131 stats: vec![b],
132 };
133 m.insert(k.payload_key.clone(), sb);
134 }
135 Some(s) => {
136 s.stats.push(b);
137 }
138 };
139 });
140 m
141 }
142
143 pub(crate) fn add(
144 &mut self,
145 span: &ObjectMap,
146 weight: f64,
147 is_top: bool,
148 aggkey: AggregationKey,
149 ) {
150 match self.data.get_mut(&aggkey) {
151 Some(gs) => Bucket::update(span, weight, is_top, gs),
152 None => {
153 let mut gs = GroupedStats::new();
154 Bucket::update(span, weight, is_top, &mut gs);
155 self.data.insert(aggkey, gs);
156 }
157 }
158 }
159
160 fn update(span: &ObjectMap, weight: f64, is_top: bool, gs: &mut GroupedStats) {
163 is_top.then(|| {
164 gs.top_level_hits += weight;
165 });
166 gs.hits += weight;
167 let error = match span.get("error") {
168 Some(Value::Integer(val)) => *val,
169 None => 0,
170 _ => panic!("`error` should be an i64"),
171 };
172 if error != 0 {
173 gs.errors += weight;
174 }
175 let duration = match span.get("duration") {
176 Some(Value::Integer(val)) => *val,
177 None => 0,
178 _ => panic!("`duration` should be an i64"),
179 };
180 gs.duration += (duration as f64) * weight;
181 if error != 0 {
182 gs.err_distribution.insert(duration as f64)
183 } else {
184 gs.ok_distribution.insert(duration as f64)
185 }
186 }
187}