vector_core/metrics/
recorder.rs1use std::{
2 cell::OnceCell,
3 sync::{Arc, RwLock, atomic::Ordering},
4 time::Duration,
5};
6
7use chrono::Utc;
8use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
9use metrics_util::{MetricKindMask, registry::Registry as MetricsRegistry};
10use quanta::Clock;
11
12use super::{
13 metric_matcher::MetricKeyMatcher,
14 recency::{GenerationalStorage, Recency},
15 storage::VectorStorage,
16};
17use crate::event::{Metric, MetricValue};
18
19thread_local!(static LOCAL_REGISTRY: OnceCell<Registry> = const { OnceCell::new() });
20
21#[allow(dead_code)]
22pub(super) struct Registry {
23 registry: MetricsRegistry<Key, GenerationalStorage<VectorStorage>>,
24 recency: RwLock<Option<Recency<Key, MetricKeyMatcher>>>,
25}
26
27impl Registry {
28 fn new() -> Self {
29 Self {
30 registry: MetricsRegistry::new(GenerationalStorage::new(VectorStorage)),
31 recency: RwLock::new(None),
32 }
33 }
34
35 pub(super) fn clear(&self) {
36 self.registry.clear();
37 }
38
39 pub(super) fn set_expiry(
40 &self,
41 global_timeout: Option<Duration>,
42 expire_metrics_per_metric_set: Vec<(MetricKeyMatcher, Duration)>,
43 ) {
44 let recency = if global_timeout.is_none() && expire_metrics_per_metric_set.is_empty() {
45 None
46 } else {
47 Some(Recency::new(
48 Clock::new(),
49 MetricKindMask::ALL,
50 global_timeout,
51 expire_metrics_per_metric_set,
52 ))
53 };
54 *(self.recency.write()).expect("Failed to acquire write lock on recency map") = recency;
55 }
56
57 pub(super) fn visit_metrics(&self) -> Vec<Metric> {
58 let timestamp = Utc::now();
59
60 let mut metrics = Vec::new();
61 let recency = self
62 .recency
63 .read()
64 .expect("Failed to acquire read lock on recency map");
65 let recency = recency.as_ref();
66
67 for (key, counter) in self.registry.get_counter_handles() {
68 if recency
69 .is_none_or(|recency| recency.should_store_counter(&key, &counter, &self.registry))
70 {
71 #[allow(clippy::cast_precision_loss)]
73 let value = counter.get_inner().load(Ordering::Relaxed) as f64;
74 let value = MetricValue::Counter { value };
75 metrics.push(Metric::from_metric_kv(&key, value, timestamp));
76 }
77 }
78 for (key, gauge) in self.registry.get_gauge_handles() {
79 if recency
80 .is_none_or(|recency| recency.should_store_gauge(&key, &gauge, &self.registry))
81 {
82 let value = gauge.get_inner().load(Ordering::Relaxed);
83 let value = MetricValue::Gauge { value };
84 metrics.push(Metric::from_metric_kv(&key, value, timestamp));
85 }
86 }
87 for (key, histogram) in self.registry.get_histogram_handles() {
88 if recency.is_none_or(|recency| {
89 recency.should_store_histogram(&key, &histogram, &self.registry)
90 }) {
91 let value = histogram.get_inner().make_metric();
92 metrics.push(Metric::from_metric_kv(&key, value, timestamp));
93 }
94 }
95 metrics
96 }
97
98 fn get_counter(&self, key: &Key) -> Counter {
99 self.registry
100 .get_or_create_counter(key, |c| c.clone().into())
101 }
102
103 fn get_gauge(&self, key: &Key) -> Gauge {
104 self.registry.get_or_create_gauge(key, |c| c.clone().into())
105 }
106
107 fn get_histogram(&self, key: &Key) -> Histogram {
108 self.registry
109 .get_or_create_histogram(key, |c| c.clone().into())
110 }
111}
112
113#[derive(Clone)]
127pub(super) enum VectorRecorder {
128 Global(Arc<Registry>),
129 ThreadLocal,
130}
131
132impl VectorRecorder {
133 pub(super) fn new_global() -> Self {
134 Self::Global(Arc::new(Registry::new()))
135 }
136
137 pub(super) fn new_test() -> Self {
138 Self::with_thread_local(Registry::clear);
139 Self::ThreadLocal
140 }
141
142 pub(super) fn with_registry<T>(&self, doit: impl FnOnce(&Registry) -> T) -> T {
143 match &self {
144 Self::Global(registry) => doit(registry),
145 Self::ThreadLocal => Self::with_thread_local(doit),
148 }
149 }
150
151 fn with_thread_local<T>(doit: impl FnOnce(&Registry) -> T) -> T {
152 LOCAL_REGISTRY.with(|oc| doit(oc.get_or_init(Registry::new)))
153 }
154}
155
156impl Recorder for VectorRecorder {
157 fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter {
158 self.with_registry(|r| r.get_counter(key))
159 }
160
161 fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> Gauge {
162 self.with_registry(|r| r.get_gauge(key))
163 }
164
165 fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> Histogram {
166 self.with_registry(|r| r.get_histogram(key))
167 }
168
169 fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
170
171 fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
172
173 fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
174}