vector/sinks/datadog/metrics/
sink.rs

1use std::{fmt, sync::Arc};
2
3use async_trait::async_trait;
4use chrono::Utc;
5use futures_util::{
6    future::ready,
7    stream::{self, BoxStream},
8    StreamExt,
9};
10use tower::Service;
11use vector_lib::stream::{BatcherSettings, DriverResponse};
12use vector_lib::{
13    event::{Event, Metric, MetricValue},
14    partition::Partitioner,
15    sink::StreamSink,
16};
17
18use super::{
19    config::DatadogMetricsEndpoint, normalizer::DatadogMetricsNormalizer,
20    request_builder::DatadogMetricsRequestBuilder, service::DatadogMetricsRequest,
21};
22use crate::{
23    internal_events::DatadogMetricsEncodingError,
24    sinks::util::{
25        buffer::metrics::{AggregatedSummarySplitter, MetricSplitter},
26        request_builder::default_request_builder_concurrency_limit,
27        SinkBuilderExt,
28    },
29};
30
31/// Partitions metrics based on which Datadog API endpoint that they are sent to.
32///
33/// Generally speaking, all "basic" metrics -- counter, gauge, set, aggregated summary-- are sent to
34/// the Series API, while distributions, aggregated histograms, and sketches (hehe) are sent to the
35/// Sketches API.
36struct DatadogMetricsTypePartitioner;
37
38impl Partitioner for DatadogMetricsTypePartitioner {
39    type Item = Metric;
40    type Key = (Option<Arc<str>>, DatadogMetricsEndpoint);
41
42    fn partition(&self, item: &Self::Item) -> Self::Key {
43        let endpoint = match item.data().value() {
44            MetricValue::Counter { .. } => DatadogMetricsEndpoint::series(),
45            MetricValue::Gauge { .. } => DatadogMetricsEndpoint::series(),
46            MetricValue::Set { .. } => DatadogMetricsEndpoint::series(),
47            MetricValue::Distribution { .. } => DatadogMetricsEndpoint::Sketches,
48            MetricValue::AggregatedHistogram { .. } => DatadogMetricsEndpoint::Sketches,
49            // NOTE: AggregatedSummary will be split into counters and gauges during normalization
50            MetricValue::AggregatedSummary { .. } => DatadogMetricsEndpoint::series(),
51            MetricValue::Sketch { .. } => DatadogMetricsEndpoint::Sketches,
52        };
53        (item.metadata().datadog_api_key(), endpoint)
54    }
55}
56
57pub(crate) struct DatadogMetricsSink<S> {
58    service: S,
59    request_builder: DatadogMetricsRequestBuilder,
60    batch_settings: BatcherSettings,
61    protocol: String,
62}
63
64impl<S> DatadogMetricsSink<S>
65where
66    S: Service<DatadogMetricsRequest> + Send,
67    S::Error: fmt::Debug + Send + 'static,
68    S::Future: Send + 'static,
69    S::Response: DriverResponse,
70{
71    /// Creates a new `DatadogMetricsSink`.
72    pub const fn new(
73        service: S,
74        request_builder: DatadogMetricsRequestBuilder,
75        batch_settings: BatcherSettings,
76        protocol: String,
77    ) -> Self {
78        DatadogMetricsSink {
79            service,
80            request_builder,
81            batch_settings,
82            protocol,
83        }
84    }
85
86    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
87        let mut splitter: MetricSplitter<AggregatedSummarySplitter> = MetricSplitter::default();
88        let batch_settings = self.batch_settings;
89
90        input
91            // Convert `Event` to `Metric` so we don't have to deal with constant conversions.
92            .filter_map(|event| ready(event.try_into_metric()))
93            // Split aggregated summaries into individual metrics for count, sum, and the quantiles, which lets us
94            // ensure that aggregated summaries effectively make it through normalization, as we can't actually
95            // normalize them and so they would be dropped during normalization otherwise.
96            .flat_map(|metric| stream::iter(splitter.split(metric)))
97            // Converts "absolute" metrics to "incremental", and converts distributions and aggregated histograms into
98            // sketches so that we can send them in a more DD-native format and thus avoid needing to directly specify
99            // what quantiles to generate, etc.
100            .normalized_with_default::<DatadogMetricsNormalizer>()
101            // We batch metrics by their endpoint: series endpoint for counters, gauge, and sets vs sketch endpoint for
102            // distributions, aggregated histograms, and sketches.
103            .batched_partitioned(DatadogMetricsTypePartitioner, || {
104                batch_settings.as_byte_size_config()
105            })
106            // Aggregate counters with identical timestamps, otherwise identical counters (same
107            // series and same timestamp, when rounded to whole seconds) will be dropped in a
108            // last-write-wins situation when they hit the DD metrics intake.
109            //
110            // This also sorts metrics by name, which significantly improves HTTP compression.
111            .concurrent_map(
112                default_request_builder_concurrency_limit(),
113                |((api_key, endpoint), metrics)| {
114                    Box::pin(async move {
115                        let collapsed_metrics =
116                            sort_and_collapse_counters_by_series_and_timestamp(metrics);
117                        ((api_key, endpoint), collapsed_metrics)
118                    })
119                },
120            )
121            // We build our requests "incrementally", which means that for a single batch of metrics, we might generate
122            // N requests to send them all, as Datadog has API-level limits on payload size, so we keep adding metrics
123            // to a request until we reach the limit, and then create a new request, and so on and so forth, until all
124            // metrics have been turned into a request.
125            .incremental_request_builder(self.request_builder)
126            // This unrolls the vector of request results that our request builder generates.
127            .flat_map(stream::iter)
128            // Generating requests _can_ fail, so we log and filter out errors here.
129            .filter_map(|request| async move {
130                match request {
131                    Err(e) => {
132                        let (reason, error_code, dropped_events) = e.into_parts();
133                        emit!(DatadogMetricsEncodingError {
134                            reason: reason.as_str(),
135                            error_code,
136                            dropped_events: dropped_events as usize,
137                        });
138                        None
139                    }
140                    Ok(req) => Some(req),
141                }
142            })
143            // Finally, we generate the driver which will take our requests, send them off, and appropriately handle
144            // finalization of the events, and logging/metrics, as the requests are responded to.
145            .into_driver(self.service)
146            .protocol(self.protocol)
147            .run()
148            .await
149    }
150}
151
152#[async_trait]
153impl<S> StreamSink<Event> for DatadogMetricsSink<S>
154where
155    S: Service<DatadogMetricsRequest> + Send,
156    S::Error: fmt::Debug + Send + 'static,
157    S::Future: Send + 'static,
158    S::Response: DriverResponse,
159{
160    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
161        // Rust has issues with lifetimes and generics, which `async_trait` exacerbates, so we write
162        // a normal async fn in `DatadogMetricsSink` itself, and then call out to it from this trait
163        // implementation, which makes the compiler happy.
164        self.run_inner(input).await
165    }
166}
167
168/// Collapses counters by series and timestamp, leaving all other metrics unmodified.
169/// The return value is sorted by metric series, which is desirable for compression. A sorted vector
170/// tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm).
171///
172/// Note that the time complexity of this function is O(n log n) and the space complexity is O(1).
173/// If needed, we can trade space for time by using a HashMap, which would be O(n) time and O(n) space.
174fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Metric> {
175    let now_ts = Utc::now().timestamp();
176
177    // Sort by series and timestamp which is required for the below dedupe to behave as desired.
178    // This also tends to compress better than a random ordering by 2-3x (JSON encoded, deflate algorithm).
179    // Note that `sort_unstable_by_key` would be simpler but results in lifetime errors without cloning.
180    metrics.sort_unstable_by(|a, b| {
181        (
182            a.value().as_name(),
183            a.series(),
184            a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
185        )
186            .cmp(&(
187                a.value().as_name(),
188                b.series(),
189                b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
190            ))
191    });
192
193    // Aggregate counters that share the same series and timestamp.
194    // While `coalesce` is semantically more fitting here than `dedupe_by`, we opt for the latter because
195    // they share the same functionality and `dedupe_by`'s implementation is more optimized, doing the
196    // operation in place.
197    metrics.dedup_by(|left, right| {
198        if left.series() != right.series() {
199            return false;
200        }
201
202        let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
203        let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
204        if left_ts != right_ts {
205            return false;
206        }
207
208        // Only aggregate counters. All other types can be skipped.
209        if let (
210            MetricValue::Counter { value: left_value },
211            MetricValue::Counter { value: right_value },
212        ) = (left.value(), right.value_mut())
213        {
214            // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then
215            // `left` is the element that gets removed.
216            *right_value += left_value;
217            right
218                .metadata_mut()
219                .merge_finalizers(left.metadata_mut().take_finalizers());
220
221            true
222        } else {
223            false
224        }
225    });
226
227    metrics
228}
229
230#[cfg(test)]
231mod tests {
232    use std::{collections::HashSet, time::Duration};
233
234    use chrono::{DateTime, Utc};
235    use proptest::prelude::*;
236    use vector_lib::{
237        event::{Metric, MetricKind, MetricValue},
238        metric_tags,
239    };
240
241    use super::sort_and_collapse_counters_by_series_and_timestamp;
242
243    fn arb_collapsible_metrics() -> impl Strategy<Value = Vec<Metric>> {
244        let ts = Utc::now();
245
246        any::<Vec<(u16, MetricValue)>>().prop_map(move |values| {
247            let mut unique_metrics = HashSet::new();
248            values
249                .into_iter()
250                .map(|(id, value)| {
251                    let name = format!("{}-{}", value.as_name(), id);
252                    Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts))
253                })
254                // Filter out duplicates other than counters. We do this to prevent false positives. False positives would occur
255                // because we don't collapse other metric types and we can't sort metrics by their values.
256                .filter(|metric| {
257                    matches!(metric.value(), MetricValue::Counter { .. })
258                        || unique_metrics.insert(metric.series().clone())
259                })
260                .collect()
261        })
262    }
263
264    fn create_counter(name: &str, value: f64) -> Metric {
265        Metric::new(
266            name,
267            MetricKind::Incremental,
268            MetricValue::Counter { value },
269        )
270    }
271
272    fn create_gauge(name: &str, value: f64) -> Metric {
273        Metric::new(name, MetricKind::Incremental, MetricValue::Gauge { value })
274    }
275
276    #[test]
277    fn collapse_no_metrics() {
278        let input = Vec::new();
279        let expected = input.clone();
280        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
281
282        assert_eq!(expected, actual);
283    }
284
285    #[test]
286    fn collapse_single_metric() {
287        let input = vec![create_counter("basic", 42.0)];
288        let expected = input.clone();
289        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
290
291        assert_eq!(expected, actual);
292    }
293
294    #[test]
295    fn collapse_identical_metrics_gauge() {
296        let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)];
297        let expected = input.clone();
298        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
299
300        assert_eq!(expected, actual);
301
302        let gauge_value = 41.0;
303        let input = vec![
304            create_gauge("basic", gauge_value),
305            create_gauge("basic", gauge_value),
306            create_gauge("basic", gauge_value),
307            create_gauge("basic", gauge_value),
308            create_gauge("basic", gauge_value),
309            create_gauge("basic", gauge_value),
310            create_gauge("basic", gauge_value),
311        ];
312        let expected = input.clone();
313        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
314
315        assert_eq!(expected, actual);
316    }
317
318    #[test]
319    fn collapse_identical_metrics_counter() {
320        let counter_value = 42.0;
321        let input = vec![
322            create_counter("basic", counter_value),
323            create_counter("basic", counter_value),
324            create_counter("basic", counter_value),
325            create_counter("basic", counter_value),
326            create_counter("basic", counter_value),
327            create_counter("basic", counter_value),
328            create_counter("basic", counter_value),
329        ];
330
331        let expected_counter_value = input.len() as f64 * counter_value;
332        let expected = vec![create_counter("basic", expected_counter_value)];
333        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
334
335        assert_eq!(expected, actual);
336    }
337
338    #[test]
339    fn collapse_identical_metrics_counter_unsorted() {
340        let gauge_value = 1.0;
341        let counter_value = 42.0;
342        let input = vec![
343            create_gauge("gauge", gauge_value),
344            create_counter("basic", counter_value),
345            create_counter("basic", counter_value),
346            create_counter("basic", counter_value),
347            create_gauge("gauge", gauge_value),
348            create_counter("basic", counter_value),
349            create_counter("basic", counter_value),
350            create_counter("basic", counter_value),
351            create_counter("basic", counter_value),
352        ];
353
354        let expected_counter_value = (input.len() - 2) as f64 * counter_value;
355        let expected = vec![
356            create_counter("basic", expected_counter_value),
357            create_gauge("gauge", gauge_value),
358            create_gauge("gauge", gauge_value),
359        ];
360        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
361
362        assert_eq!(expected, actual);
363    }
364
365    #[test]
366    fn collapse_identical_metrics_multiple_timestamps() {
367        let ts_1 = Utc::now() - Duration::from_secs(5);
368        let ts_2 = ts_1 - Duration::from_secs(5);
369        let counter_value = 42.0;
370        let input = vec![
371            create_counter("basic", counter_value),
372            create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
373            create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
374            create_counter("basic", counter_value),
375            create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
376            create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
377            create_counter("basic", counter_value),
378        ];
379
380        let expected = vec![
381            create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)),
382            create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)),
383            create_counter("basic", counter_value * 3.),
384        ];
385        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
386
387        assert_eq!(expected, actual);
388    }
389
390    #[test]
391    fn collapse_identical_metrics_with_tags() {
392        let counter_value = 42.0;
393        let input = vec![
394            create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
395            create_counter("basic", counter_value).with_tags(Some(metric_tags!(
396                "a" => "a",
397                "b" => "b",
398            ))),
399            create_counter("basic", counter_value),
400            create_counter("basic", counter_value).with_tags(Some(metric_tags!(
401                "b" => "b",
402                "a" => "a",
403            ))),
404            create_counter("basic", counter_value),
405            create_counter("basic", counter_value),
406            create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
407        ];
408
409        let expected = vec![
410            create_counter("basic", counter_value * 3.),
411            create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!("a" => "a"))),
412            create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!(
413                "a" => "a",
414                "b" => "b",
415            ))),
416        ];
417        let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
418
419        assert_eq!(expected, actual);
420    }
421
422    #[derive(Eq, Ord, PartialEq, PartialOrd)]
423    struct MetricCollapseSort {
424        metric_type: &'static str,
425        metric_name: String,
426        metric_ts: Option<DateTime<Utc>>,
427    }
428
429    impl MetricCollapseSort {
430        fn from_metric(metric: &Metric) -> Self {
431            Self {
432                metric_type: metric.value().as_name(),
433                metric_name: metric.name().to_string(),
434                metric_ts: metric.timestamp(),
435            }
436        }
437    }
438
439    fn collapse_dedup_fn(left: &mut Metric, right: &mut Metric) -> bool {
440        let series_eq = left.series() == right.series();
441        let timestamp_eq = left.timestamp() == right.timestamp();
442        if !series_eq || !timestamp_eq {
443            return false;
444        }
445
446        match (left.value_mut(), right.value_mut()) {
447            (
448                MetricValue::Counter { value: left_value },
449                MetricValue::Counter { value: right_value },
450            ) => {
451                // NOTE: The docs for `dedup_by` specify that if `left`/`right` are equal, then
452                // `left` is the element that gets removed.
453                *right_value += *left_value;
454                true
455            }
456            // Only counters can be equivalent for the purpose of this test.
457            _ => false,
458        }
459    }
460
461    proptest! {
462        #[test]
463        fn test_counter_collapse(input in arb_collapsible_metrics()) {
464            let mut expected_output = input.clone();
465            expected_output.sort_by_cached_key(MetricCollapseSort::from_metric);
466            expected_output.dedup_by(collapse_dedup_fn);
467
468            let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input);
469
470            prop_assert_eq!(expected_output, actual_output);
471        }
472    }
473}