vector/sources/datadog_agent/
metrics.rs

1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    event::{DatadogMetricOriginMetadata, EventMetadata},
11    internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12    metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use crate::{
17    SourceSender,
18    common::{
19        datadog::{DatadogMetricType, DatadogSeriesMetric},
20        http::ErrorMessage,
21    },
22    config::log_schema,
23    event::{
24        Event, MetricKind, MetricTags,
25        metric::{Metric, MetricValue},
26    },
27    internal_events::EventsReceived,
28    schema,
29    sources::{
30        datadog_agent::{
31            ApiKeyQueryParams, DatadogAgentSource,
32            ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload},
33            handle_request,
34        },
35        util::extract_tag_key_and_value,
36    },
37};
38
39#[derive(Deserialize, Serialize)]
40pub(crate) struct DatadogSeriesRequest {
41    pub(crate) series: Vec<DatadogSeriesMetric>,
42}
43
44pub(crate) fn build_warp_filter(
45    acknowledgements: bool,
46    multiple_outputs: bool,
47    out: SourceSender,
48    source: DatadogAgentSource,
49) -> BoxedFilter<(Response,)> {
50    let output = multiple_outputs.then_some(super::METRICS);
51    let sketches_service = sketches_service(acknowledgements, output, out.clone(), source.clone());
52    let series_v1_service =
53        series_v1_service(acknowledgements, output, out.clone(), source.clone());
54    let series_v2_service = series_v2_service(acknowledgements, output, out, source);
55    sketches_service
56        .or(series_v1_service)
57        .unify()
58        .or(series_v2_service)
59        .unify()
60        .boxed()
61}
62
63fn sketches_service(
64    acknowledgements: bool,
65    output: Option<&'static str>,
66    out: SourceSender,
67    source: DatadogAgentSource,
68) -> BoxedFilter<(Response,)> {
69    warp::post()
70        .and(path!("api" / "beta" / "sketches" / ..))
71        .and(warp::path::full())
72        .and(warp::header::optional::<String>("content-encoding"))
73        .and(warp::header::optional::<String>("dd-api-key"))
74        .and(warp::query::<ApiKeyQueryParams>())
75        .and(warp::body::bytes())
76        .and_then(
77            move |path: FullPath,
78                  encoding_header: Option<String>,
79                  api_token: Option<String>,
80                  query_params: ApiKeyQueryParams,
81                  body: Bytes| {
82                let events = source
83                    .decode(&encoding_header, body, path.as_str())
84                    .and_then(|body| {
85                        decode_datadog_sketches(
86                            body,
87                            source.api_key_extractor.extract(
88                                path.as_str(),
89                                api_token,
90                                query_params.dd_api_key,
91                            ),
92                            &source.events_received,
93                        )
94                    });
95                handle_request(events, acknowledgements, out.clone(), output)
96            },
97        )
98        .boxed()
99}
100
101fn series_v1_service(
102    acknowledgements: bool,
103    output: Option<&'static str>,
104    out: SourceSender,
105    source: DatadogAgentSource,
106) -> BoxedFilter<(Response,)> {
107    warp::post()
108        .and(path!("api" / "v1" / "series" / ..))
109        .and(warp::path::full())
110        .and(warp::header::optional::<String>("content-encoding"))
111        .and(warp::header::optional::<String>("dd-api-key"))
112        .and(warp::query::<ApiKeyQueryParams>())
113        .and(warp::body::bytes())
114        .and_then(
115            move |path: FullPath,
116                  encoding_header: Option<String>,
117                  api_token: Option<String>,
118                  query_params: ApiKeyQueryParams,
119                  body: Bytes| {
120                let events = source
121                    .decode(&encoding_header, body, path.as_str())
122                    .and_then(|body| {
123                        decode_datadog_series_v1(
124                            body,
125                            source.api_key_extractor.extract(
126                                path.as_str(),
127                                api_token,
128                                query_params.dd_api_key,
129                            ),
130                            // Currently metrics do not have schemas defined, so for now we just pass a
131                            // default one.
132                            &Arc::new(schema::Definition::default_legacy_namespace()),
133                            &source.events_received,
134                        )
135                    });
136                handle_request(events, acknowledgements, out.clone(), output)
137            },
138        )
139        .boxed()
140}
141
142fn series_v2_service(
143    acknowledgements: bool,
144    output: Option<&'static str>,
145    out: SourceSender,
146    source: DatadogAgentSource,
147) -> BoxedFilter<(Response,)> {
148    warp::post()
149        .and(path!("api" / "v2" / "series" / ..))
150        .and(warp::path::full())
151        .and(warp::header::optional::<String>("content-encoding"))
152        .and(warp::header::optional::<String>("dd-api-key"))
153        .and(warp::query::<ApiKeyQueryParams>())
154        .and(warp::body::bytes())
155        .and_then(
156            move |path: FullPath,
157                  encoding_header: Option<String>,
158                  api_token: Option<String>,
159                  query_params: ApiKeyQueryParams,
160                  body: Bytes| {
161                let events = source
162                    .decode(&encoding_header, body, path.as_str())
163                    .and_then(|body| {
164                        decode_datadog_series_v2(
165                            body,
166                            source.api_key_extractor.extract(
167                                path.as_str(),
168                                api_token,
169                                query_params.dd_api_key,
170                            ),
171                            &source.events_received,
172                        )
173                    });
174                handle_request(events, acknowledgements, out.clone(), output)
175            },
176        )
177        .boxed()
178}
179
180fn decode_datadog_sketches(
181    body: Bytes,
182    api_key: Option<Arc<str>>,
183    events_received: &Registered<EventsReceived>,
184) -> Result<Vec<Event>, ErrorMessage> {
185    if body.is_empty() {
186        // The datadog agent may send an empty payload as a keep alive
187        debug!(
188            message = "Empty payload ignored.",
189            internal_log_rate_limit = true
190        );
191        return Ok(Vec::new());
192    }
193
194    let metrics = decode_ddsketch(body, &api_key).map_err(|error| {
195        ErrorMessage::new(
196            StatusCode::UNPROCESSABLE_ENTITY,
197            format!("Error decoding Datadog sketch: {error:?}"),
198        )
199    })?;
200
201    events_received.emit(CountByteSize(
202        metrics.len(),
203        metrics.estimated_json_encoded_size_of(),
204    ));
205
206    Ok(metrics)
207}
208
209fn decode_datadog_series_v2(
210    body: Bytes,
211    api_key: Option<Arc<str>>,
212    events_received: &Registered<EventsReceived>,
213) -> Result<Vec<Event>, ErrorMessage> {
214    if body.is_empty() {
215        // The datadog agent may send an empty payload as a keep alive
216        debug!(
217            message = "Empty payload ignored.",
218            internal_log_rate_limit = true
219        );
220        return Ok(Vec::new());
221    }
222
223    let metrics = decode_ddseries_v2(body, &api_key).map_err(|error| {
224        ErrorMessage::new(
225            StatusCode::UNPROCESSABLE_ENTITY,
226            format!("Error decoding Datadog sketch: {error:?}"),
227        )
228    })?;
229
230    events_received.emit(CountByteSize(
231        metrics.len(),
232        metrics.estimated_json_encoded_size_of(),
233    ));
234
235    Ok(metrics)
236}
237
238/// Builds Vector's `EventMetadata` from the received metadata. Currently this is only
239/// utilized for passing through origin metadata set by the Agent.
240fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
241    metadata
242        .and_then(|metadata| metadata.origin.as_ref())
243        .map_or_else(EventMetadata::default, |origin| {
244            trace!(
245                "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
246                origin.origin_product, origin.origin_category, origin.origin_service,
247            );
248            EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
249                Some(origin.origin_product),
250                Some(origin.origin_category),
251                Some(origin.origin_service),
252            ))
253        })
254}
255
256pub(crate) fn decode_ddseries_v2(
257    frame: Bytes,
258    api_key: &Option<Arc<str>>,
259) -> crate::Result<Vec<Event>> {
260    let payload = MetricPayload::decode(frame)?;
261    let decoded_metrics: Vec<Event> = payload
262        .series
263        .into_iter()
264        .flat_map(|serie| {
265            let (namespace, name) = namespace_name_from_dd_metric(&serie.metric);
266            let mut tags = into_metric_tags(serie.tags);
267
268            let event_metadata = get_event_metadata(serie.metadata.as_ref());
269
270            // It is possible to receive non-rate metrics from the Agent with an interval set.
271            // That interval can be applied with the `as_rate` function in the Datadog UI.
272            // The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent,
273            // in which it sets an interval to 10. See
274            //    - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1
275            //    - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39
276            //
277            // Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the
278            // interval for non-rate series metrics.
279            //
280            // Note that because Vector does not yet have a specific Metric type to handle Rate,
281            // we are distinguishing Rate from Count by setting an interval to Rate but not Count.
282            // Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint.
283            // (Regular Count metrics are emitted by DogStatsD as Rate metrics).
284            //
285            // In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe,
286            // we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case
287            // is Gauges.
288            //
289            // Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to
290            // have special handling for the interval, we would just apply it to all metrics that it came in with.
291            let non_rate_interval = if serie.interval.is_positive() {
292                NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds
293            } else {
294                None
295            };
296
297            serie.resources.into_iter().for_each(|r| {
298                // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
299                // the hostname can be found in MetricSeries::resources and that is the only value stored there.
300                if r.r#type.eq("host") {
301                    log_schema()
302                        .host_key()
303                        .and_then(|key| tags.replace(key.to_string(), r.name));
304                } else {
305                    // But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map
306                    tags.replace(format!("resource.{}", r.r#type), r.name);
307                }
308            });
309            (!serie.source_type_name.is_empty())
310                .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
311            // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L224
312            // serie.unit is omitted
313            match metric_payload::MetricType::try_from(serie.r#type) {
314                Ok(metric_payload::MetricType::Count) => serie
315                    .points
316                    .iter()
317                    .map(|dd_point| {
318                        Metric::new_with_metadata(
319                            name.to_string(),
320                            MetricKind::Incremental,
321                            MetricValue::Counter {
322                                value: dd_point.value,
323                            },
324                            event_metadata.clone(),
325                        )
326                        .with_timestamp(Some(
327                            Utc.timestamp_opt(dd_point.timestamp, 0)
328                                .single()
329                                .expect("invalid timestamp"),
330                        ))
331                        .with_tags(Some(tags.clone()))
332                        .with_namespace(namespace)
333                    })
334                    .collect::<Vec<_>>(),
335                Ok(metric_payload::MetricType::Gauge) => serie
336                    .points
337                    .iter()
338                    .map(|dd_point| {
339                        Metric::new_with_metadata(
340                            name.to_string(),
341                            MetricKind::Absolute,
342                            MetricValue::Gauge {
343                                value: dd_point.value,
344                            },
345                            event_metadata.clone(),
346                        )
347                        .with_timestamp(Some(
348                            Utc.timestamp_opt(dd_point.timestamp, 0)
349                                .single()
350                                .expect("invalid timestamp"),
351                        ))
352                        .with_tags(Some(tags.clone()))
353                        .with_namespace(namespace)
354                        .with_interval_ms(non_rate_interval)
355                    })
356                    .collect::<Vec<_>>(),
357                Ok(metric_payload::MetricType::Rate) => serie
358                    .points
359                    .iter()
360                    .map(|dd_point| {
361                        let i = Some(serie.interval)
362                            .filter(|v| *v != 0)
363                            .map(|v| v as u32)
364                            .unwrap_or(1);
365                        Metric::new_with_metadata(
366                            name.to_string(),
367                            MetricKind::Incremental,
368                            MetricValue::Counter {
369                                value: dd_point.value * (i as f64),
370                            },
371                            event_metadata.clone(),
372                        )
373                        .with_timestamp(Some(
374                            Utc.timestamp_opt(dd_point.timestamp, 0)
375                                .single()
376                                .expect("invalid timestamp"),
377                        ))
378                        // serie.interval is in seconds, convert to ms
379                        .with_interval_ms(NonZeroU32::new(i * 1000))
380                        .with_tags(Some(tags.clone()))
381                        .with_namespace(namespace)
382                    })
383                    .collect::<Vec<_>>(),
384                Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
385                    warn!("Unspecified metric type ({}).", serie.r#type);
386                    Vec::new()
387                }
388            }
389        })
390        .map(|mut metric| {
391            if let Some(k) = &api_key {
392                metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
393            }
394            metric.into()
395        })
396        .collect();
397
398    Ok(decoded_metrics)
399}
400
401fn decode_datadog_series_v1(
402    body: Bytes,
403    api_key: Option<Arc<str>>,
404    schema_definition: &Arc<schema::Definition>,
405    events_received: &Registered<EventsReceived>,
406) -> Result<Vec<Event>, ErrorMessage> {
407    if body.is_empty() {
408        // The datadog agent may send an empty payload as a keep alive
409        debug!(
410            message = "Empty payload ignored.",
411            internal_log_rate_limit = true
412        );
413        return Ok(Vec::new());
414    }
415
416    let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
417        ErrorMessage::new(
418            StatusCode::BAD_REQUEST,
419            format!("Error parsing JSON: {error:?}"),
420        )
421    })?;
422
423    let decoded_metrics: Vec<Event> = metrics
424        .series
425        .into_iter()
426        .flat_map(|m| into_vector_metric(m, api_key.clone(), schema_definition))
427        .collect();
428
429    events_received.emit(CountByteSize(
430        decoded_metrics.len(),
431        decoded_metrics.estimated_json_encoded_size_of(),
432    ));
433
434    Ok(decoded_metrics)
435}
436
437fn into_metric_tags(tags: Vec<String>) -> MetricTags {
438    tags.iter().map(extract_tag_key_and_value).collect()
439}
440
441fn into_vector_metric(
442    dd_metric: DatadogSeriesMetric,
443    api_key: Option<Arc<str>>,
444    schema_definition: &Arc<schema::Definition>,
445) -> Vec<Event> {
446    let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
447
448    if let Some(key) = log_schema().host_key() {
449        dd_metric
450            .host
451            .and_then(|host| tags.replace(key.to_string(), host));
452    }
453
454    dd_metric
455        .source_type_name
456        .and_then(|source| tags.replace("source_type_name".into(), source));
457    dd_metric
458        .device
459        .and_then(|dev| tags.replace("device".into(), dev));
460
461    let (namespace, name) = namespace_name_from_dd_metric(&dd_metric.metric);
462
463    match dd_metric.r#type {
464        DatadogMetricType::Count => dd_metric
465            .points
466            .iter()
467            .map(|dd_point| {
468                Metric::new(
469                    name.to_string(),
470                    MetricKind::Incremental,
471                    MetricValue::Counter { value: dd_point.1 },
472                )
473                .with_timestamp(Some(
474                    Utc.timestamp_opt(dd_point.0, 0)
475                        .single()
476                        .expect("invalid timestamp"),
477                ))
478                .with_tags(Some(tags.clone()))
479                .with_namespace(namespace)
480            })
481            .collect::<Vec<_>>(),
482        DatadogMetricType::Gauge => dd_metric
483            .points
484            .iter()
485            .map(|dd_point| {
486                Metric::new(
487                    name.to_string(),
488                    MetricKind::Absolute,
489                    MetricValue::Gauge { value: dd_point.1 },
490                )
491                .with_timestamp(Some(
492                    Utc.timestamp_opt(dd_point.0, 0)
493                        .single()
494                        .expect("invalid timestamp"),
495                ))
496                .with_tags(Some(tags.clone()))
497                .with_namespace(namespace)
498            })
499            .collect::<Vec<_>>(),
500        // Agent sends rate only for dogstatsd counter https://github.com/DataDog/datadog-agent/blob/f4a13c6dca5e2da4bb722f861a8ac4c2f715531d/pkg/metrics/counter.go#L8-L10
501        // for consistency purpose (w.r.t. (dog)statsd source) they are turned back into counters
502        DatadogMetricType::Rate => dd_metric
503            .points
504            .iter()
505            .map(|dd_point| {
506                let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
507                Metric::new(
508                    name.to_string(),
509                    MetricKind::Incremental,
510                    MetricValue::Counter {
511                        value: dd_point.1 * (i as f64),
512                    },
513                )
514                .with_timestamp(Some(
515                    Utc.timestamp_opt(dd_point.0, 0)
516                        .single()
517                        .expect("invalid timestamp"),
518                ))
519                // dd_metric.interval is in seconds, convert to ms
520                .with_interval_ms(NonZeroU32::new(i * 1000))
521                .with_tags(Some(tags.clone()))
522                .with_namespace(namespace)
523            })
524            .collect::<Vec<_>>(),
525    }
526    .into_iter()
527    .map(|mut metric| {
528        if let Some(k) = &api_key {
529            metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
530        }
531
532        metric
533            .metadata_mut()
534            .set_schema_definition(schema_definition);
535
536        metric.into()
537    })
538    .collect()
539}
540
541/// Parses up to the first '.' of the input metric name into a namespace.
542/// If no delimiter, the namespace is None type.
543fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
544    // ex: "system.fs.util" -> ("system", "fs.util")
545    match dd_metric_name.split_once('.') {
546        Some((namespace, name)) => (Some(namespace), name),
547        None => (None, dd_metric_name),
548    }
549}
550
551pub(crate) fn decode_ddsketch(
552    frame: Bytes,
553    api_key: &Option<Arc<str>>,
554) -> crate::Result<Vec<Event>> {
555    let payload = SketchPayload::decode(frame)?;
556    // payload.metadata is always empty for payload coming from dd agents
557    Ok(payload
558        .sketches
559        .into_iter()
560        .flat_map(|sketch_series| {
561            // sketch_series.distributions is also always empty from payload coming from dd agents
562            let mut tags = into_metric_tags(sketch_series.tags);
563            log_schema()
564                .host_key()
565                .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
566
567            let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
568
569            sketch_series.dogsketches.into_iter().map(move |sketch| {
570                let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
571                let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
572                let val = MetricValue::from(
573                    AgentDDSketch::from_raw(
574                        sketch.cnt as u32,
575                        sketch.min,
576                        sketch.max,
577                        sketch.sum,
578                        sketch.avg,
579                        &k,
580                        &n,
581                    )
582                    .unwrap_or_else(AgentDDSketch::with_agent_defaults),
583                );
584                let (namespace, name) = namespace_name_from_dd_metric(&sketch_series.metric);
585                let mut metric = Metric::new_with_metadata(
586                    name.to_string(),
587                    MetricKind::Incremental,
588                    val,
589                    event_metadata.clone(),
590                )
591                .with_tags(Some(tags.clone()))
592                .with_timestamp(Some(
593                    Utc.timestamp_opt(sketch.ts, 0)
594                        .single()
595                        .expect("invalid timestamp"),
596                ))
597                .with_namespace(namespace);
598                if let Some(k) = &api_key {
599                    metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
600                }
601
602                metric.into()
603            })
604        })
605        .collect())
606}