1use std::{
2 collections::BTreeMap,
3 io::Write,
4 num::NonZeroUsize,
5 sync::{Arc, Mutex},
6};
7
8use bytes::Bytes;
9use prost::Message;
10use snafu::Snafu;
11use vector_lib::event::{EventFinalizers, Finalizable};
12use vector_lib::request_metadata::RequestMetadata;
13use vrl::event_path;
14
15use super::{
16 apm_stats::{compute_apm_stats, Aggregator},
17 config::{DatadogTracesEndpoint, DatadogTracesEndpointConfiguration},
18 dd_proto,
19 service::TraceApiRequest,
20 sink::PartitionKey,
21};
22use crate::{
23 event::{Event, ObjectMap, TraceEvent, Value},
24 sinks::util::{
25 metadata::RequestMetadataBuilder, Compression, Compressor, IncrementalRequestBuilder,
26 },
27};
28
29#[derive(Debug, Snafu)]
30pub enum RequestBuilderError {
31 #[snafu(display(
32 "Building an APM stats request payload failed ({}, {})",
33 message,
34 reason
35 ))]
36 FailedToBuild {
37 message: &'static str,
38 reason: String,
39 dropped_events: u64,
40 },
41
42 #[allow(dead_code)]
43 #[snafu(display("Unsupported endpoint ({})", reason))]
44 UnsupportedEndpoint { reason: String, dropped_events: u64 },
45}
46
47impl RequestBuilderError {
48 #[allow(clippy::missing_const_for_fn)] pub fn into_parts(self) -> (&'static str, String, u64) {
50 match self {
51 Self::FailedToBuild {
52 message,
53 reason,
54 dropped_events,
55 } => (message, reason, dropped_events),
56 Self::UnsupportedEndpoint {
57 reason,
58 dropped_events,
59 } => ("unsupported endpoint", reason, dropped_events),
60 }
61 }
62}
63
64pub struct DatadogTracesRequestBuilder {
65 api_key: Arc<str>,
66 endpoint_configuration: DatadogTracesEndpointConfiguration,
67 compression: Compression,
68 max_size: usize,
69 stats_aggregator: Arc<Mutex<Aggregator>>,
71}
72
73impl DatadogTracesRequestBuilder {
74 pub const fn new(
75 api_key: Arc<str>,
76 endpoint_configuration: DatadogTracesEndpointConfiguration,
77 compression: Compression,
78 max_size: usize,
79 stats_aggregator: Arc<Mutex<Aggregator>>,
80 ) -> Result<Self, RequestBuilderError> {
81 Ok(Self {
82 api_key,
83 endpoint_configuration,
84 compression,
85 max_size,
86 stats_aggregator,
87 })
88 }
89}
90
91pub struct DDTracesMetadata {
92 pub api_key: Arc<str>,
93 pub endpoint: DatadogTracesEndpoint,
94 pub finalizers: EventFinalizers,
95 pub uncompressed_size: usize,
96 pub content_type: String,
97}
98
99impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequestBuilder {
100 type Metadata = (DDTracesMetadata, RequestMetadata);
101 type Payload = Bytes;
102 type Request = TraceApiRequest;
103 type Error = RequestBuilderError;
104
105 fn encode_events_incremental(
106 &mut self,
107 input: (PartitionKey, Vec<Event>),
108 ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
109 let (key, events) = input;
110 let trace_events = events
111 .into_iter()
112 .filter_map(|e| e.try_into_trace())
113 .collect::<Vec<TraceEvent>>();
114
115 compute_apm_stats(&key, Arc::clone(&self.stats_aggregator), &trace_events);
118
119 encode_traces(&key, trace_events, self.max_size)
120 .into_iter()
121 .map(|result| {
122 result.and_then(|(payload, mut processed)| {
123 let uncompressed_size = payload.len();
124 let metadata = DDTracesMetadata {
125 api_key: key
126 .api_key
127 .clone()
128 .unwrap_or_else(|| Arc::clone(&self.api_key)),
129 endpoint: DatadogTracesEndpoint::Traces,
130 finalizers: processed.take_finalizers(),
131 uncompressed_size,
132 content_type: "application/x-protobuf".to_string(),
133 };
134
135 let builder = RequestMetadataBuilder::from_events(&processed);
137
138 let mut compressor = Compressor::from(self.compression);
139 match compressor.write_all(&payload) {
140 Ok(()) => {
141 let bytes = compressor.into_inner().freeze();
142
143 let bytes_len = NonZeroUsize::new(bytes.len())
144 .expect("payload should never be zero length");
145 let request_metadata = builder.with_request_size(bytes_len);
146
147 Ok(((metadata, request_metadata), bytes))
148 }
149 Err(e) => Err(RequestBuilderError::FailedToBuild {
150 message: "Payload compression failed.",
151 reason: e.to_string(),
152 dropped_events: processed.len() as u64,
153 }),
154 }
155 })
156 })
157 .collect()
158 }
159
160 fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
161 build_request(
162 metadata,
163 payload,
164 self.compression,
165 &self.endpoint_configuration,
166 )
167 }
168}
169
170pub fn build_request(
179 metadata: (DDTracesMetadata, RequestMetadata),
180 payload: Bytes,
181 compression: Compression,
182 endpoint_configuration: &DatadogTracesEndpointConfiguration,
183) -> TraceApiRequest {
184 let (ddtraces_metadata, request_metadata) = metadata;
185 let mut headers = BTreeMap::<String, String>::new();
186 headers.insert("Content-Type".to_string(), ddtraces_metadata.content_type);
187 headers.insert(
188 "DD-API-KEY".to_string(),
189 ddtraces_metadata.api_key.to_string(),
190 );
191 if let Some(ce) = compression.content_encoding() {
192 headers.insert("Content-Encoding".to_string(), ce.to_string());
193 }
194 TraceApiRequest {
195 body: payload,
196 headers,
197 finalizers: ddtraces_metadata.finalizers,
198 uri: endpoint_configuration.get_uri_for_endpoint(ddtraces_metadata.endpoint),
199 uncompressed_size: ddtraces_metadata.uncompressed_size,
200 metadata: request_metadata,
201 }
202}
203
204fn encode_traces(
205 key: &PartitionKey,
206 trace_events: Vec<TraceEvent>,
207 max_size: usize,
208) -> Vec<Result<(Vec<u8>, Vec<TraceEvent>), RequestBuilderError>> {
209 let mut results = Vec::new();
210 let mut processed = Vec::new();
211 let mut payload = build_empty_payload(key);
212
213 for trace in trace_events {
214 let mut proto = encode_trace(&trace);
215
216 loop {
217 payload.tracer_payloads.push(proto);
218 if payload.encoded_len() >= max_size {
219 proto = payload.tracer_payloads.pop().expect("just pushed");
221 if payload.tracer_payloads.is_empty() {
222 results.push(Err(RequestBuilderError::FailedToBuild {
224 message: "Dropped trace event",
225 reason: "Trace is larger than allowed payload size".into(),
226 dropped_events: 1,
227 }));
228
229 break;
230 } else {
231 results.push(Ok((
233 payload.encode_to_vec(),
234 std::mem::take(&mut processed),
235 )));
236 payload = build_empty_payload(key);
237 }
238 } else {
239 processed.push(trace);
240 break;
241 }
242 }
243 }
244 results.push(Ok((
245 payload.encode_to_vec(),
246 std::mem::take(&mut processed),
247 )));
248 results
249}
250
251fn build_empty_payload(key: &PartitionKey) -> dd_proto::TracePayload {
252 dd_proto::TracePayload {
253 host_name: key.hostname.clone().unwrap_or_default(),
254 env: key.env.clone().unwrap_or_default(),
255 traces: vec![], transactions: vec![], tracer_payloads: vec![],
258 tags: BTreeMap::new(),
260 agent_version: key.agent_version.clone().unwrap_or_default(),
261 target_tps: key.target_tps.map(|tps| tps as f64).unwrap_or_default(),
262 error_tps: key.error_tps.map(|tps| tps as f64).unwrap_or_default(),
263 }
264}
265
266fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload {
267 let tags = trace
268 .get(event_path!("tags"))
269 .and_then(|m| m.as_object())
270 .map(|m| {
271 m.iter()
272 .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
273 .collect::<BTreeMap<String, String>>()
274 })
275 .unwrap_or_default();
276
277 let spans = match trace.get(event_path!("spans")) {
278 Some(Value::Array(v)) => v
279 .iter()
280 .filter_map(|s| s.as_object().map(convert_span))
281 .collect(),
282 _ => vec![],
283 };
284
285 let chunk = dd_proto::TraceChunk {
286 priority: trace
287 .get(event_path!("priority"))
288 .and_then(|v| v.as_integer().map(|v| v as i32))
289 .unwrap_or(1i32),
294 origin: trace
295 .get(event_path!("origin"))
296 .map(|v| v.to_string_lossy().into_owned())
297 .unwrap_or_default(),
298 dropped_trace: trace
299 .get(event_path!("dropped"))
300 .and_then(|v| v.as_boolean())
301 .unwrap_or(false),
302 spans,
303 tags: tags.clone(),
304 };
305
306 dd_proto::TracerPayload {
307 container_id: trace
308 .get(event_path!("container_id"))
309 .map(|v| v.to_string_lossy().into_owned())
310 .unwrap_or_default(),
311 language_name: trace
312 .get(event_path!("language_name"))
313 .map(|v| v.to_string_lossy().into_owned())
314 .unwrap_or_default(),
315 language_version: trace
316 .get(event_path!("language_version"))
317 .map(|v| v.to_string_lossy().into_owned())
318 .unwrap_or_default(),
319 tracer_version: trace
320 .get(event_path!("tracer_version"))
321 .map(|v| v.to_string_lossy().into_owned())
322 .unwrap_or_default(),
323 runtime_id: trace
324 .get(event_path!("runtime_id"))
325 .map(|v| v.to_string_lossy().into_owned())
326 .unwrap_or_default(),
327 chunks: vec![chunk],
328 tags,
329 env: trace
330 .get(event_path!("env"))
331 .map(|v| v.to_string_lossy().into_owned())
332 .unwrap_or_default(),
333 hostname: trace
334 .get(event_path!("hostname"))
335 .map(|v| v.to_string_lossy().into_owned())
336 .unwrap_or_default(),
337 app_version: trace
338 .get(event_path!("app_version"))
339 .map(|v| v.to_string_lossy().into_owned())
340 .unwrap_or_default(),
341 }
342}
343
344fn convert_span(span: &ObjectMap) -> dd_proto::Span {
345 let trace_id = match span.get("trace_id") {
346 Some(Value::Integer(val)) => *val,
347 _ => 0,
348 };
349 let span_id = match span.get("span_id") {
350 Some(Value::Integer(val)) => *val,
351 _ => 0,
352 };
353 let parent_id = match span.get("parent_id") {
354 Some(Value::Integer(val)) => *val,
355 _ => 0,
356 };
357 let duration = match span.get("duration") {
358 Some(Value::Integer(val)) => *val,
359 _ => 0,
360 };
361 let error = match span.get("error") {
362 Some(Value::Integer(val)) => *val,
363 _ => 0,
364 };
365 let start = match span.get("start") {
366 Some(Value::Timestamp(val)) => val.timestamp_nanos_opt().expect("Timestamp out of range"),
367 _ => 0,
368 };
369
370 let meta = span
371 .get("meta")
372 .and_then(|m| m.as_object())
373 .map(|m| {
374 m.iter()
375 .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned()))
376 .collect::<BTreeMap<String, String>>()
377 })
378 .unwrap_or_default();
379
380 let meta_struct = span
381 .get("meta_struct")
382 .and_then(|m| m.as_object())
383 .map(|m| {
384 m.iter()
385 .map(|(k, v)| (k.to_string(), v.coerce_to_bytes().into_iter().collect()))
386 .collect::<BTreeMap<String, Vec<u8>>>()
387 })
388 .unwrap_or_default();
389
390 let metrics = span
391 .get("metrics")
392 .and_then(|m| m.as_object())
393 .map(|m| {
394 m.iter()
395 .filter_map(|(k, v)| {
396 if let Value::Float(f) = v {
397 Some((k.to_string(), f.into_inner()))
398 } else {
399 None
400 }
401 })
402 .collect::<BTreeMap<String, f64>>()
403 })
404 .unwrap_or_default();
405
406 dd_proto::Span {
407 service: span
408 .get("service")
409 .map(|v| v.to_string_lossy().into_owned())
410 .unwrap_or_default(),
411 name: span
412 .get("name")
413 .map(|v| v.to_string_lossy().into_owned())
414 .unwrap_or_default(),
415 resource: span
416 .get("resource")
417 .map(|v| v.to_string_lossy().into_owned())
418 .unwrap_or_default(),
419 r#type: span
420 .get("type")
421 .map(|v| v.to_string_lossy().into_owned())
422 .unwrap_or_default(),
423 trace_id: trace_id as u64,
424 span_id: span_id as u64,
425 parent_id: parent_id as u64,
426 error: error as i32,
427 start,
428 duration,
429 meta,
430 metrics,
431 meta_struct,
432 }
433}
434
435#[cfg(test)]
436mod test {
437 use proptest::prelude::*;
438 use vrl::event_path;
439
440 use super::{encode_traces, PartitionKey};
441 use crate::event::{LogEvent, TraceEvent};
442
443 proptest! {
444 #[test]
445 fn successfully_encode_payloads_smaller_than_max_size(
446 lengths in proptest::collection::vec(16usize..476, 1usize..256),
448 ) {
449 let max_size = 1024;
450
451 let key = PartitionKey {
452 api_key: Some("x".repeat(128).into()),
453 env: Some("production".into()),
454 hostname: Some("foo.bar.baz.local".into()),
455 agent_version: Some("1.2.3.4.5".into()),
456 target_tps: None,
457 error_tps: None,
458 };
459
460 let traces = lengths
463 .into_iter()
464 .map(|n| {
465 let mut log = LogEvent::default();
466 log.insert(event_path!("tags", "foo"), "x".repeat(n));
467 TraceEvent::from(log)
468 })
469 .collect();
470
471 for result in encode_traces(&key, traces, max_size) {
472 prop_assert!(result.is_ok());
473 let (encoded, _processed) = result.unwrap();
474
475 prop_assert!(
476 encoded.len() <= max_size,
477 "encoded len {} longer than max size {}",
478 encoded.len(),
479 max_size
480 );
481 }
482 }
483 }
484
485 #[test]
486 fn handles_too_large_events() {
487 let max_size = 1024;
488 let lengths = [128, 476, 128];
490
491 let key = PartitionKey {
492 api_key: Some("x".repeat(128).into()),
493 env: Some("production".into()),
494 hostname: Some("foo.bar.baz.local".into()),
495 agent_version: Some("1.2.3.4.5".into()),
496 target_tps: None,
497 error_tps: None,
498 };
499
500 let traces = lengths
503 .into_iter()
504 .map(|n| {
505 let mut log = LogEvent::default();
506 log.insert(event_path!("tags", "foo"), "x".repeat(n));
507 TraceEvent::from(log)
508 })
509 .collect();
510
511 let mut results = encode_traces(&key, traces, max_size);
512 assert_eq!(3, results.len());
513
514 match &mut results[..] {
515 [Ok(one), Err(_two), Ok(three)] => {
516 for (encoded, processed) in [one, three] {
517 assert_eq!(1, processed.len());
518 assert!(
519 encoded.len() <= max_size,
520 "encoded len {} longer than max size {}",
521 encoded.len(),
522 max_size
523 );
524 }
525 }
526 _ => panic!(
527 "unexpected output {:?}",
528 results
529 .iter()
530 .map(|r| r.as_ref().map(|(_, p)| p.len()))
531 .collect::<Vec<_>>()
532 ),
533 }
534 }
535}