vector/sources/datadog_agent/
metrics.rs

1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    event::{DatadogMetricOriginMetadata, EventMetadata},
11    internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12    metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use crate::{
17    SourceSender,
18    common::{
19        datadog::{DatadogMetricType, DatadogSeriesMetric},
20        http::ErrorMessage,
21    },
22    config::log_schema,
23    event::{
24        Event, MetricKind, MetricTags,
25        metric::{Metric, MetricValue},
26    },
27    internal_events::EventsReceived,
28    schema,
29    sources::{
30        datadog_agent::{
31            ApiKeyQueryParams, DatadogAgentSource,
32            ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload},
33            handle_request,
34        },
35        util::extract_tag_key_and_value,
36    },
37};
38
39#[derive(Deserialize, Serialize)]
40pub(crate) struct DatadogSeriesRequest {
41    pub(crate) series: Vec<DatadogSeriesMetric>,
42}
43
44pub(crate) fn build_warp_filter(
45    acknowledgements: bool,
46    multiple_outputs: bool,
47    out: SourceSender,
48    source: DatadogAgentSource,
49) -> BoxedFilter<(Response,)> {
50    let output = multiple_outputs.then_some(super::METRICS);
51    let sketches_service = sketches_service(acknowledgements, output, out.clone(), source.clone());
52    let series_v1_service =
53        series_v1_service(acknowledgements, output, out.clone(), source.clone());
54    let series_v2_service = series_v2_service(acknowledgements, output, out, source);
55    sketches_service
56        .or(series_v1_service)
57        .unify()
58        .or(series_v2_service)
59        .unify()
60        .boxed()
61}
62
63fn sketches_service(
64    acknowledgements: bool,
65    output: Option<&'static str>,
66    out: SourceSender,
67    source: DatadogAgentSource,
68) -> BoxedFilter<(Response,)> {
69    warp::post()
70        .and(path!("api" / "beta" / "sketches" / ..))
71        .and(warp::path::full())
72        .and(warp::header::optional::<String>("content-encoding"))
73        .and(warp::header::optional::<String>("dd-api-key"))
74        .and(warp::query::<ApiKeyQueryParams>())
75        .and(warp::body::bytes())
76        .and_then(
77            move |path: FullPath,
78                  encoding_header: Option<String>,
79                  api_token: Option<String>,
80                  query_params: ApiKeyQueryParams,
81                  body: Bytes| {
82                let events = source
83                    .decode(&encoding_header, body, path.as_str())
84                    .and_then(|body| {
85                        decode_datadog_sketches(
86                            body,
87                            source.api_key_extractor.extract(
88                                path.as_str(),
89                                api_token,
90                                query_params.dd_api_key,
91                            ),
92                            source.split_metric_namespace,
93                            &source.events_received,
94                        )
95                    });
96                handle_request(events, acknowledgements, out.clone(), output)
97            },
98        )
99        .boxed()
100}
101
102fn series_v1_service(
103    acknowledgements: bool,
104    output: Option<&'static str>,
105    out: SourceSender,
106    source: DatadogAgentSource,
107) -> BoxedFilter<(Response,)> {
108    warp::post()
109        .and(path!("api" / "v1" / "series" / ..))
110        .and(warp::path::full())
111        .and(warp::header::optional::<String>("content-encoding"))
112        .and(warp::header::optional::<String>("dd-api-key"))
113        .and(warp::query::<ApiKeyQueryParams>())
114        .and(warp::body::bytes())
115        .and_then(
116            move |path: FullPath,
117                  encoding_header: Option<String>,
118                  api_token: Option<String>,
119                  query_params: ApiKeyQueryParams,
120                  body: Bytes| {
121                let events = source
122                    .decode(&encoding_header, body, path.as_str())
123                    .and_then(|body| {
124                        decode_datadog_series_v1(
125                            body,
126                            source.api_key_extractor.extract(
127                                path.as_str(),
128                                api_token,
129                                query_params.dd_api_key,
130                            ),
131                            // Currently metrics do not have schemas defined, so for now we just pass a
132                            // default one.
133                            &Arc::new(schema::Definition::default_legacy_namespace()),
134                            source.split_metric_namespace,
135                            &source.events_received,
136                        )
137                    });
138                handle_request(events, acknowledgements, out.clone(), output)
139            },
140        )
141        .boxed()
142}
143
144fn series_v2_service(
145    acknowledgements: bool,
146    output: Option<&'static str>,
147    out: SourceSender,
148    source: DatadogAgentSource,
149) -> BoxedFilter<(Response,)> {
150    warp::post()
151        .and(path!("api" / "v2" / "series" / ..))
152        .and(warp::path::full())
153        .and(warp::header::optional::<String>("content-encoding"))
154        .and(warp::header::optional::<String>("dd-api-key"))
155        .and(warp::query::<ApiKeyQueryParams>())
156        .and(warp::body::bytes())
157        .and_then(
158            move |path: FullPath,
159                  encoding_header: Option<String>,
160                  api_token: Option<String>,
161                  query_params: ApiKeyQueryParams,
162                  body: Bytes| {
163                let events = source
164                    .decode(&encoding_header, body, path.as_str())
165                    .and_then(|body| {
166                        decode_datadog_series_v2(
167                            body,
168                            source.api_key_extractor.extract(
169                                path.as_str(),
170                                api_token,
171                                query_params.dd_api_key,
172                            ),
173                            source.split_metric_namespace,
174                            &source.events_received,
175                        )
176                    });
177                handle_request(events, acknowledgements, out.clone(), output)
178            },
179        )
180        .boxed()
181}
182
183fn decode_datadog_sketches(
184    body: Bytes,
185    api_key: Option<Arc<str>>,
186    split_metric_namespace: bool,
187    events_received: &Registered<EventsReceived>,
188) -> Result<Vec<Event>, ErrorMessage> {
189    if body.is_empty() {
190        // The datadog agent may send an empty payload as a keep alive
191        debug!(message = "Empty payload ignored.");
192        return Ok(Vec::new());
193    }
194
195    let metrics = decode_ddsketch(body, &api_key, split_metric_namespace).map_err(|error| {
196        ErrorMessage::new(
197            StatusCode::UNPROCESSABLE_ENTITY,
198            format!("Error decoding Datadog sketch: {error:?}"),
199        )
200    })?;
201
202    events_received.emit(CountByteSize(
203        metrics.len(),
204        metrics.estimated_json_encoded_size_of(),
205    ));
206
207    Ok(metrics)
208}
209
210fn decode_datadog_series_v2(
211    body: Bytes,
212    api_key: Option<Arc<str>>,
213    split_metric_namespace: bool,
214    events_received: &Registered<EventsReceived>,
215) -> Result<Vec<Event>, ErrorMessage> {
216    if body.is_empty() {
217        // The datadog agent may send an empty payload as a keep alive
218        debug!(message = "Empty payload ignored.");
219        return Ok(Vec::new());
220    }
221
222    let metrics = decode_ddseries_v2(body, &api_key, split_metric_namespace).map_err(|error| {
223        ErrorMessage::new(
224            StatusCode::UNPROCESSABLE_ENTITY,
225            format!("Error decoding Datadog sketch: {error:?}"),
226        )
227    })?;
228
229    events_received.emit(CountByteSize(
230        metrics.len(),
231        metrics.estimated_json_encoded_size_of(),
232    ));
233
234    Ok(metrics)
235}
236
237/// Builds Vector's `EventMetadata` from the received metadata. Currently this is only
238/// utilized for passing through origin metadata set by the Agent.
239fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
240    metadata
241        .and_then(|metadata| metadata.origin.as_ref())
242        .map_or_else(EventMetadata::default, |origin| {
243            trace!(
244                "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
245                origin.origin_product, origin.origin_category, origin.origin_service,
246            );
247            EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
248                Some(origin.origin_product),
249                Some(origin.origin_category),
250                Some(origin.origin_service),
251            ))
252        })
253}
254
255pub(crate) fn decode_ddseries_v2(
256    frame: Bytes,
257    api_key: &Option<Arc<str>>,
258    split_metric_namespace: bool,
259) -> crate::Result<Vec<Event>> {
260    let payload = MetricPayload::decode(frame)?;
261    let decoded_metrics: Vec<Event> = payload
262        .series
263        .into_iter()
264        .flat_map(|serie| {
265            let (namespace, name) = if split_metric_namespace {
266                namespace_name_from_dd_metric(&serie.metric)
267            } else {
268                (None, serie.metric.as_str())
269            };
270            let mut tags = into_metric_tags(serie.tags);
271
272            let event_metadata = get_event_metadata(serie.metadata.as_ref());
273
274            // It is possible to receive non-rate metrics from the Agent with an interval set.
275            // That interval can be applied with the `as_rate` function in the Datadog UI.
276            // The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent,
277            // in which it sets an interval to 10. See
278            //    - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1
279            //    - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39
280            //
281            // Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the
282            // interval for non-rate series metrics.
283            //
284            // Note that because Vector does not yet have a specific Metric type to handle Rate,
285            // we are distinguishing Rate from Count by setting an interval to Rate but not Count.
286            // Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint.
287            // (Regular Count metrics are emitted by DogStatsD as Rate metrics).
288            //
289            // In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe,
290            // we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case
291            // is Gauges.
292            //
293            // Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to
294            // have special handling for the interval, we would just apply it to all metrics that it came in with.
295            let non_rate_interval = if serie.interval.is_positive() {
296                NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds
297            } else {
298                None
299            };
300
301            serie.resources.into_iter().for_each(|r| {
302                // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
303                // the hostname can be found in MetricSeries::resources and that is the only value stored there.
304                if r.r#type.eq("host") {
305                    log_schema()
306                        .host_key()
307                        .and_then(|key| tags.replace(key.to_string(), r.name));
308                } else {
309                    // But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map
310                    tags.replace(format!("resource.{}", r.r#type), r.name);
311                }
312            });
313            (!serie.source_type_name.is_empty())
314                .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
315            // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L224
316            // serie.unit is omitted
317            match metric_payload::MetricType::try_from(serie.r#type) {
318                Ok(metric_payload::MetricType::Count) => serie
319                    .points
320                    .iter()
321                    .map(|dd_point| {
322                        Metric::new_with_metadata(
323                            name.to_string(),
324                            MetricKind::Incremental,
325                            MetricValue::Counter {
326                                value: dd_point.value,
327                            },
328                            event_metadata.clone(),
329                        )
330                        .with_timestamp(Some(
331                            Utc.timestamp_opt(dd_point.timestamp, 0)
332                                .single()
333                                .expect("invalid timestamp"),
334                        ))
335                        .with_tags(Some(tags.clone()))
336                        .with_namespace(namespace)
337                    })
338                    .collect::<Vec<_>>(),
339                Ok(metric_payload::MetricType::Gauge) => serie
340                    .points
341                    .iter()
342                    .map(|dd_point| {
343                        Metric::new_with_metadata(
344                            name.to_string(),
345                            MetricKind::Absolute,
346                            MetricValue::Gauge {
347                                value: dd_point.value,
348                            },
349                            event_metadata.clone(),
350                        )
351                        .with_timestamp(Some(
352                            Utc.timestamp_opt(dd_point.timestamp, 0)
353                                .single()
354                                .expect("invalid timestamp"),
355                        ))
356                        .with_tags(Some(tags.clone()))
357                        .with_namespace(namespace)
358                        .with_interval_ms(non_rate_interval)
359                    })
360                    .collect::<Vec<_>>(),
361                Ok(metric_payload::MetricType::Rate) => serie
362                    .points
363                    .iter()
364                    .map(|dd_point| {
365                        let i = Some(serie.interval)
366                            .filter(|v| *v != 0)
367                            .map(|v| v as u32)
368                            .unwrap_or(1);
369                        Metric::new_with_metadata(
370                            name.to_string(),
371                            MetricKind::Incremental,
372                            MetricValue::Counter {
373                                value: dd_point.value * (i as f64),
374                            },
375                            event_metadata.clone(),
376                        )
377                        .with_timestamp(Some(
378                            Utc.timestamp_opt(dd_point.timestamp, 0)
379                                .single()
380                                .expect("invalid timestamp"),
381                        ))
382                        // serie.interval is in seconds, convert to ms
383                        .with_interval_ms(NonZeroU32::new(i * 1000))
384                        .with_tags(Some(tags.clone()))
385                        .with_namespace(namespace)
386                    })
387                    .collect::<Vec<_>>(),
388                Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
389                    warn!("Unspecified metric type ({}).", serie.r#type);
390                    Vec::new()
391                }
392            }
393        })
394        .map(|mut metric| {
395            if let Some(k) = &api_key {
396                metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
397            }
398            metric.into()
399        })
400        .collect();
401
402    Ok(decoded_metrics)
403}
404
405fn decode_datadog_series_v1(
406    body: Bytes,
407    api_key: Option<Arc<str>>,
408    schema_definition: &Arc<schema::Definition>,
409    split_metric_namespace: bool,
410    events_received: &Registered<EventsReceived>,
411) -> Result<Vec<Event>, ErrorMessage> {
412    if body.is_empty() {
413        // The datadog agent may send an empty payload as a keep alive
414        debug!(message = "Empty payload ignored.");
415        return Ok(Vec::new());
416    }
417
418    let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
419        ErrorMessage::new(
420            StatusCode::BAD_REQUEST,
421            format!("Error parsing JSON: {error:?}"),
422        )
423    })?;
424
425    let decoded_metrics: Vec<Event> = metrics
426        .series
427        .into_iter()
428        .flat_map(|m| {
429            into_vector_metric(
430                m,
431                api_key.clone(),
432                schema_definition,
433                split_metric_namespace,
434            )
435        })
436        .collect();
437
438    events_received.emit(CountByteSize(
439        decoded_metrics.len(),
440        decoded_metrics.estimated_json_encoded_size_of(),
441    ));
442
443    Ok(decoded_metrics)
444}
445
446fn into_metric_tags(tags: Vec<String>) -> MetricTags {
447    tags.iter().map(extract_tag_key_and_value).collect()
448}
449
450fn into_vector_metric(
451    dd_metric: DatadogSeriesMetric,
452    api_key: Option<Arc<str>>,
453    schema_definition: &Arc<schema::Definition>,
454    split_metric_namespace: bool,
455) -> Vec<Event> {
456    let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
457
458    if let Some(key) = log_schema().host_key() {
459        dd_metric
460            .host
461            .and_then(|host| tags.replace(key.to_string(), host));
462    }
463
464    dd_metric
465        .source_type_name
466        .and_then(|source| tags.replace("source_type_name".into(), source));
467    dd_metric
468        .device
469        .and_then(|dev| tags.replace("device".into(), dev));
470
471    let (namespace, name) = if split_metric_namespace {
472        namespace_name_from_dd_metric(&dd_metric.metric)
473    } else {
474        (None, dd_metric.metric.as_str())
475    };
476
477    match dd_metric.r#type {
478        DatadogMetricType::Count => dd_metric
479            .points
480            .iter()
481            .map(|dd_point| {
482                Metric::new(
483                    name.to_string(),
484                    MetricKind::Incremental,
485                    MetricValue::Counter { value: dd_point.1 },
486                )
487                .with_timestamp(Some(
488                    Utc.timestamp_opt(dd_point.0, 0)
489                        .single()
490                        .expect("invalid timestamp"),
491                ))
492                .with_tags(Some(tags.clone()))
493                .with_namespace(namespace)
494            })
495            .collect::<Vec<_>>(),
496        DatadogMetricType::Gauge => dd_metric
497            .points
498            .iter()
499            .map(|dd_point| {
500                Metric::new(
501                    name.to_string(),
502                    MetricKind::Absolute,
503                    MetricValue::Gauge { value: dd_point.1 },
504                )
505                .with_timestamp(Some(
506                    Utc.timestamp_opt(dd_point.0, 0)
507                        .single()
508                        .expect("invalid timestamp"),
509                ))
510                .with_tags(Some(tags.clone()))
511                .with_namespace(namespace)
512            })
513            .collect::<Vec<_>>(),
514        // Agent sends rate only for dogstatsd counter https://github.com/DataDog/datadog-agent/blob/f4a13c6dca5e2da4bb722f861a8ac4c2f715531d/pkg/metrics/counter.go#L8-L10
515        // for consistency purpose (w.r.t. (dog)statsd source) they are turned back into counters
516        DatadogMetricType::Rate => dd_metric
517            .points
518            .iter()
519            .map(|dd_point| {
520                let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
521                Metric::new(
522                    name.to_string(),
523                    MetricKind::Incremental,
524                    MetricValue::Counter {
525                        value: dd_point.1 * (i as f64),
526                    },
527                )
528                .with_timestamp(Some(
529                    Utc.timestamp_opt(dd_point.0, 0)
530                        .single()
531                        .expect("invalid timestamp"),
532                ))
533                // dd_metric.interval is in seconds, convert to ms
534                .with_interval_ms(NonZeroU32::new(i * 1000))
535                .with_tags(Some(tags.clone()))
536                .with_namespace(namespace)
537            })
538            .collect::<Vec<_>>(),
539    }
540    .into_iter()
541    .map(|mut metric| {
542        if let Some(k) = &api_key {
543            metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
544        }
545
546        metric
547            .metadata_mut()
548            .set_schema_definition(schema_definition);
549
550        metric.into()
551    })
552    .collect()
553}
554
555/// Parses up to the first '.' of the input metric name into a namespace.
556/// If no delimiter, the namespace is None type.
557fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
558    // ex: "system.fs.util" -> ("system", "fs.util")
559    match dd_metric_name.split_once('.') {
560        Some((namespace, name)) => (Some(namespace), name),
561        None => (None, dd_metric_name),
562    }
563}
564
565pub(crate) fn decode_ddsketch(
566    frame: Bytes,
567    api_key: &Option<Arc<str>>,
568    split_metric_namespace: bool,
569) -> crate::Result<Vec<Event>> {
570    let payload = SketchPayload::decode(frame)?;
571    // payload.metadata is always empty for payload coming from dd agents
572    Ok(payload
573        .sketches
574        .into_iter()
575        .flat_map(|sketch_series| {
576            // sketch_series.distributions is also always empty from payload coming from dd agents
577            let mut tags = into_metric_tags(sketch_series.tags);
578            log_schema()
579                .host_key()
580                .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
581
582            let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
583
584            sketch_series.dogsketches.into_iter().map(move |sketch| {
585                let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
586                let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
587                let val = MetricValue::from(
588                    AgentDDSketch::from_raw(
589                        sketch.cnt as u32,
590                        sketch.min,
591                        sketch.max,
592                        sketch.sum,
593                        sketch.avg,
594                        &k,
595                        &n,
596                    )
597                    .unwrap_or_else(AgentDDSketch::with_agent_defaults),
598                );
599                let (namespace, name) = if split_metric_namespace {
600                    namespace_name_from_dd_metric(&sketch_series.metric)
601                } else {
602                    (None, sketch_series.metric.as_str())
603                };
604                let mut metric = Metric::new_with_metadata(
605                    name.to_string(),
606                    MetricKind::Incremental,
607                    val,
608                    event_metadata.clone(),
609                )
610                .with_tags(Some(tags.clone()))
611                .with_timestamp(Some(
612                    Utc.timestamp_opt(sketch.ts, 0)
613                        .single()
614                        .expect("invalid timestamp"),
615                ))
616                .with_namespace(namespace);
617                if let Some(k) = &api_key {
618                    metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
619                }
620
621                metric.into()
622            })
623        })
624        .collect())
625}