vector/sources/datadog_agent/
traces.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use futures::future;
6use http::StatusCode;
7use ordered_float::NotNan;
8use prost::Message;
9use vector_lib::{
10    EstimatedJsonEncodedSizeOf,
11    internal_event::{CountByteSize, InternalEventHandle as _},
12};
13use vrl::event_path;
14use warp::{Filter, Rejection, Reply, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler, ddtrace_proto};
17use crate::{
18    common::http::ErrorMessage,
19    event::{Event, ObjectMap, TraceEvent, Value},
20};
21
22pub(super) fn build_warp_filter(
23    handler: RequestHandler,
24    source: DatadogAgentSource,
25) -> BoxedFilter<(Response,)> {
26    build_trace_filter(handler, source)
27        .or(build_stats_filter())
28        .unify()
29        .boxed()
30}
31
32fn build_trace_filter(
33    handler: RequestHandler,
34    source: DatadogAgentSource,
35) -> BoxedFilter<(Response,)> {
36    warp::post()
37        .and(path!("api" / "v0.2" / "traces" / ..))
38        .and(warp::path::full())
39        .and(warp::header::optional::<String>("content-encoding"))
40        .and(warp::header::optional::<String>("dd-api-key"))
41        .and(warp::header::optional::<String>(
42            "X-Datadog-Reported-Languages",
43        ))
44        .and(warp::query::<ApiKeyQueryParams>())
45        .and(warp::body::bytes())
46        .and_then({
47            move |path: FullPath,
48                  encoding_header: Option<String>,
49                  api_token: Option<String>,
50                  reported_language: Option<String>,
51                  query_params: ApiKeyQueryParams,
52                  body: Bytes| {
53                let events = source
54                    .decode(&encoding_header, body, path.as_str())
55                    .and_then(|body| {
56                        handle_dd_trace_payload(
57                            body,
58                            source.api_key_extractor.extract(
59                                path.as_str(),
60                                api_token,
61                                query_params.dd_api_key,
62                            ),
63                            reported_language.as_ref(),
64                            &source,
65                        )
66                        .map_err(|error| {
67                            ErrorMessage::new(
68                                StatusCode::UNPROCESSABLE_ENTITY,
69                                format!("Error decoding Datadog traces: {error:?}"),
70                            )
71                        })
72                    });
73                handler.clone().handle_request(events, super::TRACES)
74            }
75        })
76        .boxed()
77}
78
79fn build_stats_filter() -> BoxedFilter<(Response,)> {
80    warp::post()
81        .and(path!("api" / "v0.2" / "stats" / ..))
82        .and_then(|| {
83            // APM stats are discarded on purpose, they will be computed in the `datadog_traces` sink
84            // thus we simply reply with a 200/OK response.
85            let response: Result<Response, Rejection> = Ok(warp::reply().into_response());
86            future::ready(response)
87        })
88        .boxed()
89}
90
91fn handle_dd_trace_payload(
92    frame: Bytes,
93    api_key: Option<Arc<str>>,
94    lang: Option<&String>,
95    source: &DatadogAgentSource,
96) -> crate::Result<Vec<Event>> {
97    let decoded_payload = ddtrace_proto::TracePayload::decode(frame)?;
98    if decoded_payload.tracer_payloads.is_empty() {
99        debug!("Older trace payload decoded.");
100        handle_dd_trace_payload_v0(decoded_payload, api_key, lang, source)
101    } else {
102        debug!("Newer trace payload decoded.");
103        handle_dd_trace_payload_v1(decoded_payload, api_key, source)
104    }
105}
106
107/// Decode Datadog newer protobuf schema
108fn handle_dd_trace_payload_v1(
109    decoded_payload: ddtrace_proto::TracePayload,
110    api_key: Option<Arc<str>>,
111    source: &DatadogAgentSource,
112) -> crate::Result<Vec<Event>> {
113    let env = decoded_payload.env;
114    let hostname = decoded_payload.host_name;
115    let agent_version = decoded_payload.agent_version;
116    let target_tps = decoded_payload.target_tps;
117    let error_tps = decoded_payload.error_tps;
118    let tags = convert_tags(decoded_payload.tags);
119
120    let trace_events: Vec<TraceEvent> = decoded_payload
121        .tracer_payloads
122        .into_iter()
123        .flat_map(convert_dd_tracer_payload)
124        .collect();
125
126    source.events_received.emit(CountByteSize(
127        trace_events.len(),
128        trace_events.estimated_json_encoded_size_of(),
129    ));
130
131    let enriched_events = trace_events
132        .into_iter()
133        .map(|mut trace_event| {
134            if let Some(k) = &api_key {
135                trace_event
136                    .metadata_mut()
137                    .set_datadog_api_key(Arc::clone(k));
138            }
139            trace_event.insert(
140                &source.log_schema_source_type_key,
141                Bytes::from("datadog_agent"),
142            );
143            trace_event.insert(event_path!("payload_version"), "v2".to_string());
144            trace_event.insert(&source.log_schema_host_key, hostname.clone());
145            trace_event.insert(event_path!("env"), env.clone());
146            trace_event.insert(event_path!("agent_version"), agent_version.clone());
147            trace_event.insert(
148                event_path!("target_tps"),
149                Value::Float(NotNan::new(target_tps).expect("target_tps cannot be Nan")),
150            );
151            trace_event.insert(
152                event_path!("error_tps"),
153                Value::Float(NotNan::new(error_tps).expect("error_tps cannot be Nan")),
154            );
155            if let Some(Value::Object(span_tags)) = trace_event.get_mut(event_path!("tags")) {
156                span_tags.extend(tags.clone());
157            } else {
158                trace_event.insert(event_path!("tags"), Value::from(tags.clone()));
159            }
160            Event::Trace(trace_event)
161        })
162        .collect();
163    Ok(enriched_events)
164}
165
166fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec<TraceEvent> {
167    let tags = convert_tags(payload.tags);
168    payload
169        .chunks
170        .into_iter()
171        .map(|trace| {
172            let mut trace_event = TraceEvent::default();
173            trace_event.insert(event_path!("priority"), trace.priority as i64);
174            trace_event.insert(event_path!("origin"), trace.origin);
175            trace_event.insert(event_path!("dropped"), trace.dropped_trace);
176            let mut trace_tags = convert_tags(trace.tags);
177            trace_tags.extend(tags.clone());
178            trace_event.insert(event_path!("tags"), Value::from(trace_tags));
179
180            trace_event.insert(
181                event_path!("spans"),
182                trace
183                    .spans
184                    .into_iter()
185                    .map(|s| Value::from(convert_span(s)))
186                    .collect::<Vec<Value>>(),
187            );
188
189            trace_event.insert(event_path!("container_id"), payload.container_id.clone());
190            trace_event.insert(event_path!("language_name"), payload.language_name.clone());
191            trace_event.insert(
192                event_path!("language_version"),
193                payload.language_version.clone(),
194            );
195            trace_event.insert(
196                event_path!("tracer_version"),
197                payload.tracer_version.clone(),
198            );
199            trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone());
200            trace_event.insert(event_path!("app_version"), payload.app_version.clone());
201            trace_event
202        })
203        .collect()
204}
205
206// Decode Datadog older protobuf schema
207fn handle_dd_trace_payload_v0(
208    decoded_payload: ddtrace_proto::TracePayload,
209    api_key: Option<Arc<str>>,
210    lang: Option<&String>,
211    source: &DatadogAgentSource,
212) -> crate::Result<Vec<Event>> {
213    let env = decoded_payload.env;
214    let hostname = decoded_payload.host_name;
215
216    let trace_events: Vec<TraceEvent> =
217    // Each traces is mapped to one event...
218    decoded_payload
219        .traces
220        .into_iter()
221        .map(|dd_trace| {
222            let mut trace_event = TraceEvent::default();
223
224            // TODO trace_id is being forced into an i64 but
225            // the incoming payload is u64. This is a bug and needs to be fixed per:
226            // https://github.com/vectordotdev/vector/issues/14687
227            trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64);
228            trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time));
229            trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time));
230            trace_event.insert(
231                event_path!("spans"),
232                dd_trace
233                    .spans
234                    .into_iter()
235                    .map(|s| Value::from(convert_span(s)))
236                    .collect::<Vec<Value>>(),
237            );
238            trace_event
239        })
240        //... and each APM event is also mapped into its own event
241        .chain(decoded_payload.transactions.into_iter().map(|s| {
242            let mut trace_event = TraceEvent::default();
243            trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]);
244            trace_event.insert(event_path!("dropped"), true);
245            trace_event
246        })).collect();
247
248    source.events_received.emit(CountByteSize(
249        trace_events.len(),
250        trace_events.estimated_json_encoded_size_of(),
251    ));
252
253    let enriched_events = trace_events
254        .into_iter()
255        .map(|mut trace_event| {
256            if let Some(k) = &api_key {
257                trace_event
258                    .metadata_mut()
259                    .set_datadog_api_key(Arc::clone(k));
260            }
261            if let Some(lang) = lang {
262                trace_event.insert(event_path!("language_name"), lang.clone());
263            }
264            trace_event.insert(
265                &source.log_schema_source_type_key,
266                Bytes::from("datadog_agent"),
267            );
268            trace_event.insert(event_path!("payload_version"), "v1".to_string());
269            trace_event.insert(&source.log_schema_host_key, hostname.clone());
270            trace_event.insert(event_path!("env"), env.clone());
271            Event::Trace(trace_event)
272        })
273        .collect();
274
275    Ok(enriched_events)
276}
277
278fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap {
279    let mut span = ObjectMap::new();
280    span.insert("service".into(), Value::from(dd_span.service));
281    span.insert("name".into(), Value::from(dd_span.name));
282
283    span.insert("resource".into(), Value::from(dd_span.resource));
284
285    // TODO trace_id, span_id and parent_id are being forced into an i64 but
286    // the incoming payload is u64. This is a bug and needs to be fixed per:
287    // https://github.com/vectordotdev/vector/issues/14687
288    span.insert("trace_id".into(), Value::from(dd_span.trace_id as i64));
289    span.insert("span_id".into(), Value::from(dd_span.span_id as i64));
290    span.insert("parent_id".into(), Value::from(dd_span.parent_id as i64));
291    span.insert(
292        "start".into(),
293        Value::from(Utc.timestamp_nanos(dd_span.start)),
294    );
295    span.insert("duration".into(), Value::from(dd_span.duration));
296    span.insert("error".into(), Value::from(dd_span.error as i64));
297    span.insert("meta".into(), Value::from(convert_tags(dd_span.meta)));
298    span.insert(
299        "metrics".into(),
300        Value::from(
301            dd_span
302                .metrics
303                .into_iter()
304                .map(|(k, v)| {
305                    (
306                        k.into(),
307                        NotNan::new(v).map(Value::Float).unwrap_or(Value::Null),
308                    )
309                })
310                .collect::<ObjectMap>(),
311        ),
312    );
313    span.insert("type".into(), Value::from(dd_span.r#type));
314    span.insert(
315        "meta_struct".into(),
316        Value::from(
317            dd_span
318                .meta_struct
319                .into_iter()
320                .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v))))
321                .collect::<ObjectMap>(),
322        ),
323    );
324
325    span
326}
327
328fn convert_tags(original_map: BTreeMap<String, String>) -> ObjectMap {
329    original_map
330        .into_iter()
331        .map(|(k, v)| (k.into(), Value::from(v)))
332        .collect::<ObjectMap>()
333}