vector/sinks/util/buffer/metrics/
normalize.rs

1use std::{
2    marker::PhantomData,
3    time::{Duration, Instant},
4};
5
6use lru::LruCache;
7use serde_with::serde_as;
8use snafu::Snafu;
9use vector_config_macros::configurable_component;
10use vector_lib::{
11    ByteSizeOf,
12    event::{
13        EventMetadata, Metric, MetricKind,
14        metric::{MetricData, MetricSeries},
15    },
16};
17
18#[derive(Debug, Snafu, PartialEq, Eq)]
19pub enum NormalizerError {
20    #[snafu(display("`max_bytes` must be greater than zero"))]
21    InvalidMaxBytes,
22    #[snafu(display("`max_events` must be greater than zero"))]
23    InvalidMaxEvents,
24    #[snafu(display("`time_to_live` must be greater than zero"))]
25    InvalidTimeToLive,
26}
27
28/// Defines behavior for creating the MetricNormalizer
29#[serde_as]
30#[configurable_component]
31#[configurable(metadata(docs::advanced))]
32#[derive(Clone, Copy, Debug, Default)]
33pub struct NormalizerConfig<D: NormalizerSettings + Clone> {
34    /// The maximum size in bytes of the events in the metrics normalizer cache, excluding cache overhead.
35    #[serde(default = "default_max_bytes::<D>")]
36    #[configurable(metadata(docs::type_unit = "bytes"))]
37    pub max_bytes: Option<usize>,
38
39    /// The maximum number of events of the metrics normalizer cache
40    #[serde(default = "default_max_events::<D>")]
41    #[configurable(metadata(docs::type_unit = "events"))]
42    pub max_events: Option<usize>,
43
44    /// The maximum age of a metric not being updated before it is evicted from the metrics normalizer cache.
45    #[serde(default = "default_time_to_live::<D>")]
46    #[configurable(metadata(docs::type_unit = "seconds"))]
47    #[configurable(metadata(docs::human_name = "Time To Live"))]
48    pub time_to_live: Option<u64>,
49
50    #[serde(skip)]
51    pub _d: PhantomData<D>,
52}
53
54const fn default_max_bytes<D: NormalizerSettings>() -> Option<usize> {
55    D::MAX_BYTES
56}
57
58const fn default_max_events<D: NormalizerSettings>() -> Option<usize> {
59    D::MAX_EVENTS
60}
61
62const fn default_time_to_live<D: NormalizerSettings>() -> Option<u64> {
63    D::TIME_TO_LIVE
64}
65
66impl<D: NormalizerSettings + Clone> NormalizerConfig<D> {
67    pub fn validate(&self) -> Result<NormalizerConfig<D>, NormalizerError> {
68        let config = NormalizerConfig::<D> {
69            max_bytes: self.max_bytes.or(D::MAX_BYTES),
70            max_events: self.max_events.or(D::MAX_EVENTS),
71            time_to_live: self.time_to_live.or(D::TIME_TO_LIVE),
72            _d: Default::default(),
73        };
74        match (config.max_bytes, config.max_events, config.time_to_live) {
75            (Some(0), _, _) => Err(NormalizerError::InvalidMaxBytes),
76            (_, Some(0), _) => Err(NormalizerError::InvalidMaxEvents),
77            (_, _, Some(0)) => Err(NormalizerError::InvalidTimeToLive),
78            _ => Ok(config),
79        }
80    }
81
82    pub const fn into_settings(self) -> MetricSetSettings {
83        MetricSetSettings {
84            max_bytes: self.max_bytes,
85            max_events: self.max_events,
86            time_to_live: self.time_to_live,
87        }
88    }
89}
90
91pub trait NormalizerSettings {
92    const MAX_EVENTS: Option<usize>;
93    const MAX_BYTES: Option<usize>;
94    const TIME_TO_LIVE: Option<u64>;
95}
96
97#[derive(Clone, Copy, Debug, Default)]
98pub struct DefaultNormalizerSettings;
99
100impl NormalizerSettings for DefaultNormalizerSettings {
101    const MAX_EVENTS: Option<usize> = None;
102    const MAX_BYTES: Option<usize> = None;
103    const TIME_TO_LIVE: Option<u64> = None;
104}
105
106/// Normalizes metrics according to a set of rules.
107///
108/// Depending on the system in which they are being sent to, metrics may have to be modified in order to fit the data
109/// model or constraints placed on that system.  Typically, this boils down to whether or not the system can accept
110/// absolute metrics or incremental metrics: the latest value of a metric, or the delta between the last time the
111/// metric was observed and now, respective. Other rules may need to be applied, such as dropping metrics of a specific
112/// type that the system does not support.
113///
114/// The trait provides a simple interface to apply this logic uniformly, given a reference to a simple state container
115/// that allows tracking the necessary information of a given metric over time. As well, given the optional return, it
116/// composes nicely with iterators (i.e. using `filter_map`) in order to filter metrics within existing
117/// iterator/stream-based approaches.
118pub trait MetricNormalize {
119    /// Normalizes the metric against the given state.
120    ///
121    /// If the metric was normalized successfully, `Some(metric)` will be returned. Otherwise, `None` is returned.
122    ///
123    /// In some cases, a metric may be successfully added/tracked within the given state, but due to the normalization
124    /// logic, it cannot yet be emitted. An example of this is normalizing all metrics to be incremental.
125    ///
126    /// In this example, if an incoming metric is already incremental, it can be passed through unchanged.  If the
127    /// incoming metric is absolute, however, we need to see it at least twice in order to calculate the incremental
128    /// delta necessary to emit an incremental version. This means that the first time an absolute metric is seen,
129    /// `normalize` would return `None`, and the subsequent calls would return `Some(metric)`.
130    ///
131    /// However, a metric may simply not be supported by a normalization implementation, and so `None` may or may not be
132    /// a common return value. This behavior is, thus, implementation defined.
133    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric>;
134}
135
136/// A self-contained metric normalizer.
137///
138/// The normalization state is stored internally, and it can only be created from a normalizer implementation that is
139/// either `Default` or is constructed ahead of time, so it is primarily useful for constructing a usable normalizer
140/// via implicit conversion methods or when no special parameters are required for configuring the underlying normalizer.
141pub struct MetricNormalizer<N> {
142    state: MetricSet,
143    normalizer: N,
144}
145
146impl<N> MetricNormalizer<N> {
147    /// Creates a new normalizer with the given configuration.
148    pub fn with_config<D: NormalizerSettings + Clone>(
149        normalizer: N,
150        config: NormalizerConfig<D>,
151    ) -> Self {
152        let settings = config
153            .validate()
154            .unwrap_or_else(|e| panic!("Invalid cache settings: {e:?}"))
155            .into_settings();
156        Self {
157            state: MetricSet::new(settings),
158            normalizer,
159        }
160    }
161
162    /// Gets a mutable reference to the current metric state for this normalizer.
163    pub const fn get_state_mut(&mut self) -> &mut MetricSet {
164        &mut self.state
165    }
166}
167
168impl<N: MetricNormalize> MetricNormalizer<N> {
169    /// Normalizes the metric against the internal normalization state.
170    ///
171    /// For more information about normalization, see the documentation for [`MetricNormalize::normalize`].
172    pub fn normalize(&mut self, metric: Metric) -> Option<Metric> {
173        self.normalizer.normalize(&mut self.state, metric)
174    }
175}
176
177impl<N: Default> Default for MetricNormalizer<N> {
178    fn default() -> Self {
179        Self {
180            state: MetricSet::default(),
181            normalizer: N::default(),
182        }
183    }
184}
185
186impl<N> From<N> for MetricNormalizer<N> {
187    fn from(normalizer: N) -> Self {
188        Self {
189            state: MetricSet::default(),
190            normalizer,
191        }
192    }
193}
194
195/// Represents a stored metric entry with its data, metadata, and timestamp.
196#[derive(Clone, Debug)]
197pub struct MetricEntry {
198    /// The metric data containing the value and kind
199    pub data: MetricData,
200    /// Event metadata associated with this metric
201    pub metadata: EventMetadata,
202    /// Optional timestamp for TTL tracking
203    pub timestamp: Option<Instant>,
204}
205
206impl ByteSizeOf for MetricEntry {
207    fn allocated_bytes(&self) -> usize {
208        self.data.allocated_bytes() + self.metadata.allocated_bytes()
209    }
210}
211
212impl MetricEntry {
213    /// Creates a new MetricEntry with the given data, metadata, and timestamp.
214    pub const fn new(
215        data: MetricData,
216        metadata: EventMetadata,
217        timestamp: Option<Instant>,
218    ) -> Self {
219        Self {
220            data,
221            metadata,
222            timestamp,
223        }
224    }
225
226    /// Creates a new MetricEntry from a Metric.
227    pub fn from_metric(metric: Metric, timestamp: Option<Instant>) -> (MetricSeries, Self) {
228        let (series, data, metadata) = metric.into_parts();
229        let entry = Self::new(data, metadata, timestamp);
230        (series, entry)
231    }
232
233    /// Converts this entry back to a Metric with the given series.
234    pub fn into_metric(self, series: MetricSeries) -> Metric {
235        Metric::from_parts(series, self.data, self.metadata)
236    }
237
238    /// Updates this entry's timestamp.
239    pub const fn update_timestamp(&mut self, timestamp: Option<Instant>) {
240        self.timestamp = timestamp;
241    }
242
243    /// Checks if this entry has expired based on the given TTL and reference time.
244    ///
245    /// Using a provided reference time ensures consistency across multiple expiration checks.
246    pub fn is_expired(&self, ttl: Duration, reference_time: Instant) -> bool {
247        match self.timestamp {
248            Some(ts) => reference_time.duration_since(ts) >= ttl,
249            None => false,
250        }
251    }
252}
253
254/// Configuration for capacity-based eviction (memory and/or entry count limits).
255#[derive(Clone, Debug)]
256pub struct CapacityPolicy {
257    /// Maximum memory usage in bytes
258    pub max_bytes: Option<usize>,
259    /// Maximum number of entries
260    pub max_events: Option<usize>,
261    /// Current memory usage tracking
262    current_memory: usize,
263}
264
265impl CapacityPolicy {
266    /// Creates a new capacity policy with both memory and entry limits.
267    pub const fn new(max_bytes: Option<usize>, max_events: Option<usize>) -> Self {
268        Self {
269            max_bytes,
270            max_events,
271            current_memory: 0,
272        }
273    }
274
275    /// Gets the current memory usage.
276    pub const fn current_memory(&self) -> usize {
277        self.current_memory
278    }
279
280    /// Updates memory tracking when an entry is removed.
281    const fn remove_memory(&mut self, bytes: usize) {
282        self.current_memory = self.current_memory.saturating_sub(bytes);
283    }
284
285    /// Frees the memory for an item if max_bytes is set.
286    /// Only calculates and tracks memory when max_bytes is specified.
287    pub fn free_item(&mut self, series: &MetricSeries, entry: &MetricEntry) {
288        if self.max_bytes.is_some() {
289            let freed_memory = self.item_size(series, entry);
290            self.remove_memory(freed_memory);
291        }
292    }
293
294    /// Updates memory tracking.
295    const fn replace_memory(&mut self, old_bytes: usize, new_bytes: usize) {
296        self.current_memory = self
297            .current_memory
298            .saturating_sub(old_bytes)
299            .saturating_add(new_bytes);
300    }
301
302    /// Checks if the current state exceeds memory limits.
303    const fn exceeds_memory_limit(&self) -> bool {
304        if let Some(max_bytes) = self.max_bytes {
305            self.current_memory > max_bytes
306        } else {
307            false
308        }
309    }
310
311    /// Checks if the given entry count exceeds entry limits.
312    const fn exceeds_entry_limit(&self, entry_count: usize) -> bool {
313        if let Some(max_events) = self.max_events {
314            entry_count > max_events
315        } else {
316            false
317        }
318    }
319
320    /// Returns true if any limits are currently exceeded.
321    const fn needs_eviction(&self, entry_count: usize) -> bool {
322        self.exceeds_memory_limit() || self.exceeds_entry_limit(entry_count)
323    }
324
325    /// Gets the total memory size of entry/series, excluding LRU cache overhead.
326    pub fn item_size(&self, series: &MetricSeries, entry: &MetricEntry) -> usize {
327        entry.allocated_bytes() + series.allocated_bytes()
328    }
329}
330
331#[derive(Clone, Debug)]
332pub struct TtlPolicy {
333    /// Time-to-live for entries
334    pub ttl: Duration,
335    /// How often to run cleanup
336    pub cleanup_interval: Duration,
337    /// Last time cleanup was performed
338    pub(crate) last_cleanup: Instant,
339}
340
341/// Configuration for automatic cleanup of expired entries.
342impl TtlPolicy {
343    /// Creates a new TTL policy with the given duration.
344    /// Cleanup interval defaults to TTL/10 with a 10-second minimum.
345    pub fn new(ttl: Duration) -> Self {
346        Self {
347            ttl,
348            cleanup_interval: ttl.div_f32(10.0).max(Duration::from_secs(10)),
349            last_cleanup: Instant::now(),
350        }
351    }
352
353    /// Checks if it's time to run cleanup.
354    ///
355    /// Returns Some(current_time) if cleanup should be performed, None otherwise.
356    pub fn should_cleanup(&self) -> Option<Instant> {
357        let now = Instant::now();
358        if now.duration_since(self.last_cleanup) >= self.cleanup_interval {
359            Some(now)
360        } else {
361            None
362        }
363    }
364
365    /// Marks cleanup as having been performed with the provided timestamp.
366    pub const fn mark_cleanup_done(&mut self, now: Instant) {
367        self.last_cleanup = now;
368    }
369}
370
371#[derive(Debug, Clone, Copy, Default)]
372pub struct MetricSetSettings {
373    pub max_bytes: Option<usize>,
374    pub max_events: Option<usize>,
375    pub time_to_live: Option<u64>,
376}
377
378/// Dual-limit cache using standard LRU with optional capacity and TTL policies.
379///
380/// This implementation uses the standard LRU crate with optional enforcement of both
381/// memory and entry count limits via CapacityPolicy, plus optional TTL via TtlPolicy.
382#[derive(Clone, Debug)]
383pub struct MetricSet {
384    /// LRU cache for storing metric entries
385    inner: LruCache<MetricSeries, MetricEntry>,
386    /// Optional capacity policy for memory and/or entry count limits
387    capacity_policy: Option<CapacityPolicy>,
388    /// Optional TTL policy for time-based expiration
389    ttl_policy: Option<TtlPolicy>,
390}
391
392impl MetricSet {
393    /// Creates a new MetricSet with the given settings.
394    pub fn new(settings: MetricSetSettings) -> Self {
395        // Create capacity policy if any capacity limit is set
396        let capacity_policy = match (settings.max_bytes, settings.max_events) {
397            (None, None) => None,
398            (max_bytes, max_events) => Some(CapacityPolicy::new(max_bytes, max_events)),
399        };
400
401        // Create TTL policy if time-to-live is set
402        let ttl_policy = settings
403            .time_to_live
404            .map(|ttl| TtlPolicy::new(Duration::from_secs(ttl)));
405
406        Self::with_policies(capacity_policy, ttl_policy)
407    }
408
409    /// Creates a new MetricSet with the given policies.
410    pub fn with_policies(
411        capacity_policy: Option<CapacityPolicy>,
412        ttl_policy: Option<TtlPolicy>,
413    ) -> Self {
414        // Always use an unbounded cache since we manually track limits
415        // This ensures our capacity policy can properly track memory for all evicted entries
416        Self {
417            inner: LruCache::unbounded(),
418            capacity_policy,
419            ttl_policy,
420        }
421    }
422
423    /// Gets the current capacity policy.
424    pub const fn capacity_policy(&self) -> Option<&CapacityPolicy> {
425        self.capacity_policy.as_ref()
426    }
427
428    /// Gets the current TTL policy.
429    pub const fn ttl_policy(&self) -> Option<&TtlPolicy> {
430        self.ttl_policy.as_ref()
431    }
432
433    /// Gets a mutable reference to the TTL policy configuration.
434    pub const fn ttl_policy_mut(&mut self) -> Option<&mut TtlPolicy> {
435        self.ttl_policy.as_mut()
436    }
437
438    /// Gets the current number of entries in the cache.
439    pub fn len(&self) -> usize {
440        self.inner.len()
441    }
442
443    /// Returns true if the cache contains no entries.
444    pub fn is_empty(&self) -> bool {
445        self.inner.is_empty()
446    }
447
448    /// Gets the current memory usage in bytes.
449    pub fn weighted_size(&self) -> u64 {
450        self.capacity_policy
451            .as_ref()
452            .map_or(0, |cp| cp.current_memory() as u64)
453    }
454
455    /// Creates a timestamp if TTL is enabled.
456    fn create_timestamp(&self) -> Option<Instant> {
457        self.ttl_policy.as_ref().map(|_| Instant::now())
458    }
459
460    /// Enforce memory and entry limits by evicting LRU entries.
461    fn enforce_capacity_policy(&mut self) {
462        let Some(ref mut capacity_policy) = self.capacity_policy else {
463            return; // No capacity limits configured
464        };
465
466        // Keep evicting until we're within limits
467        while capacity_policy.needs_eviction(self.inner.len()) {
468            if let Some((series, entry)) = self.inner.pop_lru() {
469                capacity_policy.free_item(&series, &entry);
470            } else {
471                break; // No more entries to evict
472            }
473        }
474    }
475
476    /// Perform TTL cleanup if configured and needed.
477    fn maybe_cleanup(&mut self) {
478        // Check if cleanup is needed and get the current timestamp in one operation
479        let now = match self.ttl_policy().and_then(|config| config.should_cleanup()) {
480            Some(timestamp) => timestamp,
481            None => return, // No cleanup needed
482        };
483
484        // Perform the cleanup using the same timestamp
485        self.cleanup_expired(now);
486
487        // Mark cleanup as done with the same timestamp
488        if let Some(config) = self.ttl_policy_mut() {
489            config.mark_cleanup_done(now);
490        }
491    }
492
493    /// Remove expired entries based on TTL using the provided timestamp.
494    fn cleanup_expired(&mut self, now: Instant) {
495        // Get the TTL from the policy
496        let Some(ttl) = self.ttl_policy().map(|policy| policy.ttl) else {
497            return; // No TTL policy, nothing to do
498        };
499
500        let mut expired_keys = Vec::new();
501
502        // Collect expired keys using the provided timestamp
503        for (series, entry) in self.inner.iter() {
504            if entry.is_expired(ttl, now) {
505                expired_keys.push(series.clone());
506            }
507        }
508
509        // Remove expired entries and update memory tracking (if max_bytes is set)
510        for series in expired_keys {
511            if let Some(entry) = self.inner.pop(&series)
512                && let Some(ref mut capacity_policy) = self.capacity_policy
513            {
514                capacity_policy.free_item(&series, &entry);
515            }
516        }
517    }
518
519    /// Internal insert that updates memory tracking and enforces limits.
520    fn insert_with_tracking(&mut self, series: MetricSeries, entry: MetricEntry) {
521        let Some(ref mut capacity_policy) = self.capacity_policy else {
522            self.inner.put(series, entry);
523            return; // No capacity limits configured, return immediately
524        };
525
526        // Handle differently based on whether we need to track memory
527        if capacity_policy.max_bytes.is_some() {
528            // When tracking memory, we need to calculate sizes before and after
529            let entry_size = capacity_policy.item_size(&series, &entry);
530
531            if let Some(existing_entry) = self.inner.put(series.clone(), entry) {
532                // If we had an existing entry, calculate its size and adjust memory tracking
533                let existing_size = capacity_policy.item_size(&series, &existing_entry);
534                capacity_policy.replace_memory(existing_size, entry_size);
535            } else {
536                // No existing entry, just add the new entry's size
537                capacity_policy.replace_memory(0, entry_size);
538            }
539        } else {
540            // When not tracking memory (only entry count limits), just put directly
541            self.inner.put(series, entry);
542        }
543
544        // Enforce limits after insertion
545        self.enforce_capacity_policy();
546    }
547
548    /// Consumes this MetricSet and returns a vector of Metric.
549    pub fn into_metrics(mut self) -> Vec<Metric> {
550        // Clean up expired entries first (using current time)
551        self.cleanup_expired(Instant::now());
552        let mut metrics = Vec::new();
553        while let Some((series, entry)) = self.inner.pop_lru() {
554            metrics.push(entry.into_metric(series));
555        }
556        metrics
557    }
558
559    /// Either pass the metric through as-is if absolute, or convert it
560    /// to absolute if incremental.
561    pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
562        self.maybe_cleanup();
563        match metric.kind() {
564            MetricKind::Absolute => Some(metric),
565            MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
566        }
567    }
568
569    /// Either convert the metric to incremental if absolute, or
570    /// aggregate it with any previous value if already incremental.
571    pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
572        self.maybe_cleanup();
573        match metric.kind() {
574            MetricKind::Absolute => self.absolute_to_incremental(metric),
575            MetricKind::Incremental => Some(metric),
576        }
577    }
578
579    /// Convert the incremental metric into an absolute one, using the
580    /// state buffer to keep track of the value throughout the entire
581    /// application uptime.
582    fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric {
583        let timestamp = self.create_timestamp();
584        // We always call insert() to track memory usage
585        match self.inner.get_mut(metric.series()) {
586            Some(existing) => {
587                let mut new_value = existing.data.value().clone();
588                if new_value.add(metric.value()) {
589                    // Update the stored value
590                    metric = metric.with_value(new_value);
591                }
592                // Insert the updated stored value, or as store a new reference value (if the Metric changed type)
593                self.insert(metric.clone(), timestamp);
594            }
595            None => {
596                self.insert(metric.clone(), timestamp);
597            }
598        }
599        metric.into_absolute()
600    }
601
602    /// Convert the absolute metric into an incremental by calculating
603    /// the increment from the last saved absolute state.
604    fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option<Metric> {
605        // NOTE: Crucially, like I did, you may wonder: why do we not always return a metric? Could
606        // this lead to issues where a metric isn't seen again and we, in effect, never emit it?
607        //
608        // You're not wrong, and that does happen based on the logic below.  However, the main
609        // problem this logic solves is avoiding massive counter updates when Vector restarts.
610        //
611        // If we emitted a metric for a newly-seen absolute metric in this method, we would
612        // naturally have to emit an incremental version where the value was the absolute value,
613        // with subsequent updates being only delta updates.  If we restarted Vector, however, we
614        // would be back to not having yet seen the metric before, so the first emission of the
615        // metric after converting it here would be... its absolute value.  Even if the value only
616        // changed by 1 between Vector stopping and restarting, we could be incrementing the counter
617        // by some outrageous amount.
618        //
619        // Thus, we only emit a metric when we've calculated an actual delta for it, which means
620        // that, yes, we're risking never seeing a metric if it's not re-emitted, and we're
621        // introducing a small amount of lag before a metric is emitted by having to wait to see it
622        // again, but this is a behavior we have to observe for sinks that can only handle
623        // incremental updates.
624        let timestamp = self.create_timestamp();
625        // We always call insert() to track memory usage
626        match self.inner.get_mut(metric.series()) {
627            Some(reference) => {
628                let new_value = metric.value().clone();
629                // Create a copy of the reference so we can insert and
630                // replace the existing entry, tracking memory usage
631                let mut new_reference = reference.clone();
632                // From the stored reference value, emit an increment
633                if metric.subtract(&reference.data) {
634                    new_reference.data.value = new_value;
635                    new_reference.timestamp = timestamp;
636                    self.insert_with_tracking(metric.series().clone(), new_reference);
637                    Some(metric.into_incremental())
638                } else {
639                    // Metric changed type, store this and emit nothing
640                    self.insert(metric, timestamp);
641                    None
642                }
643            }
644            None => {
645                // No reference so store this and emit nothing
646                self.insert(metric, timestamp);
647                None
648            }
649        }
650    }
651
652    fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
653        let (series, entry) = MetricEntry::from_metric(metric, timestamp);
654        self.insert_with_tracking(series, entry);
655    }
656
657    pub fn insert_update(&mut self, metric: Metric) {
658        self.maybe_cleanup();
659        let timestamp = self.create_timestamp();
660        let update = match metric.kind() {
661            MetricKind::Absolute => Some(metric),
662            MetricKind::Incremental => {
663                // Incremental metrics update existing entries, if present
664                match self.inner.get_mut(metric.series()) {
665                    Some(existing) => {
666                        // Create a copy of the reference so we can insert and
667                        // replace the existing entry, tracking memory usage
668                        let mut new_existing = existing.clone();
669                        let (series, data, metadata) = metric.into_parts();
670                        if new_existing.data.update(&data) {
671                            new_existing.metadata.merge(metadata);
672                            new_existing.update_timestamp(timestamp);
673                            self.insert_with_tracking(series, new_existing);
674                            None
675                        } else {
676                            warn!(message = "Metric changed type, dropping old value.", %series);
677                            Some(Metric::from_parts(series, data, metadata))
678                        }
679                    }
680                    None => Some(metric),
681                }
682            }
683        };
684        if let Some(metric) = update {
685            self.insert(metric, timestamp);
686        }
687    }
688
689    /// Removes a series from the cache.
690    ///
691    /// If the series existed and was removed, returns true.  Otherwise, false.
692    pub fn remove(&mut self, series: &MetricSeries) -> bool {
693        if let Some(entry) = self.inner.pop(series) {
694            if let Some(ref mut capacity_policy) = self.capacity_policy {
695                capacity_policy.free_item(series, &entry);
696            }
697            return true;
698        }
699        false
700    }
701}
702
703impl Default for MetricSet {
704    fn default() -> Self {
705        Self::new(MetricSetSettings::default())
706    }
707}