vector/sources/datadog_agent/
traces.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use futures::future;
6use http::StatusCode;
7use ordered_float::NotNan;
8use prost::Message;
9use vrl::event_path;
10use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter, Rejection, Reply};
11
12use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _};
13use vector_lib::EstimatedJsonEncodedSizeOf;
14
15use crate::common::http::ErrorMessage;
16use crate::{
17 event::{Event, ObjectMap, TraceEvent, Value},
18 sources::datadog_agent::{
19 ddtrace_proto, handle_request, ApiKeyQueryParams, DatadogAgentSource,
20 },
21 SourceSender,
22};
23
24pub(crate) fn build_warp_filter(
25 acknowledgements: bool,
26 multiple_outputs: bool,
27 out: SourceSender,
28 source: DatadogAgentSource,
29) -> BoxedFilter<(Response,)> {
30 build_trace_filter(acknowledgements, multiple_outputs, out, source)
31 .or(build_stats_filter())
32 .unify()
33 .boxed()
34}
35
36fn build_trace_filter(
37 acknowledgements: bool,
38 multiple_outputs: bool,
39 out: SourceSender,
40 source: DatadogAgentSource,
41) -> BoxedFilter<(Response,)> {
42 warp::post()
43 .and(path!("api" / "v0.2" / "traces" / ..))
44 .and(warp::path::full())
45 .and(warp::header::optional::<String>("content-encoding"))
46 .and(warp::header::optional::<String>("dd-api-key"))
47 .and(warp::header::optional::<String>(
48 "X-Datadog-Reported-Languages",
49 ))
50 .and(warp::query::<ApiKeyQueryParams>())
51 .and(warp::body::bytes())
52 .and_then(
53 move |path: FullPath,
54 encoding_header: Option<String>,
55 api_token: Option<String>,
56 reported_language: Option<String>,
57 query_params: ApiKeyQueryParams,
58 body: Bytes| {
59 let events = source
60 .decode(&encoding_header, body, path.as_str())
61 .and_then(|body| {
62 handle_dd_trace_payload(
63 body,
64 source.api_key_extractor.extract(
65 path.as_str(),
66 api_token,
67 query_params.dd_api_key,
68 ),
69 reported_language.as_ref(),
70 &source,
71 )
72 .map_err(|error| {
73 ErrorMessage::new(
74 StatusCode::UNPROCESSABLE_ENTITY,
75 format!("Error decoding Datadog traces: {error:?}"),
76 )
77 })
78 });
79 let output = multiple_outputs.then_some(super::TRACES);
80 handle_request(events, acknowledgements, out.clone(), output)
81 },
82 )
83 .boxed()
84}
85
86fn build_stats_filter() -> BoxedFilter<(Response,)> {
87 warp::post()
88 .and(path!("api" / "v0.2" / "stats" / ..))
89 .and_then(|| {
90 let response: Result<Response, Rejection> = Ok(warp::reply().into_response());
93 future::ready(response)
94 })
95 .boxed()
96}
97
98fn handle_dd_trace_payload(
99 frame: Bytes,
100 api_key: Option<Arc<str>>,
101 lang: Option<&String>,
102 source: &DatadogAgentSource,
103) -> crate::Result<Vec<Event>> {
104 let decoded_payload = ddtrace_proto::TracePayload::decode(frame)?;
105 if decoded_payload.tracer_payloads.is_empty() {
106 debug!("Older trace payload decoded.");
107 handle_dd_trace_payload_v0(decoded_payload, api_key, lang, source)
108 } else {
109 debug!("Newer trace payload decoded.");
110 handle_dd_trace_payload_v1(decoded_payload, api_key, source)
111 }
112}
113
114fn handle_dd_trace_payload_v1(
116 decoded_payload: ddtrace_proto::TracePayload,
117 api_key: Option<Arc<str>>,
118 source: &DatadogAgentSource,
119) -> crate::Result<Vec<Event>> {
120 let env = decoded_payload.env;
121 let hostname = decoded_payload.host_name;
122 let agent_version = decoded_payload.agent_version;
123 let target_tps = decoded_payload.target_tps;
124 let error_tps = decoded_payload.error_tps;
125 let tags = convert_tags(decoded_payload.tags);
126
127 let trace_events: Vec<TraceEvent> = decoded_payload
128 .tracer_payloads
129 .into_iter()
130 .flat_map(convert_dd_tracer_payload)
131 .collect();
132
133 source.events_received.emit(CountByteSize(
134 trace_events.len(),
135 trace_events.estimated_json_encoded_size_of(),
136 ));
137
138 let enriched_events = trace_events
139 .into_iter()
140 .map(|mut trace_event| {
141 if let Some(k) = &api_key {
142 trace_event
143 .metadata_mut()
144 .set_datadog_api_key(Arc::clone(k));
145 }
146 trace_event.insert(
147 &source.log_schema_source_type_key,
148 Bytes::from("datadog_agent"),
149 );
150 trace_event.insert(event_path!("payload_version"), "v2".to_string());
151 trace_event.insert(&source.log_schema_host_key, hostname.clone());
152 trace_event.insert(event_path!("env"), env.clone());
153 trace_event.insert(event_path!("agent_version"), agent_version.clone());
154 trace_event.insert(
155 event_path!("target_tps"),
156 Value::Float(NotNan::new(target_tps).expect("target_tps cannot be Nan")),
157 );
158 trace_event.insert(
159 event_path!("error_tps"),
160 Value::Float(NotNan::new(error_tps).expect("error_tps cannot be Nan")),
161 );
162 if let Some(Value::Object(span_tags)) = trace_event.get_mut(event_path!("tags")) {
163 span_tags.extend(tags.clone());
164 } else {
165 trace_event.insert(event_path!("tags"), Value::from(tags.clone()));
166 }
167 Event::Trace(trace_event)
168 })
169 .collect();
170 Ok(enriched_events)
171}
172
173fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec<TraceEvent> {
174 let tags = convert_tags(payload.tags);
175 payload
176 .chunks
177 .into_iter()
178 .map(|trace| {
179 let mut trace_event = TraceEvent::default();
180 trace_event.insert(event_path!("priority"), trace.priority as i64);
181 trace_event.insert(event_path!("origin"), trace.origin);
182 trace_event.insert(event_path!("dropped"), trace.dropped_trace);
183 let mut trace_tags = convert_tags(trace.tags);
184 trace_tags.extend(tags.clone());
185 trace_event.insert(event_path!("tags"), Value::from(trace_tags));
186
187 trace_event.insert(
188 event_path!("spans"),
189 trace
190 .spans
191 .into_iter()
192 .map(|s| Value::from(convert_span(s)))
193 .collect::<Vec<Value>>(),
194 );
195
196 trace_event.insert(event_path!("container_id"), payload.container_id.clone());
197 trace_event.insert(event_path!("language_name"), payload.language_name.clone());
198 trace_event.insert(
199 event_path!("language_version"),
200 payload.language_version.clone(),
201 );
202 trace_event.insert(
203 event_path!("tracer_version"),
204 payload.tracer_version.clone(),
205 );
206 trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone());
207 trace_event.insert(event_path!("app_version"), payload.app_version.clone());
208 trace_event
209 })
210 .collect()
211}
212
213fn handle_dd_trace_payload_v0(
215 decoded_payload: ddtrace_proto::TracePayload,
216 api_key: Option<Arc<str>>,
217 lang: Option<&String>,
218 source: &DatadogAgentSource,
219) -> crate::Result<Vec<Event>> {
220 let env = decoded_payload.env;
221 let hostname = decoded_payload.host_name;
222
223 let trace_events: Vec<TraceEvent> =
224 decoded_payload
226 .traces
227 .into_iter()
228 .map(|dd_trace| {
229 let mut trace_event = TraceEvent::default();
230
231 trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64);
235 trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time));
236 trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time));
237 trace_event.insert(
238 event_path!("spans"),
239 dd_trace
240 .spans
241 .into_iter()
242 .map(|s| Value::from(convert_span(s)))
243 .collect::<Vec<Value>>(),
244 );
245 trace_event
246 })
247 .chain(decoded_payload.transactions.into_iter().map(|s| {
249 let mut trace_event = TraceEvent::default();
250 trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]);
251 trace_event.insert(event_path!("dropped"), true);
252 trace_event
253 })).collect();
254
255 source.events_received.emit(CountByteSize(
256 trace_events.len(),
257 trace_events.estimated_json_encoded_size_of(),
258 ));
259
260 let enriched_events = trace_events
261 .into_iter()
262 .map(|mut trace_event| {
263 if let Some(k) = &api_key {
264 trace_event
265 .metadata_mut()
266 .set_datadog_api_key(Arc::clone(k));
267 }
268 if let Some(lang) = lang {
269 trace_event.insert(event_path!("language_name"), lang.clone());
270 }
271 trace_event.insert(
272 &source.log_schema_source_type_key,
273 Bytes::from("datadog_agent"),
274 );
275 trace_event.insert(event_path!("payload_version"), "v1".to_string());
276 trace_event.insert(&source.log_schema_host_key, hostname.clone());
277 trace_event.insert(event_path!("env"), env.clone());
278 Event::Trace(trace_event)
279 })
280 .collect();
281
282 Ok(enriched_events)
283}
284
285fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap {
286 let mut span = ObjectMap::new();
287 span.insert("service".into(), Value::from(dd_span.service));
288 span.insert("name".into(), Value::from(dd_span.name));
289
290 span.insert("resource".into(), Value::from(dd_span.resource));
291
292 span.insert("trace_id".into(), Value::from(dd_span.trace_id as i64));
296 span.insert("span_id".into(), Value::from(dd_span.span_id as i64));
297 span.insert("parent_id".into(), Value::from(dd_span.parent_id as i64));
298 span.insert(
299 "start".into(),
300 Value::from(Utc.timestamp_nanos(dd_span.start)),
301 );
302 span.insert("duration".into(), Value::from(dd_span.duration));
303 span.insert("error".into(), Value::from(dd_span.error as i64));
304 span.insert("meta".into(), Value::from(convert_tags(dd_span.meta)));
305 span.insert(
306 "metrics".into(),
307 Value::from(
308 dd_span
309 .metrics
310 .into_iter()
311 .map(|(k, v)| {
312 (
313 k.into(),
314 NotNan::new(v).map(Value::Float).unwrap_or(Value::Null),
315 )
316 })
317 .collect::<ObjectMap>(),
318 ),
319 );
320 span.insert("type".into(), Value::from(dd_span.r#type));
321 span.insert(
322 "meta_struct".into(),
323 Value::from(
324 dd_span
325 .meta_struct
326 .into_iter()
327 .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v))))
328 .collect::<ObjectMap>(),
329 ),
330 );
331
332 span
333}
334
335fn convert_tags(original_map: BTreeMap<String, String>) -> ObjectMap {
336 original_map
337 .into_iter()
338 .map(|(k, v)| (k.into(), Value::from(v)))
339 .collect::<ObjectMap>()
340}