1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 event::{DatadogMetricOriginMetadata, EventMetadata},
11 internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12 metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use super::ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload};
17use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler};
18use crate::{
19 common::{
20 datadog::{DatadogMetricType, DatadogSeriesMetric},
21 http::ErrorMessage,
22 },
23 config::log_schema,
24 event::{
25 Event, MetricKind, MetricTags,
26 metric::{Metric, MetricValue},
27 },
28 internal_events::EventsReceived,
29 schema,
30 sources::util::extract_tag_key_and_value,
31};
32
33#[derive(Deserialize, Serialize)]
34pub(crate) struct DatadogSeriesRequest {
35 pub(crate) series: Vec<DatadogSeriesMetric>,
36}
37
38pub(super) fn build_warp_filter(
39 handler: RequestHandler,
40 source: DatadogAgentSource,
41) -> BoxedFilter<(Response,)> {
42 let sketches_service = sketches_service(handler.clone(), source.clone());
43 let series_v1_service = series_v1_service(handler.clone(), source.clone());
44 let series_v2_service = series_v2_service(handler, source);
45 sketches_service
46 .or(series_v1_service)
47 .unify()
48 .or(series_v2_service)
49 .unify()
50 .boxed()
51}
52
53fn sketches_service(
54 handler: RequestHandler,
55 source: DatadogAgentSource,
56) -> BoxedFilter<(Response,)> {
57 warp::post()
58 .and(path!("api" / "beta" / "sketches" / ..))
59 .and(warp::path::full())
60 .and(warp::header::optional::<String>("content-encoding"))
61 .and(warp::header::optional::<String>("dd-api-key"))
62 .and(warp::query::<ApiKeyQueryParams>())
63 .and(warp::body::bytes())
64 .and_then({
65 move |path: FullPath,
66 encoding_header: Option<String>,
67 api_token: Option<String>,
68 query_params: ApiKeyQueryParams,
69 body: Bytes| {
70 let events = source
71 .decode(&encoding_header, body, path.as_str())
72 .and_then(|body| {
73 decode_datadog_sketches(
74 body,
75 source.api_key_extractor.extract(
76 path.as_str(),
77 api_token,
78 query_params.dd_api_key,
79 ),
80 source.split_metric_namespace,
81 &source.events_received,
82 )
83 });
84 handler.clone().handle_request(events, super::METRICS)
85 }
86 })
87 .boxed()
88}
89
90fn series_v1_service(
91 handler: RequestHandler,
92 source: DatadogAgentSource,
93) -> BoxedFilter<(Response,)> {
94 warp::post()
95 .and(path!("api" / "v1" / "series" / ..))
96 .and(warp::path::full())
97 .and(warp::header::optional::<String>("content-encoding"))
98 .and(warp::header::optional::<String>("dd-api-key"))
99 .and(warp::query::<ApiKeyQueryParams>())
100 .and(warp::body::bytes())
101 .and_then({
102 move |path: FullPath,
103 encoding_header: Option<String>,
104 api_token: Option<String>,
105 query_params: ApiKeyQueryParams,
106 body: Bytes| {
107 let events = source
108 .decode(&encoding_header, body, path.as_str())
109 .and_then(|body| {
110 decode_datadog_series_v1(
111 body,
112 source.api_key_extractor.extract(
113 path.as_str(),
114 api_token,
115 query_params.dd_api_key,
116 ),
117 &Arc::new(schema::Definition::default_legacy_namespace()),
120 source.split_metric_namespace,
121 &source.events_received,
122 )
123 });
124 handler.clone().handle_request(events, super::METRICS)
125 }
126 })
127 .boxed()
128}
129
130fn series_v2_service(
131 handler: RequestHandler,
132 source: DatadogAgentSource,
133) -> BoxedFilter<(Response,)> {
134 warp::post()
135 .and(path!("api" / "v2" / "series" / ..))
136 .and(warp::path::full())
137 .and(warp::header::optional::<String>("content-encoding"))
138 .and(warp::header::optional::<String>("dd-api-key"))
139 .and(warp::query::<ApiKeyQueryParams>())
140 .and(warp::body::bytes())
141 .and_then({
142 move |path: FullPath,
143 encoding_header: Option<String>,
144 api_token: Option<String>,
145 query_params: ApiKeyQueryParams,
146 body: Bytes| {
147 let events = source
148 .decode(&encoding_header, body, path.as_str())
149 .and_then(|body| {
150 decode_datadog_series_v2(
151 body,
152 source.api_key_extractor.extract(
153 path.as_str(),
154 api_token,
155 query_params.dd_api_key,
156 ),
157 source.split_metric_namespace,
158 &source.events_received,
159 )
160 });
161 handler.clone().handle_request(events, super::METRICS)
162 }
163 })
164 .boxed()
165}
166
167fn decode_datadog_sketches(
168 body: Bytes,
169 api_key: Option<Arc<str>>,
170 split_metric_namespace: bool,
171 events_received: &Registered<EventsReceived>,
172) -> Result<Vec<Event>, ErrorMessage> {
173 if body.is_empty() {
174 debug!(message = "Empty payload ignored.");
176 return Ok(Vec::new());
177 }
178
179 let metrics = decode_ddsketch(body, &api_key, split_metric_namespace).map_err(|error| {
180 ErrorMessage::new(
181 StatusCode::UNPROCESSABLE_ENTITY,
182 format!("Error decoding Datadog sketch: {error:?}"),
183 )
184 })?;
185
186 events_received.emit(CountByteSize(
187 metrics.len(),
188 metrics.estimated_json_encoded_size_of(),
189 ));
190
191 Ok(metrics)
192}
193
194fn decode_datadog_series_v2(
195 body: Bytes,
196 api_key: Option<Arc<str>>,
197 split_metric_namespace: bool,
198 events_received: &Registered<EventsReceived>,
199) -> Result<Vec<Event>, ErrorMessage> {
200 if body.is_empty() {
201 debug!(message = "Empty payload ignored.");
203 return Ok(Vec::new());
204 }
205
206 let metrics = decode_ddseries_v2(body, &api_key, split_metric_namespace).map_err(|error| {
207 ErrorMessage::new(
208 StatusCode::UNPROCESSABLE_ENTITY,
209 format!("Error decoding Datadog sketch: {error:?}"),
210 )
211 })?;
212
213 events_received.emit(CountByteSize(
214 metrics.len(),
215 metrics.estimated_json_encoded_size_of(),
216 ));
217
218 Ok(metrics)
219}
220
221fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
224 metadata
225 .and_then(|metadata| metadata.origin.as_ref())
226 .map_or_else(EventMetadata::default, |origin| {
227 trace!(
228 "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
229 origin.origin_product, origin.origin_category, origin.origin_service,
230 );
231 EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
232 Some(origin.origin_product),
233 Some(origin.origin_category),
234 Some(origin.origin_service),
235 ))
236 })
237}
238
239pub(crate) fn decode_ddseries_v2(
240 frame: Bytes,
241 api_key: &Option<Arc<str>>,
242 split_metric_namespace: bool,
243) -> crate::Result<Vec<Event>> {
244 let payload = MetricPayload::decode(frame)?;
245 let decoded_metrics: Vec<Event> = payload
246 .series
247 .into_iter()
248 .flat_map(|serie| {
249 let (namespace, name) = if split_metric_namespace {
250 namespace_name_from_dd_metric(&serie.metric)
251 } else {
252 (None, serie.metric.as_str())
253 };
254 let mut tags = into_metric_tags(serie.tags);
255
256 let event_metadata = get_event_metadata(serie.metadata.as_ref());
257
258 let non_rate_interval = if serie.interval.is_positive() {
280 NonZeroU32::new(serie.interval as u32 * 1000) } else {
282 None
283 };
284
285 serie.resources.into_iter().for_each(|r| {
286 if r.r#type.eq("host") {
289 log_schema()
290 .host_key()
291 .and_then(|key| tags.replace(key.to_string(), r.name));
292 } else {
293 tags.replace(format!("resource.{}", r.r#type), r.name);
295 }
296 });
297 (!serie.source_type_name.is_empty())
298 .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
299 match metric_payload::MetricType::try_from(serie.r#type) {
302 Ok(metric_payload::MetricType::Count) => serie
303 .points
304 .iter()
305 .map(|dd_point| {
306 Metric::new_with_metadata(
307 name.to_string(),
308 MetricKind::Incremental,
309 MetricValue::Counter {
310 value: dd_point.value,
311 },
312 event_metadata.clone(),
313 )
314 .with_timestamp(Some(
315 Utc.timestamp_opt(dd_point.timestamp, 0)
316 .single()
317 .expect("invalid timestamp"),
318 ))
319 .with_tags(Some(tags.clone()))
320 .with_namespace(namespace)
321 })
322 .collect::<Vec<_>>(),
323 Ok(metric_payload::MetricType::Gauge) => serie
324 .points
325 .iter()
326 .map(|dd_point| {
327 Metric::new_with_metadata(
328 name.to_string(),
329 MetricKind::Absolute,
330 MetricValue::Gauge {
331 value: dd_point.value,
332 },
333 event_metadata.clone(),
334 )
335 .with_timestamp(Some(
336 Utc.timestamp_opt(dd_point.timestamp, 0)
337 .single()
338 .expect("invalid timestamp"),
339 ))
340 .with_tags(Some(tags.clone()))
341 .with_namespace(namespace)
342 .with_interval_ms(non_rate_interval)
343 })
344 .collect::<Vec<_>>(),
345 Ok(metric_payload::MetricType::Rate) => serie
346 .points
347 .iter()
348 .map(|dd_point| {
349 let i = Some(serie.interval)
350 .filter(|v| *v != 0)
351 .map(|v| v as u32)
352 .unwrap_or(1);
353 Metric::new_with_metadata(
354 name.to_string(),
355 MetricKind::Incremental,
356 MetricValue::Counter {
357 value: dd_point.value * (i as f64),
358 },
359 event_metadata.clone(),
360 )
361 .with_timestamp(Some(
362 Utc.timestamp_opt(dd_point.timestamp, 0)
363 .single()
364 .expect("invalid timestamp"),
365 ))
366 .with_interval_ms(NonZeroU32::new(i * 1000))
368 .with_tags(Some(tags.clone()))
369 .with_namespace(namespace)
370 })
371 .collect::<Vec<_>>(),
372 Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
373 warn!("Unspecified metric type ({}).", serie.r#type);
374 Vec::new()
375 }
376 }
377 })
378 .map(|mut metric| {
379 if let Some(k) = &api_key {
380 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
381 }
382 metric.into()
383 })
384 .collect();
385
386 Ok(decoded_metrics)
387}
388
389fn decode_datadog_series_v1(
390 body: Bytes,
391 api_key: Option<Arc<str>>,
392 schema_definition: &Arc<schema::Definition>,
393 split_metric_namespace: bool,
394 events_received: &Registered<EventsReceived>,
395) -> Result<Vec<Event>, ErrorMessage> {
396 if body.is_empty() {
397 debug!(message = "Empty payload ignored.");
399 return Ok(Vec::new());
400 }
401
402 let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
403 ErrorMessage::new(
404 StatusCode::BAD_REQUEST,
405 format!("Error parsing JSON: {error:?}"),
406 )
407 })?;
408
409 let decoded_metrics: Vec<Event> = metrics
410 .series
411 .into_iter()
412 .flat_map(|m| {
413 into_vector_metric(
414 m,
415 api_key.clone(),
416 schema_definition,
417 split_metric_namespace,
418 )
419 })
420 .collect();
421
422 events_received.emit(CountByteSize(
423 decoded_metrics.len(),
424 decoded_metrics.estimated_json_encoded_size_of(),
425 ));
426
427 Ok(decoded_metrics)
428}
429
430fn into_metric_tags(tags: Vec<String>) -> MetricTags {
431 tags.iter().map(extract_tag_key_and_value).collect()
432}
433
434fn into_vector_metric(
435 dd_metric: DatadogSeriesMetric,
436 api_key: Option<Arc<str>>,
437 schema_definition: &Arc<schema::Definition>,
438 split_metric_namespace: bool,
439) -> Vec<Event> {
440 let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
441
442 if let Some(key) = log_schema().host_key() {
443 dd_metric
444 .host
445 .and_then(|host| tags.replace(key.to_string(), host));
446 }
447
448 dd_metric
449 .source_type_name
450 .and_then(|source| tags.replace("source_type_name".into(), source));
451 dd_metric
452 .device
453 .and_then(|dev| tags.replace("device".into(), dev));
454
455 let (namespace, name) = if split_metric_namespace {
456 namespace_name_from_dd_metric(&dd_metric.metric)
457 } else {
458 (None, dd_metric.metric.as_str())
459 };
460
461 match dd_metric.r#type {
462 DatadogMetricType::Count => dd_metric
463 .points
464 .iter()
465 .map(|dd_point| {
466 Metric::new(
467 name.to_string(),
468 MetricKind::Incremental,
469 MetricValue::Counter { value: dd_point.1 },
470 )
471 .with_timestamp(Some(
472 Utc.timestamp_opt(dd_point.0, 0)
473 .single()
474 .expect("invalid timestamp"),
475 ))
476 .with_tags(Some(tags.clone()))
477 .with_namespace(namespace)
478 })
479 .collect::<Vec<_>>(),
480 DatadogMetricType::Gauge => dd_metric
481 .points
482 .iter()
483 .map(|dd_point| {
484 Metric::new(
485 name.to_string(),
486 MetricKind::Absolute,
487 MetricValue::Gauge { value: dd_point.1 },
488 )
489 .with_timestamp(Some(
490 Utc.timestamp_opt(dd_point.0, 0)
491 .single()
492 .expect("invalid timestamp"),
493 ))
494 .with_tags(Some(tags.clone()))
495 .with_namespace(namespace)
496 })
497 .collect::<Vec<_>>(),
498 DatadogMetricType::Rate => dd_metric
501 .points
502 .iter()
503 .map(|dd_point| {
504 let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
505 Metric::new(
506 name.to_string(),
507 MetricKind::Incremental,
508 MetricValue::Counter {
509 value: dd_point.1 * (i as f64),
510 },
511 )
512 .with_timestamp(Some(
513 Utc.timestamp_opt(dd_point.0, 0)
514 .single()
515 .expect("invalid timestamp"),
516 ))
517 .with_interval_ms(NonZeroU32::new(i * 1000))
519 .with_tags(Some(tags.clone()))
520 .with_namespace(namespace)
521 })
522 .collect::<Vec<_>>(),
523 }
524 .into_iter()
525 .map(|mut metric| {
526 if let Some(k) = &api_key {
527 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
528 }
529
530 metric
531 .metadata_mut()
532 .set_schema_definition(schema_definition);
533
534 metric.into()
535 })
536 .collect()
537}
538
539fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
542 match dd_metric_name.split_once('.') {
544 Some((namespace, name)) => (Some(namespace), name),
545 None => (None, dd_metric_name),
546 }
547}
548
549pub(crate) fn decode_ddsketch(
550 frame: Bytes,
551 api_key: &Option<Arc<str>>,
552 split_metric_namespace: bool,
553) -> crate::Result<Vec<Event>> {
554 let payload = SketchPayload::decode(frame)?;
555 Ok(payload
557 .sketches
558 .into_iter()
559 .flat_map(|sketch_series| {
560 let mut tags = into_metric_tags(sketch_series.tags);
562 log_schema()
563 .host_key()
564 .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
565
566 let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
567
568 sketch_series.dogsketches.into_iter().map(move |sketch| {
569 let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
570 let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
571 let val = MetricValue::from(
572 AgentDDSketch::from_raw(
573 sketch.cnt as u32,
574 sketch.min,
575 sketch.max,
576 sketch.sum,
577 sketch.avg,
578 &k,
579 &n,
580 )
581 .unwrap_or_else(AgentDDSketch::with_agent_defaults),
582 );
583 let (namespace, name) = if split_metric_namespace {
584 namespace_name_from_dd_metric(&sketch_series.metric)
585 } else {
586 (None, sketch_series.metric.as_str())
587 };
588 let mut metric = Metric::new_with_metadata(
589 name.to_string(),
590 MetricKind::Incremental,
591 val,
592 event_metadata.clone(),
593 )
594 .with_tags(Some(tags.clone()))
595 .with_timestamp(Some(
596 Utc.timestamp_opt(sketch.ts, 0)
597 .single()
598 .expect("invalid timestamp"),
599 ))
600 .with_namespace(namespace);
601 if let Some(k) = &api_key {
602 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
603 }
604
605 metric.into()
606 })
607 })
608 .collect())
609}