vector/sinks/util/buffer/metrics/
normalize.rs

1use indexmap::IndexMap;
2
3use std::time::{Duration, Instant};
4
5use vector_lib::event::{
6    metric::{MetricData, MetricSeries},
7    EventMetadata, Metric, MetricKind,
8};
9
10/// Normalizes metrics according to a set of rules.
11///
12/// Depending on the system in which they are being sent to, metrics may have to be modified in order to fit the data
13/// model or constraints placed on that system.  Typically, this boils down to whether or not the system can accept
14/// absolute metrics or incremental metrics: the latest value of a metric, or the delta between the last time the
15/// metric was observed and now, respective. Other rules may need to be applied, such as dropping metrics of a specific
16/// type that the system does not support.
17///
18/// The trait provides a simple interface to apply this logic uniformly, given a reference to a simple state container
19/// that allows tracking the necessary information of a given metric over time. As well, given the optional return, it
20/// composes nicely with iterators (i.e. using `filter_map`) in order to filter metrics within existing
21/// iterator/stream-based approaches.
22pub trait MetricNormalize {
23    /// Normalizes the metric against the given state.
24    ///
25    /// If the metric was normalized successfully, `Some(metric)` will be returned. Otherwise, `None` is returned.
26    ///
27    /// In some cases, a metric may be successfully added/tracked within the given state, but due to the normalization
28    /// logic, it cannot yet be emitted. An example of this is normalizing all metrics to be incremental.
29    ///
30    /// In this example, if an incoming metric is already incremental, it can be passed through unchanged.  If the
31    /// incoming metric is absolute, however, we need to see it at least twice in order to calculate the incremental
32    /// delta necessary to emit an incremental version. This means that the first time an absolute metric is seen,
33    /// `normalize` would return `None`, and the subsequent calls would return `Some(metric)`.
34    ///
35    /// However, a metric may simply not be supported by a normalization implementation, and so `None` may or may not be
36    /// a common return value. This behavior is, thus, implementation defined.
37    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric>;
38}
39
40/// A self-contained metric normalizer.
41///
42/// The normalization state is stored internally, and it can only be created from a normalizer implementation that is
43/// either `Default` or is constructed ahead of time, so it is primarily useful for constructing a usable normalizer
44/// via implicit conversion methods or when no special parameters are required for configuring the underlying normalizer.
45pub struct MetricNormalizer<N> {
46    state: MetricSet,
47    normalizer: N,
48}
49
50impl<N> MetricNormalizer<N> {
51    /// Creates a new normalizer with TTL policy.
52    pub fn with_ttl(normalizer: N, ttl: TtlPolicy) -> Self {
53        Self {
54            state: MetricSet::with_ttl_policy(ttl),
55            normalizer,
56        }
57    }
58
59    /// Gets a mutable reference to the current metric state for this normalizer.
60    pub const fn get_state_mut(&mut self) -> &mut MetricSet {
61        &mut self.state
62    }
63}
64
65impl<N: MetricNormalize> MetricNormalizer<N> {
66    /// Normalizes the metric against the internal normalization state.
67    ///
68    /// For more information about normalization, see the documentation for [`MetricNormalize::normalize`].
69    pub fn normalize(&mut self, metric: Metric) -> Option<Metric> {
70        self.normalizer.normalize(&mut self.state, metric)
71    }
72}
73
74impl<N: Default> Default for MetricNormalizer<N> {
75    fn default() -> Self {
76        Self {
77            state: MetricSet::default(),
78            normalizer: N::default(),
79        }
80    }
81}
82
83impl<N> From<N> for MetricNormalizer<N> {
84    fn from(normalizer: N) -> Self {
85        Self {
86            state: MetricSet::default(),
87            normalizer,
88        }
89    }
90}
91
92/// Represents a stored metric entry with its data, metadata, and optional timestamp.
93#[derive(Clone, Debug)]
94pub struct MetricEntry {
95    /// The metric data containing the value and kind
96    pub data: MetricData,
97    /// Event metadata associated with this metric
98    pub metadata: EventMetadata,
99    /// Optional timestamp for TTL tracking
100    pub timestamp: Option<Instant>,
101}
102
103impl MetricEntry {
104    /// Creates a new MetricEntry with the given data, metadata, and timestamp.
105    pub const fn new(
106        data: MetricData,
107        metadata: EventMetadata,
108        timestamp: Option<Instant>,
109    ) -> Self {
110        Self {
111            data,
112            metadata,
113            timestamp,
114        }
115    }
116
117    /// Creates a new MetricEntry from a Metric and optional timestamp.
118    pub fn from_metric(metric: Metric, timestamp: Option<Instant>) -> (MetricSeries, Self) {
119        let (series, data, metadata) = metric.into_parts();
120        let entry = Self::new(data, metadata, timestamp);
121        (series, entry)
122    }
123
124    /// Converts this entry back to a Metric with the given series.
125    pub fn into_metric(self, series: MetricSeries) -> Metric {
126        Metric::from_parts(series, self.data, self.metadata)
127    }
128
129    /// Updates this entry's timestamp.
130    pub const fn update_timestamp(&mut self, timestamp: Option<Instant>) {
131        self.timestamp = timestamp;
132    }
133}
134
135/// Configuration for automatic cleanup of expired entries.
136#[derive(Clone, Debug)]
137pub struct TtlPolicy {
138    /// Time-to-live for entries
139    pub ttl: Duration,
140    /// How often to run cleanup
141    pub cleanup_interval: Duration,
142    /// Last time cleanup was performed
143    pub(crate) last_cleanup: Instant,
144}
145
146impl TtlPolicy {
147    /// Creates a new cleanup configuration with TTL.
148    /// Cleanup interval defaults to TTL/10 with a 10-second minimum.
149    pub fn new(ttl: Duration) -> Self {
150        Self {
151            ttl,
152            cleanup_interval: ttl.div_f32(10.0).max(Duration::from_secs(10)),
153            last_cleanup: Instant::now(),
154        }
155    }
156
157    /// Checks if it's time to run cleanup.
158    pub fn should_cleanup(&self) -> bool {
159        Instant::now().duration_since(self.last_cleanup) >= self.cleanup_interval
160    }
161
162    /// Marks cleanup as having been performed.
163    pub fn mark_cleanup_done(&mut self) {
164        self.last_cleanup = Instant::now();
165    }
166}
167
168/// Metric storage for use with normalization.
169///
170/// This is primarily a wrapper around [`IndexMap`] (to ensure insertion order
171/// is maintained) with convenience methods to make it easier to perform
172/// normalization-specific operations. It also includes an optional TTL policy
173/// to automatically expire old entries.
174#[derive(Clone, Debug, Default)]
175pub struct MetricSet {
176    inner: IndexMap<MetricSeries, MetricEntry>,
177    ttl_policy: Option<TtlPolicy>,
178}
179
180impl MetricSet {
181    /// Creates an empty MetricSet with the specified capacity.
182    pub fn with_capacity(capacity: usize) -> Self {
183        Self {
184            inner: IndexMap::with_capacity(capacity),
185            ttl_policy: None,
186        }
187    }
188
189    /// Creates a MetricSet with custom cleanup configuration.
190    pub fn with_ttl_policy(ttl_policy: TtlPolicy) -> Self {
191        Self {
192            inner: IndexMap::default(),
193            ttl_policy: Some(ttl_policy),
194        }
195    }
196
197    /// Gets a reference to the TTL policy configuration.
198    pub const fn ttl_policy(&self) -> Option<&TtlPolicy> {
199        self.ttl_policy.as_ref()
200    }
201
202    /// Gets a mutable reference to the TTL policy configuration.
203    pub const fn ttl_policy_mut(&mut self) -> Option<&mut TtlPolicy> {
204        self.ttl_policy.as_mut()
205    }
206
207    /// Perform periodic cleanup if enough time has passed since the last cleanup
208    fn maybe_cleanup(&mut self) {
209        // Return early if no cleanup is needed
210        if !self
211            .ttl_policy()
212            .is_some_and(|config| config.should_cleanup())
213        {
214            return;
215        }
216        self.cleanup_expired();
217        if let Some(config) = self.ttl_policy_mut() {
218            config.mark_cleanup_done();
219        }
220    }
221
222    /// Removes expired entries based on TTL if configured.
223    fn cleanup_expired(&mut self) {
224        let now = Instant::now();
225        if let Some(config) = &self.ttl_policy {
226            self.inner.retain(|_, entry| match entry.timestamp {
227                Some(ts) => now.duration_since(ts) < config.ttl,
228                None => true,
229            });
230        }
231    }
232
233    /// Returns the number of elements in the set.
234    pub fn len(&self) -> usize {
235        self.inner.len()
236    }
237
238    fn create_timestamp(&self) -> Option<Instant> {
239        match self.ttl_policy() {
240            Some(_) => Some(Instant::now()),
241            _ => None,
242        }
243    }
244
245    /// Returns true if the set contains no elements.
246    pub fn is_empty(&self) -> bool {
247        self.inner.is_empty()
248    }
249
250    /// Consumes this MetricSet and returns a vector of Metric.
251    pub fn into_metrics(mut self) -> Vec<Metric> {
252        // Always cleanup on final consumption
253        self.cleanup_expired();
254        self.inner
255            .into_iter()
256            .map(|(series, entry)| entry.into_metric(series))
257            .collect()
258    }
259
260    /// Either pass the metric through as-is if absolute, or convert it
261    /// to absolute if incremental.
262    pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
263        self.maybe_cleanup();
264        match metric.kind() {
265            MetricKind::Absolute => Some(metric),
266            MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
267        }
268    }
269
270    /// Either convert the metric to incremental if absolute, or
271    /// aggregate it with any previous value if already incremental.
272    pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
273        self.maybe_cleanup();
274        match metric.kind() {
275            MetricKind::Absolute => self.absolute_to_incremental(metric),
276            MetricKind::Incremental => Some(metric),
277        }
278    }
279
280    /// Convert the incremental metric into an absolute one, using the
281    /// state buffer to keep track of the value throughout the entire
282    /// application uptime.
283    fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric {
284        let timestamp = self.create_timestamp();
285        match self.inner.get_mut(metric.series()) {
286            Some(existing) => {
287                if existing.data.value.add(metric.value()) {
288                    metric = metric.with_value(existing.data.value.clone());
289                    existing.update_timestamp(timestamp);
290                } else {
291                    // Metric changed type, store this as the new reference value
292                    let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp);
293                    self.inner.insert(series, entry);
294                }
295            }
296            None => {
297                let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp);
298                self.inner.insert(series, entry);
299            }
300        }
301        metric.into_absolute()
302    }
303
304    /// Convert the absolute metric into an incremental by calculating
305    /// the increment from the last saved absolute state.
306    fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option<Metric> {
307        // NOTE: Crucially, like I did, you may wonder: why do we not always return a metric? Could
308        // this lead to issues where a metric isn't seen again and we, in effect, never emit it?
309        //
310        // You're not wrong, and that does happen based on the logic below.  However, the main
311        // problem this logic solves is avoiding massive counter updates when Vector restarts.
312        //
313        // If we emitted a metric for a newly-seen absolute metric in this method, we would
314        // naturally have to emit an incremental version where the value was the absolute value,
315        // with subsequent updates being only delta updates.  If we restarted Vector, however, we
316        // would be back to not having yet seen the metric before, so the first emission of the
317        // metric after converting it here would be... its absolute value.  Even if the value only
318        // changed by 1 between Vector stopping and restarting, we could be incrementing the counter
319        // by some outrageous amount.
320        //
321        // Thus, we only emit a metric when we've calculated an actual delta for it, which means
322        // that, yes, we're risking never seeing a metric if it's not re-emitted, and we're
323        // introducing a small amount of lag before a metric is emitted by having to wait to see it
324        // again, but this is a behavior we have to observe for sinks that can only handle
325        // incremental updates.
326        let timestamp = self.create_timestamp();
327        match self.inner.get_mut(metric.series()) {
328            Some(reference) => {
329                let new_value = metric.value().clone();
330                // From the stored reference value, emit an increment
331                if metric.subtract(&reference.data) {
332                    reference.data.value = new_value;
333                    reference.update_timestamp(timestamp);
334                    Some(metric.into_incremental())
335                } else {
336                    // Metric changed type, store this and emit nothing
337                    self.insert(metric, timestamp);
338                    None
339                }
340            }
341            None => {
342                // No reference so store this and emit nothing
343                self.insert(metric, timestamp);
344                None
345            }
346        }
347    }
348
349    fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
350        let (series, entry) = MetricEntry::from_metric(metric, timestamp);
351        self.inner.insert(series, entry);
352    }
353
354    pub fn insert_update(&mut self, metric: Metric) {
355        self.maybe_cleanup();
356        let timestamp = self.create_timestamp();
357        let update = match metric.kind() {
358            MetricKind::Absolute => Some(metric),
359            MetricKind::Incremental => {
360                // Incremental metrics update existing entries, if present
361                match self.inner.get_mut(metric.series()) {
362                    Some(existing) => {
363                        let (series, data, metadata) = metric.into_parts();
364                        if existing.data.update(&data) {
365                            existing.metadata.merge(metadata);
366                            existing.update_timestamp(timestamp);
367                            None
368                        } else {
369                            warn!(message = "Metric changed type, dropping old value.", %series);
370                            Some(Metric::from_parts(series, data, metadata))
371                        }
372                    }
373                    None => Some(metric),
374                }
375            }
376        };
377        if let Some(metric) = update {
378            self.insert(metric, timestamp);
379        }
380    }
381
382    /// Removes a series from the set.
383    ///
384    /// If the series existed and was removed, returns `true`.  Otherwise, `false`.
385    pub fn remove(&mut self, series: &MetricSeries) -> bool {
386        self.maybe_cleanup();
387        self.inner.shift_remove(series).is_some()
388    }
389}