vector/sources/datadog_agent/
metrics.rs

1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    event::{DatadogMetricOriginMetadata, EventMetadata},
11    internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12    metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use super::ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload};
17use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler};
18use crate::{
19    common::{
20        datadog::{DatadogMetricType, DatadogSeriesMetric},
21        http::ErrorMessage,
22    },
23    config::log_schema,
24    event::{
25        Event, MetricKind, MetricTags,
26        metric::{Metric, MetricValue},
27    },
28    internal_events::EventsReceived,
29    schema,
30    sources::util::extract_tag_key_and_value,
31};
32
33#[derive(Deserialize, Serialize)]
34pub(crate) struct DatadogSeriesRequest {
35    pub(crate) series: Vec<DatadogSeriesMetric>,
36}
37
38pub(super) fn build_warp_filter(
39    handler: RequestHandler,
40    source: DatadogAgentSource,
41) -> BoxedFilter<(Response,)> {
42    let sketches_service = sketches_service(handler.clone(), source.clone());
43    let series_v1_service = series_v1_service(handler.clone(), source.clone());
44    let series_v2_service = series_v2_service(handler, source);
45    sketches_service
46        .or(series_v1_service)
47        .unify()
48        .or(series_v2_service)
49        .unify()
50        .boxed()
51}
52
53fn sketches_service(
54    handler: RequestHandler,
55    source: DatadogAgentSource,
56) -> BoxedFilter<(Response,)> {
57    warp::post()
58        .and(path!("api" / "beta" / "sketches" / ..))
59        .and(warp::path::full())
60        .and(warp::header::optional::<String>("content-encoding"))
61        .and(warp::header::optional::<String>("dd-api-key"))
62        .and(warp::query::<ApiKeyQueryParams>())
63        .and(warp::body::bytes())
64        .and_then({
65            move |path: FullPath,
66                  encoding_header: Option<String>,
67                  api_token: Option<String>,
68                  query_params: ApiKeyQueryParams,
69                  body: Bytes| {
70                let events = source
71                    .decode(&encoding_header, body, path.as_str())
72                    .and_then(|body| {
73                        decode_datadog_sketches(
74                            body,
75                            source.api_key_extractor.extract(
76                                path.as_str(),
77                                api_token,
78                                query_params.dd_api_key,
79                            ),
80                            source.split_metric_namespace,
81                            &source.events_received,
82                        )
83                    });
84                handler.clone().handle_request(events, super::METRICS)
85            }
86        })
87        .boxed()
88}
89
90fn series_v1_service(
91    handler: RequestHandler,
92    source: DatadogAgentSource,
93) -> BoxedFilter<(Response,)> {
94    warp::post()
95        .and(path!("api" / "v1" / "series" / ..))
96        .and(warp::path::full())
97        .and(warp::header::optional::<String>("content-encoding"))
98        .and(warp::header::optional::<String>("dd-api-key"))
99        .and(warp::query::<ApiKeyQueryParams>())
100        .and(warp::body::bytes())
101        .and_then({
102            move |path: FullPath,
103                  encoding_header: Option<String>,
104                  api_token: Option<String>,
105                  query_params: ApiKeyQueryParams,
106                  body: Bytes| {
107                let events = source
108                    .decode(&encoding_header, body, path.as_str())
109                    .and_then(|body| {
110                        decode_datadog_series_v1(
111                            body,
112                            source.api_key_extractor.extract(
113                                path.as_str(),
114                                api_token,
115                                query_params.dd_api_key,
116                            ),
117                            // Currently metrics do not have schemas defined, so for now we just pass a
118                            // default one.
119                            &Arc::new(schema::Definition::default_legacy_namespace()),
120                            source.split_metric_namespace,
121                            &source.events_received,
122                        )
123                    });
124                handler.clone().handle_request(events, super::METRICS)
125            }
126        })
127        .boxed()
128}
129
130fn series_v2_service(
131    handler: RequestHandler,
132    source: DatadogAgentSource,
133) -> BoxedFilter<(Response,)> {
134    warp::post()
135        .and(path!("api" / "v2" / "series" / ..))
136        .and(warp::path::full())
137        .and(warp::header::optional::<String>("content-encoding"))
138        .and(warp::header::optional::<String>("dd-api-key"))
139        .and(warp::query::<ApiKeyQueryParams>())
140        .and(warp::body::bytes())
141        .and_then({
142            move |path: FullPath,
143                  encoding_header: Option<String>,
144                  api_token: Option<String>,
145                  query_params: ApiKeyQueryParams,
146                  body: Bytes| {
147                let events = source
148                    .decode(&encoding_header, body, path.as_str())
149                    .and_then(|body| {
150                        decode_datadog_series_v2(
151                            body,
152                            source.api_key_extractor.extract(
153                                path.as_str(),
154                                api_token,
155                                query_params.dd_api_key,
156                            ),
157                            source.split_metric_namespace,
158                            &source.events_received,
159                        )
160                    });
161                handler.clone().handle_request(events, super::METRICS)
162            }
163        })
164        .boxed()
165}
166
167fn decode_datadog_sketches(
168    body: Bytes,
169    api_key: Option<Arc<str>>,
170    split_metric_namespace: bool,
171    events_received: &Registered<EventsReceived>,
172) -> Result<Vec<Event>, ErrorMessage> {
173    if body.is_empty() {
174        // The datadog agent may send an empty payload as a keep alive
175        debug!(message = "Empty payload ignored.");
176        return Ok(Vec::new());
177    }
178
179    let metrics = decode_ddsketch(body, &api_key, split_metric_namespace).map_err(|error| {
180        ErrorMessage::new(
181            StatusCode::UNPROCESSABLE_ENTITY,
182            format!("Error decoding Datadog sketch: {error:?}"),
183        )
184    })?;
185
186    events_received.emit(CountByteSize(
187        metrics.len(),
188        metrics.estimated_json_encoded_size_of(),
189    ));
190
191    Ok(metrics)
192}
193
194fn decode_datadog_series_v2(
195    body: Bytes,
196    api_key: Option<Arc<str>>,
197    split_metric_namespace: bool,
198    events_received: &Registered<EventsReceived>,
199) -> Result<Vec<Event>, ErrorMessage> {
200    if body.is_empty() {
201        // The datadog agent may send an empty payload as a keep alive
202        debug!(message = "Empty payload ignored.");
203        return Ok(Vec::new());
204    }
205
206    let metrics = decode_ddseries_v2(body, &api_key, split_metric_namespace).map_err(|error| {
207        ErrorMessage::new(
208            StatusCode::UNPROCESSABLE_ENTITY,
209            format!("Error decoding Datadog sketch: {error:?}"),
210        )
211    })?;
212
213    events_received.emit(CountByteSize(
214        metrics.len(),
215        metrics.estimated_json_encoded_size_of(),
216    ));
217
218    Ok(metrics)
219}
220
221/// Builds Vector's `EventMetadata` from the received metadata. Currently this is only
222/// utilized for passing through origin metadata set by the Agent.
223fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
224    metadata
225        .and_then(|metadata| metadata.origin.as_ref())
226        .map_or_else(EventMetadata::default, |origin| {
227            trace!(
228                "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
229                origin.origin_product, origin.origin_category, origin.origin_service,
230            );
231            EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
232                Some(origin.origin_product),
233                Some(origin.origin_category),
234                Some(origin.origin_service),
235            ))
236        })
237}
238
239pub(crate) fn decode_ddseries_v2(
240    frame: Bytes,
241    api_key: &Option<Arc<str>>,
242    split_metric_namespace: bool,
243) -> crate::Result<Vec<Event>> {
244    let payload = MetricPayload::decode(frame)?;
245    let decoded_metrics: Vec<Event> = payload
246        .series
247        .into_iter()
248        .flat_map(|serie| {
249            let (namespace, name) = if split_metric_namespace {
250                namespace_name_from_dd_metric(&serie.metric)
251            } else {
252                (None, serie.metric.as_str())
253            };
254            let mut tags = into_metric_tags(serie.tags);
255
256            let event_metadata = get_event_metadata(serie.metadata.as_ref());
257
258            // It is possible to receive non-rate metrics from the Agent with an interval set.
259            // That interval can be applied with the `as_rate` function in the Datadog UI.
260            // The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent,
261            // in which it sets an interval to 10. See
262            //    - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1
263            //    - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39
264            //
265            // Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the
266            // interval for non-rate series metrics.
267            //
268            // Note that because Vector does not yet have a specific Metric type to handle Rate,
269            // we are distinguishing Rate from Count by setting an interval to Rate but not Count.
270            // Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint.
271            // (Regular Count metrics are emitted by DogStatsD as Rate metrics).
272            //
273            // In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe,
274            // we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case
275            // is Gauges.
276            //
277            // Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to
278            // have special handling for the interval, we would just apply it to all metrics that it came in with.
279            let non_rate_interval = if serie.interval.is_positive() {
280                NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds
281            } else {
282                None
283            };
284
285            serie.resources.into_iter().for_each(|r| {
286                // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
287                // the hostname can be found in MetricSeries::resources and that is the only value stored there.
288                if r.r#type.eq("host") {
289                    log_schema()
290                        .host_key()
291                        .and_then(|key| tags.replace(key.to_string(), r.name));
292                } else if r.r#type.eq("device") {
293                    // The `device` resource type is used by Agent checks (disk, SNMP/NDM, etc.)
294                    // and must be preserved as a plain `device` tag to match the v1 series behavior.
295                    tags.replace("device".into(), r.name);
296                } else {
297                    // But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map
298                    tags.replace(format!("resource.{}", r.r#type), r.name);
299                }
300            });
301            (!serie.source_type_name.is_empty())
302                .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
303            // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L224
304            // serie.unit is omitted
305            match metric_payload::MetricType::try_from(serie.r#type) {
306                Ok(metric_payload::MetricType::Count) => serie
307                    .points
308                    .iter()
309                    .map(|dd_point| {
310                        Metric::new_with_metadata(
311                            name.to_string(),
312                            MetricKind::Incremental,
313                            MetricValue::Counter {
314                                value: dd_point.value,
315                            },
316                            event_metadata.clone(),
317                        )
318                        .with_timestamp(Some(
319                            Utc.timestamp_opt(dd_point.timestamp, 0)
320                                .single()
321                                .expect("invalid timestamp"),
322                        ))
323                        .with_tags(Some(tags.clone()))
324                        .with_namespace(namespace)
325                    })
326                    .collect::<Vec<_>>(),
327                Ok(metric_payload::MetricType::Gauge) => serie
328                    .points
329                    .iter()
330                    .map(|dd_point| {
331                        Metric::new_with_metadata(
332                            name.to_string(),
333                            MetricKind::Absolute,
334                            MetricValue::Gauge {
335                                value: dd_point.value,
336                            },
337                            event_metadata.clone(),
338                        )
339                        .with_timestamp(Some(
340                            Utc.timestamp_opt(dd_point.timestamp, 0)
341                                .single()
342                                .expect("invalid timestamp"),
343                        ))
344                        .with_tags(Some(tags.clone()))
345                        .with_namespace(namespace)
346                        .with_interval_ms(non_rate_interval)
347                    })
348                    .collect::<Vec<_>>(),
349                Ok(metric_payload::MetricType::Rate) => serie
350                    .points
351                    .iter()
352                    .map(|dd_point| {
353                        let i = Some(serie.interval)
354                            .filter(|v| *v != 0)
355                            .map(|v| v as u32)
356                            .unwrap_or(1);
357                        Metric::new_with_metadata(
358                            name.to_string(),
359                            MetricKind::Incremental,
360                            MetricValue::Counter {
361                                value: dd_point.value * (i as f64),
362                            },
363                            event_metadata.clone(),
364                        )
365                        .with_timestamp(Some(
366                            Utc.timestamp_opt(dd_point.timestamp, 0)
367                                .single()
368                                .expect("invalid timestamp"),
369                        ))
370                        // serie.interval is in seconds, convert to ms
371                        .with_interval_ms(NonZeroU32::new(i * 1000))
372                        .with_tags(Some(tags.clone()))
373                        .with_namespace(namespace)
374                    })
375                    .collect::<Vec<_>>(),
376                Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
377                    warn!("Unspecified metric type ({}).", serie.r#type);
378                    Vec::new()
379                }
380            }
381        })
382        .map(|mut metric| {
383            if let Some(k) = &api_key {
384                metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
385            }
386            metric.into()
387        })
388        .collect();
389
390    Ok(decoded_metrics)
391}
392
393fn decode_datadog_series_v1(
394    body: Bytes,
395    api_key: Option<Arc<str>>,
396    schema_definition: &Arc<schema::Definition>,
397    split_metric_namespace: bool,
398    events_received: &Registered<EventsReceived>,
399) -> Result<Vec<Event>, ErrorMessage> {
400    if body.is_empty() {
401        // The datadog agent may send an empty payload as a keep alive
402        debug!(message = "Empty payload ignored.");
403        return Ok(Vec::new());
404    }
405
406    let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
407        ErrorMessage::new(
408            StatusCode::BAD_REQUEST,
409            format!("Error parsing JSON: {error:?}"),
410        )
411    })?;
412
413    let decoded_metrics: Vec<Event> = metrics
414        .series
415        .into_iter()
416        .flat_map(|m| {
417            into_vector_metric(
418                m,
419                api_key.clone(),
420                schema_definition,
421                split_metric_namespace,
422            )
423        })
424        .collect();
425
426    events_received.emit(CountByteSize(
427        decoded_metrics.len(),
428        decoded_metrics.estimated_json_encoded_size_of(),
429    ));
430
431    Ok(decoded_metrics)
432}
433
434fn into_metric_tags(tags: Vec<String>) -> MetricTags {
435    tags.iter().map(extract_tag_key_and_value).collect()
436}
437
438fn into_vector_metric(
439    dd_metric: DatadogSeriesMetric,
440    api_key: Option<Arc<str>>,
441    schema_definition: &Arc<schema::Definition>,
442    split_metric_namespace: bool,
443) -> Vec<Event> {
444    let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
445
446    if let Some(key) = log_schema().host_key() {
447        dd_metric
448            .host
449            .and_then(|host| tags.replace(key.to_string(), host));
450    }
451
452    dd_metric
453        .source_type_name
454        .and_then(|source| tags.replace("source_type_name".into(), source));
455    dd_metric
456        .device
457        .and_then(|dev| tags.replace("device".into(), dev));
458
459    let (namespace, name) = if split_metric_namespace {
460        namespace_name_from_dd_metric(&dd_metric.metric)
461    } else {
462        (None, dd_metric.metric.as_str())
463    };
464
465    match dd_metric.r#type {
466        DatadogMetricType::Count => dd_metric
467            .points
468            .iter()
469            .map(|dd_point| {
470                Metric::new(
471                    name.to_string(),
472                    MetricKind::Incremental,
473                    MetricValue::Counter { value: dd_point.1 },
474                )
475                .with_timestamp(Some(
476                    Utc.timestamp_opt(dd_point.0, 0)
477                        .single()
478                        .expect("invalid timestamp"),
479                ))
480                .with_tags(Some(tags.clone()))
481                .with_namespace(namespace)
482            })
483            .collect::<Vec<_>>(),
484        DatadogMetricType::Gauge => dd_metric
485            .points
486            .iter()
487            .map(|dd_point| {
488                Metric::new(
489                    name.to_string(),
490                    MetricKind::Absolute,
491                    MetricValue::Gauge { value: dd_point.1 },
492                )
493                .with_timestamp(Some(
494                    Utc.timestamp_opt(dd_point.0, 0)
495                        .single()
496                        .expect("invalid timestamp"),
497                ))
498                .with_tags(Some(tags.clone()))
499                .with_namespace(namespace)
500            })
501            .collect::<Vec<_>>(),
502        // Agent sends rate only for dogstatsd counter https://github.com/DataDog/datadog-agent/blob/f4a13c6dca5e2da4bb722f861a8ac4c2f715531d/pkg/metrics/counter.go#L8-L10
503        // for consistency purpose (w.r.t. (dog)statsd source) they are turned back into counters
504        DatadogMetricType::Rate => dd_metric
505            .points
506            .iter()
507            .map(|dd_point| {
508                let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
509                Metric::new(
510                    name.to_string(),
511                    MetricKind::Incremental,
512                    MetricValue::Counter {
513                        value: dd_point.1 * (i as f64),
514                    },
515                )
516                .with_timestamp(Some(
517                    Utc.timestamp_opt(dd_point.0, 0)
518                        .single()
519                        .expect("invalid timestamp"),
520                ))
521                // dd_metric.interval is in seconds, convert to ms
522                .with_interval_ms(NonZeroU32::new(i * 1000))
523                .with_tags(Some(tags.clone()))
524                .with_namespace(namespace)
525            })
526            .collect::<Vec<_>>(),
527    }
528    .into_iter()
529    .map(|mut metric| {
530        if let Some(k) = &api_key {
531            metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
532        }
533
534        metric
535            .metadata_mut()
536            .set_schema_definition(schema_definition);
537
538        metric.into()
539    })
540    .collect()
541}
542
543/// Parses up to the first '.' of the input metric name into a namespace.
544/// If no delimiter, the namespace is None type.
545fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
546    // ex: "system.fs.util" -> ("system", "fs.util")
547    match dd_metric_name.split_once('.') {
548        Some((namespace, name)) => (Some(namespace), name),
549        None => (None, dd_metric_name),
550    }
551}
552
553pub(crate) fn decode_ddsketch(
554    frame: Bytes,
555    api_key: &Option<Arc<str>>,
556    split_metric_namespace: bool,
557) -> crate::Result<Vec<Event>> {
558    let payload = SketchPayload::decode(frame)?;
559    // payload.metadata is always empty for payload coming from dd agents
560    Ok(payload
561        .sketches
562        .into_iter()
563        .flat_map(|sketch_series| {
564            // sketch_series.distributions is also always empty from payload coming from dd agents
565            let mut tags = into_metric_tags(sketch_series.tags);
566            log_schema()
567                .host_key()
568                .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
569
570            let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
571
572            sketch_series.dogsketches.into_iter().map(move |sketch| {
573                let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
574                let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
575                let val = MetricValue::from(
576                    AgentDDSketch::from_raw(
577                        sketch.cnt as u32,
578                        sketch.min,
579                        sketch.max,
580                        sketch.sum,
581                        sketch.avg,
582                        &k,
583                        &n,
584                    )
585                    .unwrap_or_else(AgentDDSketch::with_agent_defaults),
586                );
587                let (namespace, name) = if split_metric_namespace {
588                    namespace_name_from_dd_metric(&sketch_series.metric)
589                } else {
590                    (None, sketch_series.metric.as_str())
591                };
592                let mut metric = Metric::new_with_metadata(
593                    name.to_string(),
594                    MetricKind::Incremental,
595                    val,
596                    event_metadata.clone(),
597                )
598                .with_tags(Some(tags.clone()))
599                .with_timestamp(Some(
600                    Utc.timestamp_opt(sketch.ts, 0)
601                        .single()
602                        .expect("invalid timestamp"),
603                ))
604                .with_namespace(namespace);
605                if let Some(k) = &api_key {
606                    metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
607                }
608
609                metric.into()
610            })
611        })
612        .collect())
613}