vector/sinks/datadog/traces/apm_stats/
mod.rs

1//! APM stats
2//!
3//! This module contains the logic for computing APM statistics based on the incoming trace
4//! events this sink receives. It is modelled closely to the trace-agent component of the
5//! Datadog Agent, and sends out StatsPayload packets formatted and Aggregated by the same
6//! algorithm, at ten second intervals, independently of the sink's trace payloads.
7
8use std::sync::{Arc, Mutex};
9
10use serde::{Deserialize, Serialize};
11use vector_lib::event::TraceEvent;
12
13pub use self::aggregation::Aggregator;
14pub use self::flusher::flush_apm_stats_thread;
15
16pub(crate) use super::config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration};
17pub(crate) use super::request_builder::{build_request, DDTracesMetadata, RequestBuilderError};
18pub(crate) use super::sink::PartitionKey;
19
20mod aggregation;
21mod bucket;
22mod flusher;
23mod weight;
24
25#[cfg(all(test, feature = "datadog-traces-integration-tests"))]
26mod integration_tests;
27
28/// The duration of time in nanoseconds that a bucket covers.
29pub(crate) const BUCKET_DURATION_NANOSECONDS: u64 = 10_000_000_000;
30
31#[allow(warnings, clippy::pedantic, clippy::nursery)]
32pub(crate) mod ddsketch_full {
33    include!(concat!(env!("OUT_DIR"), "/ddsketch_full.rs"));
34}
35
36// On the agent side APM Stats payload are encoded into the messagepack format using this
37// go code https://github.com/DataDog/datadog-agent/blob/b5bed4d/pkg/trace/pb/stats_gen.go.
38// Note that this code is generated from code itself generate from this .proto file
39// https://github.com/DataDog/datadog-agent/blob/dc2f202/pkg/trace/pb/stats.proto.
40// All the subsequent struct are dedicated to be used with rmp_serde and the fields names
41// exactly match the ones of the go code.
42#[derive(Debug, PartialEq, Deserialize, Serialize)]
43#[serde(rename_all = "PascalCase")]
44pub(crate) struct StatsPayload {
45    pub(crate) agent_hostname: String,
46    pub(crate) agent_env: String,
47    pub(crate) stats: Vec<ClientStatsPayload>,
48    pub(crate) agent_version: String,
49    pub(crate) client_computed: bool,
50}
51
52#[derive(Debug, PartialEq, Deserialize, Serialize)]
53#[serde(rename_all = "PascalCase")]
54pub(crate) struct ClientStatsPayload {
55    pub(crate) hostname: String,
56    pub(crate) env: String,
57    pub(crate) version: String,
58    pub(crate) stats: Vec<ClientStatsBucket>,
59    pub(crate) lang: String,
60    pub(crate) tracer_version: String,
61    #[serde(rename = "RuntimeID")]
62    pub(crate) runtime_id: String,
63    pub(crate) sequence: u64,
64    pub(crate) agent_aggregation: String,
65    pub(crate) service: String,
66    #[serde(rename = "ContainerID")]
67    pub(crate) container_id: String,
68    pub(crate) tags: Vec<String>,
69}
70
71#[derive(Debug, PartialEq, Deserialize, Serialize)]
72#[serde(rename_all = "PascalCase")]
73pub(crate) struct ClientStatsBucket {
74    pub(crate) start: u64,
75    pub(crate) duration: u64,
76    pub(crate) stats: Vec<ClientGroupedStats>,
77    pub(crate) agent_time_shift: i64,
78}
79
80#[derive(Debug, PartialEq, Deserialize, Serialize)]
81#[serde(rename_all = "PascalCase")]
82pub(crate) struct ClientGroupedStats {
83    pub(crate) service: String,
84    pub(crate) name: String,
85    pub(crate) resource: String,
86    #[serde(rename = "HTTPStatusCode")]
87    pub(crate) http_status_code: u32,
88    pub(crate) r#type: String,
89    #[serde(rename = "DBType")]
90    pub(crate) db_type: String,
91    pub(crate) hits: u64,
92    pub(crate) errors: u64,
93    pub(crate) duration: u64,
94    #[serde(with = "serde_bytes")]
95    pub(crate) ok_summary: Vec<u8>,
96    #[serde(with = "serde_bytes")]
97    pub(crate) error_summary: Vec<u8>,
98    pub(crate) synthetics: bool,
99    pub(crate) top_level_hits: u64,
100}
101
102/// Computes APM stats from the incoming trace events.
103///
104/// # arguments
105///
106/// * `key`           - PartitionKey associated with this set of trace events.
107/// * `aggregator`    - Aggregator to use in computing and caching APM stats buckets.
108/// * `trace_events`  - Newly received trace events to process.
109pub(crate) fn compute_apm_stats(
110    key: &PartitionKey,
111    aggregator: Arc<Mutex<Aggregator>>,
112    trace_events: &[TraceEvent],
113) {
114    let mut aggregator = aggregator.lock().unwrap();
115
116    // store properties that are available only at runtime
117    aggregator.update_agent_properties(key);
118
119    // process the incoming traces
120    trace_events
121        .iter()
122        .for_each(|t| aggregator.handle_trace(key, t));
123}