vector/sinks/util/buffer/metrics/
split.rs

1use std::collections::VecDeque;
2
3use vector_lib::event::{metric::MetricData, Metric, MetricValue};
4
5#[allow(clippy::large_enum_variant)]
6enum SplitState {
7    Single(Option<Metric>),
8    Multiple(VecDeque<Metric>),
9}
10
11/// An iterator that returns the result of a metric split operation.
12pub struct SplitIterator {
13    state: SplitState,
14}
15
16impl SplitIterator {
17    /// Creates an iterator for a single metric.
18    pub const fn single(metric: Metric) -> Self {
19        Self {
20            state: SplitState::Single(Some(metric)),
21        }
22    }
23
24    /// Creates an iterator for multiple metrics.
25    pub fn multiple<I>(metrics: I) -> Self
26    where
27        I: Into<VecDeque<Metric>>,
28    {
29        Self {
30            state: SplitState::Multiple(metrics.into()),
31        }
32    }
33}
34
35impl Iterator for SplitIterator {
36    type Item = Metric;
37
38    fn next(&mut self) -> Option<Self::Item> {
39        match &mut self.state {
40            SplitState::Single(metric) => metric.take(),
41            SplitState::Multiple(metrics) => metrics.pop_front(),
42        }
43    }
44}
45
46/// Splits a metric into potentially multiple metrics.
47///
48/// In some cases, a single metric may represent multiple fundamental metrics: an aggregated summary or histogram can
49/// represent a count, sum, and subtotals for a given measurement. These metrics may be able to be handled
50/// natively/directly in a sink, but in other cases, those fundamental metrics may need to be extracted and operated on individually.
51///
52/// This trait defines a simple interface for defining custom rules about what metrics to split and when to split them.
53pub trait MetricSplit {
54    /// Attempts to split the metric.
55    ///
56    /// The returned iterator will either return only the input metric if no splitting occurred, or all resulting
57    /// metrics that were created as a result of the split.
58    fn split(&mut self, input: Metric) -> SplitIterator;
59}
60
61/// A self-contained metric splitter.
62///
63/// The splitter state is stored internally, and it can only be created from a splitter implementation that is either
64/// `Default` or is constructed ahead of time, so it is primarily useful for constructing a usable splitter via implicit
65/// conversion methods or when no special parameters are required for configuring the underlying splitter.
66pub struct MetricSplitter<S> {
67    splitter: S,
68}
69
70impl<S: MetricSplit> MetricSplitter<S> {
71    /// Attempts to split the metric.
72    ///
73    /// For more information about splitting, see the documentation for [`MetricSplit::split`].
74    pub fn split(&mut self, input: Metric) -> SplitIterator {
75        self.splitter.split(input)
76    }
77}
78
79impl<S: Default> Default for MetricSplitter<S> {
80    fn default() -> Self {
81        Self {
82            splitter: S::default(),
83        }
84    }
85}
86
87impl<S> From<S> for MetricSplitter<S> {
88    fn from(splitter: S) -> Self {
89        Self { splitter }
90    }
91}
92
93/// A splitter that separates an aggregated summary into its various parts.
94///
95/// Generally speaking, all metric types supported by Vector have way to be added to and removed from other instances of
96/// themselves, such as merging two counters by adding together their values, or merging two distributions simply be
97/// adding all of their samples together.
98///
99/// However, one particular metric type is not amenable to these operations: aggregated summaries. Hailing from
100/// Prometheus, aggregated summaries are meant to be client-side generated versions of summary data about a histogram:
101/// count, sum, and various quantiles. As quantiles themselves cannot simply be added to or removed from each other
102/// without entirely altering the statistical significancy of their value, we often do not do anything with them except
103/// forwards them on directly as their individual pieces, or even drop them.
104///
105/// However, as many sinks must do this, this splitter exists to bundle the operation in a reusable piece of code that
106/// all sinks needing to do so can share.
107///
108/// All other metric types are passed through as-is.
109#[derive(Clone, Copy, Debug, Default)]
110pub struct AggregatedSummarySplitter;
111
112impl MetricSplit for AggregatedSummarySplitter {
113    fn split(&mut self, input: Metric) -> SplitIterator {
114        let (series, data, metadata) = input.into_parts();
115        match data.value() {
116            // If it's not an aggregated summary, just send it on semi-unchanged. :)
117            MetricValue::Counter { .. }
118            | MetricValue::Gauge { .. }
119            | MetricValue::Set { .. }
120            | MetricValue::Distribution { .. }
121            | MetricValue::AggregatedHistogram { .. }
122            | MetricValue::Sketch { .. } => {
123                SplitIterator::single(Metric::from_parts(series, data, metadata))
124            }
125            MetricValue::AggregatedSummary { .. } => {
126                // Further extract the aggregated summary components so we can generate our multiple metrics.
127                let (time, kind, value) = data.into_parts();
128                let (quantiles, count, sum) = match value {
129                    MetricValue::AggregatedSummary {
130                        quantiles,
131                        count,
132                        sum,
133                    } => (quantiles, count, sum),
134                    _ => unreachable!("metric value must be aggregated summary to be here"),
135                };
136
137                // We generate one metric for the count, one metric for the sum, and one metric for each quantile. We
138                // clone the timestamp, kind, metadata, etc, to keep everything the same as it was on the way in.
139                let mut metrics = VecDeque::new();
140
141                let mut count_series = series.clone();
142                count_series.name_mut().name_mut().push_str("_count");
143                let count_data = MetricData::from_parts(
144                    time,
145                    kind,
146                    MetricValue::Counter {
147                        value: count as f64,
148                    },
149                );
150                let count_metadata = metadata.clone();
151
152                metrics.push_back(Metric::from_parts(count_series, count_data, count_metadata));
153
154                for quantile in quantiles {
155                    let mut quantile_series = series.clone();
156                    quantile_series
157                        .replace_tag(String::from("quantile"), quantile.to_quantile_string());
158                    let quantile_data = MetricData::from_parts(
159                        time,
160                        kind,
161                        MetricValue::Gauge {
162                            value: quantile.value,
163                        },
164                    );
165                    let quantile_metadata = metadata.clone();
166
167                    metrics.push_back(Metric::from_parts(
168                        quantile_series,
169                        quantile_data,
170                        quantile_metadata,
171                    ));
172                }
173
174                let mut sum_series = series;
175                sum_series.name_mut().name_mut().push_str("_sum");
176                let sum_data =
177                    MetricData::from_parts(time, kind, MetricValue::Counter { value: sum });
178                let sum_metadata = metadata;
179
180                metrics.push_back(Metric::from_parts(sum_series, sum_data, sum_metadata));
181
182                SplitIterator::multiple(metrics)
183            }
184        }
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use std::collections::BTreeSet;
191
192    use vector_lib::event::{
193        metric::{Bucket, MetricTags, Quantile, Sample},
194        Metric, MetricKind, MetricValue, StatisticKind,
195    };
196
197    use super::{AggregatedSummarySplitter, MetricSplitter};
198
199    #[test]
200    fn test_agg_summary_split() {
201        let mut splitter: MetricSplitter<AggregatedSummarySplitter> = MetricSplitter::default();
202
203        let counter = Metric::new(
204            "counter",
205            MetricKind::Incremental,
206            MetricValue::Counter { value: 42.0 },
207        );
208        let gauge = Metric::new(
209            "gauge",
210            MetricKind::Absolute,
211            MetricValue::Gauge { value: 3.15 },
212        );
213        let set = Metric::new(
214            "set",
215            MetricKind::Absolute,
216            MetricValue::Set {
217                values: BTreeSet::from([String::from("foobar")]),
218            },
219        );
220        let distribution = Metric::new(
221            "distribution",
222            MetricKind::Incremental,
223            MetricValue::Distribution {
224                statistic: StatisticKind::Histogram,
225                samples: vec![Sample {
226                    value: 13.37,
227                    rate: 10,
228                }],
229            },
230        );
231        let agg_histo = Metric::new(
232            "agg_histo",
233            MetricKind::Absolute,
234            MetricValue::AggregatedHistogram {
235                buckets: vec![
236                    Bucket {
237                        upper_limit: 10.0,
238                        count: 5,
239                    },
240                    Bucket {
241                        upper_limit: 25.0,
242                        count: 2,
243                    },
244                ],
245                count: 7,
246                sum: 100.0,
247            },
248        );
249        let agg_summary = Metric::new(
250            "agg_summary",
251            MetricKind::Absolute,
252            MetricValue::AggregatedSummary {
253                quantiles: vec![
254                    Quantile {
255                        quantile: 0.05,
256                        value: 10.0,
257                    },
258                    Quantile {
259                        quantile: 0.95,
260                        value: 25.0,
261                    },
262                ],
263                count: 7,
264                sum: 100.0,
265            },
266        );
267
268        let quantile_tag = |q: f64| -> Option<MetricTags> {
269            let quantile = Quantile {
270                quantile: q,
271                value: 0.0,
272            };
273
274            Some(
275                vec![("quantile".to_owned(), quantile.to_quantile_string())]
276                    .into_iter()
277                    .collect(),
278            )
279        };
280
281        let agg_summary_splits = vec![
282            Metric::new(
283                "agg_summary_count",
284                MetricKind::Absolute,
285                MetricValue::Counter { value: 7.0 },
286            ),
287            Metric::new(
288                "agg_summary",
289                MetricKind::Absolute,
290                MetricValue::Gauge { value: 10.0 },
291            )
292            .with_tags(quantile_tag(0.05)),
293            Metric::new(
294                "agg_summary",
295                MetricKind::Absolute,
296                MetricValue::Gauge { value: 25.0 },
297            )
298            .with_tags(quantile_tag(0.95)),
299            Metric::new(
300                "agg_summary_sum",
301                MetricKind::Absolute,
302                MetricValue::Counter { value: 100.0 },
303            ),
304        ];
305
306        let cases = &[
307            (counter.clone(), vec![counter]),
308            (gauge.clone(), vec![gauge]),
309            (set.clone(), vec![set]),
310            (distribution.clone(), vec![distribution]),
311            (agg_histo.clone(), vec![agg_histo]),
312            (agg_summary, agg_summary_splits),
313        ];
314
315        for (input, expected) in cases {
316            let actual = splitter.split(input.clone()).collect::<Vec<_>>();
317            assert_eq!(expected.clone(), actual);
318        }
319    }
320}