1use std::{fmt, sync::Arc};
2
3use async_trait::async_trait;
4use chrono::Utc;
5use futures_util::{
6 future::ready,
7 stream::{self, BoxStream},
8 StreamExt,
9};
10use tower::Service;
11use vector_lib::stream::{BatcherSettings, DriverResponse};
12use vector_lib::{
13 event::{Event, Metric, MetricValue},
14 partition::Partitioner,
15 sink::StreamSink,
16};
17
18use super::{
19 config::DatadogMetricsEndpoint, normalizer::DatadogMetricsNormalizer,
20 request_builder::DatadogMetricsRequestBuilder, service::DatadogMetricsRequest,
21};
22use crate::{
23 internal_events::DatadogMetricsEncodingError,
24 sinks::util::{
25 buffer::metrics::{AggregatedSummarySplitter, MetricSplitter},
26 request_builder::default_request_builder_concurrency_limit,
27 SinkBuilderExt,
28 },
29};
30
31struct DatadogMetricsTypePartitioner;
37
38impl Partitioner for DatadogMetricsTypePartitioner {
39 type Item = Metric;
40 type Key = (Option<Arc<str>>, DatadogMetricsEndpoint);
41
42 fn partition(&self, item: &Self::Item) -> Self::Key {
43 let endpoint = match item.data().value() {
44 MetricValue::Counter { .. } => DatadogMetricsEndpoint::series(),
45 MetricValue::Gauge { .. } => DatadogMetricsEndpoint::series(),
46 MetricValue::Set { .. } => DatadogMetricsEndpoint::series(),
47 MetricValue::Distribution { .. } => DatadogMetricsEndpoint::Sketches,
48 MetricValue::AggregatedHistogram { .. } => DatadogMetricsEndpoint::Sketches,
49 MetricValue::AggregatedSummary { .. } => DatadogMetricsEndpoint::series(),
51 MetricValue::Sketch { .. } => DatadogMetricsEndpoint::Sketches,
52 };
53 (item.metadata().datadog_api_key(), endpoint)
54 }
55}
56
57pub(crate) struct DatadogMetricsSink<S> {
58 service: S,
59 request_builder: DatadogMetricsRequestBuilder,
60 batch_settings: BatcherSettings,
61 protocol: String,
62}
63
64impl<S> DatadogMetricsSink<S>
65where
66 S: Service<DatadogMetricsRequest> + Send,
67 S::Error: fmt::Debug + Send + 'static,
68 S::Future: Send + 'static,
69 S::Response: DriverResponse,
70{
71 pub const fn new(
73 service: S,
74 request_builder: DatadogMetricsRequestBuilder,
75 batch_settings: BatcherSettings,
76 protocol: String,
77 ) -> Self {
78 DatadogMetricsSink {
79 service,
80 request_builder,
81 batch_settings,
82 protocol,
83 }
84 }
85
86 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
87 let mut splitter: MetricSplitter<AggregatedSummarySplitter> = MetricSplitter::default();
88 let batch_settings = self.batch_settings;
89
90 input
91 .filter_map(|event| ready(event.try_into_metric()))
93 .flat_map(|metric| stream::iter(splitter.split(metric)))
97 .normalized_with_default::<DatadogMetricsNormalizer>()
101 .batched_partitioned(DatadogMetricsTypePartitioner, || {
104 batch_settings.as_byte_size_config()
105 })
106 .concurrent_map(
112 default_request_builder_concurrency_limit(),
113 |((api_key, endpoint), metrics)| {
114 Box::pin(async move {
115 let collapsed_metrics =
116 sort_and_collapse_counters_by_series_and_timestamp(metrics);
117 ((api_key, endpoint), collapsed_metrics)
118 })
119 },
120 )
121 .incremental_request_builder(self.request_builder)
126 .flat_map(stream::iter)
128 .filter_map(|request| async move {
130 match request {
131 Err(e) => {
132 let (reason, error_code, dropped_events) = e.into_parts();
133 emit!(DatadogMetricsEncodingError {
134 reason: reason.as_str(),
135 error_code,
136 dropped_events: dropped_events as usize,
137 });
138 None
139 }
140 Ok(req) => Some(req),
141 }
142 })
143 .into_driver(self.service)
146 .protocol(self.protocol)
147 .run()
148 .await
149 }
150}
151
152#[async_trait]
153impl<S> StreamSink<Event> for DatadogMetricsSink<S>
154where
155 S: Service<DatadogMetricsRequest> + Send,
156 S::Error: fmt::Debug + Send + 'static,
157 S::Future: Send + 'static,
158 S::Response: DriverResponse,
159{
160 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
161 self.run_inner(input).await
165 }
166}
167
168fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Metric> {
175 let now_ts = Utc::now().timestamp();
176
177 metrics.sort_unstable_by(|a, b| {
181 (
182 a.value().as_name(),
183 a.series(),
184 a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
185 )
186 .cmp(&(
187 a.value().as_name(),
188 b.series(),
189 b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
190 ))
191 });
192
193 metrics.dedup_by(|left, right| {
198 if left.series() != right.series() {
199 return false;
200 }
201
202 let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
203 let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
204 if left_ts != right_ts {
205 return false;
206 }
207
208 if let (
210 MetricValue::Counter { value: left_value },
211 MetricValue::Counter { value: right_value },
212 ) = (left.value(), right.value_mut())
213 {
214 *right_value += left_value;
217 right
218 .metadata_mut()
219 .merge_finalizers(left.metadata_mut().take_finalizers());
220
221 true
222 } else {
223 false
224 }
225 });
226
227 metrics
228}
229
230#[cfg(test)]
231mod tests {
232 use std::{collections::HashSet, time::Duration};
233
234 use chrono::{DateTime, Utc};
235 use proptest::prelude::*;
236 use vector_lib::{
237 event::{Metric, MetricKind, MetricValue},
238 metric_tags,
239 };
240
241 use super::sort_and_collapse_counters_by_series_and_timestamp;
242
243 fn arb_collapsible_metrics() -> impl Strategy<Value = Vec<Metric>> {
244 let ts = Utc::now();
245
246 any::<Vec<(u16, MetricValue)>>().prop_map(move |values| {
247 let mut unique_metrics = HashSet::new();
248 values
249 .into_iter()
250 .map(|(id, value)| {
251 let name = format!("{}-{}", value.as_name(), id);
252 Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts))
253 })
254 .filter(|metric| {
257 matches!(metric.value(), MetricValue::Counter { .. })
258 || unique_metrics.insert(metric.series().clone())
259 })
260 .collect()
261 })
262 }
263
264 fn create_counter(name: &str, value: f64) -> Metric {
265 Metric::new(
266 name,
267 MetricKind::Incremental,
268 MetricValue::Counter { value },
269 )
270 }
271
272 fn create_gauge(name: &str, value: f64) -> Metric {
273 Metric::new(name, MetricKind::Incremental, MetricValue::Gauge { value })
274 }
275
276 #[test]
277 fn collapse_no_metrics() {
278 let input = Vec::new();
279 let expected = input.clone();
280 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
281
282 assert_eq!(expected, actual);
283 }
284
285 #[test]
286 fn collapse_single_metric() {
287 let input = vec![create_counter("basic", 42.0)];
288 let expected = input.clone();
289 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
290
291 assert_eq!(expected, actual);
292 }
293
294 #[test]
295 fn collapse_identical_metrics_gauge() {
296 let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)];
297 let expected = input.clone();
298 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
299
300 assert_eq!(expected, actual);
301
302 let gauge_value = 41.0;
303 let input = vec![
304 create_gauge("basic", gauge_value),
305 create_gauge("basic", gauge_value),
306 create_gauge("basic", gauge_value),
307 create_gauge("basic", gauge_value),
308 create_gauge("basic", gauge_value),
309 create_gauge("basic", gauge_value),
310 create_gauge("basic", gauge_value),
311 ];
312 let expected = input.clone();
313 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
314
315 assert_eq!(expected, actual);
316 }
317
318 #[test]
319 fn collapse_identical_metrics_counter() {
320 let counter_value = 42.0;
321 let input = vec![
322 create_counter("basic", counter_value),
323 create_counter("basic", counter_value),
324 create_counter("basic", counter_value),
325 create_counter("basic", counter_value),
326 create_counter("basic", counter_value),
327 create_counter("basic", counter_value),
328 create_counter("basic", counter_value),
329 ];
330
331 let expected_counter_value = input.len() as f64 * counter_value;
332 let expected = vec![create_counter("basic", expected_counter_value)];
333 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
334
335 assert_eq!(expected, actual);
336 }
337
338 #[test]
339 fn collapse_identical_metrics_counter_unsorted() {
340 let gauge_value = 1.0;
341 let counter_value = 42.0;
342 let input = vec![
343 create_gauge("gauge", gauge_value),
344 create_counter("basic", counter_value),
345 create_counter("basic", counter_value),
346 create_counter("basic", counter_value),
347 create_gauge("gauge", gauge_value),
348 create_counter("basic", counter_value),
349 create_counter("basic", counter_value),
350 create_counter("basic", counter_value),
351 create_counter("basic", counter_value),
352 ];
353
354 let expected_counter_value = (input.len() - 2) as f64 * counter_value;
355 let expected = vec![
356 create_counter("basic", expected_counter_value),
357 create_gauge("gauge", gauge_value),
358 create_gauge("gauge", gauge_value),
359 ];
360 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
361
362 assert_eq!(expected, actual);
363 }
364
365 #[test]
366 fn collapse_identical_metrics_multiple_timestamps() {
367 let ts_1 = Utc::now() - Duration::from_secs(5);
368 let ts_2 = ts_1 - Duration::from_secs(5);
369 let counter_value = 42.0;
370 let input = vec![
371 create_counter("basic", counter_value),
372 create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
373 create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
374 create_counter("basic", counter_value),
375 create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
376 create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
377 create_counter("basic", counter_value),
378 ];
379
380 let expected = vec![
381 create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)),
382 create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)),
383 create_counter("basic", counter_value * 3.),
384 ];
385 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
386
387 assert_eq!(expected, actual);
388 }
389
390 #[test]
391 fn collapse_identical_metrics_with_tags() {
392 let counter_value = 42.0;
393 let input = vec![
394 create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
395 create_counter("basic", counter_value).with_tags(Some(metric_tags!(
396 "a" => "a",
397 "b" => "b",
398 ))),
399 create_counter("basic", counter_value),
400 create_counter("basic", counter_value).with_tags(Some(metric_tags!(
401 "b" => "b",
402 "a" => "a",
403 ))),
404 create_counter("basic", counter_value),
405 create_counter("basic", counter_value),
406 create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
407 ];
408
409 let expected = vec![
410 create_counter("basic", counter_value * 3.),
411 create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!("a" => "a"))),
412 create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!(
413 "a" => "a",
414 "b" => "b",
415 ))),
416 ];
417 let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
418
419 assert_eq!(expected, actual);
420 }
421
422 #[derive(Eq, Ord, PartialEq, PartialOrd)]
423 struct MetricCollapseSort {
424 metric_type: &'static str,
425 metric_name: String,
426 metric_ts: Option<DateTime<Utc>>,
427 }
428
429 impl MetricCollapseSort {
430 fn from_metric(metric: &Metric) -> Self {
431 Self {
432 metric_type: metric.value().as_name(),
433 metric_name: metric.name().to_string(),
434 metric_ts: metric.timestamp(),
435 }
436 }
437 }
438
439 fn collapse_dedup_fn(left: &mut Metric, right: &mut Metric) -> bool {
440 let series_eq = left.series() == right.series();
441 let timestamp_eq = left.timestamp() == right.timestamp();
442 if !series_eq || !timestamp_eq {
443 return false;
444 }
445
446 match (left.value_mut(), right.value_mut()) {
447 (
448 MetricValue::Counter { value: left_value },
449 MetricValue::Counter { value: right_value },
450 ) => {
451 *right_value += *left_value;
454 true
455 }
456 _ => false,
458 }
459 }
460
461 proptest! {
462 #[test]
463 fn test_counter_collapse(input in arb_collapsible_metrics()) {
464 let mut expected_output = input.clone();
465 expected_output.sort_by_cached_key(MetricCollapseSort::from_metric);
466 expected_output.dedup_by(collapse_dedup_fn);
467
468 let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input);
469
470 prop_assert_eq!(expected_output, actual_output);
471 }
472 }
473}