1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 event::{DatadogMetricOriginMetadata, EventMetadata},
11 internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12 metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use crate::{
17 SourceSender,
18 common::{
19 datadog::{DatadogMetricType, DatadogSeriesMetric},
20 http::ErrorMessage,
21 },
22 config::log_schema,
23 event::{
24 Event, MetricKind, MetricTags,
25 metric::{Metric, MetricValue},
26 },
27 internal_events::EventsReceived,
28 schema,
29 sources::{
30 datadog_agent::{
31 ApiKeyQueryParams, DatadogAgentSource,
32 ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload},
33 handle_request,
34 },
35 util::extract_tag_key_and_value,
36 },
37};
38
39#[derive(Deserialize, Serialize)]
40pub(crate) struct DatadogSeriesRequest {
41 pub(crate) series: Vec<DatadogSeriesMetric>,
42}
43
44pub(crate) fn build_warp_filter(
45 acknowledgements: bool,
46 multiple_outputs: bool,
47 out: SourceSender,
48 source: DatadogAgentSource,
49) -> BoxedFilter<(Response,)> {
50 let output = multiple_outputs.then_some(super::METRICS);
51 let sketches_service = sketches_service(acknowledgements, output, out.clone(), source.clone());
52 let series_v1_service =
53 series_v1_service(acknowledgements, output, out.clone(), source.clone());
54 let series_v2_service = series_v2_service(acknowledgements, output, out, source);
55 sketches_service
56 .or(series_v1_service)
57 .unify()
58 .or(series_v2_service)
59 .unify()
60 .boxed()
61}
62
63fn sketches_service(
64 acknowledgements: bool,
65 output: Option<&'static str>,
66 out: SourceSender,
67 source: DatadogAgentSource,
68) -> BoxedFilter<(Response,)> {
69 warp::post()
70 .and(path!("api" / "beta" / "sketches" / ..))
71 .and(warp::path::full())
72 .and(warp::header::optional::<String>("content-encoding"))
73 .and(warp::header::optional::<String>("dd-api-key"))
74 .and(warp::query::<ApiKeyQueryParams>())
75 .and(warp::body::bytes())
76 .and_then(
77 move |path: FullPath,
78 encoding_header: Option<String>,
79 api_token: Option<String>,
80 query_params: ApiKeyQueryParams,
81 body: Bytes| {
82 let events = source
83 .decode(&encoding_header, body, path.as_str())
84 .and_then(|body| {
85 decode_datadog_sketches(
86 body,
87 source.api_key_extractor.extract(
88 path.as_str(),
89 api_token,
90 query_params.dd_api_key,
91 ),
92 &source.events_received,
93 )
94 });
95 handle_request(events, acknowledgements, out.clone(), output)
96 },
97 )
98 .boxed()
99}
100
101fn series_v1_service(
102 acknowledgements: bool,
103 output: Option<&'static str>,
104 out: SourceSender,
105 source: DatadogAgentSource,
106) -> BoxedFilter<(Response,)> {
107 warp::post()
108 .and(path!("api" / "v1" / "series" / ..))
109 .and(warp::path::full())
110 .and(warp::header::optional::<String>("content-encoding"))
111 .and(warp::header::optional::<String>("dd-api-key"))
112 .and(warp::query::<ApiKeyQueryParams>())
113 .and(warp::body::bytes())
114 .and_then(
115 move |path: FullPath,
116 encoding_header: Option<String>,
117 api_token: Option<String>,
118 query_params: ApiKeyQueryParams,
119 body: Bytes| {
120 let events = source
121 .decode(&encoding_header, body, path.as_str())
122 .and_then(|body| {
123 decode_datadog_series_v1(
124 body,
125 source.api_key_extractor.extract(
126 path.as_str(),
127 api_token,
128 query_params.dd_api_key,
129 ),
130 &Arc::new(schema::Definition::default_legacy_namespace()),
133 &source.events_received,
134 )
135 });
136 handle_request(events, acknowledgements, out.clone(), output)
137 },
138 )
139 .boxed()
140}
141
142fn series_v2_service(
143 acknowledgements: bool,
144 output: Option<&'static str>,
145 out: SourceSender,
146 source: DatadogAgentSource,
147) -> BoxedFilter<(Response,)> {
148 warp::post()
149 .and(path!("api" / "v2" / "series" / ..))
150 .and(warp::path::full())
151 .and(warp::header::optional::<String>("content-encoding"))
152 .and(warp::header::optional::<String>("dd-api-key"))
153 .and(warp::query::<ApiKeyQueryParams>())
154 .and(warp::body::bytes())
155 .and_then(
156 move |path: FullPath,
157 encoding_header: Option<String>,
158 api_token: Option<String>,
159 query_params: ApiKeyQueryParams,
160 body: Bytes| {
161 let events = source
162 .decode(&encoding_header, body, path.as_str())
163 .and_then(|body| {
164 decode_datadog_series_v2(
165 body,
166 source.api_key_extractor.extract(
167 path.as_str(),
168 api_token,
169 query_params.dd_api_key,
170 ),
171 &source.events_received,
172 )
173 });
174 handle_request(events, acknowledgements, out.clone(), output)
175 },
176 )
177 .boxed()
178}
179
180fn decode_datadog_sketches(
181 body: Bytes,
182 api_key: Option<Arc<str>>,
183 events_received: &Registered<EventsReceived>,
184) -> Result<Vec<Event>, ErrorMessage> {
185 if body.is_empty() {
186 debug!(
188 message = "Empty payload ignored.",
189 internal_log_rate_limit = true
190 );
191 return Ok(Vec::new());
192 }
193
194 let metrics = decode_ddsketch(body, &api_key).map_err(|error| {
195 ErrorMessage::new(
196 StatusCode::UNPROCESSABLE_ENTITY,
197 format!("Error decoding Datadog sketch: {error:?}"),
198 )
199 })?;
200
201 events_received.emit(CountByteSize(
202 metrics.len(),
203 metrics.estimated_json_encoded_size_of(),
204 ));
205
206 Ok(metrics)
207}
208
209fn decode_datadog_series_v2(
210 body: Bytes,
211 api_key: Option<Arc<str>>,
212 events_received: &Registered<EventsReceived>,
213) -> Result<Vec<Event>, ErrorMessage> {
214 if body.is_empty() {
215 debug!(
217 message = "Empty payload ignored.",
218 internal_log_rate_limit = true
219 );
220 return Ok(Vec::new());
221 }
222
223 let metrics = decode_ddseries_v2(body, &api_key).map_err(|error| {
224 ErrorMessage::new(
225 StatusCode::UNPROCESSABLE_ENTITY,
226 format!("Error decoding Datadog sketch: {error:?}"),
227 )
228 })?;
229
230 events_received.emit(CountByteSize(
231 metrics.len(),
232 metrics.estimated_json_encoded_size_of(),
233 ));
234
235 Ok(metrics)
236}
237
238fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
241 metadata
242 .and_then(|metadata| metadata.origin.as_ref())
243 .map_or_else(EventMetadata::default, |origin| {
244 trace!(
245 "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
246 origin.origin_product, origin.origin_category, origin.origin_service,
247 );
248 EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
249 Some(origin.origin_product),
250 Some(origin.origin_category),
251 Some(origin.origin_service),
252 ))
253 })
254}
255
256pub(crate) fn decode_ddseries_v2(
257 frame: Bytes,
258 api_key: &Option<Arc<str>>,
259) -> crate::Result<Vec<Event>> {
260 let payload = MetricPayload::decode(frame)?;
261 let decoded_metrics: Vec<Event> = payload
262 .series
263 .into_iter()
264 .flat_map(|serie| {
265 let (namespace, name) = namespace_name_from_dd_metric(&serie.metric);
266 let mut tags = into_metric_tags(serie.tags);
267
268 let event_metadata = get_event_metadata(serie.metadata.as_ref());
269
270 let non_rate_interval = if serie.interval.is_positive() {
292 NonZeroU32::new(serie.interval as u32 * 1000) } else {
294 None
295 };
296
297 serie.resources.into_iter().for_each(|r| {
298 if r.r#type.eq("host") {
301 log_schema()
302 .host_key()
303 .and_then(|key| tags.replace(key.to_string(), r.name));
304 } else {
305 tags.replace(format!("resource.{}", r.r#type), r.name);
307 }
308 });
309 (!serie.source_type_name.is_empty())
310 .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
311 match metric_payload::MetricType::try_from(serie.r#type) {
314 Ok(metric_payload::MetricType::Count) => serie
315 .points
316 .iter()
317 .map(|dd_point| {
318 Metric::new_with_metadata(
319 name.to_string(),
320 MetricKind::Incremental,
321 MetricValue::Counter {
322 value: dd_point.value,
323 },
324 event_metadata.clone(),
325 )
326 .with_timestamp(Some(
327 Utc.timestamp_opt(dd_point.timestamp, 0)
328 .single()
329 .expect("invalid timestamp"),
330 ))
331 .with_tags(Some(tags.clone()))
332 .with_namespace(namespace)
333 })
334 .collect::<Vec<_>>(),
335 Ok(metric_payload::MetricType::Gauge) => serie
336 .points
337 .iter()
338 .map(|dd_point| {
339 Metric::new_with_metadata(
340 name.to_string(),
341 MetricKind::Absolute,
342 MetricValue::Gauge {
343 value: dd_point.value,
344 },
345 event_metadata.clone(),
346 )
347 .with_timestamp(Some(
348 Utc.timestamp_opt(dd_point.timestamp, 0)
349 .single()
350 .expect("invalid timestamp"),
351 ))
352 .with_tags(Some(tags.clone()))
353 .with_namespace(namespace)
354 .with_interval_ms(non_rate_interval)
355 })
356 .collect::<Vec<_>>(),
357 Ok(metric_payload::MetricType::Rate) => serie
358 .points
359 .iter()
360 .map(|dd_point| {
361 let i = Some(serie.interval)
362 .filter(|v| *v != 0)
363 .map(|v| v as u32)
364 .unwrap_or(1);
365 Metric::new_with_metadata(
366 name.to_string(),
367 MetricKind::Incremental,
368 MetricValue::Counter {
369 value: dd_point.value * (i as f64),
370 },
371 event_metadata.clone(),
372 )
373 .with_timestamp(Some(
374 Utc.timestamp_opt(dd_point.timestamp, 0)
375 .single()
376 .expect("invalid timestamp"),
377 ))
378 .with_interval_ms(NonZeroU32::new(i * 1000))
380 .with_tags(Some(tags.clone()))
381 .with_namespace(namespace)
382 })
383 .collect::<Vec<_>>(),
384 Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
385 warn!("Unspecified metric type ({}).", serie.r#type);
386 Vec::new()
387 }
388 }
389 })
390 .map(|mut metric| {
391 if let Some(k) = &api_key {
392 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
393 }
394 metric.into()
395 })
396 .collect();
397
398 Ok(decoded_metrics)
399}
400
401fn decode_datadog_series_v1(
402 body: Bytes,
403 api_key: Option<Arc<str>>,
404 schema_definition: &Arc<schema::Definition>,
405 events_received: &Registered<EventsReceived>,
406) -> Result<Vec<Event>, ErrorMessage> {
407 if body.is_empty() {
408 debug!(
410 message = "Empty payload ignored.",
411 internal_log_rate_limit = true
412 );
413 return Ok(Vec::new());
414 }
415
416 let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
417 ErrorMessage::new(
418 StatusCode::BAD_REQUEST,
419 format!("Error parsing JSON: {error:?}"),
420 )
421 })?;
422
423 let decoded_metrics: Vec<Event> = metrics
424 .series
425 .into_iter()
426 .flat_map(|m| into_vector_metric(m, api_key.clone(), schema_definition))
427 .collect();
428
429 events_received.emit(CountByteSize(
430 decoded_metrics.len(),
431 decoded_metrics.estimated_json_encoded_size_of(),
432 ));
433
434 Ok(decoded_metrics)
435}
436
437fn into_metric_tags(tags: Vec<String>) -> MetricTags {
438 tags.iter().map(extract_tag_key_and_value).collect()
439}
440
441fn into_vector_metric(
442 dd_metric: DatadogSeriesMetric,
443 api_key: Option<Arc<str>>,
444 schema_definition: &Arc<schema::Definition>,
445) -> Vec<Event> {
446 let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
447
448 if let Some(key) = log_schema().host_key() {
449 dd_metric
450 .host
451 .and_then(|host| tags.replace(key.to_string(), host));
452 }
453
454 dd_metric
455 .source_type_name
456 .and_then(|source| tags.replace("source_type_name".into(), source));
457 dd_metric
458 .device
459 .and_then(|dev| tags.replace("device".into(), dev));
460
461 let (namespace, name) = namespace_name_from_dd_metric(&dd_metric.metric);
462
463 match dd_metric.r#type {
464 DatadogMetricType::Count => dd_metric
465 .points
466 .iter()
467 .map(|dd_point| {
468 Metric::new(
469 name.to_string(),
470 MetricKind::Incremental,
471 MetricValue::Counter { value: dd_point.1 },
472 )
473 .with_timestamp(Some(
474 Utc.timestamp_opt(dd_point.0, 0)
475 .single()
476 .expect("invalid timestamp"),
477 ))
478 .with_tags(Some(tags.clone()))
479 .with_namespace(namespace)
480 })
481 .collect::<Vec<_>>(),
482 DatadogMetricType::Gauge => dd_metric
483 .points
484 .iter()
485 .map(|dd_point| {
486 Metric::new(
487 name.to_string(),
488 MetricKind::Absolute,
489 MetricValue::Gauge { value: dd_point.1 },
490 )
491 .with_timestamp(Some(
492 Utc.timestamp_opt(dd_point.0, 0)
493 .single()
494 .expect("invalid timestamp"),
495 ))
496 .with_tags(Some(tags.clone()))
497 .with_namespace(namespace)
498 })
499 .collect::<Vec<_>>(),
500 DatadogMetricType::Rate => dd_metric
503 .points
504 .iter()
505 .map(|dd_point| {
506 let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
507 Metric::new(
508 name.to_string(),
509 MetricKind::Incremental,
510 MetricValue::Counter {
511 value: dd_point.1 * (i as f64),
512 },
513 )
514 .with_timestamp(Some(
515 Utc.timestamp_opt(dd_point.0, 0)
516 .single()
517 .expect("invalid timestamp"),
518 ))
519 .with_interval_ms(NonZeroU32::new(i * 1000))
521 .with_tags(Some(tags.clone()))
522 .with_namespace(namespace)
523 })
524 .collect::<Vec<_>>(),
525 }
526 .into_iter()
527 .map(|mut metric| {
528 if let Some(k) = &api_key {
529 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
530 }
531
532 metric
533 .metadata_mut()
534 .set_schema_definition(schema_definition);
535
536 metric.into()
537 })
538 .collect()
539}
540
541fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
544 match dd_metric_name.split_once('.') {
546 Some((namespace, name)) => (Some(namespace), name),
547 None => (None, dd_metric_name),
548 }
549}
550
551pub(crate) fn decode_ddsketch(
552 frame: Bytes,
553 api_key: &Option<Arc<str>>,
554) -> crate::Result<Vec<Event>> {
555 let payload = SketchPayload::decode(frame)?;
556 Ok(payload
558 .sketches
559 .into_iter()
560 .flat_map(|sketch_series| {
561 let mut tags = into_metric_tags(sketch_series.tags);
563 log_schema()
564 .host_key()
565 .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
566
567 let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
568
569 sketch_series.dogsketches.into_iter().map(move |sketch| {
570 let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
571 let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
572 let val = MetricValue::from(
573 AgentDDSketch::from_raw(
574 sketch.cnt as u32,
575 sketch.min,
576 sketch.max,
577 sketch.sum,
578 sketch.avg,
579 &k,
580 &n,
581 )
582 .unwrap_or_else(AgentDDSketch::with_agent_defaults),
583 );
584 let (namespace, name) = namespace_name_from_dd_metric(&sketch_series.metric);
585 let mut metric = Metric::new_with_metadata(
586 name.to_string(),
587 MetricKind::Incremental,
588 val,
589 event_metadata.clone(),
590 )
591 .with_tags(Some(tags.clone()))
592 .with_timestamp(Some(
593 Utc.timestamp_opt(sketch.ts, 0)
594 .single()
595 .expect("invalid timestamp"),
596 ))
597 .with_namespace(namespace);
598 if let Some(k) = &api_key {
599 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
600 }
601
602 metric.into()
603 })
604 })
605 .collect())
606}