vector/sinks/datadog/traces/apm_stats/
mod.rs1use std::sync::{Arc, Mutex};
9
10use serde::{Deserialize, Serialize};
11use vector_lib::event::TraceEvent;
12
13pub use self::aggregation::Aggregator;
14pub use self::flusher::flush_apm_stats_thread;
15
16pub(crate) use super::config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration};
17pub(crate) use super::request_builder::{build_request, DDTracesMetadata, RequestBuilderError};
18pub(crate) use super::sink::PartitionKey;
19
20mod aggregation;
21mod bucket;
22mod flusher;
23mod weight;
24
25#[cfg(all(test, feature = "datadog-traces-integration-tests"))]
26mod integration_tests;
27
28pub(crate) const BUCKET_DURATION_NANOSECONDS: u64 = 10_000_000_000;
30
31#[allow(warnings, clippy::pedantic, clippy::nursery)]
32pub(crate) mod ddsketch_full {
33 include!(concat!(env!("OUT_DIR"), "/ddsketch_full.rs"));
34}
35
36#[derive(Debug, PartialEq, Deserialize, Serialize)]
43#[serde(rename_all = "PascalCase")]
44pub(crate) struct StatsPayload {
45 pub(crate) agent_hostname: String,
46 pub(crate) agent_env: String,
47 pub(crate) stats: Vec<ClientStatsPayload>,
48 pub(crate) agent_version: String,
49 pub(crate) client_computed: bool,
50}
51
52#[derive(Debug, PartialEq, Deserialize, Serialize)]
53#[serde(rename_all = "PascalCase")]
54pub(crate) struct ClientStatsPayload {
55 pub(crate) hostname: String,
56 pub(crate) env: String,
57 pub(crate) version: String,
58 pub(crate) stats: Vec<ClientStatsBucket>,
59 pub(crate) lang: String,
60 pub(crate) tracer_version: String,
61 #[serde(rename = "RuntimeID")]
62 pub(crate) runtime_id: String,
63 pub(crate) sequence: u64,
64 pub(crate) agent_aggregation: String,
65 pub(crate) service: String,
66 #[serde(rename = "ContainerID")]
67 pub(crate) container_id: String,
68 pub(crate) tags: Vec<String>,
69}
70
71#[derive(Debug, PartialEq, Deserialize, Serialize)]
72#[serde(rename_all = "PascalCase")]
73pub(crate) struct ClientStatsBucket {
74 pub(crate) start: u64,
75 pub(crate) duration: u64,
76 pub(crate) stats: Vec<ClientGroupedStats>,
77 pub(crate) agent_time_shift: i64,
78}
79
80#[derive(Debug, PartialEq, Deserialize, Serialize)]
81#[serde(rename_all = "PascalCase")]
82pub(crate) struct ClientGroupedStats {
83 pub(crate) service: String,
84 pub(crate) name: String,
85 pub(crate) resource: String,
86 #[serde(rename = "HTTPStatusCode")]
87 pub(crate) http_status_code: u32,
88 pub(crate) r#type: String,
89 #[serde(rename = "DBType")]
90 pub(crate) db_type: String,
91 pub(crate) hits: u64,
92 pub(crate) errors: u64,
93 pub(crate) duration: u64,
94 #[serde(with = "serde_bytes")]
95 pub(crate) ok_summary: Vec<u8>,
96 #[serde(with = "serde_bytes")]
97 pub(crate) error_summary: Vec<u8>,
98 pub(crate) synthetics: bool,
99 pub(crate) top_level_hits: u64,
100}
101
102pub(crate) fn compute_apm_stats(
110 key: &PartitionKey,
111 aggregator: Arc<Mutex<Aggregator>>,
112 trace_events: &[TraceEvent],
113) {
114 let mut aggregator = aggregator.lock().unwrap();
115
116 aggregator.update_agent_properties(key);
118
119 trace_events
121 .iter()
122 .for_each(|t| aggregator.handle_trace(key, t));
123}