1use std::{
2 collections::BTreeMap,
3 io::Write,
4 num::NonZeroUsize,
5 sync::{Arc, Mutex},
6};
7
8use bytes::Bytes;
9use prost::Message;
10use snafu::Snafu;
11use vector_lib::{
12 event::{EventFinalizers, Finalizable},
13 request_metadata::RequestMetadata,
14};
15use vrl::event_path;
16
17use super::{
18 apm_stats::{Aggregator, compute_apm_stats},
19 config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration},
20 dd_proto,
21 service::TraceApiRequest,
22 sink::PartitionKey,
23};
24use crate::{
25 event::{Event, ObjectMap, TraceEvent, Value},
26 sinks::util::{
27 Compression, Compressor, IncrementalRequestBuilder, metadata::RequestMetadataBuilder,
28 },
29};
30
31#[derive(Debug, Snafu)]
32pub enum RequestBuilderError {
33 #[snafu(display(
34 "Building an APM stats request payload failed ({}, {})",
35 message,
36 reason
37 ))]
38 FailedToBuild {
39 message: &'static str,
40 reason: String,
41 dropped_events: u64,
42 },
43
44 #[allow(dead_code)]
45 #[snafu(display("Unsupported endpoint ({})", reason))]
46 UnsupportedEndpoint { reason: String, dropped_events: u64 },
47}
48
49impl RequestBuilderError {
50 #[allow(clippy::missing_const_for_fn)] pub fn into_parts(self) -> (&'static str, String, u64) {
52 match self {
53 Self::FailedToBuild {
54 message,
55 reason,
56 dropped_events,
57 } => (message, reason, dropped_events),
58 Self::UnsupportedEndpoint {
59 reason,
60 dropped_events,
61 } => ("unsupported endpoint", reason, dropped_events),
62 }
63 }
64}
65
66pub struct DatadogTracesRequestBuilder {
67 api_key: Arc<str>,
68 endpoint_configuration: DatadogTracesEndpointConfiguration,
69 compression: Compression,
70 max_size: usize,
71 stats_aggregator: Arc<Mutex<Aggregator>>,
73}
74
75impl DatadogTracesRequestBuilder {
76 pub const fn new(
77 api_key: Arc<str>,
78 endpoint_configuration: DatadogTracesEndpointConfiguration,
79 compression: Compression,
80 max_size: usize,
81 stats_aggregator: Arc<Mutex<Aggregator>>,
82 ) -> Result<Self, RequestBuilderError> {
83 Ok(Self {
84 api_key,
85 endpoint_configuration,
86 compression,
87 max_size,
88 stats_aggregator,
89 })
90 }
91}
92
93pub struct DDTracesMetadata {
94 pub api_key: Arc<str>,
95 pub endpoint: DatadogTracesEndpoint,
96 pub finalizers: EventFinalizers,
97 pub uncompressed_size: usize,
98 pub content_type: String,
99}
100
101impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequestBuilder {
102 type Metadata = (DDTracesMetadata, RequestMetadata);
103 type Payload = Bytes;
104 type Request = TraceApiRequest;
105 type Error = RequestBuilderError;
106
107 fn encode_events_incremental(
108 &mut self,
109 input: (PartitionKey, Vec<Event>),
110 ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
111 let (key, events) = input;
112 let trace_events = events
113 .into_iter()
114 .filter_map(|e| e.try_into_trace())
115 .collect::<Vec<TraceEvent>>();
116
117 compute_apm_stats(&key, Arc::clone(&self.stats_aggregator), &trace_events);
120
121 encode_traces(&key, trace_events, self.max_size)
122 .into_iter()
123 .map(|result| {
124 result.and_then(|(payload, mut processed)| {
125 let uncompressed_size = payload.len();
126 let metadata = DDTracesMetadata {
127 api_key: key
128 .api_key
129 .clone()
130 .unwrap_or_else(|| Arc::clone(&self.api_key)),
131 endpoint: DatadogTracesEndpoint::Traces,
132 finalizers: processed.take_finalizers(),
133 uncompressed_size,
134 content_type: "application/x-protobuf".to_string(),
135 };
136
137 let builder = RequestMetadataBuilder::from_events(&processed);
139
140 let mut compressor = Compressor::from(self.compression);
141 match compressor.write_all(&payload) {
142 Ok(()) => {
143 let bytes = compressor.into_inner().freeze();
144
145 let bytes_len = NonZeroUsize::new(bytes.len())
146 .expect("payload should never be zero length");
147 let request_metadata = builder.with_request_size(bytes_len);
148
149 Ok(((metadata, request_metadata), bytes))
150 }
151 Err(e) => Err(RequestBuilderError::FailedToBuild {
152 message: "Payload compression failed.",
153 reason: e.to_string(),
154 dropped_events: processed.len() as u64,
155 }),
156 }
157 })
158 })
159 .collect()
160 }
161
162 fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
163 build_request(
164 metadata,
165 payload,
166 self.compression,
167 &self.endpoint_configuration,
168 )
169 }
170}
171
172pub fn build_request(
181 metadata: (DDTracesMetadata, RequestMetadata),
182 payload: Bytes,
183 compression: Compression,
184 endpoint_configuration: &DatadogTracesEndpointConfiguration,
185) -> TraceApiRequest {
186 let (ddtraces_metadata, request_metadata) = metadata;
187 let mut headers = BTreeMap::<String, String>::new();
188 headers.insert("Content-Type".to_string(), ddtraces_metadata.content_type);
189 headers.insert(
190 "DD-API-KEY".to_string(),
191 ddtraces_metadata.api_key.to_string(),
192 );
193 if let Some(ce) = compression.content_encoding() {
194 headers.insert("Content-Encoding".to_string(), ce.to_string());
195 }
196 TraceApiRequest {
197 body: payload,
198 headers,
199 finalizers: ddtraces_metadata.finalizers,
200 uri: endpoint_configuration.get_uri_for_endpoint(ddtraces_metadata.endpoint),
201 uncompressed_size: ddtraces_metadata.uncompressed_size,
202 metadata: request_metadata,
203 }
204}
205
206fn encode_traces(
207 key: &PartitionKey,
208 trace_events: Vec<TraceEvent>,
209 max_size: usize,
210) -> Vec<Result<(Vec<u8>, Vec<TraceEvent>), RequestBuilderError>> {
211 let mut results = Vec::new();
212 let mut processed = Vec::new();
213 let mut payload = build_empty_payload(key);
214
215 for trace in trace_events {
216 let mut proto = encode_trace(&trace);
217
218 loop {
219 payload.tracer_payloads.push(proto);
220 if payload.encoded_len() >= max_size {
221 proto = payload.tracer_payloads.pop().expect("just pushed");
223 if payload.tracer_payloads.is_empty() {
224 results.push(Err(RequestBuilderError::FailedToBuild {
226 message: "Dropped trace event",
227 reason: "Trace is larger than allowed payload size".into(),
228 dropped_events: 1,
229 }));
230
231 break;
232 } else {
233 results.push(Ok((
235 payload.encode_to_vec(),
236 std::mem::take(&mut processed),
237 )));
238 payload = build_empty_payload(key);
239 }
240 } else {
241 processed.push(trace);
242 break;
243 }
244 }
245 }
246 results.push(Ok((
247 payload.encode_to_vec(),
248 std::mem::take(&mut processed),
249 )));
250 results
251}
252
253fn build_empty_payload(key: &PartitionKey) -> dd_proto::TracePayload {
254 dd_proto::TracePayload {
255 host_name: key.hostname.clone().unwrap_or_default(),
256 env: key.env.clone().unwrap_or_default(),
257 traces: vec![], transactions: vec![], tracer_payloads: vec![],
260 tags: BTreeMap::new(),
262 agent_version: key.agent_version.clone().unwrap_or_default(),
263 target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(),
264 error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(),
265 }
266}
267
268fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload {
269 let tags = trace
270 .get(event_path!("tags"))
271 .and_then(|m| m.as_object())
272 .map(|m| {
273 m.iter()
274 .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
275 .collect::<BTreeMap<String, String>>()
276 })
277 .unwrap_or_default();
278
279 let spans = match trace.get(event_path!("spans")) {
280 Some(Value::Array(v)) => v
281 .iter()
282 .filter_map(|s| s.as_object().map(convert_span))
283 .collect(),
284 _ => vec![],
285 };
286
287 let chunk = dd_proto::TraceChunk {
288 priority: trace
289 .get(event_path!("priority"))
290 .and_then(|v| v.as_integer().map(|v| v as i32))
291 .unwrap_or(1i32),
296 origin: trace
297 .get(event_path!("origin"))
298 .map(|v| v.to_string_lossy().into_owned())
299 .unwrap_or_default(),
300 dropped_trace: trace
301 .get(event_path!("dropped"))
302 .and_then(|v| v.as_boolean())
303 .unwrap_or(false),
304 spans,
305 tags: tags.clone(),
306 };
307
308 dd_proto::TracerPayload {
309 container_id: trace
310 .get(event_path!("container_id"))
311 .map(|v| v.to_string_lossy().into_owned())
312 .unwrap_or_default(),
313 language_name: trace
314 .get(event_path!("language_name"))
315 .map(|v| v.to_string_lossy().into_owned())
316 .unwrap_or_default(),
317 language_version: trace
318 .get(event_path!("language_version"))
319 .map(|v| v.to_string_lossy().into_owned())
320 .unwrap_or_default(),
321 tracer_version: trace
322 .get(event_path!("tracer_version"))
323 .map(|v| v.to_string_lossy().into_owned())
324 .unwrap_or_default(),
325 runtime_id: trace
326 .get(event_path!("runtime_id"))
327 .map(|v| v.to_string_lossy().into_owned())
328 .unwrap_or_default(),
329 chunks: vec![chunk],
330 tags,
331 env: trace
332 .get(event_path!("env"))
333 .map(|v| v.to_string_lossy().into_owned())
334 .unwrap_or_default(),
335 hostname: trace
336 .get(event_path!("hostname"))
337 .map(|v| v.to_string_lossy().into_owned())
338 .unwrap_or_default(),
339 app_version: trace
340 .get(event_path!("app_version"))
341 .map(|v| v.to_string_lossy().into_owned())
342 .unwrap_or_default(),
343 }
344}
345
346fn convert_span(span: &ObjectMap) -> dd_proto::Span {
347 let trace_id = match span.get("trace_id") {
348 Some(Value::Integer(val)) => *val,
349 _ => 0,
350 };
351 let span_id = match span.get("span_id") {
352 Some(Value::Integer(val)) => *val,
353 _ => 0,
354 };
355 let parent_id = match span.get("parent_id") {
356 Some(Value::Integer(val)) => *val,
357 _ => 0,
358 };
359 let duration = match span.get("duration") {
360 Some(Value::Integer(val)) => *val,
361 _ => 0,
362 };
363 let error = match span.get("error") {
364 Some(Value::Integer(val)) => *val,
365 _ => 0,
366 };
367 let start = match span.get("start") {
368 Some(Value::Timestamp(val)) => val.timestamp_nanos_opt().expect("Timestamp out of range"),
369 _ => 0,
370 };
371
372 let meta = span
373 .get("meta")
374 .and_then(|m| m.as_object())
375 .map(|m| {
376 m.iter()
377 .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
378 .collect::<BTreeMap<String, String>>()
379 })
380 .unwrap_or_default();
381
382 let meta_struct = span
383 .get("meta_struct")
384 .and_then(|m| m.as_object())
385 .map(|m| {
386 m.iter()
387 .map(|(k, v)| (k.to_string(), v.coerce_to_bytes().into_iter().collect()))
388 .collect::<BTreeMap<String, Vec<u8>>>()
389 })
390 .unwrap_or_default();
391
392 let metrics = span
393 .get("metrics")
394 .and_then(|m| m.as_object())
395 .map(|m| {
396 m.iter()
397 .filter_map(|(k, v)| {
398 if let Value::Float(f) = v {
399 Some((k.to_string(), f.into_inner()))
400 } else {
401 None
402 }
403 })
404 .collect::<BTreeMap<String, f64>>()
405 })
406 .unwrap_or_default();
407
408 dd_proto::Span {
409 service: span
410 .get("service")
411 .map(|v| v.to_string_lossy().into_owned())
412 .unwrap_or_default(),
413 name: span
414 .get("name")
415 .map(|v| v.to_string_lossy().into_owned())
416 .unwrap_or_default(),
417 resource: span
418 .get("resource")
419 .map(|v| v.to_string_lossy().into_owned())
420 .unwrap_or_default(),
421 r#type: span
422 .get("type")
423 .map(|v| v.to_string_lossy().into_owned())
424 .unwrap_or_default(),
425 trace_id: trace_id as u64,
426 span_id: span_id as u64,
427 parent_id: parent_id as u64,
428 error: error as i32,
429 start,
430 duration,
431 meta,
432 metrics,
433 meta_struct,
434 }
435}
436
437#[cfg(test)]
438mod test {
439 use proptest::prelude::*;
440 use vrl::event_path;
441
442 use super::{PartitionKey, encode_traces};
443 use crate::event::{LogEvent, TraceEvent};
444
445 proptest! {
446 #[test]
447 fn successfully_encode_payloads_smaller_than_max_size(
448 lengths in proptest::collection::vec(16usize..476, 1usize..256),
450 ) {
451 let max_size = 1024;
452
453 let key = PartitionKey {
454 api_key: Some("x".repeat(128).into()),
455 env: Some("production".into()),
456 hostname: Some("foo.bar.baz.local".into()),
457 agent_version: Some("1.2.3.4.5".into()),
458 target_tps: None,
459 error_tps: None,
460 };
461
462 let traces = lengths
465 .into_iter()
466 .map(|n| {
467 let mut log = LogEvent::default();
468 log.insert(event_path!("tags", "foo"), "x".repeat(n));
469 TraceEvent::from(log)
470 })
471 .collect();
472
473 for result in encode_traces(&key, traces, max_size) {
474 prop_assert!(result.is_ok());
475 let (encoded, _processed) = result.unwrap();
476
477 prop_assert!(
478 encoded.len() <= max_size,
479 "encoded len {} longer than max size {}",
480 encoded.len(),
481 max_size
482 );
483 }
484 }
485 }
486
487 #[test]
488 fn handles_too_large_events() {
489 let max_size = 1024;
490 let lengths = [128, 476, 128];
492
493 let key = PartitionKey {
494 api_key: Some("x".repeat(128).into()),
495 env: Some("production".into()),
496 hostname: Some("foo.bar.baz.local".into()),
497 agent_version: Some("1.2.3.4.5".into()),
498 target_tps: None,
499 error_tps: None,
500 };
501
502 let traces = lengths
505 .into_iter()
506 .map(|n| {
507 let mut log = LogEvent::default();
508 log.insert(event_path!("tags", "foo"), "x".repeat(n));
509 TraceEvent::from(log)
510 })
511 .collect();
512
513 let mut results = encode_traces(&key, traces, max_size);
514 assert_eq!(3, results.len());
515
516 match &mut results[..] {
517 [Ok(one), Err(_two), Ok(three)] => {
518 for (encoded, processed) in [one, three] {
519 assert_eq!(1, processed.len());
520 assert!(
521 encoded.len() <= max_size,
522 "encoded len {} longer than max size {}",
523 encoded.len(),
524 max_size
525 );
526 }
527 }
528 _ => panic!(
529 "unexpected output {:?}",
530 results
531 .iter()
532 .map(|r| r.as_ref().map(|(_, p)| p.len()))
533 .collect::<Vec<_>>()
534 ),
535 }
536 }
537}