use std::{fmt, sync::Arc};
use async_trait::async_trait;
use chrono::Utc;
use futures_util::{
future::ready,
stream::{self, BoxStream},
StreamExt,
};
use tower::Service;
use vector_lib::stream::{BatcherSettings, DriverResponse};
use vector_lib::{
event::{Event, Metric, MetricValue},
partition::Partitioner,
sink::StreamSink,
};
use super::{
config::DatadogMetricsEndpoint, normalizer::DatadogMetricsNormalizer,
request_builder::DatadogMetricsRequestBuilder, service::DatadogMetricsRequest,
};
use crate::{
internal_events::DatadogMetricsEncodingError,
sinks::util::{
buffer::metrics::{AggregatedSummarySplitter, MetricSplitter},
request_builder::default_request_builder_concurrency_limit,
SinkBuilderExt,
},
};
struct DatadogMetricsTypePartitioner;
impl Partitioner for DatadogMetricsTypePartitioner {
type Item = Metric;
type Key = (Option<Arc<str>>, DatadogMetricsEndpoint);
fn partition(&self, item: &Self::Item) -> Self::Key {
let endpoint = match item.data().value() {
MetricValue::Counter { .. } => DatadogMetricsEndpoint::series(),
MetricValue::Gauge { .. } => DatadogMetricsEndpoint::series(),
MetricValue::Set { .. } => DatadogMetricsEndpoint::series(),
MetricValue::Distribution { .. } => DatadogMetricsEndpoint::Sketches,
MetricValue::AggregatedHistogram { .. } => DatadogMetricsEndpoint::Sketches,
MetricValue::AggregatedSummary { .. } => DatadogMetricsEndpoint::series(),
MetricValue::Sketch { .. } => DatadogMetricsEndpoint::Sketches,
};
(item.metadata().datadog_api_key(), endpoint)
}
}
pub(crate) struct DatadogMetricsSink<S> {
service: S,
request_builder: DatadogMetricsRequestBuilder,
batch_settings: BatcherSettings,
protocol: String,
}
impl<S> DatadogMetricsSink<S>
where
S: Service<DatadogMetricsRequest> + Send,
S::Error: fmt::Debug + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse,
{
pub const fn new(
service: S,
request_builder: DatadogMetricsRequestBuilder,
batch_settings: BatcherSettings,
protocol: String,
) -> Self {
DatadogMetricsSink {
service,
request_builder,
batch_settings,
protocol,
}
}
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let mut splitter: MetricSplitter<AggregatedSummarySplitter> = MetricSplitter::default();
let batch_settings = self.batch_settings;
input
.filter_map(|event| ready(event.try_into_metric()))
.flat_map(|metric| stream::iter(splitter.split(metric)))
.normalized_with_default::<DatadogMetricsNormalizer>()
.batched_partitioned(DatadogMetricsTypePartitioner, || {
batch_settings.as_byte_size_config()
})
.concurrent_map(
default_request_builder_concurrency_limit(),
|((api_key, endpoint), metrics)| {
Box::pin(async move {
let collapsed_metrics =
sort_and_collapse_counters_by_series_and_timestamp(metrics);
((api_key, endpoint), collapsed_metrics)
})
},
)
.incremental_request_builder(self.request_builder)
.flat_map(stream::iter)
.filter_map(|request| async move {
match request {
Err(e) => {
let (reason, error_code, dropped_events) = e.into_parts();
emit!(DatadogMetricsEncodingError {
reason: reason.as_str(),
error_code,
dropped_events: dropped_events as usize,
});
None
}
Ok(req) => Some(req),
}
})
.into_driver(self.service)
.protocol(self.protocol)
.run()
.await
}
}
#[async_trait]
impl<S> StreamSink<Event> for DatadogMetricsSink<S>
where
S: Service<DatadogMetricsRequest> + Send,
S::Error: fmt::Debug + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse,
{
async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
self.run_inner(input).await
}
}
fn sort_and_collapse_counters_by_series_and_timestamp(mut metrics: Vec<Metric>) -> Vec<Metric> {
let now_ts = Utc::now().timestamp();
metrics.sort_unstable_by(|a, b| {
(
a.value().as_name(),
a.series(),
a.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
)
.cmp(&(
a.value().as_name(),
b.series(),
b.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts),
))
});
metrics.dedup_by(|left, right| {
if left.series() != right.series() {
return false;
}
let left_ts = left.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
let right_ts = right.timestamp().map(|dt| dt.timestamp()).unwrap_or(now_ts);
if left_ts != right_ts {
return false;
}
if let (
MetricValue::Counter { value: left_value },
MetricValue::Counter { value: right_value },
) = (left.value(), right.value_mut())
{
*right_value += left_value;
right
.metadata_mut()
.merge_finalizers(left.metadata_mut().take_finalizers());
true
} else {
false
}
});
metrics
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, time::Duration};
use chrono::{DateTime, Utc};
use proptest::prelude::*;
use vector_lib::{
event::{Metric, MetricKind, MetricValue},
metric_tags,
};
use super::sort_and_collapse_counters_by_series_and_timestamp;
fn arb_collapsible_metrics() -> impl Strategy<Value = Vec<Metric>> {
let ts = Utc::now();
any::<Vec<(u16, MetricValue)>>().prop_map(move |values| {
let mut unique_metrics = HashSet::new();
values
.into_iter()
.map(|(id, value)| {
let name = format!("{}-{}", value.as_name(), id);
Metric::new(name, MetricKind::Incremental, value).with_timestamp(Some(ts))
})
.filter(|metric| {
matches!(metric.value(), MetricValue::Counter { .. })
|| unique_metrics.insert(metric.series().clone())
})
.collect()
})
}
fn create_counter(name: &str, value: f64) -> Metric {
Metric::new(
name,
MetricKind::Incremental,
MetricValue::Counter { value },
)
}
fn create_gauge(name: &str, value: f64) -> Metric {
Metric::new(name, MetricKind::Incremental, MetricValue::Gauge { value })
}
#[test]
fn collapse_no_metrics() {
let input = Vec::new();
let expected = input.clone();
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
}
#[test]
fn collapse_single_metric() {
let input = vec![create_counter("basic", 42.0)];
let expected = input.clone();
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
}
#[test]
fn collapse_identical_metrics_gauge() {
let input = vec![create_gauge("basic", 42.0), create_gauge("basic", 42.0)];
let expected = input.clone();
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
let gauge_value = 41.0;
let input = vec![
create_gauge("basic", gauge_value),
create_gauge("basic", gauge_value),
create_gauge("basic", gauge_value),
create_gauge("basic", gauge_value),
create_gauge("basic", gauge_value),
create_gauge("basic", gauge_value),
create_gauge("basic", gauge_value),
];
let expected = input.clone();
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
}
#[test]
fn collapse_identical_metrics_counter() {
let counter_value = 42.0;
let input = vec![
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
];
let expected_counter_value = input.len() as f64 * counter_value;
let expected = vec![create_counter("basic", expected_counter_value)];
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
}
#[test]
fn collapse_identical_metrics_counter_unsorted() {
let gauge_value = 1.0;
let counter_value = 42.0;
let input = vec![
create_gauge("gauge", gauge_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_gauge("gauge", gauge_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
];
let expected_counter_value = (input.len() - 2) as f64 * counter_value;
let expected = vec![
create_counter("basic", expected_counter_value),
create_gauge("gauge", gauge_value),
create_gauge("gauge", gauge_value),
];
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
}
#[test]
fn collapse_identical_metrics_multiple_timestamps() {
let ts_1 = Utc::now() - Duration::from_secs(5);
let ts_2 = ts_1 - Duration::from_secs(5);
let counter_value = 42.0;
let input = vec![
create_counter("basic", counter_value),
create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
create_counter("basic", counter_value),
create_counter("basic", counter_value).with_timestamp(Some(ts_2)),
create_counter("basic", counter_value).with_timestamp(Some(ts_1)),
create_counter("basic", counter_value),
];
let expected = vec![
create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_2)),
create_counter("basic", counter_value * 2.).with_timestamp(Some(ts_1)),
create_counter("basic", counter_value * 3.),
];
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
}
#[test]
fn collapse_identical_metrics_with_tags() {
let counter_value = 42.0;
let input = vec![
create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
create_counter("basic", counter_value).with_tags(Some(metric_tags!(
"a" => "a",
"b" => "b",
))),
create_counter("basic", counter_value),
create_counter("basic", counter_value).with_tags(Some(metric_tags!(
"b" => "b",
"a" => "a",
))),
create_counter("basic", counter_value),
create_counter("basic", counter_value),
create_counter("basic", counter_value).with_tags(Some(metric_tags!("a" => "a"))),
];
let expected = vec![
create_counter("basic", counter_value * 3.),
create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!("a" => "a"))),
create_counter("basic", counter_value * 2.).with_tags(Some(metric_tags!(
"a" => "a",
"b" => "b",
))),
];
let actual = sort_and_collapse_counters_by_series_and_timestamp(input);
assert_eq!(expected, actual);
}
#[derive(Eq, Ord, PartialEq, PartialOrd)]
struct MetricCollapseSort {
metric_type: &'static str,
metric_name: String,
metric_ts: Option<DateTime<Utc>>,
}
impl MetricCollapseSort {
fn from_metric(metric: &Metric) -> Self {
Self {
metric_type: metric.value().as_name(),
metric_name: metric.name().to_string(),
metric_ts: metric.timestamp(),
}
}
}
fn collapse_dedup_fn(left: &mut Metric, right: &mut Metric) -> bool {
let series_eq = left.series() == right.series();
let timestamp_eq = left.timestamp() == right.timestamp();
if !series_eq || !timestamp_eq {
return false;
}
match (left.value_mut(), right.value_mut()) {
(
MetricValue::Counter { value: left_value },
MetricValue::Counter { value: right_value },
) => {
*right_value += *left_value;
true
}
_ => false,
}
}
proptest! {
#[test]
fn test_counter_collapse(input in arb_collapsible_metrics()) {
let mut expected_output = input.clone();
expected_output.sort_by_cached_key(MetricCollapseSort::from_metric);
expected_output.dedup_by(collapse_dedup_fn);
let actual_output = sort_and_collapse_counters_by_series_and_timestamp(input);
prop_assert_eq!(expected_output, actual_output);
}
}
}