vector/sinks/datadog/traces/
request_builder.rs

1use std::{
2    collections::BTreeMap,
3    io::Write,
4    num::NonZeroUsize,
5    sync::{Arc, Mutex},
6};
7
8use bytes::Bytes;
9use prost::Message;
10use snafu::Snafu;
11use vector_lib::event::{EventFinalizers, Finalizable};
12use vector_lib::request_metadata::RequestMetadata;
13use vrl::event_path;
14
15use super::{
16    apm_stats::{compute_apm_stats, Aggregator},
17    config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration},
18    dd_proto,
19    service::TraceApiRequest,
20    sink::PartitionKey,
21};
22use crate::{
23    event::{Event, ObjectMap, TraceEvent, Value},
24    sinks::util::{
25        metadata::RequestMetadataBuilder, Compression, Compressor, IncrementalRequestBuilder,
26    },
27};
28
29#[derive(Debug, Snafu)]
30pub enum RequestBuilderError {
31    #[snafu(display(
32        "Building an APM stats request payload failed ({}, {})",
33        message,
34        reason
35    ))]
36    FailedToBuild {
37        message: &'static str,
38        reason: String,
39        dropped_events: u64,
40    },
41
42    #[allow(dead_code)]
43    #[snafu(display("Unsupported endpoint ({})", reason))]
44    UnsupportedEndpoint { reason: String, dropped_events: u64 },
45}
46
47impl RequestBuilderError {
48    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
49    pub fn into_parts(self) -> (&'static str, String, u64) {
50        match self {
51            Self::FailedToBuild {
52                message,
53                reason,
54                dropped_events,
55            } => (message, reason, dropped_events),
56            Self::UnsupportedEndpoint {
57                reason,
58                dropped_events,
59            } => ("unsupported endpoint", reason, dropped_events),
60        }
61    }
62}
63
64pub struct DatadogTracesRequestBuilder {
65    api_key: Arc<str>,
66    endpoint_configuration: DatadogTracesEndpointConfiguration,
67    compression: Compression,
68    max_size: usize,
69    /// Contains the Aggregated stats across a time window.
70    stats_aggregator: Arc<Mutex<Aggregator>>,
71}
72
73impl DatadogTracesRequestBuilder {
74    pub const fn new(
75        api_key: Arc<str>,
76        endpoint_configuration: DatadogTracesEndpointConfiguration,
77        compression: Compression,
78        max_size: usize,
79        stats_aggregator: Arc<Mutex<Aggregator>>,
80    ) -> Result<Self, RequestBuilderError> {
81        Ok(Self {
82            api_key,
83            endpoint_configuration,
84            compression,
85            max_size,
86            stats_aggregator,
87        })
88    }
89}
90
91pub struct DDTracesMetadata {
92    pub api_key: Arc<str>,
93    pub endpoint: DatadogTracesEndpoint,
94    pub finalizers: EventFinalizers,
95    pub uncompressed_size: usize,
96    pub content_type: String,
97}
98
99impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequestBuilder {
100    type Metadata = (DDTracesMetadata, RequestMetadata);
101    type Payload = Bytes;
102    type Request = TraceApiRequest;
103    type Error = RequestBuilderError;
104
105    fn encode_events_incremental(
106        &mut self,
107        input: (PartitionKey, Vec<Event>),
108    ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
109        let (key, events) = input;
110        let trace_events = events
111            .into_iter()
112            .filter_map(|e| e.try_into_trace())
113            .collect::<Vec<TraceEvent>>();
114
115        // Compute APM stats from the incoming events. The stats payloads are sent out
116        // separately from the sink framework, by the thread `flush_apm_stats_thread()`
117        compute_apm_stats(&key, Arc::clone(&self.stats_aggregator), &trace_events);
118
119        encode_traces(&key, trace_events, self.max_size)
120            .into_iter()
121            .map(|result| {
122                result.and_then(|(payload, mut processed)| {
123                    let uncompressed_size = payload.len();
124                    let metadata = DDTracesMetadata {
125                        api_key: key
126                            .api_key
127                            .clone()
128                            .unwrap_or_else(|| Arc::clone(&self.api_key)),
129                        endpoint: DatadogTracesEndpoint::Traces,
130                        finalizers: processed.take_finalizers(),
131                        uncompressed_size,
132                        content_type: "application/x-protobuf".to_string(),
133                    };
134
135                    // build RequestMetadata
136                    let builder = RequestMetadataBuilder::from_events(&processed);
137
138                    let mut compressor = Compressor::from(self.compression);
139                    match compressor.write_all(&payload) {
140                        Ok(()) => {
141                            let bytes = compressor.into_inner().freeze();
142
143                            let bytes_len = NonZeroUsize::new(bytes.len())
144                                .expect("payload should never be zero length");
145                            let request_metadata = builder.with_request_size(bytes_len);
146
147                            Ok(((metadata, request_metadata), bytes))
148                        }
149                        Err(e) => Err(RequestBuilderError::FailedToBuild {
150                            message: "Payload compression failed.",
151                            reason: e.to_string(),
152                            dropped_events: processed.len() as u64,
153                        }),
154                    }
155                })
156            })
157            .collect()
158    }
159
160    fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
161        build_request(
162            metadata,
163            payload,
164            self.compression,
165            &self.endpoint_configuration,
166        )
167    }
168}
169
170/// Builds the `TraceApiRequest` from inputs.
171///
172/// # Arguments
173///
174/// * `metadata`                 - Tuple of Datadog traces specific metadata and the generic `RequestMetadata`.
175/// * `payload`                  - Compressed and encoded bytes to send.
176/// * `compression`              - `Compression` used to reference the Content-Encoding header.
177/// * `endpoint_configuration`   - Endpoint configuration to use when creating the HTTP requests.
178pub fn build_request(
179    metadata: (DDTracesMetadata, RequestMetadata),
180    payload: Bytes,
181    compression: Compression,
182    endpoint_configuration: &DatadogTracesEndpointConfiguration,
183) -> TraceApiRequest {
184    let (ddtraces_metadata, request_metadata) = metadata;
185    let mut headers = BTreeMap::<String, String>::new();
186    headers.insert("Content-Type".to_string(), ddtraces_metadata.content_type);
187    headers.insert(
188        "DD-API-KEY".to_string(),
189        ddtraces_metadata.api_key.to_string(),
190    );
191    if let Some(ce) = compression.content_encoding() {
192        headers.insert("Content-Encoding".to_string(), ce.to_string());
193    }
194    TraceApiRequest {
195        body: payload,
196        headers,
197        finalizers: ddtraces_metadata.finalizers,
198        uri: endpoint_configuration.get_uri_for_endpoint(ddtraces_metadata.endpoint),
199        uncompressed_size: ddtraces_metadata.uncompressed_size,
200        metadata: request_metadata,
201    }
202}
203
204fn encode_traces(
205    key: &PartitionKey,
206    trace_events: Vec<TraceEvent>,
207    max_size: usize,
208) -> Vec<Result<(Vec<u8>, Vec<TraceEvent>), RequestBuilderError>> {
209    let mut results = Vec::new();
210    let mut processed = Vec::new();
211    let mut payload = build_empty_payload(key);
212
213    for trace in trace_events {
214        let mut proto = encode_trace(&trace);
215
216        loop {
217            payload.tracer_payloads.push(proto);
218            if payload.encoded_len() >= max_size {
219                // take it back out
220                proto = payload.tracer_payloads.pop().expect("just pushed");
221                if payload.tracer_payloads.is_empty() {
222                    // this individual trace is too big
223                    results.push(Err(RequestBuilderError::FailedToBuild {
224                        message: "Dropped trace event",
225                        reason: "Trace is larger than allowed payload size".into(),
226                        dropped_events: 1,
227                    }));
228
229                    break;
230                } else {
231                    // try with a fresh payload
232                    results.push(Ok((
233                        payload.encode_to_vec(),
234                        std::mem::take(&mut processed),
235                    )));
236                    payload = build_empty_payload(key);
237                }
238            } else {
239                processed.push(trace);
240                break;
241            }
242        }
243    }
244    results.push(Ok((
245        payload.encode_to_vec(),
246        std::mem::take(&mut processed),
247    )));
248    results
249}
250
251fn build_empty_payload(key: &PartitionKey) -> dd_proto::TracePayload {
252    dd_proto::TracePayload {
253        host_name: key.hostname.clone().unwrap_or_default(),
254        env: key.env.clone().unwrap_or_default(),
255        traces: vec![],       // Field reserved for the older trace payloads
256        transactions: vec![], // Field reserved for the older trace payloads
257        tracer_payloads: vec![],
258        // We only send tags at the Trace level
259        tags: BTreeMap::new(),
260        agent_version: key.agent_version.clone().unwrap_or_default(),
261        target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(),
262        error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(),
263    }
264}
265
266fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload {
267    let tags = trace
268        .get(event_path!("tags"))
269        .and_then(|m| m.as_object())
270        .map(|m| {
271            m.iter()
272                .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
273                .collect::<BTreeMap<String, String>>()
274        })
275        .unwrap_or_default();
276
277    let spans = match trace.get(event_path!("spans")) {
278        Some(Value::Array(v)) => v
279            .iter()
280            .filter_map(|s| s.as_object().map(convert_span))
281            .collect(),
282        _ => vec![],
283    };
284
285    let chunk = dd_proto::TraceChunk {
286        priority: trace
287            .get(event_path!("priority"))
288            .and_then(|v| v.as_integer().map(|v| v as i32))
289            // This should not happen for Datadog originated traces, but in case this field is not populated
290            // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
291            // which is what the Datadog trace-agent is doing for OTLP originated traces, as per
292            // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
293            .unwrap_or(1i32),
294        origin: trace
295            .get(event_path!("origin"))
296            .map(|v| v.to_string_lossy().into_owned())
297            .unwrap_or_default(),
298        dropped_trace: trace
299            .get(event_path!("dropped"))
300            .and_then(|v| v.as_boolean())
301            .unwrap_or(false),
302        spans,
303        tags: tags.clone(),
304    };
305
306    dd_proto::TracerPayload {
307        container_id: trace
308            .get(event_path!("container_id"))
309            .map(|v| v.to_string_lossy().into_owned())
310            .unwrap_or_default(),
311        language_name: trace
312            .get(event_path!("language_name"))
313            .map(|v| v.to_string_lossy().into_owned())
314            .unwrap_or_default(),
315        language_version: trace
316            .get(event_path!("language_version"))
317            .map(|v| v.to_string_lossy().into_owned())
318            .unwrap_or_default(),
319        tracer_version: trace
320            .get(event_path!("tracer_version"))
321            .map(|v| v.to_string_lossy().into_owned())
322            .unwrap_or_default(),
323        runtime_id: trace
324            .get(event_path!("runtime_id"))
325            .map(|v| v.to_string_lossy().into_owned())
326            .unwrap_or_default(),
327        chunks: vec![chunk],
328        tags,
329        env: trace
330            .get(event_path!("env"))
331            .map(|v| v.to_string_lossy().into_owned())
332            .unwrap_or_default(),
333        hostname: trace
334            .get(event_path!("hostname"))
335            .map(|v| v.to_string_lossy().into_owned())
336            .unwrap_or_default(),
337        app_version: trace
338            .get(event_path!("app_version"))
339            .map(|v| v.to_string_lossy().into_owned())
340            .unwrap_or_default(),
341    }
342}
343
344fn convert_span(span: &ObjectMap) -> dd_proto::Span {
345    let trace_id = match span.get("trace_id") {
346        Some(Value::Integer(val)) => *val,
347        _ => 0,
348    };
349    let span_id = match span.get("span_id") {
350        Some(Value::Integer(val)) => *val,
351        _ => 0,
352    };
353    let parent_id = match span.get("parent_id") {
354        Some(Value::Integer(val)) => *val,
355        _ => 0,
356    };
357    let duration = match span.get("duration") {
358        Some(Value::Integer(val)) => *val,
359        _ => 0,
360    };
361    let error = match span.get("error") {
362        Some(Value::Integer(val)) => *val,
363        _ => 0,
364    };
365    let start = match span.get("start") {
366        Some(Value::Timestamp(val)) => val.timestamp_nanos_opt().expect("Timestamp out of range"),
367        _ => 0,
368    };
369
370    let meta = span
371        .get("meta")
372        .and_then(|m| m.as_object())
373        .map(|m| {
374            m.iter()
375                .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
376                .collect::<BTreeMap<String, String>>()
377        })
378        .unwrap_or_default();
379
380    let meta_struct = span
381        .get("meta_struct")
382        .and_then(|m| m.as_object())
383        .map(|m| {
384            m.iter()
385                .map(|(k, v)| (k.to_string(), v.coerce_to_bytes().into_iter().collect()))
386                .collect::<BTreeMap<String, Vec<u8>>>()
387        })
388        .unwrap_or_default();
389
390    let metrics = span
391        .get("metrics")
392        .and_then(|m| m.as_object())
393        .map(|m| {
394            m.iter()
395                .filter_map(|(k, v)| {
396                    if let Value::Float(f) = v {
397                        Some((k.to_string(), f.into_inner()))
398                    } else {
399                        None
400                    }
401                })
402                .collect::<BTreeMap<String, f64>>()
403        })
404        .unwrap_or_default();
405
406    dd_proto::Span {
407        service: span
408            .get("service")
409            .map(|v| v.to_string_lossy().into_owned())
410            .unwrap_or_default(),
411        name: span
412            .get("name")
413            .map(|v| v.to_string_lossy().into_owned())
414            .unwrap_or_default(),
415        resource: span
416            .get("resource")
417            .map(|v| v.to_string_lossy().into_owned())
418            .unwrap_or_default(),
419        r#type: span
420            .get("type")
421            .map(|v| v.to_string_lossy().into_owned())
422            .unwrap_or_default(),
423        trace_id: trace_id as u64,
424        span_id: span_id as u64,
425        parent_id: parent_id as u64,
426        error: error as i32,
427        start,
428        duration,
429        meta,
430        metrics,
431        meta_struct,
432    }
433}
434
435#[cfg(test)]
436mod test {
437    use proptest::prelude::*;
438    use vrl::event_path;
439
440    use super::{encode_traces, PartitionKey};
441    use crate::event::{LogEvent, TraceEvent};
442
443    proptest! {
444        #[test]
445        fn successfully_encode_payloads_smaller_than_max_size(
446            // 476 is the experimentally determined size that will fill a payload after encoding and overhead
447            lengths in proptest::collection::vec(16usize..476, 1usize..256),
448        ) {
449            let max_size = 1024;
450
451            let key = PartitionKey {
452                api_key: Some("x".repeat(128).into()),
453                env: Some("production".into()),
454                hostname: Some("foo.bar.baz.local".into()),
455                agent_version: Some("1.2.3.4.5".into()),
456                target_tps: None,
457                error_tps: None,
458            };
459
460            // We only care about the size of the incoming traces, so just populate a single tag field
461            // that will be copied into the protobuf representation.
462            let traces = lengths
463                .into_iter()
464                .map(|n| {
465                    let mut log = LogEvent::default();
466                    log.insert(event_path!("tags", "foo"), "x".repeat(n));
467                    TraceEvent::from(log)
468                })
469                .collect();
470
471            for result in encode_traces(&key, traces, max_size) {
472                prop_assert!(result.is_ok());
473                let (encoded, _processed) = result.unwrap();
474
475                prop_assert!(
476                    encoded.len() <= max_size,
477                    "encoded len {} longer than max size {}",
478                    encoded.len(),
479                    max_size
480                );
481            }
482        }
483    }
484
485    #[test]
486    fn handles_too_large_events() {
487        let max_size = 1024;
488        // 476 is experimentally determined to be too big to fit into a <1024 byte proto
489        let lengths = [128, 476, 128];
490
491        let key = PartitionKey {
492            api_key: Some("x".repeat(128).into()),
493            env: Some("production".into()),
494            hostname: Some("foo.bar.baz.local".into()),
495            agent_version: Some("1.2.3.4.5".into()),
496            target_tps: None,
497            error_tps: None,
498        };
499
500        // We only care about the size of the incoming traces, so just populate a single tag field
501        // that will be copied into the protobuf representation.
502        let traces = lengths
503            .into_iter()
504            .map(|n| {
505                let mut log = LogEvent::default();
506                log.insert(event_path!("tags", "foo"), "x".repeat(n));
507                TraceEvent::from(log)
508            })
509            .collect();
510
511        let mut results = encode_traces(&key, traces, max_size);
512        assert_eq!(3, results.len());
513
514        match &mut results[..] {
515            [Ok(one), Err(_two), Ok(three)] => {
516                for (encoded, processed) in [one, three] {
517                    assert_eq!(1, processed.len());
518                    assert!(
519                        encoded.len() <= max_size,
520                        "encoded len {} longer than max size {}",
521                        encoded.len(),
522                        max_size
523                    );
524                }
525            }
526            _ => panic!(
527                "unexpected output {:?}",
528                results
529                    .iter()
530                    .map(|r| r.as_ref().map(|(_, p)| p.len()))
531                    .collect::<Vec<_>>()
532            ),
533        }
534    }
535}