vector/transforms/
incremental_to_absolute.rs

1use std::{collections::HashMap, future::ready, pin::Pin, time::Duration};
2
3use futures::{Stream, StreamExt};
4use vector_lib::configurable::configurable_component;
5
6use crate::{
7    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
8    event::Event,
9    schema,
10    sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings},
11    transforms::{TaskTransform, Transform},
12};
13
14/// Configuration for the `incremental_to_absolute` transform.
15#[configurable_component(transform(
16    "incremental_to_absolute",
17    "Convert incremental metrics to absolute."
18))]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct IncrementalToAbsoluteConfig {
22    /// Configuration for the internal metrics cache used to normalize a stream of incremental
23    /// metrics into absolute metrics.
24    ///
25    /// By default, incremental metrics are evicted after 5 minutes of not being updated. The next
26    /// incremental value will be reset.
27    #[configurable(derived)]
28    #[serde(default)]
29    pub cache: NormalizerConfig<IncrementalToAbsoluteDefaultNormalizerSettings>,
30}
31
32#[derive(Clone, Copy, Debug, Default)]
33pub struct IncrementalToAbsoluteDefaultNormalizerSettings;
34
35impl NormalizerSettings for IncrementalToAbsoluteDefaultNormalizerSettings {
36    const MAX_EVENTS: Option<usize> = None;
37    const MAX_BYTES: Option<usize> = None;
38    const TIME_TO_LIVE: Option<u64> = Some(300);
39}
40
41pub const fn default_expire_metrics_secs() -> Duration {
42    Duration::from_secs(120)
43}
44
45impl_generate_config_from_default!(IncrementalToAbsoluteConfig);
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "incremental_to_absolute")]
49impl TransformConfig for IncrementalToAbsoluteConfig {
50    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
51        IncrementalToAbsolute::new(self).map(Transform::event_task)
52    }
53
54    fn input(&self) -> Input {
55        Input::metric()
56    }
57
58    fn outputs(
59        &self,
60        _: &TransformContext,
61        _: &[(OutputId, schema::Definition)],
62    ) -> Vec<TransformOutput> {
63        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
64    }
65}
66#[derive(Debug)]
67pub struct IncrementalToAbsolute {
68    data: MetricSet,
69}
70
71impl IncrementalToAbsolute {
72    pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result<Self> {
73        // Create a new MetricSet with the proper cache settings
74        Ok(Self {
75            data: MetricSet::new(config.cache.validate()?.into_settings()),
76        })
77    }
78    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
79        self.data
80            .make_absolute(event.as_metric().clone())
81            .map(Event::Metric)
82    }
83}
84
85impl TaskTransform<Event> for IncrementalToAbsolute {
86    fn transform(
87        self: Box<Self>,
88        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
89    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
90    where
91        Self: 'static,
92    {
93        let mut inner = self;
94        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use std::sync::Arc;
101
102    use futures_util::SinkExt;
103    use similar_asserts::assert_eq;
104    use vector_lib::config::ComponentKey;
105
106    use super::*;
107    use crate::event::{
108        Metric,
109        metric::{MetricKind, MetricValue},
110    };
111
112    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
113        let mut event = Event::Metric(Metric::new(name, kind, value))
114            .with_source_id(Arc::new(ComponentKey::from("in")))
115            .with_upstream_id(Arc::new(OutputId::from("transform")));
116
117        event.metadata_mut().set_source_type("unit_test_stream");
118
119        event
120    }
121
122    async fn assert_metric_eq(
123        tx: &mut futures::channel::mpsc::Sender<Event>,
124        mut out_stream: impl Stream<Item = Event> + Unpin,
125        metric: Event,
126        expected_metric: Event,
127    ) {
128        tx.send(metric).await.unwrap();
129        if let Some(out_event) = out_stream.next().await {
130            let result = out_event;
131            assert_eq!(result, expected_metric);
132        } else {
133            panic!("Unexpectedly received None in output stream");
134        }
135    }
136
137    #[tokio::test]
138    async fn test_incremental_to_absolute() {
139        let config = toml::from_str::<IncrementalToAbsoluteConfig>(
140            r#"
141[cache]
142max_events = 100
143"#,
144        )
145        .unwrap();
146        let incremental_to_absolute = IncrementalToAbsolute::new(&config)
147            .map(Transform::event_task)
148            .unwrap();
149        let incremental_to_absolute = incremental_to_absolute.into_task();
150        let (mut tx, rx) = futures::channel::mpsc::channel(10);
151        let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx));
152
153        let inc_counter_1 = make_metric(
154            "incremental_counter",
155            MetricKind::Incremental,
156            MetricValue::Counter { value: 10.0 },
157        );
158        let expected_inc_counter_1 = make_metric(
159            "incremental_counter",
160            MetricKind::Absolute,
161            MetricValue::Counter { value: 10.0 },
162        );
163        assert_metric_eq(
164            &mut tx,
165            &mut out_stream,
166            inc_counter_1,
167            expected_inc_counter_1,
168        )
169        .await;
170
171        let inc_counter_2 = make_metric(
172            "incremental_counter",
173            MetricKind::Incremental,
174            MetricValue::Counter { value: 10.0 },
175        );
176        let expected_inc_counter_2 = make_metric(
177            "incremental_counter",
178            MetricKind::Absolute,
179            MetricValue::Counter { value: 20.0 },
180        );
181        assert_metric_eq(
182            &mut tx,
183            &mut out_stream,
184            inc_counter_2,
185            expected_inc_counter_2,
186        )
187        .await;
188
189        let inc_counter_3 = make_metric(
190            "incremental_counter",
191            MetricKind::Incremental,
192            MetricValue::Counter { value: 10.0 },
193        );
194        let expected_inc_counter_3 = make_metric(
195            "incremental_counter",
196            MetricKind::Absolute,
197            MetricValue::Counter { value: 30.0 },
198        );
199        assert_metric_eq(
200            &mut tx,
201            &mut out_stream,
202            inc_counter_3,
203            expected_inc_counter_3,
204        )
205        .await;
206
207        // Absolute counters and gauges are emitted unchanged
208        let gauge = make_metric(
209            "gauge",
210            MetricKind::Absolute,
211            MetricValue::Gauge { value: 42.0 },
212        );
213        let expected_gauge = gauge.clone();
214        assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await;
215
216        let absolute_counter = make_metric(
217            "absolute_counter",
218            MetricKind::Absolute,
219            MetricValue::Counter { value: 42.0 },
220        );
221        let absolute_counter_expected = absolute_counter.clone();
222        assert_metric_eq(
223            &mut tx,
224            &mut out_stream,
225            absolute_counter,
226            absolute_counter_expected,
227        )
228        .await;
229    }
230}