vector/sources/datadog_agent/
traces.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use futures::future;
6use http::StatusCode;
7use ordered_float::NotNan;
8use prost::Message;
9use vrl::event_path;
10use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter, Rejection, Reply};
11
12use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _};
13use vector_lib::EstimatedJsonEncodedSizeOf;
14
15use crate::common::http::ErrorMessage;
16use crate::{
17    event::{Event, ObjectMap, TraceEvent, Value},
18    sources::datadog_agent::{
19        ddtrace_proto, handle_request, ApiKeyQueryParams, DatadogAgentSource,
20    },
21    SourceSender,
22};
23
24pub(crate) fn build_warp_filter(
25    acknowledgements: bool,
26    multiple_outputs: bool,
27    out: SourceSender,
28    source: DatadogAgentSource,
29) -> BoxedFilter<(Response,)> {
30    build_trace_filter(acknowledgements, multiple_outputs, out, source)
31        .or(build_stats_filter())
32        .unify()
33        .boxed()
34}
35
36fn build_trace_filter(
37    acknowledgements: bool,
38    multiple_outputs: bool,
39    out: SourceSender,
40    source: DatadogAgentSource,
41) -> BoxedFilter<(Response,)> {
42    warp::post()
43        .and(path!("api" / "v0.2" / "traces" / ..))
44        .and(warp::path::full())
45        .and(warp::header::optional::<String>("content-encoding"))
46        .and(warp::header::optional::<String>("dd-api-key"))
47        .and(warp::header::optional::<String>(
48            "X-Datadog-Reported-Languages",
49        ))
50        .and(warp::query::<ApiKeyQueryParams>())
51        .and(warp::body::bytes())
52        .and_then(
53            move |path: FullPath,
54                  encoding_header: Option<String>,
55                  api_token: Option<String>,
56                  reported_language: Option<String>,
57                  query_params: ApiKeyQueryParams,
58                  body: Bytes| {
59                let events = source
60                    .decode(&encoding_header, body, path.as_str())
61                    .and_then(|body| {
62                        handle_dd_trace_payload(
63                            body,
64                            source.api_key_extractor.extract(
65                                path.as_str(),
66                                api_token,
67                                query_params.dd_api_key,
68                            ),
69                            reported_language.as_ref(),
70                            &source,
71                        )
72                        .map_err(|error| {
73                            ErrorMessage::new(
74                                StatusCode::UNPROCESSABLE_ENTITY,
75                                format!("Error decoding Datadog traces: {error:?}"),
76                            )
77                        })
78                    });
79                let output = multiple_outputs.then_some(super::TRACES);
80                handle_request(events, acknowledgements, out.clone(), output)
81            },
82        )
83        .boxed()
84}
85
86fn build_stats_filter() -> BoxedFilter<(Response,)> {
87    warp::post()
88        .and(path!("api" / "v0.2" / "stats" / ..))
89        .and_then(|| {
90            // APM stats are discarded on purpose, they will be computed in the `datadog_traces` sink
91            // thus we simply reply with a 200/OK response.
92            let response: Result<Response, Rejection> = Ok(warp::reply().into_response());
93            future::ready(response)
94        })
95        .boxed()
96}
97
98fn handle_dd_trace_payload(
99    frame: Bytes,
100    api_key: Option<Arc<str>>,
101    lang: Option<&String>,
102    source: &DatadogAgentSource,
103) -> crate::Result<Vec<Event>> {
104    let decoded_payload = ddtrace_proto::TracePayload::decode(frame)?;
105    if decoded_payload.tracer_payloads.is_empty() {
106        debug!("Older trace payload decoded.");
107        handle_dd_trace_payload_v0(decoded_payload, api_key, lang, source)
108    } else {
109        debug!("Newer trace payload decoded.");
110        handle_dd_trace_payload_v1(decoded_payload, api_key, source)
111    }
112}
113
114/// Decode Datadog newer protobuf schema
115fn handle_dd_trace_payload_v1(
116    decoded_payload: ddtrace_proto::TracePayload,
117    api_key: Option<Arc<str>>,
118    source: &DatadogAgentSource,
119) -> crate::Result<Vec<Event>> {
120    let env = decoded_payload.env;
121    let hostname = decoded_payload.host_name;
122    let agent_version = decoded_payload.agent_version;
123    let target_tps = decoded_payload.target_tps;
124    let error_tps = decoded_payload.error_tps;
125    let tags = convert_tags(decoded_payload.tags);
126
127    let trace_events: Vec<TraceEvent> = decoded_payload
128        .tracer_payloads
129        .into_iter()
130        .flat_map(convert_dd_tracer_payload)
131        .collect();
132
133    source.events_received.emit(CountByteSize(
134        trace_events.len(),
135        trace_events.estimated_json_encoded_size_of(),
136    ));
137
138    let enriched_events = trace_events
139        .into_iter()
140        .map(|mut trace_event| {
141            if let Some(k) = &api_key {
142                trace_event
143                    .metadata_mut()
144                    .set_datadog_api_key(Arc::clone(k));
145            }
146            trace_event.insert(
147                &source.log_schema_source_type_key,
148                Bytes::from("datadog_agent"),
149            );
150            trace_event.insert(event_path!("payload_version"), "v2".to_string());
151            trace_event.insert(&source.log_schema_host_key, hostname.clone());
152            trace_event.insert(event_path!("env"), env.clone());
153            trace_event.insert(event_path!("agent_version"), agent_version.clone());
154            trace_event.insert(
155                event_path!("target_tps"),
156                Value::Float(NotNan::new(target_tps).expect("target_tps cannot be Nan")),
157            );
158            trace_event.insert(
159                event_path!("error_tps"),
160                Value::Float(NotNan::new(error_tps).expect("error_tps cannot be Nan")),
161            );
162            if let Some(Value::Object(span_tags)) = trace_event.get_mut(event_path!("tags")) {
163                span_tags.extend(tags.clone());
164            } else {
165                trace_event.insert(event_path!("tags"), Value::from(tags.clone()));
166            }
167            Event::Trace(trace_event)
168        })
169        .collect();
170    Ok(enriched_events)
171}
172
173fn convert_dd_tracer_payload(payload: ddtrace_proto::TracerPayload) -> Vec<TraceEvent> {
174    let tags = convert_tags(payload.tags);
175    payload
176        .chunks
177        .into_iter()
178        .map(|trace| {
179            let mut trace_event = TraceEvent::default();
180            trace_event.insert(event_path!("priority"), trace.priority as i64);
181            trace_event.insert(event_path!("origin"), trace.origin);
182            trace_event.insert(event_path!("dropped"), trace.dropped_trace);
183            let mut trace_tags = convert_tags(trace.tags);
184            trace_tags.extend(tags.clone());
185            trace_event.insert(event_path!("tags"), Value::from(trace_tags));
186
187            trace_event.insert(
188                event_path!("spans"),
189                trace
190                    .spans
191                    .into_iter()
192                    .map(|s| Value::from(convert_span(s)))
193                    .collect::<Vec<Value>>(),
194            );
195
196            trace_event.insert(event_path!("container_id"), payload.container_id.clone());
197            trace_event.insert(event_path!("language_name"), payload.language_name.clone());
198            trace_event.insert(
199                event_path!("language_version"),
200                payload.language_version.clone(),
201            );
202            trace_event.insert(
203                event_path!("tracer_version"),
204                payload.tracer_version.clone(),
205            );
206            trace_event.insert(event_path!("runtime_id"), payload.runtime_id.clone());
207            trace_event.insert(event_path!("app_version"), payload.app_version.clone());
208            trace_event
209        })
210        .collect()
211}
212
213// Decode Datadog older protobuf schema
214fn handle_dd_trace_payload_v0(
215    decoded_payload: ddtrace_proto::TracePayload,
216    api_key: Option<Arc<str>>,
217    lang: Option<&String>,
218    source: &DatadogAgentSource,
219) -> crate::Result<Vec<Event>> {
220    let env = decoded_payload.env;
221    let hostname = decoded_payload.host_name;
222
223    let trace_events: Vec<TraceEvent> =
224    // Each traces is mapped to one event...
225    decoded_payload
226        .traces
227        .into_iter()
228        .map(|dd_trace| {
229            let mut trace_event = TraceEvent::default();
230
231            // TODO trace_id is being forced into an i64 but
232            // the incoming payload is u64. This is a bug and needs to be fixed per:
233            // https://github.com/vectordotdev/vector/issues/14687
234            trace_event.insert(event_path!("trace_id"), dd_trace.trace_id as i64);
235            trace_event.insert(event_path!("start_time"), Utc.timestamp_nanos(dd_trace.start_time));
236            trace_event.insert(event_path!("end_time"), Utc.timestamp_nanos(dd_trace.end_time));
237            trace_event.insert(
238                event_path!("spans"),
239                dd_trace
240                    .spans
241                    .into_iter()
242                    .map(|s| Value::from(convert_span(s)))
243                    .collect::<Vec<Value>>(),
244            );
245            trace_event
246        })
247        //... and each APM event is also mapped into its own event
248        .chain(decoded_payload.transactions.into_iter().map(|s| {
249            let mut trace_event = TraceEvent::default();
250            trace_event.insert(event_path!("spans"), vec![Value::from(convert_span(s))]);
251            trace_event.insert(event_path!("dropped"), true);
252            trace_event
253        })).collect();
254
255    source.events_received.emit(CountByteSize(
256        trace_events.len(),
257        trace_events.estimated_json_encoded_size_of(),
258    ));
259
260    let enriched_events = trace_events
261        .into_iter()
262        .map(|mut trace_event| {
263            if let Some(k) = &api_key {
264                trace_event
265                    .metadata_mut()
266                    .set_datadog_api_key(Arc::clone(k));
267            }
268            if let Some(lang) = lang {
269                trace_event.insert(event_path!("language_name"), lang.clone());
270            }
271            trace_event.insert(
272                &source.log_schema_source_type_key,
273                Bytes::from("datadog_agent"),
274            );
275            trace_event.insert(event_path!("payload_version"), "v1".to_string());
276            trace_event.insert(&source.log_schema_host_key, hostname.clone());
277            trace_event.insert(event_path!("env"), env.clone());
278            Event::Trace(trace_event)
279        })
280        .collect();
281
282    Ok(enriched_events)
283}
284
285fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap {
286    let mut span = ObjectMap::new();
287    span.insert("service".into(), Value::from(dd_span.service));
288    span.insert("name".into(), Value::from(dd_span.name));
289
290    span.insert("resource".into(), Value::from(dd_span.resource));
291
292    // TODO trace_id, span_id and parent_id are being forced into an i64 but
293    // the incoming payload is u64. This is a bug and needs to be fixed per:
294    // https://github.com/vectordotdev/vector/issues/14687
295    span.insert("trace_id".into(), Value::from(dd_span.trace_id as i64));
296    span.insert("span_id".into(), Value::from(dd_span.span_id as i64));
297    span.insert("parent_id".into(), Value::from(dd_span.parent_id as i64));
298    span.insert(
299        "start".into(),
300        Value::from(Utc.timestamp_nanos(dd_span.start)),
301    );
302    span.insert("duration".into(), Value::from(dd_span.duration));
303    span.insert("error".into(), Value::from(dd_span.error as i64));
304    span.insert("meta".into(), Value::from(convert_tags(dd_span.meta)));
305    span.insert(
306        "metrics".into(),
307        Value::from(
308            dd_span
309                .metrics
310                .into_iter()
311                .map(|(k, v)| {
312                    (
313                        k.into(),
314                        NotNan::new(v).map(Value::Float).unwrap_or(Value::Null),
315                    )
316                })
317                .collect::<ObjectMap>(),
318        ),
319    );
320    span.insert("type".into(), Value::from(dd_span.r#type));
321    span.insert(
322        "meta_struct".into(),
323        Value::from(
324            dd_span
325                .meta_struct
326                .into_iter()
327                .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v))))
328                .collect::<ObjectMap>(),
329        ),
330    );
331
332    span
333}
334
335fn convert_tags(original_map: BTreeMap<String, String>) -> ObjectMap {
336    original_map
337        .into_iter()
338        .map(|(k, v)| (k.into(), Value::from(v)))
339        .collect::<ObjectMap>()
340}