vector/sources/datadog_agent/
traces.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use futures::future;
6use http::StatusCode;
7use ordered_float::NotNan;
8use prost::Message;
9use vector_lib::{
10 EstimatedJsonEncodedSizeOf,
11 internal_event::{CountByteSize, InternalEventHandle as _},
12};
13use vrl::event_path;
14use warp::{Filter, Rejection, Reply, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler, ddtrace_proto};
17use crate::{
18 common::http::ErrorMessage,
19 event::{Event, ObjectMap, TraceEvent, Value},
20};
21
22pub(super) fn build_warp_filter(
23 handler: RequestHandler,
24 source: DatadogAgentSource,
25) -> BoxedFilter<(Response,)> {
26 build_trace_filter(handler, source)
27 .or(build_stats_filter())
28 .unify()
29 .boxed()
30}
31
32fn build_trace_filter(
33 handler: RequestHandler,
34 source: DatadogAgentSource,
35) -> BoxedFilter<(Response,)> {
36 warp::post()
37 .and(path!("api" / "v0.2" / "traces" / ..))
38 .and(warp::path::full())
39 .and(warp::header::optional::<String>("content-encoding"))
40 .and(warp::header::optional::<String>("dd-api-key"))
41 .and(warp::header::optional::<String>(
42 "X-Datadog-Reported-Languages",
43 ))
44 .and(warp::query::<ApiKeyQueryParams>())
45 .and(warp::body::bytes())
46 .and_then({
47 move |path: FullPath,
48 encoding_header: Option<String>,
49 api_token: Option<String>,
50 reported_language: Option<String>,
51 query_params: ApiKeyQueryParams,
52 body: Bytes| {
53 let events = source
54 .decode(&encoding_header, body, path.as_str())
55 .and_then(|body| {
56 handle_dd_trace_payload(
57 body,
58 source.api_key_extractor.extract(
59 path.as_str(),
60 api_token,
61 query_params.dd_api_key,
62 ),
63 reported_language.as_ref(),
64 &source,
65 )
66 .map_err(|error| {
67 ErrorMessage::new(
68 StatusCode::UNPROCESSABLE_ENTITY,
69 format!("Error decoding Datadog traces: {error:?}"),
70 )
71 })
72 });
73 handler.clone().handle_request(events, super::TRACES)
74 }
75 })
76 .boxed()
77}
78
79fn build_stats_filter() -> BoxedFilter<(Response,)> {
80 warp::post()
81 .and(path!("api" / "v0.2" / "stats" / ..))
82 .and_then(|| {
83 let response: Result<Response, Rejection> = Ok(warp::reply().into_response());
86 future::ready(response)
87 })
88 .boxed()
89}
90
91fn handle_dd_trace_payload(
92 frame: Bytes,
93 api_key: Option<Arc<str>>,
94 lang: Option<&String>,
95 source: &DatadogAgentSource,
96) -> crate::Result<Vec<Event>> {
97 let decoded_payload = ddtrace_proto::TracePayload::decode(frame)?;
98 if decoded_payload.tracer_payloads.is_empty() {
99 debug!("Older trace payload decoded.");
100 handle_dd_trace_payload_v0(decoded_payload, api_key, lang, source)
101 } else {
102 debug!("Newer trace payload decoded.");
103 handle_dd_trace_payload_v1(decoded_payload, api_key, source)
104 }
105}
106
107fn handle_dd_trace_payload_v1(
109 decoded_payload: ddtrace_proto::TracePayload,
110 api_key: Option<Arc<str>>,
111 source: &DatadogAgentSource,
112) -> crate::Result<Vec<Event>> {
113 let env = decoded_payload.env;
114 let hostname = decoded_payload.host_name;
115 let agent_version = decoded_payload.agent_version;
116 let target_tps = decoded_payload.target_tps;
117 let error_tps = decoded_payload.error_tps;
118 let tags = convert_tags(decoded_payload.tags);
119
120 let trace_events: Vec<TraceEvent> = decoded_payload
121 .tracer_payloads
122 .into_iter()
123 .flat_map(convert_dd_tracer_payload)
124 .collect();
125
126 source.events_received.emit(CountByteSize(
127 trace_events.len(),
128 trace_events.estimated_json_encoded_size_of(),
129 ));
130
131 let enriched_events = trace_events
132 .into_iter()
133 .map(|mut trace_event| {
134 if let Some(k) = &api_key {
135 trace_event
136 .metadata_mut()
137 .set_datadog_api_key(Arc::clone(k));
138 }
139 trace_event.insert(
140 &source.log_schema_source_type_key,
141 Bytes::from("datadog_agent"),
142 );
143 trace_event.insert(event_path!("payload_version"), "v2".to_string());
144 trace_event.insert(&source.log_schema_host_key, hostname.clone());
145 trace_event.insert(event_path!("env"), env.clone());
146 trace_event.insert(event_path!("agent_version"), agent_version.clone());
147 trace_event.insert(
148 event_path!("target_tps"),
149 Value::Float(NotNan::new(target_tps).expect("target_tps cannot be Nan")),
150 );
151 trace_event.insert(
152 event_path!("error_tps"),
153 Value::Float(NotNan::new(error_tps).expect("error_tps cannot be Nan")),
154 );
155 if let Some(Value::Object(span_tags)) = trace_event.get_mut(event_path!("tags")) {
156 span_tags.extend(tags.clone());
157 } else {
158 trace_event.insert(event_path!("tags"), Value::from(tags.clone()));
159 }
160 Event::Trace(trace_event)
161 })
162 .collect();
163 Ok(enriched_events)
164}
165
166fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec<TraceEvent> {
167 let tags = convert_tags(payload.tags);
168 payload
169 .chunks
170 .into_iter()
171 .map(|trace| {
172 let mut trace_event = TraceEvent::default();
173 trace_event.insert(event_path!("priority"), trace.priority as i64);
174 trace_event.insert(event_path!("origin"), trace.origin);
175 trace_event.insert(event_path!("dropped"), trace.dropped_trace);
176 let mut trace_tags = convert_tags(trace.tags);
177 trace_tags.extend(tags.clone());
178 trace_event.insert(event_path!("tags"), Value::from(trace_tags));
179
180 trace_event.insert(
181 event_path!("spans"),
182 trace
183 .spans
184 .into_iter()
185 .map(|s| Value::from(convert_span(s)))
186 .collect::<Vec<Value>>(),
187 );
188
189 trace_event.insert(event_path!("container_id"), payload.container_id.clone());
190 trace_event.insert(event_path!("language_name"), payload.language_name.clone());
191 trace_event.insert(
192 event_path!("language_version"),
193 payload.language_version.clone(),
194 );
195 trace_event.insert(
196 event_path!("tracer_version"),
197 payload.tracer_version.clone(),
198 );
199 trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone());
200 trace_event.insert(event_path!("app_version"), payload.app_version.clone());
201 trace_event
202 })
203 .collect()
204}
205
206fn handle_dd_trace_payload_v0(
208 decoded_payload: ddtrace_proto::TracePayload,
209 api_key: Option<Arc<str>>,
210 lang: Option<&String>,
211 source: &DatadogAgentSource,
212) -> crate::Result<Vec<Event>> {
213 let env = decoded_payload.env;
214 let hostname = decoded_payload.host_name;
215
216 let trace_events: Vec<TraceEvent> =
217 decoded_payload
219 .traces
220 .into_iter()
221 .map(|dd_trace| {
222 let mut trace_event = TraceEvent::default();
223
224 trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64);
228 trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time));
229 trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time));
230 trace_event.insert(
231 event_path!("spans"),
232 dd_trace
233 .spans
234 .into_iter()
235 .map(|s| Value::from(convert_span(s)))
236 .collect::<Vec<Value>>(),
237 );
238 trace_event
239 })
240 .chain(decoded_payload.transactions.into_iter().map(|s| {
242 let mut trace_event = TraceEvent::default();
243 trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]);
244 trace_event.insert(event_path!("dropped"), true);
245 trace_event
246 })).collect();
247
248 source.events_received.emit(CountByteSize(
249 trace_events.len(),
250 trace_events.estimated_json_encoded_size_of(),
251 ));
252
253 let enriched_events = trace_events
254 .into_iter()
255 .map(|mut trace_event| {
256 if let Some(k) = &api_key {
257 trace_event
258 .metadata_mut()
259 .set_datadog_api_key(Arc::clone(k));
260 }
261 if let Some(lang) = lang {
262 trace_event.insert(event_path!("language_name"), lang.clone());
263 }
264 trace_event.insert(
265 &source.log_schema_source_type_key,
266 Bytes::from("datadog_agent"),
267 );
268 trace_event.insert(event_path!("payload_version"), "v1".to_string());
269 trace_event.insert(&source.log_schema_host_key, hostname.clone());
270 trace_event.insert(event_path!("env"), env.clone());
271 Event::Trace(trace_event)
272 })
273 .collect();
274
275 Ok(enriched_events)
276}
277
278fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap {
279 let mut span = ObjectMap::new();
280 span.insert("service".into(), Value::from(dd_span.service));
281 span.insert("name".into(), Value::from(dd_span.name));
282
283 span.insert("resource".into(), Value::from(dd_span.resource));
284
285 span.insert("trace_id".into(), Value::from(dd_span.trace_id as i64));
289 span.insert("span_id".into(), Value::from(dd_span.span_id as i64));
290 span.insert("parent_id".into(), Value::from(dd_span.parent_id as i64));
291 span.insert(
292 "start".into(),
293 Value::from(Utc.timestamp_nanos(dd_span.start)),
294 );
295 span.insert("duration".into(), Value::from(dd_span.duration));
296 span.insert("error".into(), Value::from(dd_span.error as i64));
297 span.insert("meta".into(), Value::from(convert_tags(dd_span.meta)));
298 span.insert(
299 "metrics".into(),
300 Value::from(
301 dd_span
302 .metrics
303 .into_iter()
304 .map(|(k, v)| {
305 (
306 k.into(),
307 NotNan::new(v).map(Value::Float).unwrap_or(Value::Null),
308 )
309 })
310 .collect::<ObjectMap>(),
311 ),
312 );
313 span.insert("type".into(), Value::from(dd_span.r#type));
314 span.insert(
315 "meta_struct".into(),
316 Value::from(
317 dd_span
318 .meta_struct
319 .into_iter()
320 .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v))))
321 .collect::<ObjectMap>(),
322 ),
323 );
324
325 span
326}
327
328fn convert_tags(original_map: BTreeMap<String, String>) -> ObjectMap {
329 original_map
330 .into_iter()
331 .map(|(k, v)| (k.into(), Value::from(v)))
332 .collect::<ObjectMap>()
333}