vector/sources/datadog_agent/
metrics.rs

1use std::{num::NonZeroU32, sync::Arc};
2
3use bytes::Bytes;
4use chrono::{TimeZone, Utc};
5use http::StatusCode;
6use prost::Message;
7use serde::{Deserialize, Serialize};
8use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter};
9
10use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
11use vector_lib::{
12    event::{DatadogMetricOriginMetadata, EventMetadata},
13    metrics::AgentDDSketch,
14    EstimatedJsonEncodedSizeOf,
15};
16
17use crate::common::http::ErrorMessage;
18use crate::{
19    common::datadog::{DatadogMetricType, DatadogSeriesMetric},
20    config::log_schema,
21    event::{
22        metric::{Metric, MetricValue},
23        Event, MetricKind, MetricTags,
24    },
25    internal_events::EventsReceived,
26    schema,
27    sources::{
28        datadog_agent::{
29            ddmetric_proto::{metric_payload, Metadata, MetricPayload, SketchPayload},
30            handle_request, ApiKeyQueryParams, DatadogAgentSource,
31        },
32        util::extract_tag_key_and_value,
33    },
34    SourceSender,
35};
36
37#[derive(Deserialize, Serialize)]
38pub(crate) struct DatadogSeriesRequest {
39    pub(crate) series: Vec<DatadogSeriesMetric>,
40}
41
42pub(crate) fn build_warp_filter(
43    acknowledgements: bool,
44    multiple_outputs: bool,
45    out: SourceSender,
46    source: DatadogAgentSource,
47) -> BoxedFilter<(Response,)> {
48    let output = multiple_outputs.then_some(super::METRICS);
49    let sketches_service = sketches_service(acknowledgements, output, out.clone(), source.clone());
50    let series_v1_service =
51        series_v1_service(acknowledgements, output, out.clone(), source.clone());
52    let series_v2_service = series_v2_service(acknowledgements, output, out, source);
53    sketches_service
54        .or(series_v1_service)
55        .unify()
56        .or(series_v2_service)
57        .unify()
58        .boxed()
59}
60
61fn sketches_service(
62    acknowledgements: bool,
63    output: Option<&'static str>,
64    out: SourceSender,
65    source: DatadogAgentSource,
66) -> BoxedFilter<(Response,)> {
67    warp::post()
68        .and(path!("api" / "beta" / "sketches" / ..))
69        .and(warp::path::full())
70        .and(warp::header::optional::<String>("content-encoding"))
71        .and(warp::header::optional::<String>("dd-api-key"))
72        .and(warp::query::<ApiKeyQueryParams>())
73        .and(warp::body::bytes())
74        .and_then(
75            move |path: FullPath,
76                  encoding_header: Option<String>,
77                  api_token: Option<String>,
78                  query_params: ApiKeyQueryParams,
79                  body: Bytes| {
80                let events = source
81                    .decode(&encoding_header, body, path.as_str())
82                    .and_then(|body| {
83                        decode_datadog_sketches(
84                            body,
85                            source.api_key_extractor.extract(
86                                path.as_str(),
87                                api_token,
88                                query_params.dd_api_key,
89                            ),
90                            &source.events_received,
91                        )
92                    });
93                handle_request(events, acknowledgements, out.clone(), output)
94            },
95        )
96        .boxed()
97}
98
99fn series_v1_service(
100    acknowledgements: bool,
101    output: Option<&'static str>,
102    out: SourceSender,
103    source: DatadogAgentSource,
104) -> BoxedFilter<(Response,)> {
105    warp::post()
106        .and(path!("api" / "v1" / "series" / ..))
107        .and(warp::path::full())
108        .and(warp::header::optional::<String>("content-encoding"))
109        .and(warp::header::optional::<String>("dd-api-key"))
110        .and(warp::query::<ApiKeyQueryParams>())
111        .and(warp::body::bytes())
112        .and_then(
113            move |path: FullPath,
114                  encoding_header: Option<String>,
115                  api_token: Option<String>,
116                  query_params: ApiKeyQueryParams,
117                  body: Bytes| {
118                let events = source
119                    .decode(&encoding_header, body, path.as_str())
120                    .and_then(|body| {
121                        decode_datadog_series_v1(
122                            body,
123                            source.api_key_extractor.extract(
124                                path.as_str(),
125                                api_token,
126                                query_params.dd_api_key,
127                            ),
128                            // Currently metrics do not have schemas defined, so for now we just pass a
129                            // default one.
130                            &Arc::new(schema::Definition::default_legacy_namespace()),
131                            &source.events_received,
132                        )
133                    });
134                handle_request(events, acknowledgements, out.clone(), output)
135            },
136        )
137        .boxed()
138}
139
140fn series_v2_service(
141    acknowledgements: bool,
142    output: Option<&'static str>,
143    out: SourceSender,
144    source: DatadogAgentSource,
145) -> BoxedFilter<(Response,)> {
146    warp::post()
147        .and(path!("api" / "v2" / "series" / ..))
148        .and(warp::path::full())
149        .and(warp::header::optional::<String>("content-encoding"))
150        .and(warp::header::optional::<String>("dd-api-key"))
151        .and(warp::query::<ApiKeyQueryParams>())
152        .and(warp::body::bytes())
153        .and_then(
154            move |path: FullPath,
155                  encoding_header: Option<String>,
156                  api_token: Option<String>,
157                  query_params: ApiKeyQueryParams,
158                  body: Bytes| {
159                let events = source
160                    .decode(&encoding_header, body, path.as_str())
161                    .and_then(|body| {
162                        decode_datadog_series_v2(
163                            body,
164                            source.api_key_extractor.extract(
165                                path.as_str(),
166                                api_token,
167                                query_params.dd_api_key,
168                            ),
169                            &source.events_received,
170                        )
171                    });
172                handle_request(events, acknowledgements, out.clone(), output)
173            },
174        )
175        .boxed()
176}
177
178fn decode_datadog_sketches(
179    body: Bytes,
180    api_key: Option<Arc<str>>,
181    events_received: &Registered<EventsReceived>,
182) -> Result<Vec<Event>, ErrorMessage> {
183    if body.is_empty() {
184        // The datadog agent may send an empty payload as a keep alive
185        debug!(
186            message = "Empty payload ignored.",
187            internal_log_rate_limit = true
188        );
189        return Ok(Vec::new());
190    }
191
192    let metrics = decode_ddsketch(body, &api_key).map_err(|error| {
193        ErrorMessage::new(
194            StatusCode::UNPROCESSABLE_ENTITY,
195            format!("Error decoding Datadog sketch: {error:?}"),
196        )
197    })?;
198
199    events_received.emit(CountByteSize(
200        metrics.len(),
201        metrics.estimated_json_encoded_size_of(),
202    ));
203
204    Ok(metrics)
205}
206
207fn decode_datadog_series_v2(
208    body: Bytes,
209    api_key: Option<Arc<str>>,
210    events_received: &Registered<EventsReceived>,
211) -> Result<Vec<Event>, ErrorMessage> {
212    if body.is_empty() {
213        // The datadog agent may send an empty payload as a keep alive
214        debug!(
215            message = "Empty payload ignored.",
216            internal_log_rate_limit = true
217        );
218        return Ok(Vec::new());
219    }
220
221    let metrics = decode_ddseries_v2(body, &api_key).map_err(|error| {
222        ErrorMessage::new(
223            StatusCode::UNPROCESSABLE_ENTITY,
224            format!("Error decoding Datadog sketch: {error:?}"),
225        )
226    })?;
227
228    events_received.emit(CountByteSize(
229        metrics.len(),
230        metrics.estimated_json_encoded_size_of(),
231    ));
232
233    Ok(metrics)
234}
235
236/// Builds Vector's `EventMetadata` from the received metadata. Currently this is only
237/// utilized for passing through origin metadata set by the Agent.
238fn get_event_metadata(metadata: Option<&Metadata>) -> EventMetadata {
239    metadata
240        .and_then(|metadata| metadata.origin.as_ref())
241        .map_or_else(EventMetadata::default, |origin| {
242            trace!(
243                "Deserialized origin_product: `{}` origin_category: `{}` origin_service: `{}`.",
244                origin.origin_product,
245                origin.origin_category,
246                origin.origin_service,
247            );
248            EventMetadata::default().with_origin_metadata(DatadogMetricOriginMetadata::new(
249                Some(origin.origin_product),
250                Some(origin.origin_category),
251                Some(origin.origin_service),
252            ))
253        })
254}
255
256pub(crate) fn decode_ddseries_v2(
257    frame: Bytes,
258    api_key: &Option<Arc<str>>,
259) -> crate::Result<Vec<Event>> {
260    let payload = MetricPayload::decode(frame)?;
261    let decoded_metrics: Vec<Event> = payload
262        .series
263        .into_iter()
264        .flat_map(|serie| {
265            let (namespace, name) = namespace_name_from_dd_metric(&serie.metric);
266            let mut tags = into_metric_tags(serie.tags);
267
268            let event_metadata = get_event_metadata(serie.metadata.as_ref());
269
270            // It is possible to receive non-rate metrics from the Agent with an interval set.
271            // That interval can be applied with the `as_rate` function in the Datadog UI.
272            // The scenario this happens is when DogStatsD emits non-rate series metrics to the Agent,
273            // in which it sets an interval to 10. See
274            //    - https://github.com/DataDog/datadog-agent/blob/9f0a85c926596ec9aebe2d8e1f2a8b1af6e45635/pkg/aggregator/time_sampler.go#L49C1-L49C1
275            //    - https://github.com/DataDog/datadog-agent/blob/209b70529caff9ec1c30b6b2eed27bce725ed153/pkg/aggregator/aggregator.go#L39
276            //
277            // Note that DogStatsD is the only scenario this occurs; regular Agent checks/services do not set the
278            // interval for non-rate series metrics.
279            //
280            // Note that because Vector does not yet have a specific Metric type to handle Rate,
281            // we are distinguishing Rate from Count by setting an interval to Rate but not Count.
282            // Luckily, the only time a Count metric type is emitted by DogStatsD, is in the Sketch endpoint.
283            // (Regular Count metrics are emitted by DogStatsD as Rate metrics).
284            //
285            // In theory we should be safe to set this non-rate-interval to Count metrics below, but to be safe,
286            // we will only set it for Rate and Gauge. Since Rates already need an interval, the only "odd" case
287            // is Gauges.
288            //
289            // Ultimately if we had a unique internal representation of a Rate metric type, we wouldn't need to
290            // have special handling for the interval, we would just apply it to all metrics that it came in with.
291            let non_rate_interval = if serie.interval.is_positive() {
292                NonZeroU32::new(serie.interval as u32 * 1000) // incoming is seconds, convert to milliseconds
293            } else {
294                None
295            };
296
297            serie.resources.into_iter().for_each(|r| {
298                // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
299                // the hostname can be found in MetricSeries::resources and that is the only value stored there.
300                if r.r#type.eq("host") {
301                    log_schema()
302                        .host_key()
303                        .and_then(|key| tags.replace(key.to_string(), r.name));
304                } else {
305                    // But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map
306                    tags.replace(format!("resource.{}", r.r#type), r.name);
307                }
308            });
309            (!serie.source_type_name.is_empty())
310                .then(|| tags.replace("source_type_name".into(), serie.source_type_name));
311            // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L224
312            // serie.unit is omitted
313            match metric_payload::MetricType::try_from(serie.r#type) {
314                Ok(metric_payload::MetricType::Count) => serie
315                    .points
316                    .iter()
317                    .map(|dd_point| {
318                        Metric::new_with_metadata(
319                            name.to_string(),
320                            MetricKind::Incremental,
321                            MetricValue::Counter {
322                                value: dd_point.value,
323                            },
324                            event_metadata.clone(),
325                        )
326                        .with_timestamp(Some(
327                            Utc.timestamp_opt(dd_point.timestamp, 0)
328                                .single()
329                                .expect("invalid timestamp"),
330                        ))
331                        .with_tags(Some(tags.clone()))
332                        .with_namespace(namespace)
333                    })
334                    .collect::<Vec<_>>(),
335                Ok(metric_payload::MetricType::Gauge) => serie
336                    .points
337                    .iter()
338                    .map(|dd_point| {
339                        Metric::new_with_metadata(
340                            name.to_string(),
341                            MetricKind::Absolute,
342                            MetricValue::Gauge {
343                                value: dd_point.value,
344                            },
345                            event_metadata.clone(),
346                        )
347                        .with_timestamp(Some(
348                            Utc.timestamp_opt(dd_point.timestamp, 0)
349                                .single()
350                                .expect("invalid timestamp"),
351                        ))
352                        .with_tags(Some(tags.clone()))
353                        .with_namespace(namespace)
354                        .with_interval_ms(non_rate_interval)
355                    })
356                    .collect::<Vec<_>>(),
357                Ok(metric_payload::MetricType::Rate) => serie
358                    .points
359                    .iter()
360                    .map(|dd_point| {
361                        let i = Some(serie.interval)
362                            .filter(|v| *v != 0)
363                            .map(|v| v as u32)
364                            .unwrap_or(1);
365                        Metric::new_with_metadata(
366                            name.to_string(),
367                            MetricKind::Incremental,
368                            MetricValue::Counter {
369                                value: dd_point.value * (i as f64),
370                            },
371                            event_metadata.clone(),
372                        )
373                        .with_timestamp(Some(
374                            Utc.timestamp_opt(dd_point.timestamp, 0)
375                                .single()
376                                .expect("invalid timestamp"),
377                        ))
378                        // serie.interval is in seconds, convert to ms
379                        .with_interval_ms(NonZeroU32::new(i * 1000))
380                        .with_tags(Some(tags.clone()))
381                        .with_namespace(namespace)
382                    })
383                    .collect::<Vec<_>>(),
384                Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
385                    warn!("Unspecified metric type ({}).", serie.r#type);
386                    Vec::new()
387                }
388            }
389        })
390        .map(|mut metric| {
391            if let Some(k) = &api_key {
392                metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
393            }
394            metric.into()
395        })
396        .collect();
397
398    Ok(decoded_metrics)
399}
400
401fn decode_datadog_series_v1(
402    body: Bytes,
403    api_key: Option<Arc<str>>,
404    schema_definition: &Arc<schema::Definition>,
405    events_received: &Registered<EventsReceived>,
406) -> Result<Vec<Event>, ErrorMessage> {
407    if body.is_empty() {
408        // The datadog agent may send an empty payload as a keep alive
409        debug!(
410            message = "Empty payload ignored.",
411            internal_log_rate_limit = true
412        );
413        return Ok(Vec::new());
414    }
415
416    let metrics: DatadogSeriesRequest = serde_json::from_slice(&body).map_err(|error| {
417        ErrorMessage::new(
418            StatusCode::BAD_REQUEST,
419            format!("Error parsing JSON: {error:?}"),
420        )
421    })?;
422
423    let decoded_metrics: Vec<Event> = metrics
424        .series
425        .into_iter()
426        .flat_map(|m| into_vector_metric(m, api_key.clone(), schema_definition))
427        .collect();
428
429    events_received.emit(CountByteSize(
430        decoded_metrics.len(),
431        decoded_metrics.estimated_json_encoded_size_of(),
432    ));
433
434    Ok(decoded_metrics)
435}
436
437fn into_metric_tags(tags: Vec<String>) -> MetricTags {
438    tags.iter().map(extract_tag_key_and_value).collect()
439}
440
441fn into_vector_metric(
442    dd_metric: DatadogSeriesMetric,
443    api_key: Option<Arc<str>>,
444    schema_definition: &Arc<schema::Definition>,
445) -> Vec<Event> {
446    let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
447
448    if let Some(key) = log_schema().host_key() {
449        dd_metric
450            .host
451            .and_then(|host| tags.replace(key.to_string(), host));
452    }
453
454    dd_metric
455        .source_type_name
456        .and_then(|source| tags.replace("source_type_name".into(), source));
457    dd_metric
458        .device
459        .and_then(|dev| tags.replace("device".into(), dev));
460
461    let (namespace, name) = namespace_name_from_dd_metric(&dd_metric.metric);
462
463    match dd_metric.r#type {
464        DatadogMetricType::Count => dd_metric
465            .points
466            .iter()
467            .map(|dd_point| {
468                Metric::new(
469                    name.to_string(),
470                    MetricKind::Incremental,
471                    MetricValue::Counter { value: dd_point.1 },
472                )
473                .with_timestamp(Some(
474                    Utc.timestamp_opt(dd_point.0, 0)
475                        .single()
476                        .expect("invalid timestamp"),
477                ))
478                .with_tags(Some(tags.clone()))
479                .with_namespace(namespace)
480            })
481            .collect::<Vec<_>>(),
482        DatadogMetricType::Gauge => dd_metric
483            .points
484            .iter()
485            .map(|dd_point| {
486                Metric::new(
487                    name.to_string(),
488                    MetricKind::Absolute,
489                    MetricValue::Gauge { value: dd_point.1 },
490                )
491                .with_timestamp(Some(
492                    Utc.timestamp_opt(dd_point.0, 0)
493                        .single()
494                        .expect("invalid timestamp"),
495                ))
496                .with_tags(Some(tags.clone()))
497                .with_namespace(namespace)
498            })
499            .collect::<Vec<_>>(),
500        // Agent sends rate only for dogstatsd counter https://github.com/DataDog/datadog-agent/blob/f4a13c6dca5e2da4bb722f861a8ac4c2f715531d/pkg/metrics/counter.go#L8-L10
501        // for consistency purpose (w.r.t. (dog)statsd source) they are turned back into counters
502        DatadogMetricType::Rate => dd_metric
503            .points
504            .iter()
505            .map(|dd_point| {
506                let i = dd_metric.interval.filter(|v| *v != 0).unwrap_or(1);
507                Metric::new(
508                    name.to_string(),
509                    MetricKind::Incremental,
510                    MetricValue::Counter {
511                        value: dd_point.1 * (i as f64),
512                    },
513                )
514                .with_timestamp(Some(
515                    Utc.timestamp_opt(dd_point.0, 0)
516                        .single()
517                        .expect("invalid timestamp"),
518                ))
519                // dd_metric.interval is in seconds, convert to ms
520                .with_interval_ms(NonZeroU32::new(i * 1000))
521                .with_tags(Some(tags.clone()))
522                .with_namespace(namespace)
523            })
524            .collect::<Vec<_>>(),
525    }
526    .into_iter()
527    .map(|mut metric| {
528        if let Some(k) = &api_key {
529            metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
530        }
531
532        metric
533            .metadata_mut()
534            .set_schema_definition(schema_definition);
535
536        metric.into()
537    })
538    .collect()
539}
540
541/// Parses up to the first '.' of the input metric name into a namespace.
542/// If no delimiter, the namespace is None type.
543fn namespace_name_from_dd_metric(dd_metric_name: &str) -> (Option<&str>, &str) {
544    // ex: "system.fs.util" -> ("system", "fs.util")
545    match dd_metric_name.split_once('.') {
546        Some((namespace, name)) => (Some(namespace), name),
547        None => (None, dd_metric_name),
548    }
549}
550
551pub(crate) fn decode_ddsketch(
552    frame: Bytes,
553    api_key: &Option<Arc<str>>,
554) -> crate::Result<Vec<Event>> {
555    let payload = SketchPayload::decode(frame)?;
556    // payload.metadata is always empty for payload coming from dd agents
557    Ok(payload
558        .sketches
559        .into_iter()
560        .flat_map(|sketch_series| {
561            // sketch_series.distributions is also always empty from payload coming from dd agents
562            let mut tags = into_metric_tags(sketch_series.tags);
563            log_schema()
564                .host_key()
565                .and_then(|key| tags.replace(key.to_string(), sketch_series.host.clone()));
566
567            let event_metadata = get_event_metadata(sketch_series.metadata.as_ref());
568
569            sketch_series.dogsketches.into_iter().map(move |sketch| {
570                let k: Vec<i16> = sketch.k.iter().map(|k| *k as i16).collect();
571                let n: Vec<u16> = sketch.n.iter().map(|n| *n as u16).collect();
572                let val = MetricValue::from(
573                    AgentDDSketch::from_raw(
574                        sketch.cnt as u32,
575                        sketch.min,
576                        sketch.max,
577                        sketch.sum,
578                        sketch.avg,
579                        &k,
580                        &n,
581                    )
582                    .unwrap_or_else(AgentDDSketch::with_agent_defaults),
583                );
584                let (namespace, name) = namespace_name_from_dd_metric(&sketch_series.metric);
585                let mut metric = Metric::new_with_metadata(
586                    name.to_string(),
587                    MetricKind::Incremental,
588                    val,
589                    event_metadata.clone(),
590                )
591                .with_tags(Some(tags.clone()))
592                .with_timestamp(Some(
593                    Utc.timestamp_opt(sketch.ts, 0)
594                        .single()
595                        .expect("invalid timestamp"),
596                ))
597                .with_namespace(namespace);
598                if let Some(k) = &api_key {
599                    metric.metadata_mut().set_datadog_api_key(Arc::clone(k));
600                }
601
602                metric.into()
603            })
604        })
605        .collect())
606}