1use std::{collections::HashMap, future::ready, pin::Pin, time::Duration};
2
3use futures::{Stream, StreamExt};
4use vector_lib::configurable::configurable_component;
5
6use crate::{
7 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
8 event::Event,
9 schema,
10 sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings},
11 transforms::{TaskTransform, Transform},
12};
13
14#[configurable_component(transform(
16 "incremental_to_absolute",
17 "Convert incremental metrics to absolute."
18))]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct IncrementalToAbsoluteConfig {
22 #[configurable(derived)]
28 #[serde(default)]
29 pub cache: NormalizerConfig<IncrementalToAbsoluteDefaultNormalizerSettings>,
30}
31
32#[derive(Clone, Copy, Debug, Default)]
33pub struct IncrementalToAbsoluteDefaultNormalizerSettings;
34
35impl NormalizerSettings for IncrementalToAbsoluteDefaultNormalizerSettings {
36 const MAX_EVENTS: Option<usize> = None;
37 const MAX_BYTES: Option<usize> = None;
38 const TIME_TO_LIVE: Option<u64> = Some(300);
39}
40
41pub const fn default_expire_metrics_secs() -> Duration {
42 Duration::from_secs(120)
43}
44
45impl_generate_config_from_default!(IncrementalToAbsoluteConfig);
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "incremental_to_absolute")]
49impl TransformConfig for IncrementalToAbsoluteConfig {
50 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
51 IncrementalToAbsolute::new(self).map(Transform::event_task)
52 }
53
54 fn input(&self) -> Input {
55 Input::metric()
56 }
57
58 fn outputs(
59 &self,
60 _: &TransformContext,
61 _: &[(OutputId, schema::Definition)],
62 ) -> Vec<TransformOutput> {
63 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
64 }
65}
66#[derive(Debug)]
67pub struct IncrementalToAbsolute {
68 data: MetricSet,
69}
70
71impl IncrementalToAbsolute {
72 pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result<Self> {
73 Ok(Self {
75 data: MetricSet::new(config.cache.validate()?.into_settings()),
76 })
77 }
78 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
79 self.data
80 .make_absolute(event.as_metric().clone())
81 .map(Event::Metric)
82 }
83}
84
85impl TaskTransform<Event> for IncrementalToAbsolute {
86 fn transform(
87 self: Box<Self>,
88 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
89 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
90 where
91 Self: 'static,
92 {
93 let mut inner = self;
94 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use std::sync::Arc;
101
102 use futures_util::SinkExt;
103 use similar_asserts::assert_eq;
104 use vector_lib::config::ComponentKey;
105
106 use super::*;
107 use crate::event::{
108 Metric,
109 metric::{MetricKind, MetricValue},
110 };
111
112 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
113 let mut event = Event::Metric(Metric::new(name, kind, value))
114 .with_source_id(Arc::new(ComponentKey::from("in")))
115 .with_upstream_id(Arc::new(OutputId::from("transform")));
116
117 event.metadata_mut().set_source_type("unit_test_stream");
118
119 event
120 }
121
122 async fn assert_metric_eq(
123 tx: &mut futures::channel::mpsc::Sender<Event>,
124 mut out_stream: impl Stream<Item = Event> + Unpin,
125 metric: Event,
126 expected_metric: Event,
127 ) {
128 tx.send(metric).await.unwrap();
129 if let Some(out_event) = out_stream.next().await {
130 let result = out_event;
131 assert_eq!(result, expected_metric);
132 } else {
133 panic!("Unexpectedly received None in output stream");
134 }
135 }
136
137 #[tokio::test]
138 async fn test_incremental_to_absolute() {
139 let config = toml::from_str::<IncrementalToAbsoluteConfig>(
140 r#"
141[cache]
142max_events = 100
143"#,
144 )
145 .unwrap();
146 let incremental_to_absolute = IncrementalToAbsolute::new(&config)
147 .map(Transform::event_task)
148 .unwrap();
149 let incremental_to_absolute = incremental_to_absolute.into_task();
150 let (mut tx, rx) = futures::channel::mpsc::channel(10);
151 let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx));
152
153 let inc_counter_1 = make_metric(
154 "incremental_counter",
155 MetricKind::Incremental,
156 MetricValue::Counter { value: 10.0 },
157 );
158 let expected_inc_counter_1 = make_metric(
159 "incremental_counter",
160 MetricKind::Absolute,
161 MetricValue::Counter { value: 10.0 },
162 );
163 assert_metric_eq(
164 &mut tx,
165 &mut out_stream,
166 inc_counter_1,
167 expected_inc_counter_1,
168 )
169 .await;
170
171 let inc_counter_2 = make_metric(
172 "incremental_counter",
173 MetricKind::Incremental,
174 MetricValue::Counter { value: 10.0 },
175 );
176 let expected_inc_counter_2 = make_metric(
177 "incremental_counter",
178 MetricKind::Absolute,
179 MetricValue::Counter { value: 20.0 },
180 );
181 assert_metric_eq(
182 &mut tx,
183 &mut out_stream,
184 inc_counter_2,
185 expected_inc_counter_2,
186 )
187 .await;
188
189 let inc_counter_3 = make_metric(
190 "incremental_counter",
191 MetricKind::Incremental,
192 MetricValue::Counter { value: 10.0 },
193 );
194 let expected_inc_counter_3 = make_metric(
195 "incremental_counter",
196 MetricKind::Absolute,
197 MetricValue::Counter { value: 30.0 },
198 );
199 assert_metric_eq(
200 &mut tx,
201 &mut out_stream,
202 inc_counter_3,
203 expected_inc_counter_3,
204 )
205 .await;
206
207 let gauge = make_metric(
209 "gauge",
210 MetricKind::Absolute,
211 MetricValue::Gauge { value: 42.0 },
212 );
213 let expected_gauge = gauge.clone();
214 assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await;
215
216 let absolute_counter = make_metric(
217 "absolute_counter",
218 MetricKind::Absolute,
219 MetricValue::Counter { value: 42.0 },
220 );
221 let absolute_counter_expected = absolute_counter.clone();
222 assert_metric_eq(
223 &mut tx,
224 &mut out_stream,
225 absolute_counter,
226 absolute_counter_expected,
227 )
228 .await;
229 }
230}