vector/sinks/util/buffer/metrics/normalize.rs
1use indexmap::IndexMap;
2
3use std::time::{Duration, Instant};
4
5use vector_lib::event::{
6 metric::{MetricData, MetricSeries},
7 EventMetadata, Metric, MetricKind,
8};
9
10/// Normalizes metrics according to a set of rules.
11///
12/// Depending on the system in which they are being sent to, metrics may have to be modified in order to fit the data
13/// model or constraints placed on that system. Typically, this boils down to whether or not the system can accept
14/// absolute metrics or incremental metrics: the latest value of a metric, or the delta between the last time the
15/// metric was observed and now, respective. Other rules may need to be applied, such as dropping metrics of a specific
16/// type that the system does not support.
17///
18/// The trait provides a simple interface to apply this logic uniformly, given a reference to a simple state container
19/// that allows tracking the necessary information of a given metric over time. As well, given the optional return, it
20/// composes nicely with iterators (i.e. using `filter_map`) in order to filter metrics within existing
21/// iterator/stream-based approaches.
22pub trait MetricNormalize {
23 /// Normalizes the metric against the given state.
24 ///
25 /// If the metric was normalized successfully, `Some(metric)` will be returned. Otherwise, `None` is returned.
26 ///
27 /// In some cases, a metric may be successfully added/tracked within the given state, but due to the normalization
28 /// logic, it cannot yet be emitted. An example of this is normalizing all metrics to be incremental.
29 ///
30 /// In this example, if an incoming metric is already incremental, it can be passed through unchanged. If the
31 /// incoming metric is absolute, however, we need to see it at least twice in order to calculate the incremental
32 /// delta necessary to emit an incremental version. This means that the first time an absolute metric is seen,
33 /// `normalize` would return `None`, and the subsequent calls would return `Some(metric)`.
34 ///
35 /// However, a metric may simply not be supported by a normalization implementation, and so `None` may or may not be
36 /// a common return value. This behavior is, thus, implementation defined.
37 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric>;
38}
39
40/// A self-contained metric normalizer.
41///
42/// The normalization state is stored internally, and it can only be created from a normalizer implementation that is
43/// either `Default` or is constructed ahead of time, so it is primarily useful for constructing a usable normalizer
44/// via implicit conversion methods or when no special parameters are required for configuring the underlying normalizer.
45pub struct MetricNormalizer<N> {
46 state: MetricSet,
47 normalizer: N,
48}
49
50impl<N> MetricNormalizer<N> {
51 /// Creates a new normalizer with TTL policy.
52 pub fn with_ttl(normalizer: N, ttl: TtlPolicy) -> Self {
53 Self {
54 state: MetricSet::with_ttl_policy(ttl),
55 normalizer,
56 }
57 }
58
59 /// Gets a mutable reference to the current metric state for this normalizer.
60 pub const fn get_state_mut(&mut self) -> &mut MetricSet {
61 &mut self.state
62 }
63}
64
65impl<N: MetricNormalize> MetricNormalizer<N> {
66 /// Normalizes the metric against the internal normalization state.
67 ///
68 /// For more information about normalization, see the documentation for [`MetricNormalize::normalize`].
69 pub fn normalize(&mut self, metric: Metric) -> Option<Metric> {
70 self.normalizer.normalize(&mut self.state, metric)
71 }
72}
73
74impl<N: Default> Default for MetricNormalizer<N> {
75 fn default() -> Self {
76 Self {
77 state: MetricSet::default(),
78 normalizer: N::default(),
79 }
80 }
81}
82
83impl<N> From<N> for MetricNormalizer<N> {
84 fn from(normalizer: N) -> Self {
85 Self {
86 state: MetricSet::default(),
87 normalizer,
88 }
89 }
90}
91
92/// Represents a stored metric entry with its data, metadata, and optional timestamp.
93#[derive(Clone, Debug)]
94pub struct MetricEntry {
95 /// The metric data containing the value and kind
96 pub data: MetricData,
97 /// Event metadata associated with this metric
98 pub metadata: EventMetadata,
99 /// Optional timestamp for TTL tracking
100 pub timestamp: Option<Instant>,
101}
102
103impl MetricEntry {
104 /// Creates a new MetricEntry with the given data, metadata, and timestamp.
105 pub const fn new(
106 data: MetricData,
107 metadata: EventMetadata,
108 timestamp: Option<Instant>,
109 ) -> Self {
110 Self {
111 data,
112 metadata,
113 timestamp,
114 }
115 }
116
117 /// Creates a new MetricEntry from a Metric and optional timestamp.
118 pub fn from_metric(metric: Metric, timestamp: Option<Instant>) -> (MetricSeries, Self) {
119 let (series, data, metadata) = metric.into_parts();
120 let entry = Self::new(data, metadata, timestamp);
121 (series, entry)
122 }
123
124 /// Converts this entry back to a Metric with the given series.
125 pub fn into_metric(self, series: MetricSeries) -> Metric {
126 Metric::from_parts(series, self.data, self.metadata)
127 }
128
129 /// Updates this entry's timestamp.
130 pub const fn update_timestamp(&mut self, timestamp: Option<Instant>) {
131 self.timestamp = timestamp;
132 }
133}
134
135/// Configuration for automatic cleanup of expired entries.
136#[derive(Clone, Debug)]
137pub struct TtlPolicy {
138 /// Time-to-live for entries
139 pub ttl: Duration,
140 /// How often to run cleanup
141 pub cleanup_interval: Duration,
142 /// Last time cleanup was performed
143 pub(crate) last_cleanup: Instant,
144}
145
146impl TtlPolicy {
147 /// Creates a new cleanup configuration with TTL.
148 /// Cleanup interval defaults to TTL/10 with a 10-second minimum.
149 pub fn new(ttl: Duration) -> Self {
150 Self {
151 ttl,
152 cleanup_interval: ttl.div_f32(10.0).max(Duration::from_secs(10)),
153 last_cleanup: Instant::now(),
154 }
155 }
156
157 /// Checks if it's time to run cleanup.
158 pub fn should_cleanup(&self) -> bool {
159 Instant::now().duration_since(self.last_cleanup) >= self.cleanup_interval
160 }
161
162 /// Marks cleanup as having been performed.
163 pub fn mark_cleanup_done(&mut self) {
164 self.last_cleanup = Instant::now();
165 }
166}
167
168/// Metric storage for use with normalization.
169///
170/// This is primarily a wrapper around [`IndexMap`] (to ensure insertion order
171/// is maintained) with convenience methods to make it easier to perform
172/// normalization-specific operations. It also includes an optional TTL policy
173/// to automatically expire old entries.
174#[derive(Clone, Debug, Default)]
175pub struct MetricSet {
176 inner: IndexMap<MetricSeries, MetricEntry>,
177 ttl_policy: Option<TtlPolicy>,
178}
179
180impl MetricSet {
181 /// Creates an empty MetricSet with the specified capacity.
182 pub fn with_capacity(capacity: usize) -> Self {
183 Self {
184 inner: IndexMap::with_capacity(capacity),
185 ttl_policy: None,
186 }
187 }
188
189 /// Creates a MetricSet with custom cleanup configuration.
190 pub fn with_ttl_policy(ttl_policy: TtlPolicy) -> Self {
191 Self {
192 inner: IndexMap::default(),
193 ttl_policy: Some(ttl_policy),
194 }
195 }
196
197 /// Gets a reference to the TTL policy configuration.
198 pub const fn ttl_policy(&self) -> Option<&TtlPolicy> {
199 self.ttl_policy.as_ref()
200 }
201
202 /// Gets a mutable reference to the TTL policy configuration.
203 pub const fn ttl_policy_mut(&mut self) -> Option<&mut TtlPolicy> {
204 self.ttl_policy.as_mut()
205 }
206
207 /// Perform periodic cleanup if enough time has passed since the last cleanup
208 fn maybe_cleanup(&mut self) {
209 // Return early if no cleanup is needed
210 if !self
211 .ttl_policy()
212 .is_some_and(|config| config.should_cleanup())
213 {
214 return;
215 }
216 self.cleanup_expired();
217 if let Some(config) = self.ttl_policy_mut() {
218 config.mark_cleanup_done();
219 }
220 }
221
222 /// Removes expired entries based on TTL if configured.
223 fn cleanup_expired(&mut self) {
224 let now = Instant::now();
225 if let Some(config) = &self.ttl_policy {
226 self.inner.retain(|_, entry| match entry.timestamp {
227 Some(ts) => now.duration_since(ts) < config.ttl,
228 None => true,
229 });
230 }
231 }
232
233 /// Returns the number of elements in the set.
234 pub fn len(&self) -> usize {
235 self.inner.len()
236 }
237
238 fn create_timestamp(&self) -> Option<Instant> {
239 match self.ttl_policy() {
240 Some(_) => Some(Instant::now()),
241 _ => None,
242 }
243 }
244
245 /// Returns true if the set contains no elements.
246 pub fn is_empty(&self) -> bool {
247 self.inner.is_empty()
248 }
249
250 /// Consumes this MetricSet and returns a vector of Metric.
251 pub fn into_metrics(mut self) -> Vec<Metric> {
252 // Always cleanup on final consumption
253 self.cleanup_expired();
254 self.inner
255 .into_iter()
256 .map(|(series, entry)| entry.into_metric(series))
257 .collect()
258 }
259
260 /// Either pass the metric through as-is if absolute, or convert it
261 /// to absolute if incremental.
262 pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
263 self.maybe_cleanup();
264 match metric.kind() {
265 MetricKind::Absolute => Some(metric),
266 MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
267 }
268 }
269
270 /// Either convert the metric to incremental if absolute, or
271 /// aggregate it with any previous value if already incremental.
272 pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
273 self.maybe_cleanup();
274 match metric.kind() {
275 MetricKind::Absolute => self.absolute_to_incremental(metric),
276 MetricKind::Incremental => Some(metric),
277 }
278 }
279
280 /// Convert the incremental metric into an absolute one, using the
281 /// state buffer to keep track of the value throughout the entire
282 /// application uptime.
283 fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric {
284 let timestamp = self.create_timestamp();
285 match self.inner.get_mut(metric.series()) {
286 Some(existing) => {
287 if existing.data.value.add(metric.value()) {
288 metric = metric.with_value(existing.data.value.clone());
289 existing.update_timestamp(timestamp);
290 } else {
291 // Metric changed type, store this as the new reference value
292 let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp);
293 self.inner.insert(series, entry);
294 }
295 }
296 None => {
297 let (series, entry) = MetricEntry::from_metric(metric.clone(), timestamp);
298 self.inner.insert(series, entry);
299 }
300 }
301 metric.into_absolute()
302 }
303
304 /// Convert the absolute metric into an incremental by calculating
305 /// the increment from the last saved absolute state.
306 fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option<Metric> {
307 // NOTE: Crucially, like I did, you may wonder: why do we not always return a metric? Could
308 // this lead to issues where a metric isn't seen again and we, in effect, never emit it?
309 //
310 // You're not wrong, and that does happen based on the logic below. However, the main
311 // problem this logic solves is avoiding massive counter updates when Vector restarts.
312 //
313 // If we emitted a metric for a newly-seen absolute metric in this method, we would
314 // naturally have to emit an incremental version where the value was the absolute value,
315 // with subsequent updates being only delta updates. If we restarted Vector, however, we
316 // would be back to not having yet seen the metric before, so the first emission of the
317 // metric after converting it here would be... its absolute value. Even if the value only
318 // changed by 1 between Vector stopping and restarting, we could be incrementing the counter
319 // by some outrageous amount.
320 //
321 // Thus, we only emit a metric when we've calculated an actual delta for it, which means
322 // that, yes, we're risking never seeing a metric if it's not re-emitted, and we're
323 // introducing a small amount of lag before a metric is emitted by having to wait to see it
324 // again, but this is a behavior we have to observe for sinks that can only handle
325 // incremental updates.
326 let timestamp = self.create_timestamp();
327 match self.inner.get_mut(metric.series()) {
328 Some(reference) => {
329 let new_value = metric.value().clone();
330 // From the stored reference value, emit an increment
331 if metric.subtract(&reference.data) {
332 reference.data.value = new_value;
333 reference.update_timestamp(timestamp);
334 Some(metric.into_incremental())
335 } else {
336 // Metric changed type, store this and emit nothing
337 self.insert(metric, timestamp);
338 None
339 }
340 }
341 None => {
342 // No reference so store this and emit nothing
343 self.insert(metric, timestamp);
344 None
345 }
346 }
347 }
348
349 fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
350 let (series, entry) = MetricEntry::from_metric(metric, timestamp);
351 self.inner.insert(series, entry);
352 }
353
354 pub fn insert_update(&mut self, metric: Metric) {
355 self.maybe_cleanup();
356 let timestamp = self.create_timestamp();
357 let update = match metric.kind() {
358 MetricKind::Absolute => Some(metric),
359 MetricKind::Incremental => {
360 // Incremental metrics update existing entries, if present
361 match self.inner.get_mut(metric.series()) {
362 Some(existing) => {
363 let (series, data, metadata) = metric.into_parts();
364 if existing.data.update(&data) {
365 existing.metadata.merge(metadata);
366 existing.update_timestamp(timestamp);
367 None
368 } else {
369 warn!(message = "Metric changed type, dropping old value.", %series);
370 Some(Metric::from_parts(series, data, metadata))
371 }
372 }
373 None => Some(metric),
374 }
375 }
376 };
377 if let Some(metric) = update {
378 self.insert(metric, timestamp);
379 }
380 }
381
382 /// Removes a series from the set.
383 ///
384 /// If the series existed and was removed, returns `true`. Otherwise, `false`.
385 pub fn remove(&mut self, series: &MetricSeries) -> bool {
386 self.maybe_cleanup();
387 self.inner.shift_remove(series).is_some()
388 }
389}