vector/sources/datadog_agent/
traces.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use futures::future;
6use http::StatusCode;
7use ordered_float::NotNan;
8use prost::Message;
9use vector_lib::{
10    EstimatedJsonEncodedSizeOf,
11    internal_event::{CountByteSize, InternalEventHandle as _},
12};
13use vrl::event_path;
14use warp::{Filter, Rejection, Reply, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use crate::{
17    SourceSender,
18    common::http::ErrorMessage,
19    event::{Event, ObjectMap, TraceEvent, Value},
20    sources::datadog_agent::{
21        ApiKeyQueryParams, DatadogAgentSource, ddtrace_proto, handle_request,
22    },
23};
24
25pub(crate) fn build_warp_filter(
26    acknowledgements: bool,
27    multiple_outputs: bool,
28    out: SourceSender,
29    source: DatadogAgentSource,
30) -> BoxedFilter<(Response,)> {
31    build_trace_filter(acknowledgements, multiple_outputs, out, source)
32        .or(build_stats_filter())
33        .unify()
34        .boxed()
35}
36
37fn build_trace_filter(
38    acknowledgements: bool,
39    multiple_outputs: bool,
40    out: SourceSender,
41    source: DatadogAgentSource,
42) -> BoxedFilter<(Response,)> {
43    warp::post()
44        .and(path!("api" / "v0.2" / "traces" / ..))
45        .and(warp::path::full())
46        .and(warp::header::optional::<String>("content-encoding"))
47        .and(warp::header::optional::<String>("dd-api-key"))
48        .and(warp::header::optional::<String>(
49            "X-Datadog-Reported-Languages",
50        ))
51        .and(warp::query::<ApiKeyQueryParams>())
52        .and(warp::body::bytes())
53        .and_then(
54            move |path: FullPath,
55                  encoding_header: Option<String>,
56                  api_token: Option<String>,
57                  reported_language: Option<String>,
58                  query_params: ApiKeyQueryParams,
59                  body: Bytes| {
60                let events = source
61                    .decode(&encoding_header, body, path.as_str())
62                    .and_then(|body| {
63                        handle_dd_trace_payload(
64                            body,
65                            source.api_key_extractor.extract(
66                                path.as_str(),
67                                api_token,
68                                query_params.dd_api_key,
69                            ),
70                            reported_language.as_ref(),
71                            &source,
72                        )
73                        .map_err(|error| {
74                            ErrorMessage::new(
75                                StatusCode::UNPROCESSABLE_ENTITY,
76                                format!("Error decoding Datadog traces: {error:?}"),
77                            )
78                        })
79                    });
80                let output = multiple_outputs.then_some(super::TRACES);
81                handle_request(events, acknowledgements, out.clone(), output)
82            },
83        )
84        .boxed()
85}
86
87fn build_stats_filter() -> BoxedFilter<(Response,)> {
88    warp::post()
89        .and(path!("api" / "v0.2" / "stats" / ..))
90        .and_then(|| {
91            // APM stats are discarded on purpose, they will be computed in the `datadog_traces` sink
92            // thus we simply reply with a 200/OK response.
93            let response: Result<Response, Rejection> = Ok(warp::reply().into_response());
94            future::ready(response)
95        })
96        .boxed()
97}
98
99fn handle_dd_trace_payload(
100    frame: Bytes,
101    api_key: Option<Arc<str>>,
102    lang: Option<&String>,
103    source: &DatadogAgentSource,
104) -> crate::Result<Vec<Event>> {
105    let decoded_payload = ddtrace_proto::TracePayload::decode(frame)?;
106    if decoded_payload.tracer_payloads.is_empty() {
107        debug!("Older trace payload decoded.");
108        handle_dd_trace_payload_v0(decoded_payload, api_key, lang, source)
109    } else {
110        debug!("Newer trace payload decoded.");
111        handle_dd_trace_payload_v1(decoded_payload, api_key, source)
112    }
113}
114
115/// Decode Datadog newer protobuf schema
116fn handle_dd_trace_payload_v1(
117    decoded_payload: ddtrace_proto::TracePayload,
118    api_key: Option<Arc<str>>,
119    source: &DatadogAgentSource,
120) -> crate::Result<Vec<Event>> {
121    let env = decoded_payload.env;
122    let hostname = decoded_payload.host_name;
123    let agent_version = decoded_payload.agent_version;
124    let target_tps = decoded_payload.target_tps;
125    let error_tps = decoded_payload.error_tps;
126    let tags = convert_tags(decoded_payload.tags);
127
128    let trace_events: Vec<TraceEvent> = decoded_payload
129        .tracer_payloads
130        .into_iter()
131        .flat_map(convert_dd_tracer_payload)
132        .collect();
133
134    source.events_received.emit(CountByteSize(
135        trace_events.len(),
136        trace_events.estimated_json_encoded_size_of(),
137    ));
138
139    let enriched_events = trace_events
140        .into_iter()
141        .map(|mut trace_event| {
142            if let Some(k) = &api_key {
143                trace_event
144                    .metadata_mut()
145                    .set_datadog_api_key(Arc::clone(k));
146            }
147            trace_event.insert(
148                &source.log_schema_source_type_key,
149                Bytes::from("datadog_agent"),
150            );
151            trace_event.insert(event_path!("payload_version"), "v2".to_string());
152            trace_event.insert(&source.log_schema_host_key, hostname.clone());
153            trace_event.insert(event_path!("env"), env.clone());
154            trace_event.insert(event_path!("agent_version"), agent_version.clone());
155            trace_event.insert(
156                event_path!("target_tps"),
157                Value::Float(NotNan::new(target_tps).expect("target_tps cannot be Nan")),
158            );
159            trace_event.insert(
160                event_path!("error_tps"),
161                Value::Float(NotNan::new(error_tps).expect("error_tps cannot be Nan")),
162            );
163            if let Some(Value::Object(span_tags)) = trace_event.get_mut(event_path!("tags")) {
164                span_tags.extend(tags.clone());
165            } else {
166                trace_event.insert(event_path!("tags"), Value::from(tags.clone()));
167            }
168            Event::Trace(trace_event)
169        })
170        .collect();
171    Ok(enriched_events)
172}
173
174fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec<TraceEvent> {
175    let tags = convert_tags(payload.tags);
176    payload
177        .chunks
178        .into_iter()
179        .map(|trace| {
180            let mut trace_event = TraceEvent::default();
181            trace_event.insert(event_path!("priority"), trace.priority as i64);
182            trace_event.insert(event_path!("origin"), trace.origin);
183            trace_event.insert(event_path!("dropped"), trace.dropped_trace);
184            let mut trace_tags = convert_tags(trace.tags);
185            trace_tags.extend(tags.clone());
186            trace_event.insert(event_path!("tags"), Value::from(trace_tags));
187
188            trace_event.insert(
189                event_path!("spans"),
190                trace
191                    .spans
192                    .into_iter()
193                    .map(|s| Value::from(convert_span(s)))
194                    .collect::<Vec<Value>>(),
195            );
196
197            trace_event.insert(event_path!("container_id"), payload.container_id.clone());
198            trace_event.insert(event_path!("language_name"), payload.language_name.clone());
199            trace_event.insert(
200                event_path!("language_version"),
201                payload.language_version.clone(),
202            );
203            trace_event.insert(
204                event_path!("tracer_version"),
205                payload.tracer_version.clone(),
206            );
207            trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone());
208            trace_event.insert(event_path!("app_version"), payload.app_version.clone());
209            trace_event
210        })
211        .collect()
212}
213
214// Decode Datadog older protobuf schema
215fn handle_dd_trace_payload_v0(
216    decoded_payload: ddtrace_proto::TracePayload,
217    api_key: Option<Arc<str>>,
218    lang: Option<&String>,
219    source: &DatadogAgentSource,
220) -> crate::Result<Vec<Event>> {
221    let env = decoded_payload.env;
222    let hostname = decoded_payload.host_name;
223
224    let trace_events: Vec<TraceEvent> =
225    // Each traces is mapped to one event...
226    decoded_payload
227        .traces
228        .into_iter()
229        .map(|dd_trace| {
230            let mut trace_event = TraceEvent::default();
231
232            // TODO trace_id is being forced into an i64 but
233            // the incoming payload is u64. This is a bug and needs to be fixed per:
234            // https://github.com/vectordotdev/vector/issues/14687
235            trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64);
236            trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time));
237            trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time));
238            trace_event.insert(
239                event_path!("spans"),
240                dd_trace
241                    .spans
242                    .into_iter()
243                    .map(|s| Value::from(convert_span(s)))
244                    .collect::<Vec<Value>>(),
245            );
246            trace_event
247        })
248        //... and each APM event is also mapped into its own event
249        .chain(decoded_payload.transactions.into_iter().map(|s| {
250            let mut trace_event = TraceEvent::default();
251            trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]);
252            trace_event.insert(event_path!("dropped"), true);
253            trace_event
254        })).collect();
255
256    source.events_received.emit(CountByteSize(
257        trace_events.len(),
258        trace_events.estimated_json_encoded_size_of(),
259    ));
260
261    let enriched_events = trace_events
262        .into_iter()
263        .map(|mut trace_event| {
264            if let Some(k) = &api_key {
265                trace_event
266                    .metadata_mut()
267                    .set_datadog_api_key(Arc::clone(k));
268            }
269            if let Some(lang) = lang {
270                trace_event.insert(event_path!("language_name"), lang.clone());
271            }
272            trace_event.insert(
273                &source.log_schema_source_type_key,
274                Bytes::from("datadog_agent"),
275            );
276            trace_event.insert(event_path!("payload_version"), "v1".to_string());
277            trace_event.insert(&source.log_schema_host_key, hostname.clone());
278            trace_event.insert(event_path!("env"), env.clone());
279            Event::Trace(trace_event)
280        })
281        .collect();
282
283    Ok(enriched_events)
284}
285
286fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap {
287    let mut span = ObjectMap::new();
288    span.insert("service".into(), Value::from(dd_span.service));
289    span.insert("name".into(), Value::from(dd_span.name));
290
291    span.insert("resource".into(), Value::from(dd_span.resource));
292
293    // TODO trace_id, span_id and parent_id are being forced into an i64 but
294    // the incoming payload is u64. This is a bug and needs to be fixed per:
295    // https://github.com/vectordotdev/vector/issues/14687
296    span.insert("trace_id".into(), Value::from(dd_span.trace_id as i64));
297    span.insert("span_id".into(), Value::from(dd_span.span_id as i64));
298    span.insert("parent_id".into(), Value::from(dd_span.parent_id as i64));
299    span.insert(
300        "start".into(),
301        Value::from(Utc.timestamp_nanos(dd_span.start)),
302    );
303    span.insert("duration".into(), Value::from(dd_span.duration));
304    span.insert("error".into(), Value::from(dd_span.error as i64));
305    span.insert("meta".into(), Value::from(convert_tags(dd_span.meta)));
306    span.insert(
307        "metrics".into(),
308        Value::from(
309            dd_span
310                .metrics
311                .into_iter()
312                .map(|(k, v)| {
313                    (
314                        k.into(),
315                        NotNan::new(v).map(Value::Float).unwrap_or(Value::Null),
316                    )
317                })
318                .collect::<ObjectMap>(),
319        ),
320    );
321    span.insert("type".into(), Value::from(dd_span.r#type));
322    span.insert(
323        "meta_struct".into(),
324        Value::from(
325            dd_span
326                .meta_struct
327                .into_iter()
328                .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v))))
329                .collect::<ObjectMap>(),
330        ),
331    );
332
333    span
334}
335
336fn convert_tags(original_map: BTreeMap<String, String>) -> ObjectMap {
337    original_map
338        .into_iter()
339        .map(|(k, v)| (k.into(), Value::from(v)))
340        .collect::<ObjectMap>()
341}