1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 event::{DatadogMetricOriginMetadata, EventMetadata},
11 internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12 metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use super::ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload};
17use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler};
18use crate::{
19 common::{
20 datadog::{DatadogMetricType, DatadogSeriesMetric},
21 http::ErrorMessage,
22 },
23 config::log_schema,
24 event::{
25 Event, MetricKind, MetricTags,
26 metric::{Metric, MetricValue},
27 },
28 internal_events::EventsReceived,
29 schema,
30 sources::util::extract_tag_key_and_value,
31};
32
33#[derive(Deserialize, Serialize)]
34pub(crate) struct DatadogSeriesRequest {
35 pub(crate) series: Vec<DatadogSeriesMetric>,
36}
37
38pub(super) fn build_warp_filter(
39 handler: RequestHandler,
40 source: DatadogAgentSource,
41) -> BoxedFilter<(Response,)> {
42 let sketches_service = sketches_service(handler.clone(), source.clone());
43 let series_v1_service = series_v1_service(handler.clone(), source.clone());
44 let series_v2_service = series_v2_service(handler, source);
45 sketches_service
46 .or(series_v1_service)
47 .unify()
48 .or(series_v2_service)
49 .unify()
50 .boxed()
51}
52
53fn sketches_service(
54 handler: RequestHandler,
55 source: DatadogAgentSource,
56) -> BoxedFilter<(Response,)> {
57 warp::post()
58 .and(path!("api" / "beta" / "sketches" / ..))
59 .and(warp::path::full())
60 .and(warp::header::optional::<String>("content-encoding"))
61 .and(warp::header::optional::<String>("dd-api-key"))
62 .and(warp::query::<ApiKeyQueryParams>())
63 .and(warp::body::bytes())
64 .and_then({
65 move |path: FullPath,
66 encoding_header: Option<String>,
67 api_token: Option<String>,
68 query_params: ApiKeyQueryParams,
69 body: Bytes| {
70 let events = source
71 .decode(&encoding_header, body, path.as_str())
72 .and_then(|body| {
73 decode_datadog_sketches(
74 body,
75 source.api_key_extractor.extract(
76 path.as_str(),
77 api_token,
78 query_params.dd_api_key,
79 ),
80 source.split_metric_namespace,
81 &source.events_received,
82 )
83 });
84 handler.clone().handle_request(events, super::METRICS)
85 }
86 })
87 .boxed()
88}
89
90fn series_v1_service(
91 handler: RequestHandler,
92 source: DatadogAgentSource,
93) -> BoxedFilter<(Response,)> {
94 warp::post()
95 .and(path!("api" / "v1" / "series" / ..))
96 .and(warp::path::full())
97 .and(warp::header::optional::<String>("content-encoding"))
98 .and(warp::header::optional::<String>("dd-api-key"))
99 .and(warp::query::<ApiKeyQueryParams>())
100 .and(warp::body::bytes())
101 .and_then({
102 move |path: FullPath,
103 encoding_header: Option<String>,
104 api_token: Option<String>,
105 query_params: ApiKeyQueryParams,
106 body: Bytes| {
107 let events = source
108 .decode(&encoding_header, body, path.as_str())
109 .and_then(|body| {
110 decode_datadog_series_v1(
111 body,
112 source.api_key_extractor.extract(
113 path.as_str(),
114 api_token,
115 query_params.dd_api_key,
116 ),
117 &Arc::new(schema::Definition::default_legacy_namespace()),
120 source.split_metric_namespace,
121 &source.events_received,
122 )
123 });
124 handler.clone().handle_request(events, super::METRICS)
125 }
126 })
127 .boxed()
128}
129
130fn series_v2_service(
131 handler: RequestHandler,
132 source: DatadogAgentSource,
133) -> BoxedFilter<(Response,)> {
134 warp::post()
135 .and(path!("api" / "v2" / "series" / ..))
136 .and(warp::path::full())
137 .and(warp::header::optional::<String>("content-encoding"))
138 .and(warp::header::optional::<String>("dd-api-key"))
139 .and(warp::query::<ApiKeyQueryParams>())
140 .and(warp::body::bytes())
141 .and_then({
142 move |path: FullPath,
143 encoding_header: Option<String>,
144 api_token: Option<String>,
145 query_params: ApiKeyQueryParams,
146 body: Bytes| {
147 let events = source
148 .decode(&encoding_header, body, path.as_str())
149 .and_then(|body| {
150 decode_datadog_series_v2(
151 body,
152 source.api_key_extractor.extract(
153 path.as_str(),
154 api_token,
155 query_params.dd_api_key,
156 ),
157 source.split_metric_namespace,
158 &source.events_received,
159 )
160 });
161 handler.clone().handle_request(events, super::METRICS)
162 }
163 })
164 .boxed()
165}
166
167fn decode_datadog_sketches(
168 body: Bytes,
169 api_key: Option<Arc<str>>,
170 split_metric_namespace: bool,
171 events_received: &Registered<EventsReceived>,
172) -> Result<Vec<Event>, ErrorMessage> {
173 if body.is_empty() {
174 debug!(message = "Empty payload ignored.");
176 return Ok(Vec::new());
177 }
178
179 let metrics = decode_ddsketch(body, &api_key, split_metric_namespace).map_err(|error| {
180 ErrorMessage::new(
181 StatusCode::UNPROCESSABLE_ENTITY,
182 format!("Error decoding Datadog sketch: {error:?}"),
183 )
184 })?;
185
186 events_received.emit(CountByteSize(
187 metrics.len(),
188 metrics.estimated_json_encoded_size_of(),
189 ));
190
191 Ok(metrics)
192}
193
194fn decode_datadog_series_v2(
195 body: Bytes,
196 api_key: Option<Arc<str>>,
197 split_metric_namespace: bool,
198 events_received: &Registered<EventsReceived>,
199) -> Result<Vec<Event>, ErrorMessage> {
200 if body.is_empty() {
201 debug!(message = "Empty payload ignored.");
203 return Ok(Vec::new());
204 }
205
206 let metrics = decode_ddseries_v2(body, &api_key, split_metric_namespace).map_err(|error| {
207 ErrorMessage::new(
208 StatusCode::UNPROCESSABLE_ENTITY,
209 format!("Error decoding Datadog sketch: {error:?}"),
210 )
211 })?;
212
213 events_received.emit(CountByteSize(
214 metrics.len(),
215 metrics.estimated_json_encoded_size_of(),
216 ));
217
218 Ok(metrics)
219}
220
221fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
224 metadata
225 .and_then(|metadata| metadata.origin.as_ref())
226 .map_or_else(EventMetadata::default, |origin| {
227 trace!(
228 "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
229 origin.origin_product, origin.origin_category, origin.origin_service,
230 );
231 EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
232 Some(origin.origin_product),
233 Some(origin.origin_category),
234 Some(origin.origin_service),
235 ))
236 })
237}
238
239pub(crate) fn decode_ddseries_v2(
240 frame: Bytes,
241 api_key: &Option<Arc<str>>,
242 split_metric_namespace: bool,
243) -> crate::Result<Vec<Event>> {
244 let payload = MetricPayload::decode(frame)?;
245 let decoded_metrics: Vec<Event> = payload
246 .series
247 .into_iter()
248 .flat_map(|serie| {
249 let (namespace, name) = if split_metric_namespace {
250 namespace_name_from_dd_metric(&serie.metric)
251 } else {
252 (None, serie.metric.as_str())
253 };
254 let mut tags = into_metric_tags(serie.tags);
255
256 let event_metadata = get_event_metadata(serie.metadata.as_ref());
257
258 let non_rate_interval = if serie.interval.is_positive() {
280 NonZeroU32::new(serie.interval as u32 * 1000) } else {
282 None
283 };
284
285 serie.resources.into_iter().for_each(|r| {
286 if r.r#type.eq("host") {
289 log_schema()
290 .host_key()
291 .and_then(|key| tags.replace(key.to_string(), r.name));
292 } else if r.r#type.eq("device") {
293 tags.replace("device".into(), r.name);
296 } else {
297 tags.replace(format!("resource.{}", r.r#type), r.name);
299 }
300 });
301 (!serie.source_type_name.is_empty())
302 .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
303 match metric_payload::MetricType::try_from(serie.r#type) {
306 Ok(metric_payload::MetricType::Count) => serie
307 .points
308 .iter()
309 .map(|dd_point| {
310 Metric::new_with_metadata(
311 name.to_string(),
312 MetricKind::Incremental,
313 MetricValue::Counter {
314 value: dd_point.value,
315 },
316 event_metadata.clone(),
317 )
318 .with_timestamp(Some(
319 Utc.timestamp_opt(dd_point.timestamp, 0)
320 .single()
321 .expect("invalid timestamp"),
322 ))
323 .with_tags(Some(tags.clone()))
324 .with_namespace(namespace)
325 })
326 .collect::<Vec<_>>(),
327 Ok(metric_payload::MetricType::Gauge) => serie
328 .points
329 .iter()
330 .map(|dd_point| {
331 Metric::new_with_metadata(
332 name.to_string(),
333 MetricKind::Absolute,
334 MetricValue::Gauge {
335 value: dd_point.value,
336 },
337 event_metadata.clone(),
338 )
339 .with_timestamp(Some(
340 Utc.timestamp_opt(dd_point.timestamp, 0)
341 .single()
342 .expect("invalid timestamp"),
343 ))
344 .with_tags(Some(tags.clone()))
345 .with_namespace(namespace)
346 .with_interval_ms(non_rate_interval)
347 })
348 .collect::<Vec<_>>(),
349 Ok(metric_payload::MetricType::Rate) => serie
350 .points
351 .iter()
352 .map(|dd_point| {
353 let i = Some(serie.interval)
354 .filter(|v| *v != 0)
355 .map(|v| v as u32)
356 .unwrap_or(1);
357 Metric::new_with_metadata(
358 name.to_string(),
359 MetricKind::Incremental,
360 MetricValue::Counter {
361 value: dd_point.value * (i as f64),
362 },
363 event_metadata.clone(),
364 )
365 .with_timestamp(Some(
366 Utc.timestamp_opt(dd_point.timestamp, 0)
367 .single()
368 .expect("invalid timestamp"),
369 ))
370 .with_interval_ms(NonZeroU32::new(i * 1000))
372 .with_tags(Some(tags.clone()))
373 .with_namespace(namespace)
374 })
375 .collect::<Vec<_>>(),
376 Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
377 warn!("Unspecified metric type ({}).", serie.r#type);
378 Vec::new()
379 }
380 }
381 })
382 .map(|mut metric| {
383 if let Some(k) = &api_key {
384 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
385 }
386 metric.into()
387 })
388 .collect();
389
390 Ok(decoded_metrics)
391}
392
393fn decode_datadog_series_v1(
394 body: Bytes,
395 api_key: Option<Arc<str>>,
396 schema_definition: &Arc<schema::Definition>,
397 split_metric_namespace: bool,
398 events_received: &Registered<EventsReceived>,
399) -> Result<Vec<Event>, ErrorMessage> {
400 if body.is_empty() {
401 debug!(message = "Empty payload ignored.");
403 return Ok(Vec::new());
404 }
405
406 let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
407 ErrorMessage::new(
408 StatusCode::BAD_REQUEST,
409 format!("Error parsing JSON: {error:?}"),
410 )
411 })?;
412
413 let decoded_metrics: Vec<Event> = metrics
414 .series
415 .into_iter()
416 .flat_map(|m| {
417 into_vector_metric(
418 m,
419 api_key.clone(),
420 schema_definition,
421 split_metric_namespace,
422 )
423 })
424 .collect();
425
426 events_received.emit(CountByteSize(
427 decoded_metrics.len(),
428 decoded_metrics.estimated_json_encoded_size_of(),
429 ));
430
431 Ok(decoded_metrics)
432}
433
434fn into_metric_tags(tags: Vec<String>) -> MetricTags {
435 tags.iter().map(extract_tag_key_and_value).collect()
436}
437
438fn into_vector_metric(
439 dd_metric: DatadogSeriesMetric,
440 api_key: Option<Arc<str>>,
441 schema_definition: &Arc<schema::Definition>,
442 split_metric_namespace: bool,
443) -> Vec<Event> {
444 let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
445
446 if let Some(key) = log_schema().host_key() {
447 dd_metric
448 .host
449 .and_then(|host| tags.replace(key.to_string(), host));
450 }
451
452 dd_metric
453 .source_type_name
454 .and_then(|source| tags.replace("source_type_name".into(), source));
455 dd_metric
456 .device
457 .and_then(|dev| tags.replace("device".into(), dev));
458
459 let (namespace, name) = if split_metric_namespace {
460 namespace_name_from_dd_metric(&dd_metric.metric)
461 } else {
462 (None, dd_metric.metric.as_str())
463 };
464
465 match dd_metric.r#type {
466 DatadogMetricType::Count => dd_metric
467 .points
468 .iter()
469 .map(|dd_point| {
470 Metric::new(
471 name.to_string(),
472 MetricKind::Incremental,
473 MetricValue::Counter { value: dd_point.1 },
474 )
475 .with_timestamp(Some(
476 Utc.timestamp_opt(dd_point.0, 0)
477 .single()
478 .expect("invalid timestamp"),
479 ))
480 .with_tags(Some(tags.clone()))
481 .with_namespace(namespace)
482 })
483 .collect::<Vec<_>>(),
484 DatadogMetricType::Gauge => dd_metric
485 .points
486 .iter()
487 .map(|dd_point| {
488 Metric::new(
489 name.to_string(),
490 MetricKind::Absolute,
491 MetricValue::Gauge { value: dd_point.1 },
492 )
493 .with_timestamp(Some(
494 Utc.timestamp_opt(dd_point.0, 0)
495 .single()
496 .expect("invalid timestamp"),
497 ))
498 .with_tags(Some(tags.clone()))
499 .with_namespace(namespace)
500 })
501 .collect::<Vec<_>>(),
502 DatadogMetricType::Rate => dd_metric
505 .points
506 .iter()
507 .map(|dd_point| {
508 let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
509 Metric::new(
510 name.to_string(),
511 MetricKind::Incremental,
512 MetricValue::Counter {
513 value: dd_point.1 * (i as f64),
514 },
515 )
516 .with_timestamp(Some(
517 Utc.timestamp_opt(dd_point.0, 0)
518 .single()
519 .expect("invalid timestamp"),
520 ))
521 .with_interval_ms(NonZeroU32::new(i * 1000))
523 .with_tags(Some(tags.clone()))
524 .with_namespace(namespace)
525 })
526 .collect::<Vec<_>>(),
527 }
528 .into_iter()
529 .map(|mut metric| {
530 if let Some(k) = &api_key {
531 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
532 }
533
534 metric
535 .metadata_mut()
536 .set_schema_definition(schema_definition);
537
538 metric.into()
539 })
540 .collect()
541}
542
543fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
546 match dd_metric_name.split_once('.') {
548 Some((namespace, name)) => (Some(namespace), name),
549 None => (None, dd_metric_name),
550 }
551}
552
553pub(crate) fn decode_ddsketch(
554 frame: Bytes,
555 api_key: &Option<Arc<str>>,
556 split_metric_namespace: bool,
557) -> crate::Result<Vec<Event>> {
558 let payload = SketchPayload::decode(frame)?;
559 Ok(payload
561 .sketches
562 .into_iter()
563 .flat_map(|sketch_series| {
564 let mut tags = into_metric_tags(sketch_series.tags);
566 log_schema()
567 .host_key()
568 .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
569
570 let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
571
572 sketch_series.dogsketches.into_iter().map(move |sketch| {
573 let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
574 let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
575 let val = MetricValue::from(
576 AgentDDSketch::from_raw(
577 sketch.cnt as u32,
578 sketch.min,
579 sketch.max,
580 sketch.sum,
581 sketch.avg,
582 &k,
583 &n,
584 )
585 .unwrap_or_else(AgentDDSketch::with_agent_defaults),
586 );
587 let (namespace, name) = if split_metric_namespace {
588 namespace_name_from_dd_metric(&sketch_series.metric)
589 } else {
590 (None, sketch_series.metric.as_str())
591 };
592 let mut metric = Metric::new_with_metadata(
593 name.to_string(),
594 MetricKind::Incremental,
595 val,
596 event_metadata.clone(),
597 )
598 .with_tags(Some(tags.clone()))
599 .with_timestamp(Some(
600 Utc.timestamp_opt(sketch.ts, 0)
601 .single()
602 .expect("invalid timestamp"),
603 ))
604 .with_namespace(namespace);
605 if let Some(k) = &api_key {
606 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
607 }
608
609 metric.into()
610 })
611 })
612 .collect())
613}