vector_core/metrics/
recorder.rs

1use std::{
2    cell::OnceCell,
3    sync::{Arc, RwLock, atomic::Ordering},
4    time::Duration,
5};
6
7use chrono::Utc;
8use metrics::{Counter, Gauge, Histogram, Key, KeyName, Metadata, Recorder, SharedString, Unit};
9use metrics_util::{MetricKindMask, registry::Registry as MetricsRegistry};
10use quanta::Clock;
11
12use super::{
13    metric_matcher::MetricKeyMatcher,
14    recency::{GenerationalStorage, Recency},
15    storage::VectorStorage,
16};
17use crate::event::{Metric, MetricValue};
18
19thread_local!(static LOCAL_REGISTRY: OnceCell<Registry> = const { OnceCell::new() });
20
21#[allow(dead_code)]
22pub(super) struct Registry {
23    registry: MetricsRegistry<Key, GenerationalStorage<VectorStorage>>,
24    recency: RwLock<Option<Recency<Key, MetricKeyMatcher>>>,
25}
26
27impl Registry {
28    fn new() -> Self {
29        Self {
30            registry: MetricsRegistry::new(GenerationalStorage::new(VectorStorage)),
31            recency: RwLock::new(None),
32        }
33    }
34
35    pub(super) fn clear(&self) {
36        self.registry.clear();
37    }
38
39    pub(super) fn set_expiry(
40        &self,
41        global_timeout: Option<Duration>,
42        expire_metrics_per_metric_set: Vec<(MetricKeyMatcher, Duration)>,
43    ) {
44        let recency = if global_timeout.is_none() && expire_metrics_per_metric_set.is_empty() {
45            None
46        } else {
47            Some(Recency::new(
48                Clock::new(),
49                MetricKindMask::ALL,
50                global_timeout,
51                expire_metrics_per_metric_set,
52            ))
53        };
54        *(self.recency.write()).expect("Failed to acquire write lock on recency map") = recency;
55    }
56
57    pub(super) fn visit_metrics(&self) -> Vec<Metric> {
58        let timestamp = Utc::now();
59
60        let mut metrics = Vec::new();
61        let recency = self
62            .recency
63            .read()
64            .expect("Failed to acquire read lock on recency map");
65        let recency = recency.as_ref();
66
67        for (key, counter) in self.registry.get_counter_handles() {
68            if recency
69                .is_none_or(|recency| recency.should_store_counter(&key, &counter, &self.registry))
70            {
71                // NOTE this will truncate if the value is greater than 2**52.
72                #[allow(clippy::cast_precision_loss)]
73                let value = counter.get_inner().load(Ordering::Relaxed) as f64;
74                let value = MetricValue::Counter { value };
75                metrics.push(Metric::from_metric_kv(&key, value, timestamp));
76            }
77        }
78        for (key, gauge) in self.registry.get_gauge_handles() {
79            if recency
80                .is_none_or(|recency| recency.should_store_gauge(&key, &gauge, &self.registry))
81            {
82                let value = gauge.get_inner().load(Ordering::Relaxed);
83                let value = MetricValue::Gauge { value };
84                metrics.push(Metric::from_metric_kv(&key, value, timestamp));
85            }
86        }
87        for (key, histogram) in self.registry.get_histogram_handles() {
88            if recency.is_none_or(|recency| {
89                recency.should_store_histogram(&key, &histogram, &self.registry)
90            }) {
91                let value = histogram.get_inner().make_metric();
92                metrics.push(Metric::from_metric_kv(&key, value, timestamp));
93            }
94        }
95        metrics
96    }
97
98    fn get_counter(&self, key: &Key) -> Counter {
99        self.registry
100            .get_or_create_counter(key, |c| c.clone().into())
101    }
102
103    fn get_gauge(&self, key: &Key) -> Gauge {
104        self.registry.get_or_create_gauge(key, |c| c.clone().into())
105    }
106
107    fn get_histogram(&self, key: &Key) -> Histogram {
108        self.registry
109            .get_or_create_histogram(key, |c| c.clone().into())
110    }
111}
112
113/// [`VectorRecorder`] is a [`metrics::Recorder`] implementation that's suitable
114/// for the advanced usage that we have in Vector.
115///
116/// TODO: The latest version of the `metrics` crate has a test recorder interface that could be used
117/// to replace this whole global/local switching mechanism, as it effectively does the exact same
118/// thing internally. However, it is only available through a `with_test_recorder` function that
119/// takes a closure and cleans up the test recorder once the closure finishes. This is a much
120/// cleaner interface, but interacts poorly with async code as used by the component tests. The best
121/// path forward to make async tests work, then, is to replace the standard `#[tokio::test]` proc
122/// macro wrapper with an alternate wrapper that does the normal tokio setup from within the
123/// `with_test_recorder` closure, and use it across all the tests that require a test
124/// recorder. Given the large number of such tests, we are retaining this global test recorder hack
125/// here, but some day we should refactor the tests to eliminate it.
126#[derive(Clone)]
127pub(super) enum VectorRecorder {
128    Global(Arc<Registry>),
129    ThreadLocal,
130}
131
132impl VectorRecorder {
133    pub(super) fn new_global() -> Self {
134        Self::Global(Arc::new(Registry::new()))
135    }
136
137    pub(super) fn new_test() -> Self {
138        Self::with_thread_local(Registry::clear);
139        Self::ThreadLocal
140    }
141
142    pub(super) fn with_registry<T>(&self, doit: impl FnOnce(&Registry) -> T) -> T {
143        match &self {
144            Self::Global(registry) => doit(registry),
145            // This is only called after the registry is created, so we can just use a dummy
146            // idle_timeout parameter.
147            Self::ThreadLocal => Self::with_thread_local(doit),
148        }
149    }
150
151    fn with_thread_local<T>(doit: impl FnOnce(&Registry) -> T) -> T {
152        LOCAL_REGISTRY.with(|oc| doit(oc.get_or_init(Registry::new)))
153    }
154}
155
156impl Recorder for VectorRecorder {
157    fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter {
158        self.with_registry(|r| r.get_counter(key))
159    }
160
161    fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> Gauge {
162        self.with_registry(|r| r.get_gauge(key))
163    }
164
165    fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> Histogram {
166        self.with_registry(|r| r.get_histogram(key))
167    }
168
169    fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
170
171    fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
172
173    fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {}
174}