1use std::{fmt, sync::Arc};
2
3use async_trait::async_trait;
4use chrono::Utc;
5use futures_util::{
6 StreamExt,
7 future::ready,
8 stream::{self, BoxStream},
9};
10use tower::Service;
11use vector_lib::{
12 event::{Event, Metric, MetricValue},
13 partition::Partitioner,
14 sink::StreamSink,
15 stream::{BatcherSettings, DriverResponse},
16};
17
18use super::{
19 config::{DatadogMetricsEndpoint, SeriesApiVersion},
20 normalizer::DatadogMetricsNormalizer,
21 request_builder::DatadogMetricsRequestBuilder,
22 service::DatadogMetricsRequest,
23};
24use crate::{
25 internal_events::DatadogMetricsEncodingError,
26 sinks::util::{
27 SinkBuilderExt,
28 buffer::metrics::{AggregatedSummarySplitter, MetricSplitter},
29 request_builder::default_request_builder_concurrency_limit,
30 },
31};
32
33struct DatadogMetricsTypePartitioner {
39 series_api_version: SeriesApiVersion,
40}
41
42impl Partitioner for DatadogMetricsTypePartitioner {
43 type Item = Metric;
44 type Key = (Option<Arc<str>>, DatadogMetricsEndpoint);
45
46 fn partition(&self, item: &Self::Item) -> Self::Key {
47 let series = DatadogMetricsEndpoint::Series(self.series_api_version);
48 let endpoint = match item.data().value() {
49 MetricValue::Counter { .. } => series,
50 MetricValue::Gauge { .. } => series,
51 MetricValue::Set { .. } => series,
52 MetricValue::Distribution { .. } => DatadogMetricsEndpoint::Sketches,
53 MetricValue::AggregatedHistogram { .. } => DatadogMetricsEndpoint::Sketches,
54 MetricValue::AggregatedSummary { .. } => series,
56 MetricValue::Sketch { .. } => DatadogMetricsEndpoint::Sketches,
57 };
58 (item.metadata().datadog_api_key(), endpoint)
59 }
60}
61
62pub(crate) struct DatadogMetricsSink<S> {
63 service: S,
64 request_builder: DatadogMetricsRequestBuilder,
65 series_batch_settings: BatcherSettings,
66 sketches_batch_settings: BatcherSettings,
67 protocol: String,
68 series_api_version: SeriesApiVersion,
69}
70
71impl<S> DatadogMetricsSink<S>
72where
73 S: Service<DatadogMetricsRequest> + Send,
74 S::Error: fmt::Debug + Send + 'static,
75 S::Future: Send + 'static,
76 S::Response: DriverResponse,
77{
78 pub const fn new(
80 service: S,
81 request_builder: DatadogMetricsRequestBuilder,
82 series_batch_settings: BatcherSettings,
83 sketches_batch_settings: BatcherSettings,
84 protocol: String,
85 series_api_version: SeriesApiVersion,
86 ) -> Self {
87 DatadogMetricsSink {
88 service,
89 request_builder,
90 series_batch_settings,
91 sketches_batch_settings,
92 protocol,
93 series_api_version,
94 }
95 }
96
97 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
98 let mut splitter: MetricSplitter<AggregatedSummarySplitter> = MetricSplitter::default();
99 let series_batch_settings = self.series_batch_settings;
100 let sketches_batch_settings = self.sketches_batch_settings;
101 let partitioner = DatadogMetricsTypePartitioner {
102 series_api_version: self.series_api_version,
103 };
104
105 input
106 .filter_map(|event| ready(event.try_into_metric()))
108 .flat_map(|metric| stream::iter(splitter.split(metric)))
112 .normalized_with_default::<DatadogMetricsNormalizer>()
116 .batched_partitioned(
120 partitioner,
121 series_batch_settings.timeout,
122 |(_api_key, endpoint)| match endpoint {
123 DatadogMetricsEndpoint::Series(_) => {
124 series_batch_settings.as_byte_size_config()
125 }
126 DatadogMetricsEndpoint::Sketches => {
127 sketches_batch_settings.as_byte_size_config()
128 }
129 },
130 )
131 .concurrent_map(
137 default_request_builder_concurrency_limit(),
138 |((api_key, endpoint), metrics)| {
139 Box::pin(async move {
140 let collapsed_metrics =
141 sort_and_collapse_counters_by_series_and_timestamp(metrics);
142 ((api_key, endpoint), collapsed_metrics)
143 })
144 },
145 )
146 .incremental_request_builder(self.request_builder)
151 .flat_map(stream::iter)
153 .filter_map(|request| async move {
155 match request {
156 Err(e) => {
157 let (reason, error_code, dropped_events) = e.into_parts();
158 emit!(DatadogMetricsEncodingError {
159 reason: reason.as_str(),
160 error_code,
161 dropped_events: dropped_events as usize,
162 });
163 None
164 }
165 Ok(req) => Some(req),
166 }
167 })
168 .into_driver(self.service)
171 .protocol(self.protocol)
172 .run()
173 .await
174 }
175}
176
177#[async_trait]
178impl<S> StreamSink<Event> for DatadogMetricsSink<S>
179where
180 S: Service<DatadogMetricsRequest> + Send,
181 S::Error: fmt::Debug + Send + 'static,
182 S::Future: Send + 'static,
183 S::Response: DriverResponse,
184{
185 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
186 self.run_inner(input).await
190 }
191}
192
193fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Metric> {
200 let now_ts = Utc::now().timestamp();
201
202 metrics.sort_unstable_by(|a, b| {
206 (
207 a.value().as_name(),
208 a.series(),
209 a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
210 )
211 .cmp(&(
212 a.value().as_name(),
213 b.series(),
214 b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
215 ))
216 });
217
218 metrics.dedup_by(|left, right| {
223 if left.series() != right.series() {
224 return false;
225 }
226
227 let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
228 let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
229 if left_ts != right_ts {
230 return false;
231 }
232
233 if let (
235 MetricValue::Counter { value: left_value },
236 MetricValue::Counter { value: right_value },
237 ) = (left.value(), right.value_mut())
238 {
239 *right_value += left_value;
242 right
243 .metadata_mut()
244 .merge_finalizers(left.metadata_mut().take_finalizers());
245
246 true
247 } else {
248 false
249 }
250 });
251
252 metrics
253}
254
255#[cfg(test)]
256mod tests {
257 use std::{collections::HashSet, time::Duration};
258
259 use chrono::{DateTime, Utc};
260 use proptest::prelude::*;
261 use vector_lib::event::{Metric, MetricKind, MetricValue};
262 use vector_lib::metric_tags;
263
264 use super::sort_and_collapse_counters_by_series_and_timestamp;
265
266 fn arb_collapsible_metrics() -> impl Strategy<Value = Vec<Metric>> {
267 let ts = Utc::now();
268
269 any::<Vec<(u16, MetricValue)>>().prop_map(move |values| {
270 let mut unique_metrics = HashSet::new();
271 values
272 .into_iter()
273 .map(|(id, value)| {
274 let name = format!("{}-{}", value.as_name(), id);
275 Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts))
276 })
277 .filter(|metric| {
280 matches!(metric.value(), MetricValue::Counter { .. })
281 || unique_metrics.insert(metric.series().clone())
282 })
283 .collect()
284 })
285 }
286
287 fn create_counter(name: &str, value: f64) -> Metric {
288 Metric::new(
289 name,
290 MetricKind::Incremental,
291 MetricValue::Counter { value },
292 )
293 }
294
295 fn create_gauge(name: &str, value: f64) -> Metric {
296 Metric::new(name, MetricKind::Incremental, MetricValue::Gauge { value })
297 }
298
299 #[test]
300 fn collapse_no_metrics() {
301 let input = Vec::new();
302 let expected = input.clone();
303 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
304
305 assert_eq!(expected, actual);
306 }
307
308 #[test]
309 fn collapse_single_metric() {
310 let input = vec![create_counter("basic", 42.0)];
311 let expected = input.clone();
312 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
313
314 assert_eq!(expected, actual);
315 }
316
317 #[test]
318 fn collapse_identical_metrics_gauge() {
319 let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)];
320 let expected = input.clone();
321 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
322
323 assert_eq!(expected, actual);
324
325 let gauge_value = 41.0;
326 let input = vec![
327 create_gauge("basic", gauge_value),
328 create_gauge("basic", gauge_value),
329 create_gauge("basic", gauge_value),
330 create_gauge("basic", gauge_value),
331 create_gauge("basic", gauge_value),
332 create_gauge("basic", gauge_value),
333 create_gauge("basic", gauge_value),
334 ];
335 let expected = input.clone();
336 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
337
338 assert_eq!(expected, actual);
339 }
340
341 #[test]
342 fn collapse_identical_metrics_counter() {
343 let counter_value = 42.0;
344 let input = vec![
345 create_counter("basic", counter_value),
346 create_counter("basic", counter_value),
347 create_counter("basic", counter_value),
348 create_counter("basic", counter_value),
349 create_counter("basic", counter_value),
350 create_counter("basic", counter_value),
351 create_counter("basic", counter_value),
352 ];
353
354 let expected_counter_value = input.len() as f64 * counter_value;
355 let expected = vec![create_counter("basic", expected_counter_value)];
356 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
357
358 assert_eq!(expected, actual);
359 }
360
361 #[test]
362 fn collapse_identical_metrics_counter_unsorted() {
363 let gauge_value = 1.0;
364 let counter_value = 42.0;
365 let input = vec![
366 create_gauge("gauge", gauge_value),
367 create_counter("basic", counter_value),
368 create_counter("basic", counter_value),
369 create_counter("basic", counter_value),
370 create_gauge("gauge", gauge_value),
371 create_counter("basic", counter_value),
372 create_counter("basic", counter_value),
373 create_counter("basic", counter_value),
374 create_counter("basic", counter_value),
375 ];
376
377 let expected_counter_value = (input.len() - 2) as f64 * counter_value;
378 let expected = vec![
379 create_counter("basic", expected_counter_value),
380 create_gauge("gauge", gauge_value),
381 create_gauge("gauge", gauge_value),
382 ];
383 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
384
385 assert_eq!(expected, actual);
386 }
387
388 #[test]
389 fn collapse_identical_metrics_multiple_timestamps() {
390 let ts_1 = Utc::now() - Duration::from_secs(5);
391 let ts_2 = ts_1 - Duration::from_secs(5);
392 let counter_value = 42.0;
393 let input = vec![
394 create_counter("basic", counter_value),
395 create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
396 create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
397 create_counter("basic", counter_value),
398 create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
399 create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
400 create_counter("basic", counter_value),
401 ];
402
403 let expected = vec![
404 create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)),
405 create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)),
406 create_counter("basic", counter_value * 3.),
407 ];
408 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
409
410 assert_eq!(expected, actual);
411 }
412
413 #[test]
414 fn collapse_identical_metrics_with_tags() {
415 let counter_value = 42.0;
416 let input = vec![
417 create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
418 create_counter("basic", counter_value).with_tags(Some(metric_tags!(
419 "a" => "a",
420 "b" => "b",
421 ))),
422 create_counter("basic", counter_value),
423 create_counter("basic", counter_value).with_tags(Some(metric_tags!(
424 "b" => "b",
425 "a" => "a",
426 ))),
427 create_counter("basic", counter_value),
428 create_counter("basic", counter_value),
429 create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
430 ];
431
432 let expected = vec![
433 create_counter("basic", counter_value * 3.),
434 create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!("a" => "a"))),
435 create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!(
436 "a" => "a",
437 "b" => "b",
438 ))),
439 ];
440 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
441
442 assert_eq!(expected, actual);
443 }
444
445 #[derive(Eq, Ord, PartialEq, PartialOrd)]
446 struct MetricCollapseSort {
447 metric_type: &'static str,
448 metric_name: String,
449 metric_ts: Option<DateTime<Utc>>,
450 }
451
452 impl MetricCollapseSort {
453 fn from_metric(metric: &Metric) -> Self {
454 Self {
455 metric_type: metric.value().as_name(),
456 metric_name: metric.name().to_string(),
457 metric_ts: metric.timestamp(),
458 }
459 }
460 }
461
462 fn collapse_dedup_fn(left: &mut Metric, right: &mut Metric) -> bool {
463 let series_eq = left.series() == right.series();
464 let timestamp_eq = left.timestamp() == right.timestamp();
465 if !series_eq || !timestamp_eq {
466 return false;
467 }
468
469 match (left.value_mut(), right.value_mut()) {
470 (
471 MetricValue::Counter { value: left_value },
472 MetricValue::Counter { value: right_value },
473 ) => {
474 *right_value += *left_value;
477 true
478 }
479 _ => false,
481 }
482 }
483
484 proptest! {
485 #[test]
486 fn test_counter_collapse(input in arb_collapsible_metrics()) {
487 let mut expected_output = input.clone();
488 expected_output.sort_by_cached_key(MetricCollapseSort::from_metric);
489 expected_output.dedup_by(collapse_dedup_fn);
490
491 let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input);
492
493 prop_assert_eq!(expected_output, actual_output);
494 }
495 }
496}