vector/sources/datadog_agent/
traces.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use futures::future;
6use http::StatusCode;
7use ordered_float::NotNan;
8use prost::Message;
9use vector_lib::{
10 EstimatedJsonEncodedSizeOf,
11 internal_event::{CountByteSize, InternalEventHandle as _},
12};
13use vrl::event_path;
14use warp::{Filter, Rejection, Reply, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use crate::{
17 SourceSender,
18 common::http::ErrorMessage,
19 event::{Event, ObjectMap, TraceEvent, Value},
20 sources::datadog_agent::{
21 ApiKeyQueryParams, DatadogAgentSource, ddtrace_proto, handle_request,
22 },
23};
24
25pub(crate) fn build_warp_filter(
26 acknowledgements: bool,
27 multiple_outputs: bool,
28 out: SourceSender,
29 source: DatadogAgentSource,
30) -> BoxedFilter<(Response,)> {
31 build_trace_filter(acknowledgements, multiple_outputs, out, source)
32 .or(build_stats_filter())
33 .unify()
34 .boxed()
35}
36
37fn build_trace_filter(
38 acknowledgements: bool,
39 multiple_outputs: bool,
40 out: SourceSender,
41 source: DatadogAgentSource,
42) -> BoxedFilter<(Response,)> {
43 warp::post()
44 .and(path!("api" / "v0.2" / "traces" / ..))
45 .and(warp::path::full())
46 .and(warp::header::optional::<String>("content-encoding"))
47 .and(warp::header::optional::<String>("dd-api-key"))
48 .and(warp::header::optional::<String>(
49 "X-Datadog-Reported-Languages",
50 ))
51 .and(warp::query::<ApiKeyQueryParams>())
52 .and(warp::body::bytes())
53 .and_then(
54 move |path: FullPath,
55 encoding_header: Option<String>,
56 api_token: Option<String>,
57 reported_language: Option<String>,
58 query_params: ApiKeyQueryParams,
59 body: Bytes| {
60 let events = source
61 .decode(&encoding_header, body, path.as_str())
62 .and_then(|body| {
63 handle_dd_trace_payload(
64 body,
65 source.api_key_extractor.extract(
66 path.as_str(),
67 api_token,
68 query_params.dd_api_key,
69 ),
70 reported_language.as_ref(),
71 &source,
72 )
73 .map_err(|error| {
74 ErrorMessage::new(
75 StatusCode::UNPROCESSABLE_ENTITY,
76 format!("Error decoding Datadog traces: {error:?}"),
77 )
78 })
79 });
80 let output = multiple_outputs.then_some(super::TRACES);
81 handle_request(events, acknowledgements, out.clone(), output)
82 },
83 )
84 .boxed()
85}
86
87fn build_stats_filter() -> BoxedFilter<(Response,)> {
88 warp::post()
89 .and(path!("api" / "v0.2" / "stats" / ..))
90 .and_then(|| {
91 let response: Result<Response, Rejection> = Ok(warp::reply().into_response());
94 future::ready(response)
95 })
96 .boxed()
97}
98
99fn handle_dd_trace_payload(
100 frame: Bytes,
101 api_key: Option<Arc<str>>,
102 lang: Option<&String>,
103 source: &DatadogAgentSource,
104) -> crate::Result<Vec<Event>> {
105 let decoded_payload = ddtrace_proto::TracePayload::decode(frame)?;
106 if decoded_payload.tracer_payloads.is_empty() {
107 debug!("Older trace payload decoded.");
108 handle_dd_trace_payload_v0(decoded_payload, api_key, lang, source)
109 } else {
110 debug!("Newer trace payload decoded.");
111 handle_dd_trace_payload_v1(decoded_payload, api_key, source)
112 }
113}
114
115fn handle_dd_trace_payload_v1(
117 decoded_payload: ddtrace_proto::TracePayload,
118 api_key: Option<Arc<str>>,
119 source: &DatadogAgentSource,
120) -> crate::Result<Vec<Event>> {
121 let env = decoded_payload.env;
122 let hostname = decoded_payload.host_name;
123 let agent_version = decoded_payload.agent_version;
124 let target_tps = decoded_payload.target_tps;
125 let error_tps = decoded_payload.error_tps;
126 let tags = convert_tags(decoded_payload.tags);
127
128 let trace_events: Vec<TraceEvent> = decoded_payload
129 .tracer_payloads
130 .into_iter()
131 .flat_map(convert_dd_tracer_payload)
132 .collect();
133
134 source.events_received.emit(CountByteSize(
135 trace_events.len(),
136 trace_events.estimated_json_encoded_size_of(),
137 ));
138
139 let enriched_events = trace_events
140 .into_iter()
141 .map(|mut trace_event| {
142 if let Some(k) = &api_key {
143 trace_event
144 .metadata_mut()
145 .set_datadog_api_key(Arc::clone(k));
146 }
147 trace_event.insert(
148 &source.log_schema_source_type_key,
149 Bytes::from("datadog_agent"),
150 );
151 trace_event.insert(event_path!("payload_version"), "v2".to_string());
152 trace_event.insert(&source.log_schema_host_key, hostname.clone());
153 trace_event.insert(event_path!("env"), env.clone());
154 trace_event.insert(event_path!("agent_version"), agent_version.clone());
155 trace_event.insert(
156 event_path!("target_tps"),
157 Value::Float(NotNan::new(target_tps).expect("target_tps cannot be Nan")),
158 );
159 trace_event.insert(
160 event_path!("error_tps"),
161 Value::Float(NotNan::new(error_tps).expect("error_tps cannot be Nan")),
162 );
163 if let Some(Value::Object(span_tags)) = trace_event.get_mut(event_path!("tags")) {
164 span_tags.extend(tags.clone());
165 } else {
166 trace_event.insert(event_path!("tags"), Value::from(tags.clone()));
167 }
168 Event::Trace(trace_event)
169 })
170 .collect();
171 Ok(enriched_events)
172}
173
174fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec<TraceEvent> {
175 let tags = convert_tags(payload.tags);
176 payload
177 .chunks
178 .into_iter()
179 .map(|trace| {
180 let mut trace_event = TraceEvent::default();
181 trace_event.insert(event_path!("priority"), trace.priority as i64);
182 trace_event.insert(event_path!("origin"), trace.origin);
183 trace_event.insert(event_path!("dropped"), trace.dropped_trace);
184 let mut trace_tags = convert_tags(trace.tags);
185 trace_tags.extend(tags.clone());
186 trace_event.insert(event_path!("tags"), Value::from(trace_tags));
187
188 trace_event.insert(
189 event_path!("spans"),
190 trace
191 .spans
192 .into_iter()
193 .map(|s| Value::from(convert_span(s)))
194 .collect::<Vec<Value>>(),
195 );
196
197 trace_event.insert(event_path!("container_id"), payload.container_id.clone());
198 trace_event.insert(event_path!("language_name"), payload.language_name.clone());
199 trace_event.insert(
200 event_path!("language_version"),
201 payload.language_version.clone(),
202 );
203 trace_event.insert(
204 event_path!("tracer_version"),
205 payload.tracer_version.clone(),
206 );
207 trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone());
208 trace_event.insert(event_path!("app_version"), payload.app_version.clone());
209 trace_event
210 })
211 .collect()
212}
213
214fn handle_dd_trace_payload_v0(
216 decoded_payload: ddtrace_proto::TracePayload,
217 api_key: Option<Arc<str>>,
218 lang: Option<&String>,
219 source: &DatadogAgentSource,
220) -> crate::Result<Vec<Event>> {
221 let env = decoded_payload.env;
222 let hostname = decoded_payload.host_name;
223
224 let trace_events: Vec<TraceEvent> =
225 decoded_payload
227 .traces
228 .into_iter()
229 .map(|dd_trace| {
230 let mut trace_event = TraceEvent::default();
231
232 trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64);
236 trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time));
237 trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time));
238 trace_event.insert(
239 event_path!("spans"),
240 dd_trace
241 .spans
242 .into_iter()
243 .map(|s| Value::from(convert_span(s)))
244 .collect::<Vec<Value>>(),
245 );
246 trace_event
247 })
248 .chain(decoded_payload.transactions.into_iter().map(|s| {
250 let mut trace_event = TraceEvent::default();
251 trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]);
252 trace_event.insert(event_path!("dropped"), true);
253 trace_event
254 })).collect();
255
256 source.events_received.emit(CountByteSize(
257 trace_events.len(),
258 trace_events.estimated_json_encoded_size_of(),
259 ));
260
261 let enriched_events = trace_events
262 .into_iter()
263 .map(|mut trace_event| {
264 if let Some(k) = &api_key {
265 trace_event
266 .metadata_mut()
267 .set_datadog_api_key(Arc::clone(k));
268 }
269 if let Some(lang) = lang {
270 trace_event.insert(event_path!("language_name"), lang.clone());
271 }
272 trace_event.insert(
273 &source.log_schema_source_type_key,
274 Bytes::from("datadog_agent"),
275 );
276 trace_event.insert(event_path!("payload_version"), "v1".to_string());
277 trace_event.insert(&source.log_schema_host_key, hostname.clone());
278 trace_event.insert(event_path!("env"), env.clone());
279 Event::Trace(trace_event)
280 })
281 .collect();
282
283 Ok(enriched_events)
284}
285
286fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap {
287 let mut span = ObjectMap::new();
288 span.insert("service".into(), Value::from(dd_span.service));
289 span.insert("name".into(), Value::from(dd_span.name));
290
291 span.insert("resource".into(), Value::from(dd_span.resource));
292
293 span.insert("trace_id".into(), Value::from(dd_span.trace_id as i64));
297 span.insert("span_id".into(), Value::from(dd_span.span_id as i64));
298 span.insert("parent_id".into(), Value::from(dd_span.parent_id as i64));
299 span.insert(
300 "start".into(),
301 Value::from(Utc.timestamp_nanos(dd_span.start)),
302 );
303 span.insert("duration".into(), Value::from(dd_span.duration));
304 span.insert("error".into(), Value::from(dd_span.error as i64));
305 span.insert("meta".into(), Value::from(convert_tags(dd_span.meta)));
306 span.insert(
307 "metrics".into(),
308 Value::from(
309 dd_span
310 .metrics
311 .into_iter()
312 .map(|(k, v)| {
313 (
314 k.into(),
315 NotNan::new(v).map(Value::Float).unwrap_or(Value::Null),
316 )
317 })
318 .collect::<ObjectMap>(),
319 ),
320 );
321 span.insert("type".into(), Value::from(dd_span.r#type));
322 span.insert(
323 "meta_struct".into(),
324 Value::from(
325 dd_span
326 .meta_struct
327 .into_iter()
328 .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v))))
329 .collect::<ObjectMap>(),
330 ),
331 );
332
333 span
334}
335
336fn convert_tags(original_map: BTreeMap<String, String>) -> ObjectMap {
337 original_map
338 .into_iter()
339 .map(|(k, v)| (k.into(), Value::from(v)))
340 .collect::<ObjectMap>()
341}