vector_core/metrics/
recorder.rs

1use std::sync::{atomic::Ordering, Arc, RwLock};
2use std::{cell::OnceCell, time::Duration};
3
4use chrono::Utc;
5use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
6use metrics_util::{registry::Registry as MetricsRegistry, MetricKindMask};
7use quanta::Clock;
8
9use super::metric_matcher::MetricKeyMatcher;
10use super::recency::{GenerationalStorage, Recency};
11use super::storage::VectorStorage;
12use crate::event::{Metric, MetricValue};
13
14thread_local!(static LOCAL_REGISTRY: OnceCell<Registry> = const { OnceCell::new() });
15
16#[allow(dead_code)]
17pub(super) struct Registry {
18    registry: MetricsRegistry<Key, GenerationalStorage<VectorStorage>>,
19    recency: RwLock<Option<Recency<Key, MetricKeyMatcher>>>,
20}
21
22impl Registry {
23    fn new() -> Self {
24        Self {
25            registry: MetricsRegistry::new(GenerationalStorage::new(VectorStorage)),
26            recency: RwLock::new(None),
27        }
28    }
29
30    pub(super) fn clear(&self) {
31        self.registry.clear();
32    }
33
34    pub(super) fn set_expiry(
35        &self,
36        global_timeout: Option<Duration>,
37        expire_metrics_per_metric_set: Vec<(MetricKeyMatcher, Duration)>,
38    ) {
39        let recency = if global_timeout.is_none() && expire_metrics_per_metric_set.is_empty() {
40            None
41        } else {
42            Some(Recency::new(
43                Clock::new(),
44                MetricKindMask::ALL,
45                global_timeout,
46                expire_metrics_per_metric_set,
47            ))
48        };
49        *(self.recency.write()).expect("Failed to acquire write lock on recency map") = recency;
50    }
51
52    pub(super) fn visit_metrics(&self) -> Vec<Metric> {
53        let timestamp = Utc::now();
54
55        let mut metrics = Vec::new();
56        let recency = self
57            .recency
58            .read()
59            .expect("Failed to acquire read lock on recency map");
60        let recency = recency.as_ref();
61
62        for (key, counter) in self.registry.get_counter_handles() {
63            if recency
64                .is_none_or(|recency| recency.should_store_counter(&key, &counter, &self.registry))
65            {
66                // NOTE this will truncate if the value is greater than 2**52.
67                #[allow(clippy::cast_precision_loss)]
68                let value = counter.get_inner().load(Ordering::Relaxed) as f64;
69                let value = MetricValue::Counter { value };
70                metrics.push(Metric::from_metric_kv(&key, value, timestamp));
71            }
72        }
73        for (key, gauge) in self.registry.get_gauge_handles() {
74            if recency
75                .is_none_or(|recency| recency.should_store_gauge(&key, &gauge, &self.registry))
76            {
77                let value = gauge.get_inner().load(Ordering::Relaxed);
78                let value = MetricValue::Gauge { value };
79                metrics.push(Metric::from_metric_kv(&key, value, timestamp));
80            }
81        }
82        for (key, histogram) in self.registry.get_histogram_handles() {
83            if recency.is_none_or(|recency| {
84                recency.should_store_histogram(&key, &histogram, &self.registry)
85            }) {
86                let value = histogram.get_inner().make_metric();
87                metrics.push(Metric::from_metric_kv(&key, value, timestamp));
88            }
89        }
90        metrics
91    }
92
93    fn get_counter(&self, key: &Key) -> Counter {
94        self.registry
95            .get_or_create_counter(key, |c| c.clone().into())
96    }
97
98    fn get_gauge(&self, key: &Key) -> Gauge {
99        self.registry.get_or_create_gauge(key, |c| c.clone().into())
100    }
101
102    fn get_histogram(&self, key: &Key) -> Histogram {
103        self.registry
104            .get_or_create_histogram(key, |c| c.clone().into())
105    }
106}
107
108/// [`VectorRecorder`] is a [`metrics::Recorder`] implementation that's suitable
109/// for the advanced usage that we have in Vector.
110///
111/// TODO: The latest version of the `metrics` crate has a test recorder interface that could be used
112/// to replace this whole global/local switching mechanism, as it effectively does the exact same
113/// thing internally. However, it is only available through a `with_test_recorder` function that
114/// takes a closure and cleans up the test recorder once the closure finishes. This is a much
115/// cleaner interface, but interacts poorly with async code as used by the component tests. The best
116/// path forward to make async tests work, then, is to replace the standard `#[tokio::test]` proc
117/// macro wrapper with an alternate wrapper that does the normal tokio setup from within the
118/// `with_test_recorder` closure, and use it across all the tests that require a test
119/// recorder. Given the large number of such tests, we are retaining this global test recorder hack
120/// here, but some day we should refactor the tests to eliminate it.
121#[derive(Clone)]
122pub(super) enum VectorRecorder {
123    Global(Arc<Registry>),
124    ThreadLocal,
125}
126
127impl VectorRecorder {
128    pub(super) fn new_global() -> Self {
129        Self::Global(Arc::new(Registry::new()))
130    }
131
132    pub(super) fn new_test() -> Self {
133        Self::with_thread_local(Registry::clear);
134        Self::ThreadLocal
135    }
136
137    pub(super) fn with_registry<T>(&self, doit: impl FnOnce(&Registry) -> T) -> T {
138        match &self {
139            Self::Global(registry) => doit(registry),
140            // This is only called after the registry is created, so we can just use a dummy
141            // idle_timeout parameter.
142            Self::ThreadLocal => Self::with_thread_local(doit),
143        }
144    }
145
146    fn with_thread_local<T>(doit: impl FnOnce(&Registry) -> T) -> T {
147        LOCAL_REGISTRY.with(|oc| doit(oc.get_or_init(Registry::new)))
148    }
149}
150
151impl Recorder for VectorRecorder {
152    fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter {
153        self.with_registry(|r| r.get_counter(key))
154    }
155
156    fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> Gauge {
157        self.with_registry(|r| r.get_gauge(key))
158    }
159
160    fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> Histogram {
161        self.with_registry(|r| r.get_histogram(key))
162    }
163
164    fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
165
166    fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
167
168    fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
169}