1use std::{
2 marker::PhantomData,
3 time::{Duration, Instant},
4};
5
6use lru::LruCache;
7use serde_with::serde_as;
8use snafu::Snafu;
9use vector_config_macros::configurable_component;
10use vector_lib::{
11 ByteSizeOf,
12 event::{
13 EventMetadata, Metric, MetricKind,
14 metric::{MetricData, MetricSeries},
15 },
16};
17
18#[derive(Debug, Snafu, PartialEq, Eq)]
19pub enum NormalizerError {
20 #[snafu(display("`max_bytes` must be greater than zero"))]
21 InvalidMaxBytes,
22 #[snafu(display("`max_events` must be greater than zero"))]
23 InvalidMaxEvents,
24 #[snafu(display("`time_to_live` must be greater than zero"))]
25 InvalidTimeToLive,
26}
27
28#[serde_as]
30#[configurable_component]
31#[configurable(metadata(docs::advanced))]
32#[derive(Clone, Copy, Debug, Default)]
33pub struct NormalizerConfig<D: NormalizerSettings + Clone> {
34 #[serde(default = "default_max_bytes::<D>")]
36 #[configurable(metadata(docs::type_unit = "bytes"))]
37 pub max_bytes: Option<usize>,
38
39 #[serde(default = "default_max_events::<D>")]
41 #[configurable(metadata(docs::type_unit = "events"))]
42 pub max_events: Option<usize>,
43
44 #[serde(default = "default_time_to_live::<D>")]
46 #[configurable(metadata(docs::type_unit = "seconds"))]
47 #[configurable(metadata(docs::human_name = "Time To Live"))]
48 pub time_to_live: Option<u64>,
49
50 #[serde(skip)]
51 pub _d: PhantomData<D>,
52}
53
54const fn default_max_bytes<D: NormalizerSettings>() -> Option<usize> {
55 D::MAX_BYTES
56}
57
58const fn default_max_events<D: NormalizerSettings>() -> Option<usize> {
59 D::MAX_EVENTS
60}
61
62const fn default_time_to_live<D: NormalizerSettings>() -> Option<u64> {
63 D::TIME_TO_LIVE
64}
65
66impl<D: NormalizerSettings + Clone> NormalizerConfig<D> {
67 pub fn validate(&self) -> Result<NormalizerConfig<D>, NormalizerError> {
68 let config = NormalizerConfig::<D> {
69 max_bytes: self.max_bytes.or(D::MAX_BYTES),
70 max_events: self.max_events.or(D::MAX_EVENTS),
71 time_to_live: self.time_to_live.or(D::TIME_TO_LIVE),
72 _d: Default::default(),
73 };
74 match (config.max_bytes, config.max_events, config.time_to_live) {
75 (Some(0), _, _) => Err(NormalizerError::InvalidMaxBytes),
76 (_, Some(0), _) => Err(NormalizerError::InvalidMaxEvents),
77 (_, _, Some(0)) => Err(NormalizerError::InvalidTimeToLive),
78 _ => Ok(config),
79 }
80 }
81
82 pub const fn into_settings(self) -> MetricSetSettings {
83 MetricSetSettings {
84 max_bytes: self.max_bytes,
85 max_events: self.max_events,
86 time_to_live: self.time_to_live,
87 }
88 }
89}
90
91pub trait NormalizerSettings {
92 const MAX_EVENTS: Option<usize>;
93 const MAX_BYTES: Option<usize>;
94 const TIME_TO_LIVE: Option<u64>;
95}
96
97#[derive(Clone, Copy, Debug, Default)]
98pub struct DefaultNormalizerSettings;
99
100impl NormalizerSettings for DefaultNormalizerSettings {
101 const MAX_EVENTS: Option<usize> = None;
102 const MAX_BYTES: Option<usize> = None;
103 const TIME_TO_LIVE: Option<u64> = None;
104}
105
106pub trait MetricNormalize {
119 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric>;
134}
135
136pub struct MetricNormalizer<N> {
142 state: MetricSet,
143 normalizer: N,
144}
145
146impl<N> MetricNormalizer<N> {
147 pub fn with_config<D: NormalizerSettings + Clone>(
149 normalizer: N,
150 config: NormalizerConfig<D>,
151 ) -> Self {
152 let settings = config
153 .validate()
154 .unwrap_or_else(|e| panic!("Invalid cache settings: {e:?}"))
155 .into_settings();
156 Self {
157 state: MetricSet::new(settings),
158 normalizer,
159 }
160 }
161
162 pub const fn get_state_mut(&mut self) -> &mut MetricSet {
164 &mut self.state
165 }
166}
167
168impl<N: MetricNormalize> MetricNormalizer<N> {
169 pub fn normalize(&mut self, metric: Metric) -> Option<Metric> {
173 self.normalizer.normalize(&mut self.state, metric)
174 }
175}
176
177impl<N: Default> Default for MetricNormalizer<N> {
178 fn default() -> Self {
179 Self {
180 state: MetricSet::default(),
181 normalizer: N::default(),
182 }
183 }
184}
185
186impl<N> From<N> for MetricNormalizer<N> {
187 fn from(normalizer: N) -> Self {
188 Self {
189 state: MetricSet::default(),
190 normalizer,
191 }
192 }
193}
194
195#[derive(Clone, Debug)]
197pub struct MetricEntry {
198 pub data: MetricData,
200 pub metadata: EventMetadata,
202 pub timestamp: Option<Instant>,
204}
205
206impl ByteSizeOf for MetricEntry {
207 fn allocated_bytes(&self) -> usize {
208 self.data.allocated_bytes() + self.metadata.allocated_bytes()
209 }
210}
211
212impl MetricEntry {
213 pub const fn new(
215 data: MetricData,
216 metadata: EventMetadata,
217 timestamp: Option<Instant>,
218 ) -> Self {
219 Self {
220 data,
221 metadata,
222 timestamp,
223 }
224 }
225
226 pub fn from_metric(metric: Metric, timestamp: Option<Instant>) -> (MetricSeries, Self) {
228 let (series, data, metadata) = metric.into_parts();
229 let entry = Self::new(data, metadata, timestamp);
230 (series, entry)
231 }
232
233 pub fn into_metric(self, series: MetricSeries) -> Metric {
235 Metric::from_parts(series, self.data, self.metadata)
236 }
237
238 pub const fn update_timestamp(&mut self, timestamp: Option<Instant>) {
240 self.timestamp = timestamp;
241 }
242
243 pub fn is_expired(&self, ttl: Duration, reference_time: Instant) -> bool {
247 match self.timestamp {
248 Some(ts) => reference_time.duration_since(ts) >= ttl,
249 None => false,
250 }
251 }
252}
253
254#[derive(Clone, Debug)]
256pub struct CapacityPolicy {
257 pub max_bytes: Option<usize>,
259 pub max_events: Option<usize>,
261 current_memory: usize,
263}
264
265impl CapacityPolicy {
266 pub const fn new(max_bytes: Option<usize>, max_events: Option<usize>) -> Self {
268 Self {
269 max_bytes,
270 max_events,
271 current_memory: 0,
272 }
273 }
274
275 pub const fn current_memory(&self) -> usize {
277 self.current_memory
278 }
279
280 const fn remove_memory(&mut self, bytes: usize) {
282 self.current_memory = self.current_memory.saturating_sub(bytes);
283 }
284
285 pub fn free_item(&mut self, series: &MetricSeries, entry: &MetricEntry) {
288 if self.max_bytes.is_some() {
289 let freed_memory = self.item_size(series, entry);
290 self.remove_memory(freed_memory);
291 }
292 }
293
294 const fn replace_memory(&mut self, old_bytes: usize, new_bytes: usize) {
296 self.current_memory = self
297 .current_memory
298 .saturating_sub(old_bytes)
299 .saturating_add(new_bytes);
300 }
301
302 const fn exceeds_memory_limit(&self) -> bool {
304 if let Some(max_bytes) = self.max_bytes {
305 self.current_memory > max_bytes
306 } else {
307 false
308 }
309 }
310
311 const fn exceeds_entry_limit(&self, entry_count: usize) -> bool {
313 if let Some(max_events) = self.max_events {
314 entry_count > max_events
315 } else {
316 false
317 }
318 }
319
320 const fn needs_eviction(&self, entry_count: usize) -> bool {
322 self.exceeds_memory_limit() || self.exceeds_entry_limit(entry_count)
323 }
324
325 pub fn item_size(&self, series: &MetricSeries, entry: &MetricEntry) -> usize {
327 entry.allocated_bytes() + series.allocated_bytes()
328 }
329}
330
331#[derive(Clone, Debug)]
332pub struct TtlPolicy {
333 pub ttl: Duration,
335 pub cleanup_interval: Duration,
337 pub(crate) last_cleanup: Instant,
339}
340
341impl TtlPolicy {
343 pub fn new(ttl: Duration) -> Self {
346 Self {
347 ttl,
348 cleanup_interval: ttl.div_f32(10.0).max(Duration::from_secs(10)),
349 last_cleanup: Instant::now(),
350 }
351 }
352
353 pub fn should_cleanup(&self) -> Option<Instant> {
357 let now = Instant::now();
358 if now.duration_since(self.last_cleanup) >= self.cleanup_interval {
359 Some(now)
360 } else {
361 None
362 }
363 }
364
365 pub const fn mark_cleanup_done(&mut self, now: Instant) {
367 self.last_cleanup = now;
368 }
369}
370
371#[derive(Debug, Clone, Copy, Default)]
372pub struct MetricSetSettings {
373 pub max_bytes: Option<usize>,
374 pub max_events: Option<usize>,
375 pub time_to_live: Option<u64>,
376}
377
378#[derive(Clone, Debug)]
383pub struct MetricSet {
384 inner: LruCache<MetricSeries, MetricEntry>,
386 capacity_policy: Option<CapacityPolicy>,
388 ttl_policy: Option<TtlPolicy>,
390}
391
392impl MetricSet {
393 pub fn new(settings: MetricSetSettings) -> Self {
395 let capacity_policy = match (settings.max_bytes, settings.max_events) {
397 (None, None) => None,
398 (max_bytes, max_events) => Some(CapacityPolicy::new(max_bytes, max_events)),
399 };
400
401 let ttl_policy = settings
403 .time_to_live
404 .map(|ttl| TtlPolicy::new(Duration::from_secs(ttl)));
405
406 Self::with_policies(capacity_policy, ttl_policy)
407 }
408
409 pub fn with_policies(
411 capacity_policy: Option<CapacityPolicy>,
412 ttl_policy: Option<TtlPolicy>,
413 ) -> Self {
414 Self {
417 inner: LruCache::unbounded(),
418 capacity_policy,
419 ttl_policy,
420 }
421 }
422
423 pub const fn capacity_policy(&self) -> Option<&CapacityPolicy> {
425 self.capacity_policy.as_ref()
426 }
427
428 pub const fn ttl_policy(&self) -> Option<&TtlPolicy> {
430 self.ttl_policy.as_ref()
431 }
432
433 pub const fn ttl_policy_mut(&mut self) -> Option<&mut TtlPolicy> {
435 self.ttl_policy.as_mut()
436 }
437
438 pub fn len(&self) -> usize {
440 self.inner.len()
441 }
442
443 pub fn is_empty(&self) -> bool {
445 self.inner.is_empty()
446 }
447
448 pub fn weighted_size(&self) -> u64 {
450 self.capacity_policy
451 .as_ref()
452 .map_or(0, |cp| cp.current_memory() as u64)
453 }
454
455 fn create_timestamp(&self) -> Option<Instant> {
457 self.ttl_policy.as_ref().map(|_| Instant::now())
458 }
459
460 fn enforce_capacity_policy(&mut self) {
462 let Some(ref mut capacity_policy) = self.capacity_policy else {
463 return; };
465
466 while capacity_policy.needs_eviction(self.inner.len()) {
468 if let Some((series, entry)) = self.inner.pop_lru() {
469 capacity_policy.free_item(&series, &entry);
470 } else {
471 break; }
473 }
474 }
475
476 fn maybe_cleanup(&mut self) {
478 let now = match self.ttl_policy().and_then(|config| config.should_cleanup()) {
480 Some(timestamp) => timestamp,
481 None => return, };
483
484 self.cleanup_expired(now);
486
487 if let Some(config) = self.ttl_policy_mut() {
489 config.mark_cleanup_done(now);
490 }
491 }
492
493 fn cleanup_expired(&mut self, now: Instant) {
495 let Some(ttl) = self.ttl_policy().map(|policy| policy.ttl) else {
497 return; };
499
500 let mut expired_keys = Vec::new();
501
502 for (series, entry) in self.inner.iter() {
504 if entry.is_expired(ttl, now) {
505 expired_keys.push(series.clone());
506 }
507 }
508
509 for series in expired_keys {
511 if let Some(entry) = self.inner.pop(&series)
512 && let Some(ref mut capacity_policy) = self.capacity_policy
513 {
514 capacity_policy.free_item(&series, &entry);
515 }
516 }
517 }
518
519 fn insert_with_tracking(&mut self, series: MetricSeries, entry: MetricEntry) {
521 let Some(ref mut capacity_policy) = self.capacity_policy else {
522 self.inner.put(series, entry);
523 return; };
525
526 if capacity_policy.max_bytes.is_some() {
528 let entry_size = capacity_policy.item_size(&series, &entry);
530
531 if let Some(existing_entry) = self.inner.put(series.clone(), entry) {
532 let existing_size = capacity_policy.item_size(&series, &existing_entry);
534 capacity_policy.replace_memory(existing_size, entry_size);
535 } else {
536 capacity_policy.replace_memory(0, entry_size);
538 }
539 } else {
540 self.inner.put(series, entry);
542 }
543
544 self.enforce_capacity_policy();
546 }
547
548 pub fn into_metrics(mut self) -> Vec<Metric> {
550 self.cleanup_expired(Instant::now());
552 let mut metrics = Vec::new();
553 while let Some((series, entry)) = self.inner.pop_lru() {
554 metrics.push(entry.into_metric(series));
555 }
556 metrics
557 }
558
559 pub fn make_absolute(&mut self, metric: Metric) -> Option<Metric> {
562 self.maybe_cleanup();
563 match metric.kind() {
564 MetricKind::Absolute => Some(metric),
565 MetricKind::Incremental => Some(self.incremental_to_absolute(metric)),
566 }
567 }
568
569 pub fn make_incremental(&mut self, metric: Metric) -> Option<Metric> {
572 self.maybe_cleanup();
573 match metric.kind() {
574 MetricKind::Absolute => self.absolute_to_incremental(metric),
575 MetricKind::Incremental => Some(metric),
576 }
577 }
578
579 fn incremental_to_absolute(&mut self, mut metric: Metric) -> Metric {
583 let timestamp = self.create_timestamp();
584 match self.inner.get_mut(metric.series()) {
586 Some(existing) => {
587 let mut new_value = existing.data.value().clone();
588 if new_value.add(metric.value()) {
589 metric = metric.with_value(new_value);
591 }
592 self.insert(metric.clone(), timestamp);
594 }
595 None => {
596 self.insert(metric.clone(), timestamp);
597 }
598 }
599 metric.into_absolute()
600 }
601
602 fn absolute_to_incremental(&mut self, mut metric: Metric) -> Option<Metric> {
605 let timestamp = self.create_timestamp();
625 match self.inner.get_mut(metric.series()) {
627 Some(reference) => {
628 let new_value = metric.value().clone();
629 let mut new_reference = reference.clone();
632 if metric.subtract(&reference.data) {
634 new_reference.data.value = new_value;
635 new_reference.timestamp = timestamp;
636 self.insert_with_tracking(metric.series().clone(), new_reference);
637 Some(metric.into_incremental())
638 } else {
639 self.insert(metric, timestamp);
641 None
642 }
643 }
644 None => {
645 self.insert(metric, timestamp);
647 None
648 }
649 }
650 }
651
652 fn insert(&mut self, metric: Metric, timestamp: Option<Instant>) {
653 let (series, entry) = MetricEntry::from_metric(metric, timestamp);
654 self.insert_with_tracking(series, entry);
655 }
656
657 pub fn insert_update(&mut self, metric: Metric) {
658 self.maybe_cleanup();
659 let timestamp = self.create_timestamp();
660 let update = match metric.kind() {
661 MetricKind::Absolute => Some(metric),
662 MetricKind::Incremental => {
663 match self.inner.get_mut(metric.series()) {
665 Some(existing) => {
666 let mut new_existing = existing.clone();
669 let (series, data, metadata) = metric.into_parts();
670 if new_existing.data.update(&data) {
671 new_existing.metadata.merge(metadata);
672 new_existing.update_timestamp(timestamp);
673 self.insert_with_tracking(series, new_existing);
674 None
675 } else {
676 warn!(message = "Metric changed type, dropping old value.", %series);
677 Some(Metric::from_parts(series, data, metadata))
678 }
679 }
680 None => Some(metric),
681 }
682 }
683 };
684 if let Some(metric) = update {
685 self.insert(metric, timestamp);
686 }
687 }
688
689 pub fn remove(&mut self, series: &MetricSeries) -> bool {
693 if let Some(entry) = self.inner.pop(series) {
694 if let Some(ref mut capacity_policy) = self.capacity_policy {
695 capacity_policy.free_item(series, &entry);
696 }
697 return true;
698 }
699 false
700 }
701}
702
703impl Default for MetricSet {
704 fn default() -> Self {
705 Self::new(MetricSetSettings::default())
706 }
707}