vector/sources/datadog_agent/
metrics.rs

1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    event::{DatadogMetricOriginMetadata, EventMetadata},
11    internal_event::{CountByteSize, InternalEventHandle as _, Registered},
12    metrics::AgentDDSketch,
13};
14use warp::{Filter, filters::BoxedFilter, path, path::FullPath, reply::Response};
15
16use super::ddmetric_proto::{Metadata, MetricPayload, SketchPayload, metric_payload};
17use super::{ApiKeyQueryParams, DatadogAgentSource, RequestHandler};
18use crate::{
19    common::{
20        datadog::{DatadogMetricType, DatadogSeriesMetric},
21        http::ErrorMessage,
22    },
23    config::log_schema,
24    event::{
25        Event, MetricKind, MetricTags,
26        metric::{Metric, MetricValue},
27    },
28    internal_events::EventsReceived,
29    schema,
30    sources::util::extract_tag_key_and_value,
31};
32
33#[derive(Deserialize, Serialize)]
34pub(crate) struct DatadogSeriesRequest {
35    pub(crate) series: Vec<DatadogSeriesMetric>,
36}
37
38pub(super) fn build_warp_filter(
39    handler: RequestHandler,
40    source: DatadogAgentSource,
41) -> BoxedFilter<(Response,)> {
42    let sketches_service = sketches_service(handler.clone(), source.clone());
43    let series_v1_service = series_v1_service(handler.clone(), source.clone());
44    let series_v2_service = series_v2_service(handler, source);
45    sketches_service
46        .or(series_v1_service)
47        .unify()
48        .or(series_v2_service)
49        .unify()
50        .boxed()
51}
52
53fn sketches_service(
54    handler: RequestHandler,
55    source: DatadogAgentSource,
56) -> BoxedFilter<(Response,)> {
57    warp::post()
58        .and(path!("api" / "beta" / "sketches" / ..))
59        .and(warp::path::full())
60        .and(warp::header::optional::<String>("content-encoding"))
61        .and(warp::header::optional::<String>("dd-api-key"))
62        .and(warp::query::<ApiKeyQueryParams>())
63        .and(warp::body::bytes())
64        .and_then({
65            move |path: FullPath,
66                  encoding_header: Option<String>,
67                  api_token: Option<String>,
68                  query_params: ApiKeyQueryParams,
69                  body: Bytes| {
70                let events = source
71                    .decode(&encoding_header, body, path.as_str())
72                    .and_then(|body| {
73                        decode_datadog_sketches(
74                            body,
75                            source.api_key_extractor.extract(
76                                path.as_str(),
77                                api_token,
78                                query_params.dd_api_key,
79                            ),
80                            source.split_metric_namespace,
81                            &source.events_received,
82                        )
83                    });
84                handler.clone().handle_request(events, super::METRICS)
85            }
86        })
87        .boxed()
88}
89
90fn series_v1_service(
91    handler: RequestHandler,
92    source: DatadogAgentSource,
93) -> BoxedFilter<(Response,)> {
94    warp::post()
95        .and(path!("api" / "v1" / "series" / ..))
96        .and(warp::path::full())
97        .and(warp::header::optional::<String>("content-encoding"))
98        .and(warp::header::optional::<String>("dd-api-key"))
99        .and(warp::query::<ApiKeyQueryParams>())
100        .and(warp::body::bytes())
101        .and_then({
102            move |path: FullPath,
103                  encoding_header: Option<String>,
104                  api_token: Option<String>,
105                  query_params: ApiKeyQueryParams,
106                  body: Bytes| {
107                let events = source
108                    .decode(&encoding_header, body, path.as_str())
109                    .and_then(|body| {
110                        decode_datadog_series_v1(
111                            body,
112                            source.api_key_extractor.extract(
113                                path.as_str(),
114                                api_token,
115                                query_params.dd_api_key,
116                            ),
117                            // Currently metrics do not have schemas defined, so for now we just pass a
118                            // default one.
119                            &Arc::new(schema::Definition::default_legacy_namespace()),
120                            source.split_metric_namespace,
121                            &source.events_received,
122                        )
123                    });
124                handler.clone().handle_request(events, super::METRICS)
125            }
126        })
127        .boxed()
128}
129
130fn series_v2_service(
131    handler: RequestHandler,
132    source: DatadogAgentSource,
133) -> BoxedFilter<(Response,)> {
134    warp::post()
135        .and(path!("api" / "v2" / "series" / ..))
136        .and(warp::path::full())
137        .and(warp::header::optional::<String>("content-encoding"))
138        .and(warp::header::optional::<String>("dd-api-key"))
139        .and(warp::query::<ApiKeyQueryParams>())
140        .and(warp::body::bytes())
141        .and_then({
142            move |path: FullPath,
143                  encoding_header: Option<String>,
144                  api_token: Option<String>,
145                  query_params: ApiKeyQueryParams,
146                  body: Bytes| {
147                let events = source
148                    .decode(&encoding_header, body, path.as_str())
149                    .and_then(|body| {
150                        decode_datadog_series_v2(
151                            body,
152                            source.api_key_extractor.extract(
153                                path.as_str(),
154                                api_token,
155                                query_params.dd_api_key,
156                            ),
157                            source.split_metric_namespace,
158                            &source.events_received,
159                        )
160                    });
161                handler.clone().handle_request(events, super::METRICS)
162            }
163        })
164        .boxed()
165}
166
167fn decode_datadog_sketches(
168    body: Bytes,
169    api_key: Option<Arc<str>>,
170    split_metric_namespace: bool,
171    events_received: &Registered<EventsReceived>,
172) -> Result<Vec<Event>, ErrorMessage> {
173    if body.is_empty() {
174        // The datadog agent may send an empty payload as a keep alive
175        debug!(message = "Empty payload ignored.");
176        return Ok(Vec::new());
177    }
178
179    let metrics = decode_ddsketch(body, &api_key, split_metric_namespace).map_err(|error| {
180        ErrorMessage::new(
181            StatusCode::UNPROCESSABLE_ENTITY,
182            format!("Error decoding Datadog sketch: {error:?}"),
183        )
184    })?;
185
186    events_received.emit(CountByteSize(
187        metrics.len(),
188        metrics.estimated_json_encoded_size_of(),
189    ));
190
191    Ok(metrics)
192}
193
194fn decode_datadog_series_v2(
195    body: Bytes,
196    api_key: Option<Arc<str>>,
197    split_metric_namespace: bool,
198    events_received: &Registered<EventsReceived>,
199) -> Result<Vec<Event>, ErrorMessage> {
200    if body.is_empty() {
201        // The datadog agent may send an empty payload as a keep alive
202        debug!(message = "Empty payload ignored.");
203        return Ok(Vec::new());
204    }
205
206    let metrics = decode_ddseries_v2(body, &api_key, split_metric_namespace).map_err(|error| {
207        ErrorMessage::new(
208            StatusCode::UNPROCESSABLE_ENTITY,
209            format!("Error decoding Datadog sketch: {error:?}"),
210        )
211    })?;
212
213    events_received.emit(CountByteSize(
214        metrics.len(),
215        metrics.estimated_json_encoded_size_of(),
216    ));
217
218    Ok(metrics)
219}
220
221/// Builds Vector's `EventMetadata` from the received metadata. Currently this is only
222/// utilized for passing through origin metadata set by the Agent.
223fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
224    metadata
225        .and_then(|metadata| metadata.origin.as_ref())
226        .map_or_else(EventMetadata::default, |origin| {
227            trace!(
228                "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
229                origin.origin_product, origin.origin_category, origin.origin_service,
230            );
231            EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
232                Some(origin.origin_product),
233                Some(origin.origin_category),
234                Some(origin.origin_service),
235            ))
236        })
237}
238
239pub(crate) fn decode_ddseries_v2(
240    frame: Bytes,
241    api_key: &Option<Arc<str>>,
242    split_metric_namespace: bool,
243) -> crate::Result<Vec<Event>> {
244    let payload = MetricPayload::decode(frame)?;
245    let decoded_metrics: Vec<Event> = payload
246        .series
247        .into_iter()
248        .flat_map(|serie| {
249            let (namespace, name) = if split_metric_namespace {
250                namespace_name_from_dd_metric(&serie.metric)
251            } else {
252                (None, serie.metric.as_str())
253            };
254            let mut tags = into_metric_tags(serie.tags);
255
256            let event_metadata = get_event_metadata(serie.metadata.as_ref());
257
258            // It is possible to receive non-rate metrics from the Agent with an interval set.
259            // That interval can be applied with the `as_rate` function in the Datadog UI.
260            // The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent,
261            // in which it sets an interval to 10. See
262            //    - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1
263            //    - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39
264            //
265            // Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the
266            // interval for non-rate series metrics.
267            //
268            // Note that because Vector does not yet have a specific Metric type to handle Rate,
269            // we are distinguishing Rate from Count by setting an interval to Rate but not Count.
270            // Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint.
271            // (Regular Count metrics are emitted by DogStatsD as Rate metrics).
272            //
273            // In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe,
274            // we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case
275            // is Gauges.
276            //
277            // Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to
278            // have special handling for the interval, we would just apply it to all metrics that it came in with.
279            let non_rate_interval = if serie.interval.is_positive() {
280                NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds
281            } else {
282                None
283            };
284
285            serie.resources.into_iter().for_each(|r| {
286                // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
287                // the hostname can be found in MetricSeries::resources and that is the only value stored there.
288                if r.r#type.eq("host") {
289                    log_schema()
290                        .host_key()
291                        .and_then(|key| tags.replace(key.to_string(), r.name));
292                } else {
293                    // But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map
294                    tags.replace(format!("resource.{}", r.r#type), r.name);
295                }
296            });
297            (!serie.source_type_name.is_empty())
298                .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
299            // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L224
300            // serie.unit is omitted
301            match metric_payload::MetricType::try_from(serie.r#type) {
302                Ok(metric_payload::MetricType::Count) => serie
303                    .points
304                    .iter()
305                    .map(|dd_point| {
306                        Metric::new_with_metadata(
307                            name.to_string(),
308                            MetricKind::Incremental,
309                            MetricValue::Counter {
310                                value: dd_point.value,
311                            },
312                            event_metadata.clone(),
313                        )
314                        .with_timestamp(Some(
315                            Utc.timestamp_opt(dd_point.timestamp, 0)
316                                .single()
317                                .expect("invalid timestamp"),
318                        ))
319                        .with_tags(Some(tags.clone()))
320                        .with_namespace(namespace)
321                    })
322                    .collect::<Vec<_>>(),
323                Ok(metric_payload::MetricType::Gauge) => serie
324                    .points
325                    .iter()
326                    .map(|dd_point| {
327                        Metric::new_with_metadata(
328                            name.to_string(),
329                            MetricKind::Absolute,
330                            MetricValue::Gauge {
331                                value: dd_point.value,
332                            },
333                            event_metadata.clone(),
334                        )
335                        .with_timestamp(Some(
336                            Utc.timestamp_opt(dd_point.timestamp, 0)
337                                .single()
338                                .expect("invalid timestamp"),
339                        ))
340                        .with_tags(Some(tags.clone()))
341                        .with_namespace(namespace)
342                        .with_interval_ms(non_rate_interval)
343                    })
344                    .collect::<Vec<_>>(),
345                Ok(metric_payload::MetricType::Rate) => serie
346                    .points
347                    .iter()
348                    .map(|dd_point| {
349                        let i = Some(serie.interval)
350                            .filter(|v| *v != 0)
351                            .map(|v| v as u32)
352                            .unwrap_or(1);
353                        Metric::new_with_metadata(
354                            name.to_string(),
355                            MetricKind::Incremental,
356                            MetricValue::Counter {
357                                value: dd_point.value * (i as f64),
358                            },
359                            event_metadata.clone(),
360                        )
361                        .with_timestamp(Some(
362                            Utc.timestamp_opt(dd_point.timestamp, 0)
363                                .single()
364                                .expect("invalid timestamp"),
365                        ))
366                        // serie.interval is in seconds, convert to ms
367                        .with_interval_ms(NonZeroU32::new(i * 1000))
368                        .with_tags(Some(tags.clone()))
369                        .with_namespace(namespace)
370                    })
371                    .collect::<Vec<_>>(),
372                Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
373                    warn!("Unspecified metric type ({}).", serie.r#type);
374                    Vec::new()
375                }
376            }
377        })
378        .map(|mut metric| {
379            if let Some(k) = &api_key {
380                metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
381            }
382            metric.into()
383        })
384        .collect();
385
386    Ok(decoded_metrics)
387}
388
389fn decode_datadog_series_v1(
390    body: Bytes,
391    api_key: Option<Arc<str>>,
392    schema_definition: &Arc<schema::Definition>,
393    split_metric_namespace: bool,
394    events_received: &Registered<EventsReceived>,
395) -> Result<Vec<Event>, ErrorMessage> {
396    if body.is_empty() {
397        // The datadog agent may send an empty payload as a keep alive
398        debug!(message = "Empty payload ignored.");
399        return Ok(Vec::new());
400    }
401
402    let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
403        ErrorMessage::new(
404            StatusCode::BAD_REQUEST,
405            format!("Error parsing JSON: {error:?}"),
406        )
407    })?;
408
409    let decoded_metrics: Vec<Event> = metrics
410        .series
411        .into_iter()
412        .flat_map(|m| {
413            into_vector_metric(
414                m,
415                api_key.clone(),
416                schema_definition,
417                split_metric_namespace,
418            )
419        })
420        .collect();
421
422    events_received.emit(CountByteSize(
423        decoded_metrics.len(),
424        decoded_metrics.estimated_json_encoded_size_of(),
425    ));
426
427    Ok(decoded_metrics)
428}
429
430fn into_metric_tags(tags: Vec<String>) -> MetricTags {
431    tags.iter().map(extract_tag_key_and_value).collect()
432}
433
434fn into_vector_metric(
435    dd_metric: DatadogSeriesMetric,
436    api_key: Option<Arc<str>>,
437    schema_definition: &Arc<schema::Definition>,
438    split_metric_namespace: bool,
439) -> Vec<Event> {
440    let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
441
442    if let Some(key) = log_schema().host_key() {
443        dd_metric
444            .host
445            .and_then(|host| tags.replace(key.to_string(), host));
446    }
447
448    dd_metric
449        .source_type_name
450        .and_then(|source| tags.replace("source_type_name".into(), source));
451    dd_metric
452        .device
453        .and_then(|dev| tags.replace("device".into(), dev));
454
455    let (namespace, name) = if split_metric_namespace {
456        namespace_name_from_dd_metric(&dd_metric.metric)
457    } else {
458        (None, dd_metric.metric.as_str())
459    };
460
461    match dd_metric.r#type {
462        DatadogMetricType::Count => dd_metric
463            .points
464            .iter()
465            .map(|dd_point| {
466                Metric::new(
467                    name.to_string(),
468                    MetricKind::Incremental,
469                    MetricValue::Counter { value: dd_point.1 },
470                )
471                .with_timestamp(Some(
472                    Utc.timestamp_opt(dd_point.0, 0)
473                        .single()
474                        .expect("invalid timestamp"),
475                ))
476                .with_tags(Some(tags.clone()))
477                .with_namespace(namespace)
478            })
479            .collect::<Vec<_>>(),
480        DatadogMetricType::Gauge => dd_metric
481            .points
482            .iter()
483            .map(|dd_point| {
484                Metric::new(
485                    name.to_string(),
486                    MetricKind::Absolute,
487                    MetricValue::Gauge { value: dd_point.1 },
488                )
489                .with_timestamp(Some(
490                    Utc.timestamp_opt(dd_point.0, 0)
491                        .single()
492                        .expect("invalid timestamp"),
493                ))
494                .with_tags(Some(tags.clone()))
495                .with_namespace(namespace)
496            })
497            .collect::<Vec<_>>(),
498        // Agent sends rate only for dogstatsd counter https://github.com/DataDog/datadog-agent/blob/f4a13c6dca5e2da4bb722f861a8ac4c2f715531d/pkg/metrics/counter.go#L8-L10
499        // for consistency purpose (w.r.t. (dog)statsd source) they are turned back into counters
500        DatadogMetricType::Rate => dd_metric
501            .points
502            .iter()
503            .map(|dd_point| {
504                let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
505                Metric::new(
506                    name.to_string(),
507                    MetricKind::Incremental,
508                    MetricValue::Counter {
509                        value: dd_point.1 * (i as f64),
510                    },
511                )
512                .with_timestamp(Some(
513                    Utc.timestamp_opt(dd_point.0, 0)
514                        .single()
515                        .expect("invalid timestamp"),
516                ))
517                // dd_metric.interval is in seconds, convert to ms
518                .with_interval_ms(NonZeroU32::new(i * 1000))
519                .with_tags(Some(tags.clone()))
520                .with_namespace(namespace)
521            })
522            .collect::<Vec<_>>(),
523    }
524    .into_iter()
525    .map(|mut metric| {
526        if let Some(k) = &api_key {
527            metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
528        }
529
530        metric
531            .metadata_mut()
532            .set_schema_definition(schema_definition);
533
534        metric.into()
535    })
536    .collect()
537}
538
539/// Parses up to the first '.' of the input metric name into a namespace.
540/// If no delimiter, the namespace is None type.
541fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
542    // ex: "system.fs.util" -> ("system", "fs.util")
543    match dd_metric_name.split_once('.') {
544        Some((namespace, name)) => (Some(namespace), name),
545        None => (None, dd_metric_name),
546    }
547}
548
549pub(crate) fn decode_ddsketch(
550    frame: Bytes,
551    api_key: &Option<Arc<str>>,
552    split_metric_namespace: bool,
553) -> crate::Result<Vec<Event>> {
554    let payload = SketchPayload::decode(frame)?;
555    // payload.metadata is always empty for payload coming from dd agents
556    Ok(payload
557        .sketches
558        .into_iter()
559        .flat_map(|sketch_series| {
560            // sketch_series.distributions is also always empty from payload coming from dd agents
561            let mut tags = into_metric_tags(sketch_series.tags);
562            log_schema()
563                .host_key()
564                .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
565
566            let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
567
568            sketch_series.dogsketches.into_iter().map(move |sketch| {
569                let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
570                let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
571                let val = MetricValue::from(
572                    AgentDDSketch::from_raw(
573                        sketch.cnt as u32,
574                        sketch.min,
575                        sketch.max,
576                        sketch.sum,
577                        sketch.avg,
578                        &k,
579                        &n,
580                    )
581                    .unwrap_or_else(AgentDDSketch::with_agent_defaults),
582                );
583                let (namespace, name) = if split_metric_namespace {
584                    namespace_name_from_dd_metric(&sketch_series.metric)
585                } else {
586                    (None, sketch_series.metric.as_str())
587                };
588                let mut metric = Metric::new_with_metadata(
589                    name.to_string(),
590                    MetricKind::Incremental,
591                    val,
592                    event_metadata.clone(),
593                )
594                .with_tags(Some(tags.clone()))
595                .with_timestamp(Some(
596                    Utc.timestamp_opt(sketch.ts, 0)
597                        .single()
598                        .expect("invalid timestamp"),
599                ))
600                .with_namespace(namespace);
601                if let Some(k) = &api_key {
602                    metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
603                }
604
605                metric.into()
606            })
607        })
608        .collect())
609}