1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//! APM stats
//!
//! This module contains the logic for computing APM statistics based on the incoming trace
//! events this sink receives. It is modelled closely to the trace-agent component of the
//! Datadog Agent, and sends out StatsPayload packets formatted and Aggregated by the same
//! algorithm, at ten second intervals, independently of the sink's trace payloads.

use std::sync::{Arc, Mutex};

use serde::{Deserialize, Serialize};
use vector_lib::event::TraceEvent;

pub use self::aggregation::Aggregator;
pub use self::flusher::flush_apm_stats_thread;

pub(crate) use super::config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration};
pub(crate) use super::request_builder::{build_request, DDTracesMetadata, RequestBuilderError};
pub(crate) use super::sink::PartitionKey;

mod aggregation;
mod bucket;
mod flusher;
mod weight;

#[cfg(all(test, feature = "datadog-traces-integration-tests"))]
mod integration_tests;

/// The duration of time in nanoseconds that a bucket covers.
pub(crate) const BUCKET_DURATION_NANOSECONDS: u64 = 10_000_000_000;

#[allow(warnings, clippy::pedantic, clippy::nursery)]
pub(crate) mod ddsketch_full {
    include!(concat!(env!("OUT_DIR"), "/ddsketch_full.rs"));
}

// On the agent side APM Stats payload are encoded into the messagepack format using this
// go code https://github.com/DataDog/datadog-agent/blob/b5bed4d/pkg/trace/pb/stats_gen.go.
// Note that this code is generated from code itself generate from this .proto file
// https://github.com/DataDog/datadog-agent/blob/dc2f202/pkg/trace/pb/stats.proto.
// All the subsequent struct are dedicated to be used with rmp_serde and the fields names
// exactly match the ones of the go code.
#[derive(Debug, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct StatsPayload {
    pub(crate) agent_hostname: String,
    pub(crate) agent_env: String,
    pub(crate) stats: Vec<ClientStatsPayload>,
    pub(crate) agent_version: String,
    pub(crate) client_computed: bool,
}

#[derive(Debug, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct ClientStatsPayload {
    pub(crate) hostname: String,
    pub(crate) env: String,
    pub(crate) version: String,
    pub(crate) stats: Vec<ClientStatsBucket>,
    pub(crate) lang: String,
    pub(crate) tracer_version: String,
    #[serde(rename = "RuntimeID")]
    pub(crate) runtime_id: String,
    pub(crate) sequence: u64,
    pub(crate) agent_aggregation: String,
    pub(crate) service: String,
    #[serde(rename = "ContainerID")]
    pub(crate) container_id: String,
    pub(crate) tags: Vec<String>,
}

#[derive(Debug, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct ClientStatsBucket {
    pub(crate) start: u64,
    pub(crate) duration: u64,
    pub(crate) stats: Vec<ClientGroupedStats>,
    pub(crate) agent_time_shift: i64,
}

#[derive(Debug, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct ClientGroupedStats {
    pub(crate) service: String,
    pub(crate) name: String,
    pub(crate) resource: String,
    #[serde(rename = "HTTPStatusCode")]
    pub(crate) http_status_code: u32,
    pub(crate) r#type: String,
    #[serde(rename = "DBType")]
    pub(crate) db_type: String,
    pub(crate) hits: u64,
    pub(crate) errors: u64,
    pub(crate) duration: u64,
    #[serde(with = "serde_bytes")]
    pub(crate) ok_summary: Vec<u8>,
    #[serde(with = "serde_bytes")]
    pub(crate) error_summary: Vec<u8>,
    pub(crate) synthetics: bool,
    pub(crate) top_level_hits: u64,
}

/// Computes APM stats from the incoming trace events.
///
/// # arguments
///
/// * `key`           - PartitionKey associated with this set of trace events.
/// * `aggregator`    - Aggregator to use in computing and caching APM stats buckets.
/// * `trace_events`  - Newly received trace events to process.
pub(crate) fn compute_apm_stats(
    key: &PartitionKey,
    aggregator: Arc<Mutex<Aggregator>>,
    trace_events: &[TraceEvent],
) {
    let mut aggregator = aggregator.lock().unwrap();

    // store properties that are available only at runtime
    aggregator.update_agent_properties(key);

    // process the incoming traces
    trace_events
        .iter()
        .for_each(|t| aggregator.handle_trace(key, t));
}