vector/sinks/datadog/traces/apm_stats/
aggregation.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use chrono::Utc;
4use vrl::event_path;
5
6use super::{
7 bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey,
8 BUCKET_DURATION_NANOSECONDS,
9};
10use crate::event::{ObjectMap, TraceEvent, Value};
11
12const MEASURED_KEY: &str = "_dd.measured";
13const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
14const TAG_STATUS_CODE: &str = "http.status_code";
15const TAG_SYNTHETICS: &str = "synthetics";
16const TOP_LEVEL_KEY: &str = "_top_level";
17
18const BUCKET_WINDOW_LEN: u64 = 2;
20
21#[derive(PartialEq, Eq, PartialOrd, Ord)]
22pub(crate) struct AggregationKey {
23 pub(crate) payload_key: PayloadAggregationKey,
24 pub(crate) bucket_key: BucketAggregationKey,
25}
26
27impl AggregationKey {
28 fn new_aggregation_from_span(
29 span: &ObjectMap,
30 payload_key: PayloadAggregationKey,
31 synthetics: bool,
32 ) -> Self {
33 AggregationKey {
34 payload_key: payload_key.with_span_context(span),
35 bucket_key: BucketAggregationKey {
36 service: span
37 .get("service")
38 .map(|v| v.to_string_lossy().into_owned())
39 .unwrap_or_default(),
40 name: span
41 .get("name")
42 .map(|v| v.to_string_lossy().into_owned())
43 .unwrap_or_default(),
44 resource: span
45 .get("resource")
46 .map(|v| v.to_string_lossy().into_owned())
47 .unwrap_or_default(),
48 ty: span
49 .get("type")
50 .map(|v| v.to_string_lossy().into_owned())
51 .unwrap_or_default(),
52 status_code: span
53 .get("meta")
54 .and_then(|m| m.as_object())
55 .and_then(|m| m.get(TAG_STATUS_CODE))
56 .and_then(|s| s.to_string_lossy().parse::<u32>().ok())
58 .unwrap_or_default(),
59 synthetics,
60 },
61 }
62 }
63}
64
65#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
66pub(crate) struct PayloadAggregationKey {
67 pub(crate) env: String,
68 pub(crate) hostname: String,
69 pub(crate) version: String,
70 pub(crate) container_id: String,
71}
72
73impl PayloadAggregationKey {
74 fn with_span_context(self, span: &ObjectMap) -> Self {
75 PayloadAggregationKey {
76 env: span
77 .get("meta")
78 .and_then(|m| m.as_object())
79 .and_then(|m| m.get("env"))
80 .map(|s| s.to_string_lossy().into_owned())
81 .unwrap_or(self.env),
82 hostname: self.hostname,
83 version: self.version,
84 container_id: self.container_id,
85 }
86 }
87}
88
89#[derive(PartialEq, Eq, PartialOrd, Ord)]
90pub(crate) struct BucketAggregationKey {
91 pub(crate) service: String,
92 pub(crate) name: String,
93 pub(crate) resource: String,
94 pub(crate) ty: String,
95 pub(crate) status_code: u32,
96 pub(crate) synthetics: bool,
97}
98
99pub struct Aggregator {
100 buckets: BTreeMap<u64, Bucket>,
103
104 oldest_timestamp: u64,
106
107 agent_env: Option<String>,
109
110 agent_hostname: Option<String>,
112
113 agent_version: Option<String>,
115
116 api_key: Option<Arc<str>>,
118
119 default_api_key: Arc<str>,
121}
122
123impl Aggregator {
124 pub fn new(default_api_key: Arc<str>) -> Self {
125 Self {
126 buckets: BTreeMap::new(),
127 oldest_timestamp: align_timestamp(
128 Utc::now()
129 .timestamp_nanos_opt()
130 .expect("Timestamp out of range") as u64,
131 ),
132 default_api_key,
133 agent_env: None,
135 agent_hostname: None,
136 agent_version: None,
137 api_key: None,
138 }
139 }
140
141 pub(crate) fn update_agent_properties(&mut self, partition_key: &PartitionKey) {
143 if self.agent_env.is_none() {
144 if let Some(env) = &partition_key.env {
145 self.agent_env = Some(env.clone());
146 }
147 }
148 if self.agent_hostname.is_none() {
149 if let Some(hostname) = &partition_key.hostname {
150 self.agent_hostname = Some(hostname.clone());
151 }
152 }
153 if self.agent_version.is_none() {
154 if let Some(version) = &partition_key.agent_version {
155 self.agent_version = Some(version.clone());
156 }
157 }
158 if self.api_key.is_none() {
159 if let Some(api_key) = &partition_key.api_key {
160 self.api_key = Some(Arc::<str>::clone(api_key));
161 }
162 }
163 }
164
165 pub(crate) fn get_agent_env(&self) -> String {
166 self.agent_env.clone().unwrap_or_default()
167 }
168
169 pub(crate) fn get_agent_hostname(&self) -> String {
170 self.agent_hostname.clone().unwrap_or_default()
171 }
172
173 pub(crate) fn get_agent_version(&self) -> String {
174 self.agent_version.clone().unwrap_or_default()
175 }
176
177 pub(crate) fn get_api_key(&self) -> Arc<str> {
178 self.api_key
179 .clone()
180 .unwrap_or_else(|| Arc::clone(&self.default_api_key))
181 }
182
183 pub(crate) fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) {
185 let spans = match trace.get(event_path!("spans")) {
188 Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(),
189 _ => vec![],
190 };
191
192 let weight = super::weight::extract_weight_from_root_span(&spans);
193 let payload_aggkey = PayloadAggregationKey {
194 env: partition_key.env.clone().unwrap_or_default(),
195 hostname: partition_key.hostname.clone().unwrap_or_default(),
196 version: trace
197 .get(event_path!("app_version"))
198 .map(|v| v.to_string_lossy().into_owned())
199 .unwrap_or_default(),
200 container_id: trace
201 .get(event_path!("container_id"))
202 .map(|v| v.to_string_lossy().into_owned())
203 .unwrap_or_default(),
204 };
205 let synthetics = trace
206 .get(event_path!("origin"))
207 .map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS))
208 .unwrap_or(false);
209
210 spans.iter().for_each(|span| {
211 let is_top = has_top_level(span);
212 if !(is_top || is_measured(span) || is_partial_snapshot(span)) {
213 return;
214 }
215
216 self.handle_span(span, weight, is_top, synthetics, payload_aggkey.clone());
217 });
218 }
219
220 fn handle_span(
223 &mut self,
224 span: &ObjectMap,
225 weight: f64,
226 is_top: bool,
227 synthetics: bool,
228 payload_aggkey: PayloadAggregationKey,
229 ) {
230 let aggkey = AggregationKey::new_aggregation_from_span(span, payload_aggkey, synthetics);
233
234 let start = match span.get("start") {
235 Some(Value::Timestamp(val)) => {
236 val.timestamp_nanos_opt().expect("Timestamp out of range") as u64
237 }
238 _ => Utc::now()
239 .timestamp_nanos_opt()
240 .expect("Timestamp out of range") as u64,
241 };
242
243 let duration = match span.get("duration") {
244 Some(Value::Integer(val)) => *val as u64,
245 None => 0,
246 _ => panic!("`duration` should be an i64"),
247 };
248
249 let end = start + duration;
250
251 let mut btime = align_timestamp(end);
253
254 if btime < self.oldest_timestamp {
256 btime = self.oldest_timestamp
257 }
258
259 match self.buckets.get_mut(&btime) {
260 Some(b) => {
261 b.add(span, weight, is_top, aggkey);
262 }
263 None => {
264 let mut b = Bucket {
265 start: btime,
266 duration: BUCKET_DURATION_NANOSECONDS,
267 data: BTreeMap::new(),
268 };
269 b.add(span, weight, is_top, aggkey);
270
271 debug!("Created {} start_time bucket.", btime);
272 self.buckets.insert(btime, b);
273 }
274 }
275 }
276
277 pub(crate) fn flush(&mut self, force: bool) -> Vec<ClientStatsPayload> {
285 let now = Utc::now()
289 .timestamp_nanos_opt()
290 .expect("Timestamp out of range") as u64;
291
292 let flush_cutoff_time = if force {
293 now
295 } else {
296 now - (BUCKET_DURATION_NANOSECONDS * BUCKET_WINDOW_LEN)
298 };
299
300 let client_stats_payloads = self.get_client_stats_payloads(flush_cutoff_time);
301
302 let new_oldest_ts =
305 align_timestamp(now) - ((BUCKET_WINDOW_LEN - 1) * BUCKET_DURATION_NANOSECONDS);
306
307 if new_oldest_ts > self.oldest_timestamp {
308 debug!("Updated oldest_timestamp to {}.", new_oldest_ts);
309 self.oldest_timestamp = new_oldest_ts;
310 }
311
312 client_stats_payloads
313 }
314
315 fn get_client_stats_payloads(&mut self, flush_cutoff_time: u64) -> Vec<ClientStatsPayload> {
321 let client_stats_buckets = self.export_buckets(flush_cutoff_time);
322
323 client_stats_buckets
324 .into_iter()
325 .map(|(payload_aggkey, csb)| {
326 ClientStatsPayload {
327 env: payload_aggkey.env,
328 hostname: payload_aggkey.hostname,
329 container_id: payload_aggkey.container_id,
330 version: payload_aggkey.version,
331 stats: csb,
332 service: "".to_string(),
335 agent_aggregation: "".to_string(),
336 sequence: 0,
337 runtime_id: "".to_string(),
338 lang: "".to_string(),
339 tracer_version: "".to_string(),
340 tags: vec![],
341 }
342 })
343 .collect::<Vec<ClientStatsPayload>>()
344 }
345
346 fn export_buckets(
352 &mut self,
353 flush_cutoff_time: u64,
354 ) -> BTreeMap<PayloadAggregationKey, Vec<ClientStatsBucket>> {
355 let mut m = BTreeMap::<PayloadAggregationKey, Vec<ClientStatsBucket>>::new();
356
357 self.buckets.retain(|&bucket_start, bucket| {
358 let retain = bucket_start > flush_cutoff_time;
359
360 if !retain {
361 debug!("Flushing {} start_time bucket.", bucket_start);
362
363 bucket.export().into_iter().for_each(|(payload_key, csb)| {
364 match m.get_mut(&payload_key) {
365 None => {
366 m.insert(payload_key.clone(), vec![csb]);
367 }
368 Some(s) => {
369 s.push(csb);
370 }
371 };
372 })
373 }
374 retain
375 });
376
377 m
378 }
379}
380
381const fn align_timestamp(start: u64) -> u64 {
384 start - (start % BUCKET_DURATION_NANOSECONDS)
386}
387
388fn get_metric_value_float(span: &ObjectMap, key: &str) -> Option<f64> {
391 span.get("metrics")
392 .and_then(|m| m.as_object())
393 .map(|m| match m.get(key) {
394 Some(Value::Float(f)) => Some(f.into_inner()),
395 None => None,
396 _ => panic!("`metric` values should be all be f64"),
397 })
398 .unwrap_or(None)
399}
400
401fn metric_value_is_1(span: &ObjectMap, key: &str) -> bool {
403 match get_metric_value_float(span, key) {
404 Some(f) => f == 1.0,
405 None => false,
406 }
407}
408
409fn has_top_level(span: &ObjectMap) -> bool {
411 metric_value_is_1(span, TOP_LEVEL_KEY)
414}
415
416fn is_measured(span: &ObjectMap) -> bool {
418 metric_value_is_1(span, MEASURED_KEY)
421}
422
423fn is_partial_snapshot(span: &ObjectMap) -> bool {
428 match get_metric_value_float(span, PARTIAL_VERSION_KEY) {
431 Some(f) => f >= 0.0,
432 None => false,
433 }
434}