vector_core/metrics/
recorder.rs1use std::sync::{atomic::Ordering, Arc, RwLock};
2use std::{cell::OnceCell, time::Duration};
3
4use chrono::Utc;
5use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
6use metrics_util::{registry::Registry as MetricsRegistry, MetricKindMask};
7use quanta::Clock;
8
9use super::metric_matcher::MetricKeyMatcher;
10use super::recency::{GenerationalStorage, Recency};
11use super::storage::VectorStorage;
12use crate::event::{Metric, MetricValue};
13
14thread_local!(static LOCAL_REGISTRY: OnceCell<Registry> = const { OnceCell::new() });
15
16#[allow(dead_code)]
17pub(super) struct Registry {
18 registry: MetricsRegistry<Key, GenerationalStorage<VectorStorage>>,
19 recency: RwLock<Option<Recency<Key, MetricKeyMatcher>>>,
20}
21
22impl Registry {
23 fn new() -> Self {
24 Self {
25 registry: MetricsRegistry::new(GenerationalStorage::new(VectorStorage)),
26 recency: RwLock::new(None),
27 }
28 }
29
30 pub(super) fn clear(&self) {
31 self.registry.clear();
32 }
33
34 pub(super) fn set_expiry(
35 &self,
36 global_timeout: Option<Duration>,
37 expire_metrics_per_metric_set: Vec<(MetricKeyMatcher, Duration)>,
38 ) {
39 let recency = if global_timeout.is_none() && expire_metrics_per_metric_set.is_empty() {
40 None
41 } else {
42 Some(Recency::new(
43 Clock::new(),
44 MetricKindMask::ALL,
45 global_timeout,
46 expire_metrics_per_metric_set,
47 ))
48 };
49 *(self.recency.write()).expect("Failed to acquire write lock on recency map") = recency;
50 }
51
52 pub(super) fn visit_metrics(&self) -> Vec<Metric> {
53 let timestamp = Utc::now();
54
55 let mut metrics = Vec::new();
56 let recency = self
57 .recency
58 .read()
59 .expect("Failed to acquire read lock on recency map");
60 let recency = recency.as_ref();
61
62 for (key, counter) in self.registry.get_counter_handles() {
63 if recency
64 .is_none_or(|recency| recency.should_store_counter(&key, &counter, &self.registry))
65 {
66 #[allow(clippy::cast_precision_loss)]
68 let value = counter.get_inner().load(Ordering::Relaxed) as f64;
69 let value = MetricValue::Counter { value };
70 metrics.push(Metric::from_metric_kv(&key, value, timestamp));
71 }
72 }
73 for (key, gauge) in self.registry.get_gauge_handles() {
74 if recency
75 .is_none_or(|recency| recency.should_store_gauge(&key, &gauge, &self.registry))
76 {
77 let value = gauge.get_inner().load(Ordering::Relaxed);
78 let value = MetricValue::Gauge { value };
79 metrics.push(Metric::from_metric_kv(&key, value, timestamp));
80 }
81 }
82 for (key, histogram) in self.registry.get_histogram_handles() {
83 if recency.is_none_or(|recency| {
84 recency.should_store_histogram(&key, &histogram, &self.registry)
85 }) {
86 let value = histogram.get_inner().make_metric();
87 metrics.push(Metric::from_metric_kv(&key, value, timestamp));
88 }
89 }
90 metrics
91 }
92
93 fn get_counter(&self, key: &Key) -> Counter {
94 self.registry
95 .get_or_create_counter(key, |c| c.clone().into())
96 }
97
98 fn get_gauge(&self, key: &Key) -> Gauge {
99 self.registry.get_or_create_gauge(key, |c| c.clone().into())
100 }
101
102 fn get_histogram(&self, key: &Key) -> Histogram {
103 self.registry
104 .get_or_create_histogram(key, |c| c.clone().into())
105 }
106}
107
108#[derive(Clone)]
122pub(super) enum VectorRecorder {
123 Global(Arc<Registry>),
124 ThreadLocal,
125}
126
127impl VectorRecorder {
128 pub(super) fn new_global() -> Self {
129 Self::Global(Arc::new(Registry::new()))
130 }
131
132 pub(super) fn new_test() -> Self {
133 Self::with_thread_local(Registry::clear);
134 Self::ThreadLocal
135 }
136
137 pub(super) fn with_registry<T>(&self, doit: impl FnOnce(&Registry) -> T) -> T {
138 match &self {
139 Self::Global(registry) => doit(registry),
140 Self::ThreadLocal => Self::with_thread_local(doit),
143 }
144 }
145
146 fn with_thread_local<T>(doit: impl FnOnce(&Registry) -> T) -> T {
147 LOCAL_REGISTRY.with(|oc| doit(oc.get_or_init(Registry::new)))
148 }
149}
150
151impl Recorder for VectorRecorder {
152 fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter {
153 self.with_registry(|r| r.get_counter(key))
154 }
155
156 fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> Gauge {
157 self.with_registry(|r| r.get_gauge(key))
158 }
159
160 fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> Histogram {
161 self.with_registry(|r| r.get_histogram(key))
162 }
163
164 fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
165
166 fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
167
168 fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
169}