vector/sinks/util/buffer/metrics/
split.rs1use std::collections::VecDeque;
2
3use vector_lib::event::{metric::MetricData, Metric, MetricValue};
4
5#[allow(clippy::large_enum_variant)]
6enum SplitState {
7 Single(Option<Metric>),
8 Multiple(VecDeque<Metric>),
9}
10
11pub struct SplitIterator {
13 state: SplitState,
14}
15
16impl SplitIterator {
17 pub const fn single(metric: Metric) -> Self {
19 Self {
20 state: SplitState::Single(Some(metric)),
21 }
22 }
23
24 pub fn multiple<I>(metrics: I) -> Self
26 where
27 I: Into<VecDeque<Metric>>,
28 {
29 Self {
30 state: SplitState::Multiple(metrics.into()),
31 }
32 }
33}
34
35impl Iterator for SplitIterator {
36 type Item = Metric;
37
38 fn next(&mut self) -> Option<Self::Item> {
39 match &mut self.state {
40 SplitState::Single(metric) => metric.take(),
41 SplitState::Multiple(metrics) => metrics.pop_front(),
42 }
43 }
44}
45
46pub trait MetricSplit {
54 fn split(&mut self, input: Metric) -> SplitIterator;
59}
60
61pub struct MetricSplitter<S> {
67 splitter: S,
68}
69
70impl<S: MetricSplit> MetricSplitter<S> {
71 pub fn split(&mut self, input: Metric) -> SplitIterator {
75 self.splitter.split(input)
76 }
77}
78
79impl<S: Default> Default for MetricSplitter<S> {
80 fn default() -> Self {
81 Self {
82 splitter: S::default(),
83 }
84 }
85}
86
87impl<S> From<S> for MetricSplitter<S> {
88 fn from(splitter: S) -> Self {
89 Self { splitter }
90 }
91}
92
93#[derive(Clone, Copy, Debug, Default)]
110pub struct AggregatedSummarySplitter;
111
112impl MetricSplit for AggregatedSummarySplitter {
113 fn split(&mut self, input: Metric) -> SplitIterator {
114 let (series, data, metadata) = input.into_parts();
115 match data.value() {
116 MetricValue::Counter { .. }
118 | MetricValue::Gauge { .. }
119 | MetricValue::Set { .. }
120 | MetricValue::Distribution { .. }
121 | MetricValue::AggregatedHistogram { .. }
122 | MetricValue::Sketch { .. } => {
123 SplitIterator::single(Metric::from_parts(series, data, metadata))
124 }
125 MetricValue::AggregatedSummary { .. } => {
126 let (time, kind, value) = data.into_parts();
128 let (quantiles, count, sum) = match value {
129 MetricValue::AggregatedSummary {
130 quantiles,
131 count,
132 sum,
133 } => (quantiles, count, sum),
134 _ => unreachable!("metric value must be aggregated summary to be here"),
135 };
136
137 let mut metrics = VecDeque::new();
140
141 let mut count_series = series.clone();
142 count_series.name_mut().name_mut().push_str("_count");
143 let count_data = MetricData::from_parts(
144 time,
145 kind,
146 MetricValue::Counter {
147 value: count as f64,
148 },
149 );
150 let count_metadata = metadata.clone();
151
152 metrics.push_back(Metric::from_parts(count_series, count_data, count_metadata));
153
154 for quantile in quantiles {
155 let mut quantile_series = series.clone();
156 quantile_series
157 .replace_tag(String::from("quantile"), quantile.to_quantile_string());
158 let quantile_data = MetricData::from_parts(
159 time,
160 kind,
161 MetricValue::Gauge {
162 value: quantile.value,
163 },
164 );
165 let quantile_metadata = metadata.clone();
166
167 metrics.push_back(Metric::from_parts(
168 quantile_series,
169 quantile_data,
170 quantile_metadata,
171 ));
172 }
173
174 let mut sum_series = series;
175 sum_series.name_mut().name_mut().push_str("_sum");
176 let sum_data =
177 MetricData::from_parts(time, kind, MetricValue::Counter { value: sum });
178 let sum_metadata = metadata;
179
180 metrics.push_back(Metric::from_parts(sum_series, sum_data, sum_metadata));
181
182 SplitIterator::multiple(metrics)
183 }
184 }
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::collections::BTreeSet;
191
192 use vector_lib::event::{
193 metric::{Bucket, MetricTags, Quantile, Sample},
194 Metric, MetricKind, MetricValue, StatisticKind,
195 };
196
197 use super::{AggregatedSummarySplitter, MetricSplitter};
198
199 #[test]
200 fn test_agg_summary_split() {
201 let mut splitter: MetricSplitter<AggregatedSummarySplitter> = MetricSplitter::default();
202
203 let counter = Metric::new(
204 "counter",
205 MetricKind::Incremental,
206 MetricValue::Counter { value: 42.0 },
207 );
208 let gauge = Metric::new(
209 "gauge",
210 MetricKind::Absolute,
211 MetricValue::Gauge { value: 3.15 },
212 );
213 let set = Metric::new(
214 "set",
215 MetricKind::Absolute,
216 MetricValue::Set {
217 values: BTreeSet::from([String::from("foobar")]),
218 },
219 );
220 let distribution = Metric::new(
221 "distribution",
222 MetricKind::Incremental,
223 MetricValue::Distribution {
224 statistic: StatisticKind::Histogram,
225 samples: vec![Sample {
226 value: 13.37,
227 rate: 10,
228 }],
229 },
230 );
231 let agg_histo = Metric::new(
232 "agg_histo",
233 MetricKind::Absolute,
234 MetricValue::AggregatedHistogram {
235 buckets: vec![
236 Bucket {
237 upper_limit: 10.0,
238 count: 5,
239 },
240 Bucket {
241 upper_limit: 25.0,
242 count: 2,
243 },
244 ],
245 count: 7,
246 sum: 100.0,
247 },
248 );
249 let agg_summary = Metric::new(
250 "agg_summary",
251 MetricKind::Absolute,
252 MetricValue::AggregatedSummary {
253 quantiles: vec![
254 Quantile {
255 quantile: 0.05,
256 value: 10.0,
257 },
258 Quantile {
259 quantile: 0.95,
260 value: 25.0,
261 },
262 ],
263 count: 7,
264 sum: 100.0,
265 },
266 );
267
268 let quantile_tag = |q: f64| -> Option<MetricTags> {
269 let quantile = Quantile {
270 quantile: q,
271 value: 0.0,
272 };
273
274 Some(
275 vec![("quantile".to_owned(), quantile.to_quantile_string())]
276 .into_iter()
277 .collect(),
278 )
279 };
280
281 let agg_summary_splits = vec![
282 Metric::new(
283 "agg_summary_count",
284 MetricKind::Absolute,
285 MetricValue::Counter { value: 7.0 },
286 ),
287 Metric::new(
288 "agg_summary",
289 MetricKind::Absolute,
290 MetricValue::Gauge { value: 10.0 },
291 )
292 .with_tags(quantile_tag(0.05)),
293 Metric::new(
294 "agg_summary",
295 MetricKind::Absolute,
296 MetricValue::Gauge { value: 25.0 },
297 )
298 .with_tags(quantile_tag(0.95)),
299 Metric::new(
300 "agg_summary_sum",
301 MetricKind::Absolute,
302 MetricValue::Counter { value: 100.0 },
303 ),
304 ];
305
306 let cases = &[
307 (counter.clone(), vec![counter]),
308 (gauge.clone(), vec![gauge]),
309 (set.clone(), vec![set]),
310 (distribution.clone(), vec![distribution]),
311 (agg_histo.clone(), vec![agg_histo]),
312 (agg_summary, agg_summary_splits),
313 ];
314
315 for (input, expected) in cases {
316 let actual = splitter.split(input.clone()).collect::<Vec<_>>();
317 assert_eq!(expected.clone(), actual);
318 }
319 }
320}