vector/sinks/datadog/traces/
request_builder.rs

1use std::{
2    collections::BTreeMap,
3    io::Write,
4    num::NonZeroUsize,
5    sync::{Arc, Mutex},
6};
7
8use bytes::Bytes;
9use prost::Message;
10use snafu::Snafu;
11use vector_lib::{
12    event::{EventFinalizers, Finalizable},
13    request_metadata::RequestMetadata,
14};
15use vrl::event_path;
16
17use super::{
18    apm_stats::{Aggregator, compute_apm_stats},
19    config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration},
20    dd_proto,
21    service::TraceApiRequest,
22    sink::PartitionKey,
23};
24use crate::{
25    event::{Event, ObjectMap, TraceEvent, Value},
26    sinks::util::{
27        Compression, Compressor, IncrementalRequestBuilder, metadata::RequestMetadataBuilder,
28    },
29};
30
31#[derive(Debug, Snafu)]
32pub enum RequestBuilderError {
33    #[snafu(display(
34        "Building an APM stats request payload failed ({}, {})",
35        message,
36        reason
37    ))]
38    FailedToBuild {
39        message: &'static str,
40        reason: String,
41        dropped_events: u64,
42    },
43
44    #[allow(dead_code)]
45    #[snafu(display("Unsupported endpoint ({})", reason))]
46    UnsupportedEndpoint { reason: String, dropped_events: u64 },
47}
48
49impl RequestBuilderError {
50    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
51    pub fn into_parts(self) -> (&'static str, String, u64) {
52        match self {
53            Self::FailedToBuild {
54                message,
55                reason,
56                dropped_events,
57            } => (message, reason, dropped_events),
58            Self::UnsupportedEndpoint {
59                reason,
60                dropped_events,
61            } => ("unsupported endpoint", reason, dropped_events),
62        }
63    }
64}
65
66pub struct DatadogTracesRequestBuilder {
67    api_key: Arc<str>,
68    endpoint_configuration: DatadogTracesEndpointConfiguration,
69    compression: Compression,
70    max_size: usize,
71    /// Contains the Aggregated stats across a time window.
72    stats_aggregator: Arc<Mutex<Aggregator>>,
73}
74
75impl DatadogTracesRequestBuilder {
76    pub const fn new(
77        api_key: Arc<str>,
78        endpoint_configuration: DatadogTracesEndpointConfiguration,
79        compression: Compression,
80        max_size: usize,
81        stats_aggregator: Arc<Mutex<Aggregator>>,
82    ) -> Result<Self, RequestBuilderError> {
83        Ok(Self {
84            api_key,
85            endpoint_configuration,
86            compression,
87            max_size,
88            stats_aggregator,
89        })
90    }
91}
92
93pub struct DDTracesMetadata {
94    pub api_key: Arc<str>,
95    pub endpoint: DatadogTracesEndpoint,
96    pub finalizers: EventFinalizers,
97    pub uncompressed_size: usize,
98    pub content_type: String,
99}
100
101impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequestBuilder {
102    type Metadata = (DDTracesMetadata, RequestMetadata);
103    type Payload = Bytes;
104    type Request = TraceApiRequest;
105    type Error = RequestBuilderError;
106
107    fn encode_events_incremental(
108        &mut self,
109        input: (PartitionKey, Vec<Event>),
110    ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
111        let (key, events) = input;
112        let trace_events = events
113            .into_iter()
114            .filter_map(|e| e.try_into_trace())
115            .collect::<Vec<TraceEvent>>();
116
117        // Compute APM stats from the incoming events. The stats payloads are sent out
118        // separately from the sink framework, by the thread `flush_apm_stats_thread()`
119        compute_apm_stats(&key, Arc::clone(&self.stats_aggregator), &trace_events);
120
121        encode_traces(&key, trace_events, self.max_size)
122            .into_iter()
123            .map(|result| {
124                result.and_then(|(payload, mut processed)| {
125                    let uncompressed_size = payload.len();
126                    let metadata = DDTracesMetadata {
127                        api_key: key
128                            .api_key
129                            .clone()
130                            .unwrap_or_else(|| Arc::clone(&self.api_key)),
131                        endpoint: DatadogTracesEndpoint::Traces,
132                        finalizers: processed.take_finalizers(),
133                        uncompressed_size,
134                        content_type: "application/x-protobuf".to_string(),
135                    };
136
137                    // build RequestMetadata
138                    let builder = RequestMetadataBuilder::from_events(&processed);
139
140                    let mut compressor = Compressor::from(self.compression);
141                    match compressor.write_all(&payload) {
142                        Ok(()) => {
143                            let bytes = compressor.into_inner().freeze();
144
145                            let bytes_len = NonZeroUsize::new(bytes.len())
146                                .expect("payload should never be zero length");
147                            let request_metadata = builder.with_request_size(bytes_len);
148
149                            Ok(((metadata, request_metadata), bytes))
150                        }
151                        Err(e) => Err(RequestBuilderError::FailedToBuild {
152                            message: "Payload compression failed.",
153                            reason: e.to_string(),
154                            dropped_events: processed.len() as u64,
155                        }),
156                    }
157                })
158            })
159            .collect()
160    }
161
162    fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
163        build_request(
164            metadata,
165            payload,
166            self.compression,
167            &self.endpoint_configuration,
168        )
169    }
170}
171
172/// Builds the `TraceApiRequest` from inputs.
173///
174/// # Arguments
175///
176/// * `metadata`                 - Tuple of Datadog traces specific metadata and the generic `RequestMetadata`.
177/// * `payload`                  - Compressed and encoded bytes to send.
178/// * `compression`              - `Compression` used to reference the Content-Encoding header.
179/// * `endpoint_configuration`   - Endpoint configuration to use when creating the HTTP requests.
180pub fn build_request(
181    metadata: (DDTracesMetadata, RequestMetadata),
182    payload: Bytes,
183    compression: Compression,
184    endpoint_configuration: &DatadogTracesEndpointConfiguration,
185) -> TraceApiRequest {
186    let (ddtraces_metadata, request_metadata) = metadata;
187    let mut headers = BTreeMap::<String, String>::new();
188    headers.insert("Content-Type".to_string(), ddtraces_metadata.content_type);
189    headers.insert(
190        "DD-API-KEY".to_string(),
191        ddtraces_metadata.api_key.to_string(),
192    );
193    if let Some(ce) = compression.content_encoding() {
194        headers.insert("Content-Encoding".to_string(), ce.to_string());
195    }
196    TraceApiRequest {
197        body: payload,
198        headers,
199        finalizers: ddtraces_metadata.finalizers,
200        uri: endpoint_configuration.get_uri_for_endpoint(ddtraces_metadata.endpoint),
201        uncompressed_size: ddtraces_metadata.uncompressed_size,
202        metadata: request_metadata,
203    }
204}
205
206fn encode_traces(
207    key: &PartitionKey,
208    trace_events: Vec<TraceEvent>,
209    max_size: usize,
210) -> Vec<Result<(Vec<u8>, Vec<TraceEvent>), RequestBuilderError>> {
211    let mut results = Vec::new();
212    let mut processed = Vec::new();
213    let mut payload = build_empty_payload(key);
214
215    for trace in trace_events {
216        let mut proto = encode_trace(&trace);
217
218        loop {
219            payload.tracer_payloads.push(proto);
220            if payload.encoded_len() >= max_size {
221                // take it back out
222                proto = payload.tracer_payloads.pop().expect("just pushed");
223                if payload.tracer_payloads.is_empty() {
224                    // this individual trace is too big
225                    results.push(Err(RequestBuilderError::FailedToBuild {
226                        message: "Dropped trace event",
227                        reason: "Trace is larger than allowed payload size".into(),
228                        dropped_events: 1,
229                    }));
230
231                    break;
232                } else {
233                    // try with a fresh payload
234                    results.push(Ok((
235                        payload.encode_to_vec(),
236                        std::mem::take(&mut processed),
237                    )));
238                    payload = build_empty_payload(key);
239                }
240            } else {
241                processed.push(trace);
242                break;
243            }
244        }
245    }
246    results.push(Ok((
247        payload.encode_to_vec(),
248        std::mem::take(&mut processed),
249    )));
250    results
251}
252
253fn build_empty_payload(key: &PartitionKey) -> dd_proto::TracePayload {
254    dd_proto::TracePayload {
255        host_name: key.hostname.clone().unwrap_or_default(),
256        env: key.env.clone().unwrap_or_default(),
257        traces: vec![],       // Field reserved for the older trace payloads
258        transactions: vec![], // Field reserved for the older trace payloads
259        tracer_payloads: vec![],
260        // We only send tags at the Trace level
261        tags: BTreeMap::new(),
262        agent_version: key.agent_version.clone().unwrap_or_default(),
263        target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(),
264        error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(),
265    }
266}
267
268fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload {
269    let tags = trace
270        .get(event_path!("tags"))
271        .and_then(|m| m.as_object())
272        .map(|m| {
273            m.iter()
274                .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
275                .collect::<BTreeMap<String, String>>()
276        })
277        .unwrap_or_default();
278
279    let spans = match trace.get(event_path!("spans")) {
280        Some(Value::Array(v)) => v
281            .iter()
282            .filter_map(|s| s.as_object().map(convert_span))
283            .collect(),
284        _ => vec![],
285    };
286
287    let chunk = dd_proto::TraceChunk {
288        priority: trace
289            .get(event_path!("priority"))
290            .and_then(|v| v.as_integer().map(|v| v as i32))
291            // This should not happen for Datadog originated traces, but in case this field is not populated
292            // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
293            // which is what the Datadog trace-agent is doing for OTLP originated traces, as per
294            // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
295            .unwrap_or(1i32),
296        origin: trace
297            .get(event_path!("origin"))
298            .map(|v| v.to_string_lossy().into_owned())
299            .unwrap_or_default(),
300        dropped_trace: trace
301            .get(event_path!("dropped"))
302            .and_then(|v| v.as_boolean())
303            .unwrap_or(false),
304        spans,
305        tags: tags.clone(),
306    };
307
308    dd_proto::TracerPayload {
309        container_id: trace
310            .get(event_path!("container_id"))
311            .map(|v| v.to_string_lossy().into_owned())
312            .unwrap_or_default(),
313        language_name: trace
314            .get(event_path!("language_name"))
315            .map(|v| v.to_string_lossy().into_owned())
316            .unwrap_or_default(),
317        language_version: trace
318            .get(event_path!("language_version"))
319            .map(|v| v.to_string_lossy().into_owned())
320            .unwrap_or_default(),
321        tracer_version: trace
322            .get(event_path!("tracer_version"))
323            .map(|v| v.to_string_lossy().into_owned())
324            .unwrap_or_default(),
325        runtime_id: trace
326            .get(event_path!("runtime_id"))
327            .map(|v| v.to_string_lossy().into_owned())
328            .unwrap_or_default(),
329        chunks: vec![chunk],
330        tags,
331        env: trace
332            .get(event_path!("env"))
333            .map(|v| v.to_string_lossy().into_owned())
334            .unwrap_or_default(),
335        hostname: trace
336            .get(event_path!("hostname"))
337            .map(|v| v.to_string_lossy().into_owned())
338            .unwrap_or_default(),
339        app_version: trace
340            .get(event_path!("app_version"))
341            .map(|v| v.to_string_lossy().into_owned())
342            .unwrap_or_default(),
343    }
344}
345
346fn convert_span(span: &ObjectMap) -> dd_proto::Span {
347    let trace_id = match span.get("trace_id") {
348        Some(Value::Integer(val)) => *val,
349        _ => 0,
350    };
351    let span_id = match span.get("span_id") {
352        Some(Value::Integer(val)) => *val,
353        _ => 0,
354    };
355    let parent_id = match span.get("parent_id") {
356        Some(Value::Integer(val)) => *val,
357        _ => 0,
358    };
359    let duration = match span.get("duration") {
360        Some(Value::Integer(val)) => *val,
361        _ => 0,
362    };
363    let error = match span.get("error") {
364        Some(Value::Integer(val)) => *val,
365        _ => 0,
366    };
367    let start = match span.get("start") {
368        Some(Value::Timestamp(val)) => val.timestamp_nanos_opt().expect("Timestamp out of range"),
369        _ => 0,
370    };
371
372    let meta = span
373        .get("meta")
374        .and_then(|m| m.as_object())
375        .map(|m| {
376            m.iter()
377                .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
378                .collect::<BTreeMap<String, String>>()
379        })
380        .unwrap_or_default();
381
382    let meta_struct = span
383        .get("meta_struct")
384        .and_then(|m| m.as_object())
385        .map(|m| {
386            m.iter()
387                .map(|(k, v)| (k.to_string(), v.coerce_to_bytes().into_iter().collect()))
388                .collect::<BTreeMap<String, Vec<u8>>>()
389        })
390        .unwrap_or_default();
391
392    let metrics = span
393        .get("metrics")
394        .and_then(|m| m.as_object())
395        .map(|m| {
396            m.iter()
397                .filter_map(|(k, v)| {
398                    if let Value::Float(f) = v {
399                        Some((k.to_string(), f.into_inner()))
400                    } else {
401                        None
402                    }
403                })
404                .collect::<BTreeMap<String, f64>>()
405        })
406        .unwrap_or_default();
407
408    dd_proto::Span {
409        service: span
410            .get("service")
411            .map(|v| v.to_string_lossy().into_owned())
412            .unwrap_or_default(),
413        name: span
414            .get("name")
415            .map(|v| v.to_string_lossy().into_owned())
416            .unwrap_or_default(),
417        resource: span
418            .get("resource")
419            .map(|v| v.to_string_lossy().into_owned())
420            .unwrap_or_default(),
421        r#type: span
422            .get("type")
423            .map(|v| v.to_string_lossy().into_owned())
424            .unwrap_or_default(),
425        trace_id: trace_id as u64,
426        span_id: span_id as u64,
427        parent_id: parent_id as u64,
428        error: error as i32,
429        start,
430        duration,
431        meta,
432        metrics,
433        meta_struct,
434    }
435}
436
437#[cfg(test)]
438mod test {
439    use proptest::prelude::*;
440    use vrl::event_path;
441
442    use super::{PartitionKey, encode_traces};
443    use crate::event::{LogEvent, TraceEvent};
444
445    proptest! {
446        #[test]
447        fn successfully_encode_payloads_smaller_than_max_size(
448            // 476 is the experimentally determined size that will fill a payload after encoding and overhead
449            lengths in proptest::collection::vec(16usize..476, 1usize..256),
450        ) {
451            let max_size = 1024;
452
453            let key = PartitionKey {
454                api_key: Some("x".repeat(128).into()),
455                env: Some("production".into()),
456                hostname: Some("foo.bar.baz.local".into()),
457                agent_version: Some("1.2.3.4.5".into()),
458                target_tps: None,
459                error_tps: None,
460            };
461
462            // We only care about the size of the incoming traces, so just populate a single tag field
463            // that will be copied into the protobuf representation.
464            let traces = lengths
465                .into_iter()
466                .map(|n| {
467                    let mut log = LogEvent::default();
468                    log.insert(event_path!("tags", "foo"), "x".repeat(n));
469                    TraceEvent::from(log)
470                })
471                .collect();
472
473            for result in encode_traces(&key, traces, max_size) {
474                prop_assert!(result.is_ok());
475                let (encoded, _processed) = result.unwrap();
476
477                prop_assert!(
478                    encoded.len() <= max_size,
479                    "encoded len {} longer than max size {}",
480                    encoded.len(),
481                    max_size
482                );
483            }
484        }
485    }
486
487    #[test]
488    fn handles_too_large_events() {
489        let max_size = 1024;
490        // 476 is experimentally determined to be too big to fit into a <1024 byte proto
491        let lengths = [128, 476, 128];
492
493        let key = PartitionKey {
494            api_key: Some("x".repeat(128).into()),
495            env: Some("production".into()),
496            hostname: Some("foo.bar.baz.local".into()),
497            agent_version: Some("1.2.3.4.5".into()),
498            target_tps: None,
499            error_tps: None,
500        };
501
502        // We only care about the size of the incoming traces, so just populate a single tag field
503        // that will be copied into the protobuf representation.
504        let traces = lengths
505            .into_iter()
506            .map(|n| {
507                let mut log = LogEvent::default();
508                log.insert(event_path!("tags", "foo"), "x".repeat(n));
509                TraceEvent::from(log)
510            })
511            .collect();
512
513        let mut results = encode_traces(&key, traces, max_size);
514        assert_eq!(3, results.len());
515
516        match &mut results[..] {
517            [Ok(one), Err(_two), Ok(three)] => {
518                for (encoded, processed) in [one, three] {
519                    assert_eq!(1, processed.len());
520                    assert!(
521                        encoded.len() <= max_size,
522                        "encoded len {} longer than max size {}",
523                        encoded.len(),
524                        max_size
525                    );
526                }
527            }
528            _ => panic!(
529                "unexpected output {:?}",
530                results
531                    .iter()
532                    .map(|r| r.as_ref().map(|(_, p)| p.len()))
533                    .collect::<Vec<_>>()
534            ),
535        }
536    }
537}