1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter};
9
10use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
11use vector_lib::{
12 event::{DatadogMetricOriginMetadata, EventMetadata},
13 metrics::AgentDDSketch,
14 EstimatedJsonEncodedSizeOf,
15};
16
17use crate::common::http::ErrorMessage;
18use crate::{
19 common::datadog::{DatadogMetricType, DatadogSeriesMetric},
20 config::log_schema,
21 event::{
22 metric::{Metric, MetricValue},
23 Event, MetricKind, MetricTags,
24 },
25 internal_events::EventsReceived,
26 schema,
27 sources::{
28 datadog_agent::{
29 ddmetric_proto::{metric_payload, Metadata, MetricPayload, SketchPayload},
30 handle_request, ApiKeyQueryParams, DatadogAgentSource,
31 },
32 util::extract_tag_key_and_value,
33 },
34 SourceSender,
35};
36
37#[derive(Deserialize, Serialize)]
38pub(crate) struct DatadogSeriesRequest {
39 pub(crate) series: Vec<DatadogSeriesMetric>,
40}
41
42pub(crate) fn build_warp_filter(
43 acknowledgements: bool,
44 multiple_outputs: bool,
45 out: SourceSender,
46 source: DatadogAgentSource,
47) -> BoxedFilter<(Response,)> {
48 let output = multiple_outputs.then_some(super::METRICS);
49 let sketches_service = sketches_service(acknowledgements, output, out.clone(), source.clone());
50 let series_v1_service =
51 series_v1_service(acknowledgements, output, out.clone(), source.clone());
52 let series_v2_service = series_v2_service(acknowledgements, output, out, source);
53 sketches_service
54 .or(series_v1_service)
55 .unify()
56 .or(series_v2_service)
57 .unify()
58 .boxed()
59}
60
61fn sketches_service(
62 acknowledgements: bool,
63 output: Option<&'static str>,
64 out: SourceSender,
65 source: DatadogAgentSource,
66) -> BoxedFilter<(Response,)> {
67 warp::post()
68 .and(path!("api" / "beta" / "sketches" / ..))
69 .and(warp::path::full())
70 .and(warp::header::optional::<String>("content-encoding"))
71 .and(warp::header::optional::<String>("dd-api-key"))
72 .and(warp::query::<ApiKeyQueryParams>())
73 .and(warp::body::bytes())
74 .and_then(
75 move |path: FullPath,
76 encoding_header: Option<String>,
77 api_token: Option<String>,
78 query_params: ApiKeyQueryParams,
79 body: Bytes| {
80 let events = source
81 .decode(&encoding_header, body, path.as_str())
82 .and_then(|body| {
83 decode_datadog_sketches(
84 body,
85 source.api_key_extractor.extract(
86 path.as_str(),
87 api_token,
88 query_params.dd_api_key,
89 ),
90 &source.events_received,
91 )
92 });
93 handle_request(events, acknowledgements, out.clone(), output)
94 },
95 )
96 .boxed()
97}
98
99fn series_v1_service(
100 acknowledgements: bool,
101 output: Option<&'static str>,
102 out: SourceSender,
103 source: DatadogAgentSource,
104) -> BoxedFilter<(Response,)> {
105 warp::post()
106 .and(path!("api" / "v1" / "series" / ..))
107 .and(warp::path::full())
108 .and(warp::header::optional::<String>("content-encoding"))
109 .and(warp::header::optional::<String>("dd-api-key"))
110 .and(warp::query::<ApiKeyQueryParams>())
111 .and(warp::body::bytes())
112 .and_then(
113 move |path: FullPath,
114 encoding_header: Option<String>,
115 api_token: Option<String>,
116 query_params: ApiKeyQueryParams,
117 body: Bytes| {
118 let events = source
119 .decode(&encoding_header, body, path.as_str())
120 .and_then(|body| {
121 decode_datadog_series_v1(
122 body,
123 source.api_key_extractor.extract(
124 path.as_str(),
125 api_token,
126 query_params.dd_api_key,
127 ),
128 &Arc::new(schema::Definition::default_legacy_namespace()),
131 &source.events_received,
132 )
133 });
134 handle_request(events, acknowledgements, out.clone(), output)
135 },
136 )
137 .boxed()
138}
139
140fn series_v2_service(
141 acknowledgements: bool,
142 output: Option<&'static str>,
143 out: SourceSender,
144 source: DatadogAgentSource,
145) -> BoxedFilter<(Response,)> {
146 warp::post()
147 .and(path!("api" / "v2" / "series" / ..))
148 .and(warp::path::full())
149 .and(warp::header::optional::<String>("content-encoding"))
150 .and(warp::header::optional::<String>("dd-api-key"))
151 .and(warp::query::<ApiKeyQueryParams>())
152 .and(warp::body::bytes())
153 .and_then(
154 move |path: FullPath,
155 encoding_header: Option<String>,
156 api_token: Option<String>,
157 query_params: ApiKeyQueryParams,
158 body: Bytes| {
159 let events = source
160 .decode(&encoding_header, body, path.as_str())
161 .and_then(|body| {
162 decode_datadog_series_v2(
163 body,
164 source.api_key_extractor.extract(
165 path.as_str(),
166 api_token,
167 query_params.dd_api_key,
168 ),
169 &source.events_received,
170 )
171 });
172 handle_request(events, acknowledgements, out.clone(), output)
173 },
174 )
175 .boxed()
176}
177
178fn decode_datadog_sketches(
179 body: Bytes,
180 api_key: Option<Arc<str>>,
181 events_received: &Registered<EventsReceived>,
182) -> Result<Vec<Event>, ErrorMessage> {
183 if body.is_empty() {
184 debug!(
186 message = "Empty payload ignored.",
187 internal_log_rate_limit = true
188 );
189 return Ok(Vec::new());
190 }
191
192 let metrics = decode_ddsketch(body, &api_key).map_err(|error| {
193 ErrorMessage::new(
194 StatusCode::UNPROCESSABLE_ENTITY,
195 format!("Error decoding Datadog sketch: {error:?}"),
196 )
197 })?;
198
199 events_received.emit(CountByteSize(
200 metrics.len(),
201 metrics.estimated_json_encoded_size_of(),
202 ));
203
204 Ok(metrics)
205}
206
207fn decode_datadog_series_v2(
208 body: Bytes,
209 api_key: Option<Arc<str>>,
210 events_received: &Registered<EventsReceived>,
211) -> Result<Vec<Event>, ErrorMessage> {
212 if body.is_empty() {
213 debug!(
215 message = "Empty payload ignored.",
216 internal_log_rate_limit = true
217 );
218 return Ok(Vec::new());
219 }
220
221 let metrics = decode_ddseries_v2(body, &api_key).map_err(|error| {
222 ErrorMessage::new(
223 StatusCode::UNPROCESSABLE_ENTITY,
224 format!("Error decoding Datadog sketch: {error:?}"),
225 )
226 })?;
227
228 events_received.emit(CountByteSize(
229 metrics.len(),
230 metrics.estimated_json_encoded_size_of(),
231 ));
232
233 Ok(metrics)
234}
235
236fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
239 metadata
240 .and_then(|metadata| metadata.origin.as_ref())
241 .map_or_else(EventMetadata::default, |origin| {
242 trace!(
243 "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
244 origin.origin_product,
245 origin.origin_category,
246 origin.origin_service,
247 );
248 EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
249 Some(origin.origin_product),
250 Some(origin.origin_category),
251 Some(origin.origin_service),
252 ))
253 })
254}
255
256pub(crate) fn decode_ddseries_v2(
257 frame: Bytes,
258 api_key: &Option<Arc<str>>,
259) -> crate::Result<Vec<Event>> {
260 let payload = MetricPayload::decode(frame)?;
261 let decoded_metrics: Vec<Event> = payload
262 .series
263 .into_iter()
264 .flat_map(|serie| {
265 let (namespace, name) = namespace_name_from_dd_metric(&serie.metric);
266 let mut tags = into_metric_tags(serie.tags);
267
268 let event_metadata = get_event_metadata(serie.metadata.as_ref());
269
270 let non_rate_interval = if serie.interval.is_positive() {
292 NonZeroU32::new(serie.interval as u32 * 1000) } else {
294 None
295 };
296
297 serie.resources.into_iter().for_each(|r| {
298 if r.r#type.eq("host") {
301 log_schema()
302 .host_key()
303 .and_then(|key| tags.replace(key.to_string(), r.name));
304 } else {
305 tags.replace(format!("resource.{}", r.r#type), r.name);
307 }
308 });
309 (!serie.source_type_name.is_empty())
310 .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
311 match metric_payload::MetricType::try_from(serie.r#type) {
314 Ok(metric_payload::MetricType::Count) => serie
315 .points
316 .iter()
317 .map(|dd_point| {
318 Metric::new_with_metadata(
319 name.to_string(),
320 MetricKind::Incremental,
321 MetricValue::Counter {
322 value: dd_point.value,
323 },
324 event_metadata.clone(),
325 )
326 .with_timestamp(Some(
327 Utc.timestamp_opt(dd_point.timestamp, 0)
328 .single()
329 .expect("invalid timestamp"),
330 ))
331 .with_tags(Some(tags.clone()))
332 .with_namespace(namespace)
333 })
334 .collect::<Vec<_>>(),
335 Ok(metric_payload::MetricType::Gauge) => serie
336 .points
337 .iter()
338 .map(|dd_point| {
339 Metric::new_with_metadata(
340 name.to_string(),
341 MetricKind::Absolute,
342 MetricValue::Gauge {
343 value: dd_point.value,
344 },
345 event_metadata.clone(),
346 )
347 .with_timestamp(Some(
348 Utc.timestamp_opt(dd_point.timestamp, 0)
349 .single()
350 .expect("invalid timestamp"),
351 ))
352 .with_tags(Some(tags.clone()))
353 .with_namespace(namespace)
354 .with_interval_ms(non_rate_interval)
355 })
356 .collect::<Vec<_>>(),
357 Ok(metric_payload::MetricType::Rate) => serie
358 .points
359 .iter()
360 .map(|dd_point| {
361 let i = Some(serie.interval)
362 .filter(|v| *v != 0)
363 .map(|v| v as u32)
364 .unwrap_or(1);
365 Metric::new_with_metadata(
366 name.to_string(),
367 MetricKind::Incremental,
368 MetricValue::Counter {
369 value: dd_point.value * (i as f64),
370 },
371 event_metadata.clone(),
372 )
373 .with_timestamp(Some(
374 Utc.timestamp_opt(dd_point.timestamp, 0)
375 .single()
376 .expect("invalid timestamp"),
377 ))
378 .with_interval_ms(NonZeroU32::new(i * 1000))
380 .with_tags(Some(tags.clone()))
381 .with_namespace(namespace)
382 })
383 .collect::<Vec<_>>(),
384 Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
385 warn!("Unspecified metric type ({}).", serie.r#type);
386 Vec::new()
387 }
388 }
389 })
390 .map(|mut metric| {
391 if let Some(k) = &api_key {
392 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
393 }
394 metric.into()
395 })
396 .collect();
397
398 Ok(decoded_metrics)
399}
400
401fn decode_datadog_series_v1(
402 body: Bytes,
403 api_key: Option<Arc<str>>,
404 schema_definition: &Arc<schema::Definition>,
405 events_received: &Registered<EventsReceived>,
406) -> Result<Vec<Event>, ErrorMessage> {
407 if body.is_empty() {
408 debug!(
410 message = "Empty payload ignored.",
411 internal_log_rate_limit = true
412 );
413 return Ok(Vec::new());
414 }
415
416 let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
417 ErrorMessage::new(
418 StatusCode::BAD_REQUEST,
419 format!("Error parsing JSON: {error:?}"),
420 )
421 })?;
422
423 let decoded_metrics: Vec<Event> = metrics
424 .series
425 .into_iter()
426 .flat_map(|m| into_vector_metric(m, api_key.clone(), schema_definition))
427 .collect();
428
429 events_received.emit(CountByteSize(
430 decoded_metrics.len(),
431 decoded_metrics.estimated_json_encoded_size_of(),
432 ));
433
434 Ok(decoded_metrics)
435}
436
437fn into_metric_tags(tags: Vec<String>) -> MetricTags {
438 tags.iter().map(extract_tag_key_and_value).collect()
439}
440
441fn into_vector_metric(
442 dd_metric: DatadogSeriesMetric,
443 api_key: Option<Arc<str>>,
444 schema_definition: &Arc<schema::Definition>,
445) -> Vec<Event> {
446 let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
447
448 if let Some(key) = log_schema().host_key() {
449 dd_metric
450 .host
451 .and_then(|host| tags.replace(key.to_string(), host));
452 }
453
454 dd_metric
455 .source_type_name
456 .and_then(|source| tags.replace("source_type_name".into(), source));
457 dd_metric
458 .device
459 .and_then(|dev| tags.replace("device".into(), dev));
460
461 let (namespace, name) = namespace_name_from_dd_metric(&dd_metric.metric);
462
463 match dd_metric.r#type {
464 DatadogMetricType::Count => dd_metric
465 .points
466 .iter()
467 .map(|dd_point| {
468 Metric::new(
469 name.to_string(),
470 MetricKind::Incremental,
471 MetricValue::Counter { value: dd_point.1 },
472 )
473 .with_timestamp(Some(
474 Utc.timestamp_opt(dd_point.0, 0)
475 .single()
476 .expect("invalid timestamp"),
477 ))
478 .with_tags(Some(tags.clone()))
479 .with_namespace(namespace)
480 })
481 .collect::<Vec<_>>(),
482 DatadogMetricType::Gauge => dd_metric
483 .points
484 .iter()
485 .map(|dd_point| {
486 Metric::new(
487 name.to_string(),
488 MetricKind::Absolute,
489 MetricValue::Gauge { value: dd_point.1 },
490 )
491 .with_timestamp(Some(
492 Utc.timestamp_opt(dd_point.0, 0)
493 .single()
494 .expect("invalid timestamp"),
495 ))
496 .with_tags(Some(tags.clone()))
497 .with_namespace(namespace)
498 })
499 .collect::<Vec<_>>(),
500 DatadogMetricType::Rate => dd_metric
503 .points
504 .iter()
505 .map(|dd_point| {
506 let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
507 Metric::new(
508 name.to_string(),
509 MetricKind::Incremental,
510 MetricValue::Counter {
511 value: dd_point.1 * (i as f64),
512 },
513 )
514 .with_timestamp(Some(
515 Utc.timestamp_opt(dd_point.0, 0)
516 .single()
517 .expect("invalid timestamp"),
518 ))
519 .with_interval_ms(NonZeroU32::new(i * 1000))
521 .with_tags(Some(tags.clone()))
522 .with_namespace(namespace)
523 })
524 .collect::<Vec<_>>(),
525 }
526 .into_iter()
527 .map(|mut metric| {
528 if let Some(k) = &api_key {
529 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
530 }
531
532 metric
533 .metadata_mut()
534 .set_schema_definition(schema_definition);
535
536 metric.into()
537 })
538 .collect()
539}
540
541fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
544 match dd_metric_name.split_once('.') {
546 Some((namespace, name)) => (Some(namespace), name),
547 None => (None, dd_metric_name),
548 }
549}
550
551pub(crate) fn decode_ddsketch(
552 frame: Bytes,
553 api_key: &Option<Arc<str>>,
554) -> crate::Result<Vec<Event>> {
555 let payload = SketchPayload::decode(frame)?;
556 Ok(payload
558 .sketches
559 .into_iter()
560 .flat_map(|sketch_series| {
561 let mut tags = into_metric_tags(sketch_series.tags);
563 log_schema()
564 .host_key()
565 .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
566
567 let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
568
569 sketch_series.dogsketches.into_iter().map(move |sketch| {
570 let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
571 let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
572 let val = MetricValue::from(
573 AgentDDSketch::from_raw(
574 sketch.cnt as u32,
575 sketch.min,
576 sketch.max,
577 sketch.sum,
578 sketch.avg,
579 &k,
580 &n,
581 )
582 .unwrap_or_else(AgentDDSketch::with_agent_defaults),
583 );
584 let (namespace, name) = namespace_name_from_dd_metric(&sketch_series.metric);
585 let mut metric = Metric::new_with_metadata(
586 name.to_string(),
587 MetricKind::Incremental,
588 val,
589 event_metadata.clone(),
590 )
591 .with_tags(Some(tags.clone()))
592 .with_timestamp(Some(
593 Utc.timestamp_opt(sketch.ts, 0)
594 .single()
595 .expect("invalid timestamp"),
596 ))
597 .with_namespace(namespace);
598 if let Some(k) = &api_key {
599 metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
600 }
601
602 metric.into()
603 })
604 })
605 .collect())
606}