1use std::{collections::HashMap, future::ready, pin::Pin, time::Duration};
2
3use futures::{Stream, StreamExt};
4use vector_lib::{config::LogNamespace, configurable::configurable_component};
5
6use crate::{
7 config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
8 event::Event,
9 schema,
10 sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings},
11 transforms::{TaskTransform, Transform},
12};
13
14#[configurable_component(transform(
16 "incremental_to_absolute",
17 "Convert incremental metrics to absolute."
18))]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct IncrementalToAbsoluteConfig {
22 #[configurable(derived)]
28 #[serde(default)]
29 pub cache: NormalizerConfig<IncrementalToAbsoluteDefaultNormalizerSettings>,
30}
31
32#[derive(Clone, Copy, Debug, Default)]
33pub struct IncrementalToAbsoluteDefaultNormalizerSettings;
34
35impl NormalizerSettings for IncrementalToAbsoluteDefaultNormalizerSettings {
36 const MAX_EVENTS: Option<usize> = None;
37 const MAX_BYTES: Option<usize> = None;
38 const TIME_TO_LIVE: Option<u64> = Some(300);
39}
40
41pub const fn default_expire_metrics_secs() -> Duration {
42 Duration::from_secs(120)
43}
44
45impl_generate_config_from_default!(IncrementalToAbsoluteConfig);
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "incremental_to_absolute")]
49impl TransformConfig for IncrementalToAbsoluteConfig {
50 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
51 IncrementalToAbsolute::new(self).map(Transform::event_task)
52 }
53
54 fn input(&self) -> Input {
55 Input::metric()
56 }
57
58 fn outputs(
59 &self,
60 _: vector_lib::enrichment::TableRegistry,
61 _: &[(OutputId, schema::Definition)],
62 _: LogNamespace,
63 ) -> Vec<TransformOutput> {
64 vec![TransformOutput::new(DataType::Metric, HashMap::new())]
65 }
66}
67#[derive(Debug)]
68pub struct IncrementalToAbsolute {
69 data: MetricSet,
70}
71
72impl IncrementalToAbsolute {
73 pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result<Self> {
74 Ok(Self {
76 data: MetricSet::new(config.cache.validate()?.into_settings()),
77 })
78 }
79 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
80 self.data
81 .make_absolute(event.as_metric().clone())
82 .map(Event::Metric)
83 }
84}
85
86impl TaskTransform<Event> for IncrementalToAbsolute {
87 fn transform(
88 self: Box<Self>,
89 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
90 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
91 where
92 Self: 'static,
93 {
94 let mut inner = self;
95 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use std::sync::Arc;
102
103 use futures_util::SinkExt;
104 use similar_asserts::assert_eq;
105 use vector_lib::config::ComponentKey;
106
107 use super::*;
108 use crate::event::{
109 Metric,
110 metric::{MetricKind, MetricValue},
111 };
112
113 fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
114 let mut event = Event::Metric(Metric::new(name, kind, value))
115 .with_source_id(Arc::new(ComponentKey::from("in")))
116 .with_upstream_id(Arc::new(OutputId::from("transform")));
117
118 event.metadata_mut().set_source_type("unit_test_stream");
119
120 event
121 }
122
123 async fn assert_metric_eq(
124 tx: &mut futures::channel::mpsc::Sender<Event>,
125 mut out_stream: impl Stream<Item = Event> + Unpin,
126 metric: Event,
127 expected_metric: Event,
128 ) {
129 tx.send(metric).await.unwrap();
130 if let Some(out_event) = out_stream.next().await {
131 let result = out_event;
132 assert_eq!(result, expected_metric);
133 } else {
134 panic!("Unexpectedly received None in output stream");
135 }
136 }
137
138 #[tokio::test]
139 async fn test_incremental_to_absolute() {
140 let config = toml::from_str::<IncrementalToAbsoluteConfig>(
141 r#"
142[cache]
143max_events = 100
144"#,
145 )
146 .unwrap();
147 let incremental_to_absolute = IncrementalToAbsolute::new(&config)
148 .map(Transform::event_task)
149 .unwrap();
150 let incremental_to_absolute = incremental_to_absolute.into_task();
151 let (mut tx, rx) = futures::channel::mpsc::channel(10);
152 let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx));
153
154 let inc_counter_1 = make_metric(
155 "incremental_counter",
156 MetricKind::Incremental,
157 MetricValue::Counter { value: 10.0 },
158 );
159 let expected_inc_counter_1 = make_metric(
160 "incremental_counter",
161 MetricKind::Absolute,
162 MetricValue::Counter { value: 10.0 },
163 );
164 assert_metric_eq(
165 &mut tx,
166 &mut out_stream,
167 inc_counter_1,
168 expected_inc_counter_1,
169 )
170 .await;
171
172 let inc_counter_2 = make_metric(
173 "incremental_counter",
174 MetricKind::Incremental,
175 MetricValue::Counter { value: 10.0 },
176 );
177 let expected_inc_counter_2 = make_metric(
178 "incremental_counter",
179 MetricKind::Absolute,
180 MetricValue::Counter { value: 20.0 },
181 );
182 assert_metric_eq(
183 &mut tx,
184 &mut out_stream,
185 inc_counter_2,
186 expected_inc_counter_2,
187 )
188 .await;
189
190 let inc_counter_3 = make_metric(
191 "incremental_counter",
192 MetricKind::Incremental,
193 MetricValue::Counter { value: 10.0 },
194 );
195 let expected_inc_counter_3 = make_metric(
196 "incremental_counter",
197 MetricKind::Absolute,
198 MetricValue::Counter { value: 30.0 },
199 );
200 assert_metric_eq(
201 &mut tx,
202 &mut out_stream,
203 inc_counter_3,
204 expected_inc_counter_3,
205 )
206 .await;
207
208 let gauge = make_metric(
210 "gauge",
211 MetricKind::Absolute,
212 MetricValue::Gauge { value: 42.0 },
213 );
214 let expected_gauge = gauge.clone();
215 assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await;
216
217 let absolute_counter = make_metric(
218 "absolute_counter",
219 MetricKind::Absolute,
220 MetricValue::Counter { value: 42.0 },
221 );
222 let absolute_counter_expected = absolute_counter.clone();
223 assert_metric_eq(
224 &mut tx,
225 &mut out_stream,
226 absolute_counter,
227 absolute_counter_expected,
228 )
229 .await;
230 }
231}