vector/sinks/datadog/metrics/
sink.rs

1use std::{fmt, sync::Arc};
2
3use async_trait::async_trait;
4use chrono::Utc;
5use futures_util::{
6    StreamExt,
7    future::ready,
8    stream::{self, BoxStream},
9};
10use tower::Service;
11use vector_lib::{
12    event::{Event, Metric, MetricValue},
13    partition::Partitioner,
14    sink::StreamSink,
15    stream::{BatcherSettings, DriverResponse},
16};
17
18use super::{
19    config::{DatadogMetricsEndpoint, SeriesApiVersion},
20    normalizer::DatadogMetricsNormalizer,
21    request_builder::DatadogMetricsRequestBuilder,
22    service::DatadogMetricsRequest,
23};
24use crate::{
25    internal_events::DatadogMetricsEncodingError,
26    sinks::util::{
27        SinkBuilderExt,
28        buffer::metrics::{AggregatedSummarySplitter, MetricSplitter},
29        request_builder::default_request_builder_concurrency_limit,
30    },
31};
32
33/// Partitions metrics based on which Datadog API endpoint that they are sent to.
34///
35/// Generally speaking, all "basic" metrics -- counter, gauge, set, aggregated summary-- are sent to
36/// the Series API, while distributions, aggregated histograms, and sketches (hehe) are sent to the
37/// Sketches API.
38struct DatadogMetricsTypePartitioner {
39    series_api_version: SeriesApiVersion,
40}
41
42impl Partitioner for DatadogMetricsTypePartitioner {
43    type Item = Metric;
44    type Key = (Option<Arc<str>>, DatadogMetricsEndpoint);
45
46    fn partition(&self, item: &Self::Item) -> Self::Key {
47        let series = DatadogMetricsEndpoint::Series(self.series_api_version);
48        let endpoint = match item.data().value() {
49            MetricValue::Counter { .. } => series,
50            MetricValue::Gauge { .. } => series,
51            MetricValue::Set { .. } => series,
52            MetricValue::Distribution { .. } => DatadogMetricsEndpoint::Sketches,
53            MetricValue::AggregatedHistogram { .. } => DatadogMetricsEndpoint::Sketches,
54            // NOTE: AggregatedSummary will be split into counters and gauges during normalization
55            MetricValue::AggregatedSummary { .. } => series,
56            MetricValue::Sketch { .. } => DatadogMetricsEndpoint::Sketches,
57        };
58        (item.metadata().datadog_api_key(), endpoint)
59    }
60}
61
62pub(crate) struct DatadogMetricsSink<S> {
63    service: S,
64    request_builder: DatadogMetricsRequestBuilder,
65    series_batch_settings: BatcherSettings,
66    sketches_batch_settings: BatcherSettings,
67    protocol: String,
68    series_api_version: SeriesApiVersion,
69}
70
71impl<S> DatadogMetricsSink<S>
72where
73    S: Service<DatadogMetricsRequest> + Send,
74    S::Error: fmt::Debug + Send + 'static,
75    S::Future: Send + 'static,
76    S::Response: DriverResponse,
77{
78    /// Creates a new `DatadogMetricsSink`.
79    pub const fn new(
80        service: S,
81        request_builder: DatadogMetricsRequestBuilder,
82        series_batch_settings: BatcherSettings,
83        sketches_batch_settings: BatcherSettings,
84        protocol: String,
85        series_api_version: SeriesApiVersion,
86    ) -> Self {
87        DatadogMetricsSink {
88            service,
89            request_builder,
90            series_batch_settings,
91            sketches_batch_settings,
92            protocol,
93            series_api_version,
94        }
95    }
96
97    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
98        let mut splitter: MetricSplitter<AggregatedSummarySplitter> = MetricSplitter::default();
99        let series_batch_settings = self.series_batch_settings;
100        let sketches_batch_settings = self.sketches_batch_settings;
101        let partitioner = DatadogMetricsTypePartitioner {
102            series_api_version: self.series_api_version,
103        };
104
105        input
106            // Convert `Event` to `Metric` so we don't have to deal with constant conversions.
107            .filter_map(|event| ready(event.try_into_metric()))
108            // Split aggregated summaries into individual metrics for count, sum, and the quantiles, which lets us
109            // ensure that aggregated summaries effectively make it through normalization, as we can't actually
110            // normalize them and so they would be dropped during normalization otherwise.
111            .flat_map(|metric| stream::iter(splitter.split(metric)))
112            // Converts "absolute" metrics to "incremental", and converts distributions and aggregated histograms into
113            // sketches so that we can send them in a more DD-native format and thus avoid needing to directly specify
114            // what quantiles to generate, etc.
115            .normalized_with_default::<DatadogMetricsNormalizer>()
116            // We batch metrics by their endpoint: series endpoint for counters, gauge, and sets vs sketch endpoint for
117            // distributions, aggregated histograms, and sketches. Each endpoint uses its own byte size
118            // limit: 5 MiB for Series v2 and 60 MiB for Sketches.
119            .batched_partitioned(
120                partitioner,
121                series_batch_settings.timeout,
122                |(_api_key, endpoint)| match endpoint {
123                    DatadogMetricsEndpoint::Series(_) => {
124                        series_batch_settings.as_byte_size_config()
125                    }
126                    DatadogMetricsEndpoint::Sketches => {
127                        sketches_batch_settings.as_byte_size_config()
128                    }
129                },
130            )
131            // Aggregate counters with identical timestamps, otherwise identical counters (same
132            // series and same timestamp, when rounded to whole seconds) will be dropped in a
133            // last-write-wins situation when they hit the DD metrics intake.
134            //
135            // This also sorts metrics by name, which significantly improves HTTP compression.
136            .concurrent_map(
137                default_request_builder_concurrency_limit(),
138                |((api_key, endpoint), metrics)| {
139                    Box::pin(async move {
140                        let collapsed_metrics =
141                            sort_and_collapse_counters_by_series_and_timestamp(metrics);
142                        ((api_key, endpoint), collapsed_metrics)
143                    })
144                },
145            )
146            // We build our requests "incrementally", which means that for a single batch of metrics, we might generate
147            // N requests to send them all, as Datadog has API-level limits on payload size, so we keep adding metrics
148            // to a request until we reach the limit, and then create a new request, and so on and so forth, until all
149            // metrics have been turned into a request.
150            .incremental_request_builder(self.request_builder)
151            // This unrolls the vector of request results that our request builder generates.
152            .flat_map(stream::iter)
153            // Generating requests _can_ fail, so we log and filter out errors here.
154            .filter_map(|request| async move {
155                match request {
156                    Err(e) => {
157                        let (reason, error_code, dropped_events) = e.into_parts();
158                        emit!(DatadogMetricsEncodingError {
159                            reason: reason.as_str(),
160                            error_code,
161                            dropped_events: dropped_events as usize,
162                        });
163                        None
164                    }
165                    Ok(req) => Some(req),
166                }
167            })
168            // Finally, we generate the driver which will take our requests, send them off, and appropriately handle
169            // finalization of the events, and logging/metrics, as the requests are responded to.
170            .into_driver(self.service)
171            .protocol(self.protocol)
172            .run()
173            .await
174    }
175}
176
177#[async_trait]
178impl<S> StreamSink<Event> for DatadogMetricsSink<S>
179where
180    S: Service<DatadogMetricsRequest> + Send,
181    S::Error: fmt::Debug + Send + 'static,
182    S::Future: Send + 'static,
183    S::Response: DriverResponse,
184{
185    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
186        // Rust has issues with lifetimes and generics, which `async_trait` exacerbates, so we write
187        // a normal async fn in `DatadogMetricsSink` itself, and then call out to it from this trait
188        // implementation, which makes the compiler happy.
189        self.run_inner(input).await
190    }
191}
192
193/// Collapses counters by series and timestamp, leaving all other metrics unmodified.
194/// The return value is sorted by metric series, which is desirable for compression. A sorted vector
195/// tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm).
196///
197/// Note that the time complexity of this function is O(n log n) and the space complexity is O(1).
198/// If needed, we can trade space for time by using a HashMap, which would be O(n) time and O(n) space.
199fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Metric> {
200    let now_ts = Utc::now().timestamp();
201
202    // Sort by series and timestamp which is required for the below dedupe to behave as desired.
203    // This also tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm).
204    // Note that `sort_unstable_by_key` would be simpler but results in lifetime errors without cloning.
205    metrics.sort_unstable_by(|a, b| {
206        (
207            a.value().as_name(),
208            a.series(),
209            a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
210        )
211            .cmp(&(
212                a.value().as_name(),
213                b.series(),
214                b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
215            ))
216    });
217
218    // Aggregate counters that share the same series and timestamp.
219    // While `coalesce` is semantically more fitting here than `dedupe_by`, we opt for the latter because
220    // they share the same functionality and `dedupe_by`'s implementation is more optimized, doing the
221    // operation in place.
222    metrics.dedup_by(|left, right| {
223        if left.series() != right.series() {
224            return false;
225        }
226
227        let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
228        let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
229        if left_ts != right_ts {
230            return false;
231        }
232
233        // Only aggregate counters. All other types can be skipped.
234        if let (
235            MetricValue::Counter { value: left_value },
236            MetricValue::Counter { value: right_value },
237        ) = (left.value(), right.value_mut())
238        {
239            // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then
240            // `left` is the element that gets removed.
241            *right_value += left_value;
242            right
243                .metadata_mut()
244                .merge_finalizers(left.metadata_mut().take_finalizers());
245
246            true
247        } else {
248            false
249        }
250    });
251
252    metrics
253}
254
255#[cfg(test)]
256mod tests {
257    use std::{collections::HashSet, time::Duration};
258
259    use chrono::{DateTime, Utc};
260    use proptest::prelude::*;
261    use vector_lib::event::{Metric, MetricKind, MetricValue};
262    use vector_lib::metric_tags;
263
264    use super::sort_and_collapse_counters_by_series_and_timestamp;
265
266    fn arb_collapsible_metrics() -> impl Strategy<Value = Vec<Metric>> {
267        let ts = Utc::now();
268
269        any::<Vec<(u16, MetricValue)>>().prop_map(move |values| {
270            let mut unique_metrics = HashSet::new();
271            values
272                .into_iter()
273                .map(|(id, value)| {
274                    let name = format!("{}-{}", value.as_name(), id);
275                    Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts))
276                })
277                // Filter out duplicates other than counters. We do this to prevent false positives. False positives would occur
278                // because we don't collapse other metric types and we can't sort metrics by their values.
279                .filter(|metric| {
280                    matches!(metric.value(), MetricValue::Counter { .. })
281                        || unique_metrics.insert(metric.series().clone())
282                })
283                .collect()
284        })
285    }
286
287    fn create_counter(name: &str, value: f64) -> Metric {
288        Metric::new(
289            name,
290            MetricKind::Incremental,
291            MetricValue::Counter { value },
292        )
293    }
294
295    fn create_gauge(name: &str, value: f64) -> Metric {
296        Metric::new(name, MetricKind::Incremental, MetricValue::Gauge { value })
297    }
298
299    #[test]
300    fn collapse_no_metrics() {
301        let input = Vec::new();
302        let expected = input.clone();
303        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
304
305        assert_eq!(expected, actual);
306    }
307
308    #[test]
309    fn collapse_single_metric() {
310        let input = vec![create_counter("basic", 42.0)];
311        let expected = input.clone();
312        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
313
314        assert_eq!(expected, actual);
315    }
316
317    #[test]
318    fn collapse_identical_metrics_gauge() {
319        let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)];
320        let expected = input.clone();
321        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
322
323        assert_eq!(expected, actual);
324
325        let gauge_value = 41.0;
326        let input = vec![
327            create_gauge("basic", gauge_value),
328            create_gauge("basic", gauge_value),
329            create_gauge("basic", gauge_value),
330            create_gauge("basic", gauge_value),
331            create_gauge("basic", gauge_value),
332            create_gauge("basic", gauge_value),
333            create_gauge("basic", gauge_value),
334        ];
335        let expected = input.clone();
336        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
337
338        assert_eq!(expected, actual);
339    }
340
341    #[test]
342    fn collapse_identical_metrics_counter() {
343        let counter_value = 42.0;
344        let input = vec![
345            create_counter("basic", counter_value),
346            create_counter("basic", counter_value),
347            create_counter("basic", counter_value),
348            create_counter("basic", counter_value),
349            create_counter("basic", counter_value),
350            create_counter("basic", counter_value),
351            create_counter("basic", counter_value),
352        ];
353
354        let expected_counter_value = input.len() as f64 * counter_value;
355        let expected = vec![create_counter("basic", expected_counter_value)];
356        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
357
358        assert_eq!(expected, actual);
359    }
360
361    #[test]
362    fn collapse_identical_metrics_counter_unsorted() {
363        let gauge_value = 1.0;
364        let counter_value = 42.0;
365        let input = vec![
366            create_gauge("gauge", gauge_value),
367            create_counter("basic", counter_value),
368            create_counter("basic", counter_value),
369            create_counter("basic", counter_value),
370            create_gauge("gauge", gauge_value),
371            create_counter("basic", counter_value),
372            create_counter("basic", counter_value),
373            create_counter("basic", counter_value),
374            create_counter("basic", counter_value),
375        ];
376
377        let expected_counter_value = (input.len() - 2) as f64 * counter_value;
378        let expected = vec![
379            create_counter("basic", expected_counter_value),
380            create_gauge("gauge", gauge_value),
381            create_gauge("gauge", gauge_value),
382        ];
383        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
384
385        assert_eq!(expected, actual);
386    }
387
388    #[test]
389    fn collapse_identical_metrics_multiple_timestamps() {
390        let ts_1 = Utc::now() - Duration::from_secs(5);
391        let ts_2 = ts_1 - Duration::from_secs(5);
392        let counter_value = 42.0;
393        let input = vec![
394            create_counter("basic", counter_value),
395            create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
396            create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
397            create_counter("basic", counter_value),
398            create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
399            create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
400            create_counter("basic", counter_value),
401        ];
402
403        let expected = vec![
404            create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)),
405            create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)),
406            create_counter("basic", counter_value * 3.),
407        ];
408        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
409
410        assert_eq!(expected, actual);
411    }
412
413    #[test]
414    fn collapse_identical_metrics_with_tags() {
415        let counter_value = 42.0;
416        let input = vec![
417            create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
418            create_counter("basic", counter_value).with_tags(Some(metric_tags!(
419                "a" => "a",
420                "b" => "b",
421            ))),
422            create_counter("basic", counter_value),
423            create_counter("basic", counter_value).with_tags(Some(metric_tags!(
424                "b" => "b",
425                "a" => "a",
426            ))),
427            create_counter("basic", counter_value),
428            create_counter("basic", counter_value),
429            create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
430        ];
431
432        let expected = vec![
433            create_counter("basic", counter_value * 3.),
434            create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!("a" => "a"))),
435            create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!(
436                "a" => "a",
437                "b" => "b",
438            ))),
439        ];
440        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
441
442        assert_eq!(expected, actual);
443    }
444
445    #[derive(Eq, Ord, PartialEq, PartialOrd)]
446    struct MetricCollapseSort {
447        metric_type: &'static str,
448        metric_name: String,
449        metric_ts: Option<DateTime<Utc>>,
450    }
451
452    impl MetricCollapseSort {
453        fn from_metric(metric: &Metric) -> Self {
454            Self {
455                metric_type: metric.value().as_name(),
456                metric_name: metric.name().to_string(),
457                metric_ts: metric.timestamp(),
458            }
459        }
460    }
461
462    fn collapse_dedup_fn(left: &mut Metric, right: &mut Metric) -> bool {
463        let series_eq = left.series() == right.series();
464        let timestamp_eq = left.timestamp() == right.timestamp();
465        if !series_eq || !timestamp_eq {
466            return false;
467        }
468
469        match (left.value_mut(), right.value_mut()) {
470            (
471                MetricValue::Counter { value: left_value },
472                MetricValue::Counter { value: right_value },
473            ) => {
474                // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then
475                // `left` is the element that gets removed.
476                *right_value += *left_value;
477                true
478            }
479            // Only counters can be equivalent for the purpose of this test.
480            _ => false,
481        }
482    }
483
484    proptest! {
485        #[test]
486        fn test_counter_collapse(input in arb_collapsible_metrics()) {
487            let mut expected_output = input.clone();
488            expected_output.sort_by_cached_key(MetricCollapseSort::from_metric);
489            expected_output.dedup_by(collapse_dedup_fn);
490
491            let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input);
492
493            prop_assert_eq!(expected_output, actual_output);
494        }
495    }
496}