vector/sinks/datadog/traces/apm_stats/
aggregation.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use chrono::Utc;
4use vrl::event_path;
5
6use super::{
7    bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey,
8    BUCKET_DURATION_NANOSECONDS,
9};
10use crate::event::{ObjectMap, TraceEvent, Value};
11
12const MEASURED_KEY: &str = "_dd.measured";
13const PARTIAL_VERSION_KEY: &str = "_dd.partial_version";
14const TAG_STATUS_CODE: &str = "http.status_code";
15const TAG_SYNTHETICS: &str = "synthetics";
16const TOP_LEVEL_KEY: &str = "_top_level";
17
18/// The number of bucket durations to keep in memory before flushing them.
19const BUCKET_WINDOW_LEN: u64 = 2;
20
21#[derive(PartialEq, Eq, PartialOrd, Ord)]
22pub(crate) struct AggregationKey {
23    pub(crate) payload_key: PayloadAggregationKey,
24    pub(crate) bucket_key: BucketAggregationKey,
25}
26
27impl AggregationKey {
28    fn new_aggregation_from_span(
29        span: &ObjectMap,
30        payload_key: PayloadAggregationKey,
31        synthetics: bool,
32    ) -> Self {
33        AggregationKey {
34            payload_key: payload_key.with_span_context(span),
35            bucket_key: BucketAggregationKey {
36                service: span
37                    .get("service")
38                    .map(|v| v.to_string_lossy().into_owned())
39                    .unwrap_or_default(),
40                name: span
41                    .get("name")
42                    .map(|v| v.to_string_lossy().into_owned())
43                    .unwrap_or_default(),
44                resource: span
45                    .get("resource")
46                    .map(|v| v.to_string_lossy().into_owned())
47                    .unwrap_or_default(),
48                ty: span
49                    .get("type")
50                    .map(|v| v.to_string_lossy().into_owned())
51                    .unwrap_or_default(),
52                status_code: span
53                    .get("meta")
54                    .and_then(|m| m.as_object())
55                    .and_then(|m| m.get(TAG_STATUS_CODE))
56                    // the meta field is supposed to be a string/string map
57                    .and_then(|s| s.to_string_lossy().parse::<u32>().ok())
58                    .unwrap_or_default(),
59                synthetics,
60            },
61        }
62    }
63}
64
65#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
66pub(crate) struct PayloadAggregationKey {
67    pub(crate) env: String,
68    pub(crate) hostname: String,
69    pub(crate) version: String,
70    pub(crate) container_id: String,
71}
72
73impl PayloadAggregationKey {
74    fn with_span_context(self, span: &ObjectMap) -> Self {
75        PayloadAggregationKey {
76            env: span
77                .get("meta")
78                .and_then(|m| m.as_object())
79                .and_then(|m| m.get("env"))
80                .map(|s| s.to_string_lossy().into_owned())
81                .unwrap_or(self.env),
82            hostname: self.hostname,
83            version: self.version,
84            container_id: self.container_id,
85        }
86    }
87}
88
89#[derive(PartialEq, Eq, PartialOrd, Ord)]
90pub(crate) struct BucketAggregationKey {
91    pub(crate) service: String,
92    pub(crate) name: String,
93    pub(crate) resource: String,
94    pub(crate) ty: String,
95    pub(crate) status_code: u32,
96    pub(crate) synthetics: bool,
97}
98
99pub struct Aggregator {
100    /// The key represents the timestamp (in nanoseconds) of the beginning of the time window (that lasts 10 seconds) on
101    /// which the associated bucket will calculate statistics.
102    buckets: BTreeMap<u64, Bucket>,
103
104    /// The oldest timestamp we will allow for the current time bucket.
105    oldest_timestamp: u64,
106
107    /// Env associated with the Agent.
108    agent_env: Option<String>,
109
110    /// Hostname associated with the Agent.
111    agent_hostname: Option<String>,
112
113    /// Version associated with the Agent.
114    agent_version: Option<String>,
115
116    /// API key associated with the Agent.
117    api_key: Option<Arc<str>>,
118
119    /// Default API key to use if api_key not set.
120    default_api_key: Arc<str>,
121}
122
123impl Aggregator {
124    pub fn new(default_api_key: Arc<str>) -> Self {
125        Self {
126            buckets: BTreeMap::new(),
127            oldest_timestamp: align_timestamp(
128                Utc::now()
129                    .timestamp_nanos_opt()
130                    .expect("Timestamp out of range") as u64,
131            ),
132            default_api_key,
133            // We can't know the below fields until have received a trace event
134            agent_env: None,
135            agent_hostname: None,
136            agent_version: None,
137            api_key: None,
138        }
139    }
140
141    /// Updates cached properties from the Agent.
142    pub(crate) fn update_agent_properties(&mut self, partition_key: &PartitionKey) {
143        if self.agent_env.is_none() {
144            if let Some(env) = &partition_key.env {
145                self.agent_env = Some(env.clone());
146            }
147        }
148        if self.agent_hostname.is_none() {
149            if let Some(hostname) = &partition_key.hostname {
150                self.agent_hostname = Some(hostname.clone());
151            }
152        }
153        if self.agent_version.is_none() {
154            if let Some(version) = &partition_key.agent_version {
155                self.agent_version = Some(version.clone());
156            }
157        }
158        if self.api_key.is_none() {
159            if let Some(api_key) = &partition_key.api_key {
160                self.api_key = Some(Arc::<str>::clone(api_key));
161            }
162        }
163    }
164
165    pub(crate) fn get_agent_env(&self) -> String {
166        self.agent_env.clone().unwrap_or_default()
167    }
168
169    pub(crate) fn get_agent_hostname(&self) -> String {
170        self.agent_hostname.clone().unwrap_or_default()
171    }
172
173    pub(crate) fn get_agent_version(&self) -> String {
174        self.agent_version.clone().unwrap_or_default()
175    }
176
177    pub(crate) fn get_api_key(&self) -> Arc<str> {
178        self.api_key
179            .clone()
180            .unwrap_or_else(|| Arc::clone(&self.default_api_key))
181    }
182
183    /// Iterates over a trace's constituting spans and upon matching conditions it updates statistics (mostly using the top level span).
184    pub(crate) fn handle_trace(&mut self, partition_key: &PartitionKey, trace: &TraceEvent) {
185        // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L148-L184
186
187        let spans = match trace.get(event_path!("spans")) {
188            Some(Value::Array(v)) => v.iter().filter_map(|s| s.as_object()).collect(),
189            _ => vec![],
190        };
191
192        let weight = super::weight::extract_weight_from_root_span(&spans);
193        let payload_aggkey = PayloadAggregationKey {
194            env: partition_key.env.clone().unwrap_or_default(),
195            hostname: partition_key.hostname.clone().unwrap_or_default(),
196            version: trace
197                .get(event_path!("app_version"))
198                .map(|v| v.to_string_lossy().into_owned())
199                .unwrap_or_default(),
200            container_id: trace
201                .get(event_path!("container_id"))
202                .map(|v| v.to_string_lossy().into_owned())
203                .unwrap_or_default(),
204        };
205        let synthetics = trace
206            .get(event_path!("origin"))
207            .map(|v| v.to_string_lossy().starts_with(TAG_SYNTHETICS))
208            .unwrap_or(false);
209
210        spans.iter().for_each(|span| {
211            let is_top = has_top_level(span);
212            if !(is_top || is_measured(span) || is_partial_snapshot(span)) {
213                return;
214            }
215
216            self.handle_span(span, weight, is_top, synthetics, payload_aggkey.clone());
217        });
218    }
219
220    /// Aggregates statistics per key over 10 seconds windows.
221    /// The key is constructed from various span/trace properties (see `AggregationKey`).
222    fn handle_span(
223        &mut self,
224        span: &ObjectMap,
225        weight: f64,
226        is_top: bool,
227        synthetics: bool,
228        payload_aggkey: PayloadAggregationKey,
229    ) {
230        // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/statsraw.go#L147-L182
231
232        let aggkey = AggregationKey::new_aggregation_from_span(span, payload_aggkey, synthetics);
233
234        let start = match span.get("start") {
235            Some(Value::Timestamp(val)) => {
236                val.timestamp_nanos_opt().expect("Timestamp out of range") as u64
237            }
238            _ => Utc::now()
239                .timestamp_nanos_opt()
240                .expect("Timestamp out of range") as u64,
241        };
242
243        let duration = match span.get("duration") {
244            Some(Value::Integer(val)) => *val as u64,
245            None => 0,
246            _ => panic!("`duration` should be an i64"),
247        };
248
249        let end = start + duration;
250
251        // 10 second bucket window
252        let mut btime = align_timestamp(end);
253
254        // If too far in the past, use the oldest-allowed time bucket instead
255        if btime < self.oldest_timestamp {
256            btime = self.oldest_timestamp
257        }
258
259        match self.buckets.get_mut(&btime) {
260            Some(b) => {
261                b.add(span, weight, is_top, aggkey);
262            }
263            None => {
264                let mut b = Bucket {
265                    start: btime,
266                    duration: BUCKET_DURATION_NANOSECONDS,
267                    data: BTreeMap::new(),
268                };
269                b.add(span, weight, is_top, aggkey);
270
271                debug!("Created {} start_time bucket.", btime);
272                self.buckets.insert(btime, b);
273            }
274        }
275    }
276
277    /// Flushes the bucket cache.
278    /// We cache and can compute stats only for the last `BUCKET_WINDOW_LEN * BUCKET_DURATION_NANOSECONDS` and after such time,
279    /// buckets are then flushed. This only applies to past buckets. Stats buckets in the future are cached with no restriction.
280    ///
281    /// # Arguments
282    ///
283    /// * `force` - If true, all cached buckets are flushed.
284    pub(crate) fn flush(&mut self, force: bool) -> Vec<ClientStatsPayload> {
285        // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L38-L41
286        // , and https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L195-L207
287
288        let now = Utc::now()
289            .timestamp_nanos_opt()
290            .expect("Timestamp out of range") as u64;
291
292        let flush_cutoff_time = if force {
293            // flush all the remaining buckets (the Vector process is exiting)
294            now
295        } else {
296            // maintain two buckets in the cache during normal operation
297            now - (BUCKET_DURATION_NANOSECONDS * BUCKET_WINDOW_LEN)
298        };
299
300        let client_stats_payloads = self.get_client_stats_payloads(flush_cutoff_time);
301
302        // update the oldest_timestamp allowed, to prevent having stats for an already flushed
303        // bucket
304        let new_oldest_ts =
305            align_timestamp(now) - ((BUCKET_WINDOW_LEN - 1) * BUCKET_DURATION_NANOSECONDS);
306
307        if new_oldest_ts > self.oldest_timestamp {
308            debug!("Updated oldest_timestamp to {}.", new_oldest_ts);
309            self.oldest_timestamp = new_oldest_ts;
310        }
311
312        client_stats_payloads
313    }
314
315    /// Builds the array of ClientStatsPayloads will be sent out as part of the StatsPayload.
316    ///
317    /// # Arguments
318    ///
319    /// * `flush_cutoff_time` - Timestamp in nanos to use to determine what buckets to keep in the cache and which to export.
320    fn get_client_stats_payloads(&mut self, flush_cutoff_time: u64) -> Vec<ClientStatsPayload> {
321        let client_stats_buckets = self.export_buckets(flush_cutoff_time);
322
323        client_stats_buckets
324            .into_iter()
325            .map(|(payload_aggkey, csb)| {
326                ClientStatsPayload {
327                    env: payload_aggkey.env,
328                    hostname: payload_aggkey.hostname,
329                    container_id: payload_aggkey.container_id,
330                    version: payload_aggkey.version,
331                    stats: csb,
332                    // All the following fields are left unset by the trace-agent:
333                    // https://github.com/DataDog/datadog-agent/blob/42e72dd/pkg/trace/stats/concentrator.go#L216-L227
334                    service: "".to_string(),
335                    agent_aggregation: "".to_string(),
336                    sequence: 0,
337                    runtime_id: "".to_string(),
338                    lang: "".to_string(),
339                    tracer_version: "".to_string(),
340                    tags: vec![],
341                }
342            })
343            .collect::<Vec<ClientStatsPayload>>()
344    }
345
346    /// Exports the buckets that began before `flush_cutoff_time` and purges them from the cache.
347    ///
348    /// # Arguments
349    ///
350    /// * `flush_cutoff_time` - Timestamp in nanos to use to determine what buckets to keep in the cache and which to export.
351    fn export_buckets(
352        &mut self,
353        flush_cutoff_time: u64,
354    ) -> BTreeMap<PayloadAggregationKey, Vec<ClientStatsBucket>> {
355        let mut m = BTreeMap::<PayloadAggregationKey, Vec<ClientStatsBucket>>::new();
356
357        self.buckets.retain(|&bucket_start, bucket| {
358            let retain = bucket_start > flush_cutoff_time;
359
360            if !retain {
361                debug!("Flushing {} start_time bucket.", bucket_start);
362
363                bucket.export().into_iter().for_each(|(payload_key, csb)| {
364                    match m.get_mut(&payload_key) {
365                        None => {
366                            m.insert(payload_key.clone(), vec![csb]);
367                        }
368                        Some(s) => {
369                            s.push(csb);
370                        }
371                    };
372                })
373            }
374            retain
375        });
376
377        m
378    }
379}
380
381/// Returns the provided timestamp truncated to the bucket size.
382/// This is the start time of the time bucket in which such timestamp falls.
383const fn align_timestamp(start: u64) -> u64 {
384    // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/concentrator.go#L232-L234
385    start - (start % BUCKET_DURATION_NANOSECONDS)
386}
387
388/// Assumes that all metrics are all encoded as Value::Float.
389/// Return the f64 of the specified key or None of key not present.
390fn get_metric_value_float(span: &ObjectMap, key: &str) -> Option<f64> {
391    span.get("metrics")
392        .and_then(|m| m.as_object())
393        .map(|m| match m.get(key) {
394            Some(Value::Float(f)) => Some(f.into_inner()),
395            None => None,
396            _ => panic!("`metric` values should be all be f64"),
397        })
398        .unwrap_or(None)
399}
400
401/// Returns true if the value of this metric is equal to 1.0
402fn metric_value_is_1(span: &ObjectMap, key: &str) -> bool {
403    match get_metric_value_float(span, key) {
404        Some(f) => f == 1.0,
405        None => false,
406    }
407}
408
409/// Returns true if span is top-level.
410fn has_top_level(span: &ObjectMap) -> bool {
411    // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L28-L31
412
413    metric_value_is_1(span, TOP_LEVEL_KEY)
414}
415
416/// Returns true if a span should be measured (i.e. it should get trace metrics calculated).
417fn is_measured(span: &ObjectMap) -> bool {
418    // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L40-L43
419
420    metric_value_is_1(span, MEASURED_KEY)
421}
422
423/// Returns true if the span is a partial snapshot.
424/// These types of spans are partial images of long-running spans.
425/// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive integer.
426/// The metric usually increases each time a new version of the same span is sent by the tracer
427fn is_partial_snapshot(span: &ObjectMap) -> bool {
428    // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L49-L52
429
430    match get_metric_value_float(span, PARTIAL_VERSION_KEY) {
431        Some(f) => f >= 0.0,
432        None => false,
433    }
434}