vector/transforms/
incremental_to_absolute.rs

1use std::{collections::HashMap, future::ready, pin::Pin, time::Duration};
2
3use futures::{Stream, StreamExt};
4use vector_lib::{config::LogNamespace, configurable::configurable_component};
5
6use crate::{
7    config::{DataType, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
8    event::Event,
9    schema,
10    sinks::util::buffer::metrics::{MetricSet, NormalizerConfig, NormalizerSettings},
11    transforms::{TaskTransform, Transform},
12};
13
14/// Configuration for the `incremental_to_absolute` transform.
15#[configurable_component(transform(
16    "incremental_to_absolute",
17    "Convert incremental metrics to absolute."
18))]
19#[derive(Clone, Debug, Default)]
20#[serde(deny_unknown_fields)]
21pub struct IncrementalToAbsoluteConfig {
22    /// Configuration for the internal metrics cache used to normalize a stream of incremental
23    /// metrics into absolute metrics.
24    ///
25    /// By default, incremental metrics are evicted after 5 minutes of not being updated. The next
26    /// incremental value will be reset.
27    #[configurable(derived)]
28    #[serde(default)]
29    pub cache: NormalizerConfig<IncrementalToAbsoluteDefaultNormalizerSettings>,
30}
31
32#[derive(Clone, Copy, Debug, Default)]
33pub struct IncrementalToAbsoluteDefaultNormalizerSettings;
34
35impl NormalizerSettings for IncrementalToAbsoluteDefaultNormalizerSettings {
36    const MAX_EVENTS: Option<usize> = None;
37    const MAX_BYTES: Option<usize> = None;
38    const TIME_TO_LIVE: Option<u64> = Some(300);
39}
40
41pub const fn default_expire_metrics_secs() -> Duration {
42    Duration::from_secs(120)
43}
44
45impl_generate_config_from_default!(IncrementalToAbsoluteConfig);
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "incremental_to_absolute")]
49impl TransformConfig for IncrementalToAbsoluteConfig {
50    async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
51        IncrementalToAbsolute::new(self).map(Transform::event_task)
52    }
53
54    fn input(&self) -> Input {
55        Input::metric()
56    }
57
58    fn outputs(
59        &self,
60        _: vector_lib::enrichment::TableRegistry,
61        _: &[(OutputId, schema::Definition)],
62        _: LogNamespace,
63    ) -> Vec<TransformOutput> {
64        vec![TransformOutput::new(DataType::Metric, HashMap::new())]
65    }
66}
67#[derive(Debug)]
68pub struct IncrementalToAbsolute {
69    data: MetricSet,
70}
71
72impl IncrementalToAbsolute {
73    pub fn new(config: &IncrementalToAbsoluteConfig) -> crate::Result<Self> {
74        // Create a new MetricSet with the proper cache settings
75        Ok(Self {
76            data: MetricSet::new(config.cache.validate()?.into_settings()),
77        })
78    }
79    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
80        self.data
81            .make_absolute(event.as_metric().clone())
82            .map(Event::Metric)
83    }
84}
85
86impl TaskTransform<Event> for IncrementalToAbsolute {
87    fn transform(
88        self: Box<Self>,
89        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
90    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
91    where
92        Self: 'static,
93    {
94        let mut inner = self;
95        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use std::sync::Arc;
102
103    use futures_util::SinkExt;
104    use similar_asserts::assert_eq;
105    use vector_lib::config::ComponentKey;
106
107    use super::*;
108    use crate::event::{
109        Metric,
110        metric::{MetricKind, MetricValue},
111    };
112
113    fn make_metric(name: &'static str, kind: MetricKind, value: MetricValue) -> Event {
114        let mut event = Event::Metric(Metric::new(name, kind, value))
115            .with_source_id(Arc::new(ComponentKey::from("in")))
116            .with_upstream_id(Arc::new(OutputId::from("transform")));
117
118        event.metadata_mut().set_source_type("unit_test_stream");
119
120        event
121    }
122
123    async fn assert_metric_eq(
124        tx: &mut futures::channel::mpsc::Sender<Event>,
125        mut out_stream: impl Stream<Item = Event> + Unpin,
126        metric: Event,
127        expected_metric: Event,
128    ) {
129        tx.send(metric).await.unwrap();
130        if let Some(out_event) = out_stream.next().await {
131            let result = out_event;
132            assert_eq!(result, expected_metric);
133        } else {
134            panic!("Unexpectedly received None in output stream");
135        }
136    }
137
138    #[tokio::test]
139    async fn test_incremental_to_absolute() {
140        let config = toml::from_str::<IncrementalToAbsoluteConfig>(
141            r#"
142[cache]
143max_events = 100
144"#,
145        )
146        .unwrap();
147        let incremental_to_absolute = IncrementalToAbsolute::new(&config)
148            .map(Transform::event_task)
149            .unwrap();
150        let incremental_to_absolute = incremental_to_absolute.into_task();
151        let (mut tx, rx) = futures::channel::mpsc::channel(10);
152        let mut out_stream = incremental_to_absolute.transform_events(Box::pin(rx));
153
154        let inc_counter_1 = make_metric(
155            "incremental_counter",
156            MetricKind::Incremental,
157            MetricValue::Counter { value: 10.0 },
158        );
159        let expected_inc_counter_1 = make_metric(
160            "incremental_counter",
161            MetricKind::Absolute,
162            MetricValue::Counter { value: 10.0 },
163        );
164        assert_metric_eq(
165            &mut tx,
166            &mut out_stream,
167            inc_counter_1,
168            expected_inc_counter_1,
169        )
170        .await;
171
172        let inc_counter_2 = make_metric(
173            "incremental_counter",
174            MetricKind::Incremental,
175            MetricValue::Counter { value: 10.0 },
176        );
177        let expected_inc_counter_2 = make_metric(
178            "incremental_counter",
179            MetricKind::Absolute,
180            MetricValue::Counter { value: 20.0 },
181        );
182        assert_metric_eq(
183            &mut tx,
184            &mut out_stream,
185            inc_counter_2,
186            expected_inc_counter_2,
187        )
188        .await;
189
190        let inc_counter_3 = make_metric(
191            "incremental_counter",
192            MetricKind::Incremental,
193            MetricValue::Counter { value: 10.0 },
194        );
195        let expected_inc_counter_3 = make_metric(
196            "incremental_counter",
197            MetricKind::Absolute,
198            MetricValue::Counter { value: 30.0 },
199        );
200        assert_metric_eq(
201            &mut tx,
202            &mut out_stream,
203            inc_counter_3,
204            expected_inc_counter_3,
205        )
206        .await;
207
208        // Absolute counters and gauges are emitted unchanged
209        let gauge = make_metric(
210            "gauge",
211            MetricKind::Absolute,
212            MetricValue::Gauge { value: 42.0 },
213        );
214        let expected_gauge = gauge.clone();
215        assert_metric_eq(&mut tx, &mut out_stream, gauge, expected_gauge).await;
216
217        let absolute_counter = make_metric(
218            "absolute_counter",
219            MetricKind::Absolute,
220            MetricValue::Counter { value: 42.0 },
221        );
222        let absolute_counter_expected = absolute_counter.clone();
223        assert_metric_eq(
224            &mut tx,
225            &mut out_stream,
226            absolute_counter,
227            absolute_counter_expected,
228        )
229        .await;
230    }
231}