1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 event::{DatadogMetricOriginMetadata, EventMetadata},
11 internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12 metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use crate::{
17 SourceSender,
18 common::{
19 datadog::{DatadogMetricType, DatadogSeriesMetric},
20 http::ErrorMessage,
21 },
22 config::log_schema,
23 event::{
24 Event, MetricKind, MetricTags,
25 metric::{Metric, MetricValue},
26 },
27 internal_events::EventsReceived,
28 schema,
29 sources::{
30 datadog_agent::{
31 ApiKeyQueryParams, DatadogAgentSource,
32 ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload},
33 handle_request,
34 },
35 util::extract_tag_key_and_value,
36 },
37};
38
39#[derive(Deserialize, Serialize)]
40pub(crate) struct DatadogSeriesRequest {
41 pub(crate) series: Vec<DatadogSeriesMetric>,
42}
43
44pub(crate) fn build_warp_filter(
45 acknowledgements: bool,
46 multiple_outputs: bool,
47 out: SourceSender,
48 source: DatadogAgentSource,
49) -> BoxedFilter<(Response,)> {
50 let output = multiple_outputs.then_some(super::METRICS);
51 let sketches_service = sketches_service(acknowledgements, output, out.clone(), source.clone());
52 let series_v1_service =
53 series_v1_service(acknowledgements, output, out.clone(), source.clone());
54 let series_v2_service = series_v2_service(acknowledgements, output, out, source);
55 sketches_service
56 .or(series_v1_service)
57 .unify()
58 .or(series_v2_service)
59 .unify()
60 .boxed()
61}
62
63fn sketches_service(
64 acknowledgements: bool,
65 output: Option<&'static str>,
66 out: SourceSender,
67 source: DatadogAgentSource,
68) -> BoxedFilter<(Response,)> {
69 warp::post()
70 .and(path!("api" / "beta" / "sketches" / ..))
71 .and(warp::path::full())
72 .and(warp::header::optional::<String>("content-encoding"))
73 .and(warp::header::optional::<String>("dd-api-key"))
74 .and(warp::query::<ApiKeyQueryParams>())
75 .and(warp::body::bytes())
76 .and_then(
77 move |path: FullPath,
78 encoding_header: Option<String>,
79 api_token: Option<String>,
80 query_params: ApiKeyQueryParams,
81 body: Bytes| {
82 let events = source
83 .decode(&encoding_header, body, path.as_str())
84 .and_then(|body| {
85 decode_datadog_sketches(
86 body,
87 source.api_key_extractor.extract(
88 path.as_str(),
89 api_token,
90 query_params.dd_api_key,
91 ),
92 source.split_metric_namespace,
93 &source.events_received,
94 )
95 });
96 handle_request(events, acknowledgements, out.clone(), output)
97 },
98 )
99 .boxed()
100}
101
102fn series_v1_service(
103 acknowledgements: bool,
104 output: Option<&'static str>,
105 out: SourceSender,
106 source: DatadogAgentSource,
107) -> BoxedFilter<(Response,)> {
108 warp::post()
109 .and(path!("api" / "v1" / "series" / ..))
110 .and(warp::path::full())
111 .and(warp::header::optional::<String>("content-encoding"))
112 .and(warp::header::optional::<String>("dd-api-key"))
113 .and(warp::query::<ApiKeyQueryParams>())
114 .and(warp::body::bytes())
115 .and_then(
116 move |path: FullPath,
117 encoding_header: Option<String>,
118 api_token: Option<String>,
119 query_params: ApiKeyQueryParams,
120 body: Bytes| {
121 let events = source
122 .decode(&encoding_header, body, path.as_str())
123 .and_then(|body| {
124 decode_datadog_series_v1(
125 body,
126 source.api_key_extractor.extract(
127 path.as_str(),
128 api_token,
129 query_params.dd_api_key,
130 ),
131 &Arc::new(schema::Definition::default_legacy_namespace()),
134 source.split_metric_namespace,
135 &source.events_received,
136 )
137 });
138 handle_request(events, acknowledgements, out.clone(), output)
139 },
140 )
141 .boxed()
142}
143
144fn series_v2_service(
145 acknowledgements: bool,
146 output: Option<&'static str>,
147 out: SourceSender,
148 source: DatadogAgentSource,
149) -> BoxedFilter<(Response,)> {
150 warp::post()
151 .and(path!("api" / "v2" / "series" / ..))
152 .and(warp::path::full())
153 .and(warp::header::optional::<String>("content-encoding"))
154 .and(warp::header::optional::<String>("dd-api-key"))
155 .and(warp::query::<ApiKeyQueryParams>())
156 .and(warp::body::bytes())
157 .and_then(
158 move |path: FullPath,
159 encoding_header: Option<String>,
160 api_token: Option<String>,
161 query_params: ApiKeyQueryParams,
162 body: Bytes| {
163 let events = source
164 .decode(&encoding_header, body, path.as_str())
165 .and_then(|body| {
166 decode_datadog_series_v2(
167 body,
168 source.api_key_extractor.extract(
169 path.as_str(),
170 api_token,
171 query_params.dd_api_key,
172 ),
173 source.split_metric_namespace,
174 &source.events_received,
175 )
176 });
177 handle_request(events, acknowledgements, out.clone(), output)
178 },
179 )
180 .boxed()
181}
182
183fn decode_datadog_sketches(
184 body: Bytes,
185 api_key: Option<Arc<str>>,
186 split_metric_namespace: bool,
187 events_received: &Registered<EventsReceived>,
188) -> Result<Vec<Event>, ErrorMessage> {
189 if body.is_empty() {
190 debug!(message = "Empty payload ignored.");
192 return Ok(Vec::new());
193 }
194
195 let metrics = decode_ddsketch(body, &api_key, split_metric_namespace).map_err(|error| {
196 ErrorMessage::new(
197 StatusCode::UNPROCESSABLE_ENTITY,
198 format!("Error decoding Datadog sketch: {error:?}"),
199 )
200 })?;
201
202 events_received.emit(CountByteSize(
203 metrics.len(),
204 metrics.estimated_json_encoded_size_of(),
205 ));
206
207 Ok(metrics)
208}
209
210fn decode_datadog_series_v2(
211 body: Bytes,
212 api_key: Option<Arc<str>>,
213 split_metric_namespace: bool,
214 events_received: &Registered<EventsReceived>,
215) -> Result<Vec<Event>, ErrorMessage> {
216 if body.is_empty() {
217 debug!(message = "Empty payload ignored.");
219 return Ok(Vec::new());
220 }
221
222 let metrics = decode_ddseries_v2(body, &api_key, split_metric_namespace).map_err(|error| {
223 ErrorMessage::new(
224 StatusCode::UNPROCESSABLE_ENTITY,
225 format!("Error decoding Datadog sketch: {error:?}"),
226 )
227 })?;
228
229 events_received.emit(CountByteSize(
230 metrics.len(),
231 metrics.estimated_json_encoded_size_of(),
232 ));
233
234 Ok(metrics)
235}
236
237fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
240 metadata
241 .and_then(|metadata| metadata.origin.as_ref())
242 .map_or_else(EventMetadata::default, |origin| {
243 trace!(
244 "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
245 origin.origin_product, origin.origin_category, origin.origin_service,
246 );
247 EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
248 Some(origin.origin_product),
249 Some(origin.origin_category),
250 Some(origin.origin_service),
251 ))
252 })
253}
254
255pub(crate) fn decode_ddseries_v2(
256 frame: Bytes,
257 api_key: &Option<Arc<str>>,
258 split_metric_namespace: bool,
259) -> crate::Result<Vec<Event>> {
260 let payload = MetricPayload::decode(frame)?;
261 let decoded_metrics: Vec<Event> = payload
262 .series
263 .into_iter()
264 .flat_map(|serie| {
265 let (namespace, name) = if split_metric_namespace {
266 namespace_name_from_dd_metric(&serie.metric)
267 } else {
268 (None, serie.metric.as_str())
269 };
270 let mut tags = into_metric_tags(serie.tags);
271
272 let event_metadata = get_event_metadata(serie.metadata.as_ref());
273
274 let non_rate_interval = if serie.interval.is_positive() {
296 NonZeroU32::new(serie.interval as u32 * 1000) } else {
298 None
299 };
300
301 serie.resources.into_iter().for_each(|r| {
302 if r.r#type.eq("host") {
305 log_schema()
306 .host_key()
307 .and_then(|key| tags.replace(key.to_string(), r.name));
308 } else {
309 tags.replace(format!("resource.{}", r.r#type), r.name);
311 }
312 });
313 (!serie.source_type_name.is_empty())
314 .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
315 match metric_payload::MetricType::try_from(serie.r#type) {
318 Ok(metric_payload::MetricType::Count) => serie
319 .points
320 .iter()
321 .map(|dd_point| {
322 Metric::new_with_metadata(
323 name.to_string(),
324 MetricKind::Incremental,
325 MetricValue::Counter {
326 value: dd_point.value,
327 },
328 event_metadata.clone(),
329 )
330 .with_timestamp(Some(
331 Utc.timestamp_opt(dd_point.timestamp, 0)
332 .single()
333 .expect("invalid timestamp"),
334 ))
335 .with_tags(Some(tags.clone()))
336 .with_namespace(namespace)
337 })
338 .collect::<Vec<_>>(),
339 Ok(metric_payload::MetricType::Gauge) => serie
340 .points
341 .iter()
342 .map(|dd_point| {
343 Metric::new_with_metadata(
344 name.to_string(),
345 MetricKind::Absolute,
346 MetricValue::Gauge {
347 value: dd_point.value,
348 },
349 event_metadata.clone(),
350 )
351 .with_timestamp(Some(
352 Utc.timestamp_opt(dd_point.timestamp, 0)
353 .single()
354 .expect("invalid timestamp"),
355 ))
356 .with_tags(Some(tags.clone()))
357 .with_namespace(namespace)
358 .with_interval_ms(non_rate_interval)
359 })
360 .collect::<Vec<_>>(),
361 Ok(metric_payload::MetricType::Rate) => serie
362 .points
363 .iter()
364 .map(|dd_point| {
365 let i = Some(serie.interval)
366 .filter(|v| *v != 0)
367 .map(|v| v as u32)
368 .unwrap_or(1);
369 Metric::new_with_metadata(
370 name.to_string(),
371 MetricKind::Incremental,
372 MetricValue::Counter {
373 value: dd_point.value * (i as f64),
374 },
375 event_metadata.clone(),
376 )
377 .with_timestamp(Some(
378 Utc.timestamp_opt(dd_point.timestamp, 0)
379 .single()
380 .expect("invalid timestamp"),
381 ))
382 .with_interval_ms(NonZeroU32::new(i * 1000))
384 .with_tags(Some(tags.clone()))
385 .with_namespace(namespace)
386 })
387 .collect::<Vec<_>>(),
388 Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
389 warn!("Unspecified metric type ({}).", serie.r#type);
390 Vec::new()
391 }
392 }
393 })
394 .map(|mut metric| {
395 if let Some(k) = &api_key {
396 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
397 }
398 metric.into()
399 })
400 .collect();
401
402 Ok(decoded_metrics)
403}
404
405fn decode_datadog_series_v1(
406 body: Bytes,
407 api_key: Option<Arc<str>>,
408 schema_definition: &Arc<schema::Definition>,
409 split_metric_namespace: bool,
410 events_received: &Registered<EventsReceived>,
411) -> Result<Vec<Event>, ErrorMessage> {
412 if body.is_empty() {
413 debug!(message = "Empty payload ignored.");
415 return Ok(Vec::new());
416 }
417
418 let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
419 ErrorMessage::new(
420 StatusCode::BAD_REQUEST,
421 format!("Error parsing JSON: {error:?}"),
422 )
423 })?;
424
425 let decoded_metrics: Vec<Event> = metrics
426 .series
427 .into_iter()
428 .flat_map(|m| {
429 into_vector_metric(
430 m,
431 api_key.clone(),
432 schema_definition,
433 split_metric_namespace,
434 )
435 })
436 .collect();
437
438 events_received.emit(CountByteSize(
439 decoded_metrics.len(),
440 decoded_metrics.estimated_json_encoded_size_of(),
441 ));
442
443 Ok(decoded_metrics)
444}
445
446fn into_metric_tags(tags: Vec<String>) -> MetricTags {
447 tags.iter().map(extract_tag_key_and_value).collect()
448}
449
450fn into_vector_metric(
451 dd_metric: DatadogSeriesMetric,
452 api_key: Option<Arc<str>>,
453 schema_definition: &Arc<schema::Definition>,
454 split_metric_namespace: bool,
455) -> Vec<Event> {
456 let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
457
458 if let Some(key) = log_schema().host_key() {
459 dd_metric
460 .host
461 .and_then(|host| tags.replace(key.to_string(), host));
462 }
463
464 dd_metric
465 .source_type_name
466 .and_then(|source| tags.replace("source_type_name".into(), source));
467 dd_metric
468 .device
469 .and_then(|dev| tags.replace("device".into(), dev));
470
471 let (namespace, name) = if split_metric_namespace {
472 namespace_name_from_dd_metric(&dd_metric.metric)
473 } else {
474 (None, dd_metric.metric.as_str())
475 };
476
477 match dd_metric.r#type {
478 DatadogMetricType::Count => dd_metric
479 .points
480 .iter()
481 .map(|dd_point| {
482 Metric::new(
483 name.to_string(),
484 MetricKind::Incremental,
485 MetricValue::Counter { value: dd_point.1 },
486 )
487 .with_timestamp(Some(
488 Utc.timestamp_opt(dd_point.0, 0)
489 .single()
490 .expect("invalid timestamp"),
491 ))
492 .with_tags(Some(tags.clone()))
493 .with_namespace(namespace)
494 })
495 .collect::<Vec<_>>(),
496 DatadogMetricType::Gauge => dd_metric
497 .points
498 .iter()
499 .map(|dd_point| {
500 Metric::new(
501 name.to_string(),
502 MetricKind::Absolute,
503 MetricValue::Gauge { value: dd_point.1 },
504 )
505 .with_timestamp(Some(
506 Utc.timestamp_opt(dd_point.0, 0)
507 .single()
508 .expect("invalid timestamp"),
509 ))
510 .with_tags(Some(tags.clone()))
511 .with_namespace(namespace)
512 })
513 .collect::<Vec<_>>(),
514 DatadogMetricType::Rate => dd_metric
517 .points
518 .iter()
519 .map(|dd_point| {
520 let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
521 Metric::new(
522 name.to_string(),
523 MetricKind::Incremental,
524 MetricValue::Counter {
525 value: dd_point.1 * (i as f64),
526 },
527 )
528 .with_timestamp(Some(
529 Utc.timestamp_opt(dd_point.0, 0)
530 .single()
531 .expect("invalid timestamp"),
532 ))
533 .with_interval_ms(NonZeroU32::new(i * 1000))
535 .with_tags(Some(tags.clone()))
536 .with_namespace(namespace)
537 })
538 .collect::<Vec<_>>(),
539 }
540 .into_iter()
541 .map(|mut metric| {
542 if let Some(k) = &api_key {
543 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
544 }
545
546 metric
547 .metadata_mut()
548 .set_schema_definition(schema_definition);
549
550 metric.into()
551 })
552 .collect()
553}
554
555fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
558 match dd_metric_name.split_once('.') {
560 Some((namespace, name)) => (Some(namespace), name),
561 None => (None, dd_metric_name),
562 }
563}
564
565pub(crate) fn decode_ddsketch(
566 frame: Bytes,
567 api_key: &Option<Arc<str>>,
568 split_metric_namespace: bool,
569) -> crate::Result<Vec<Event>> {
570 let payload = SketchPayload::decode(frame)?;
571 Ok(payload
573 .sketches
574 .into_iter()
575 .flat_map(|sketch_series| {
576 let mut tags = into_metric_tags(sketch_series.tags);
578 log_schema()
579 .host_key()
580 .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
581
582 let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
583
584 sketch_series.dogsketches.into_iter().map(move |sketch| {
585 let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
586 let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
587 let val = MetricValue::from(
588 AgentDDSketch::from_raw(
589 sketch.cnt as u32,
590 sketch.min,
591 sketch.max,
592 sketch.sum,
593 sketch.avg,
594 &k,
595 &n,
596 )
597 .unwrap_or_else(AgentDDSketch::with_agent_defaults),
598 );
599 let (namespace, name) = if split_metric_namespace {
600 namespace_name_from_dd_metric(&sketch_series.metric)
601 } else {
602 (None, sketch_series.metric.as_str())
603 };
604 let mut metric = Metric::new_with_metadata(
605 name.to_string(),
606 MetricKind::Incremental,
607 val,
608 event_metadata.clone(),
609 )
610 .with_tags(Some(tags.clone()))
611 .with_timestamp(Some(
612 Utc.timestamp_opt(sketch.ts, 0)
613 .single()
614 .expect("invalid timestamp"),
615 ))
616 .with_namespace(namespace);
617 if let Some(k) = &api_key {
618 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
619 }
620
621 metric.into()
622 })
623 })
624 .collect())
625}