vector_core/metrics/
mod.rs

1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18use self::{
19    label_filter::VectorLabelFilter,
20    recorder::{Registry, VectorRecorder},
21};
22use crate::{
23    config::metrics_expiration::PerMetricSetExpiration,
24    event::{Metric, MetricValue},
25};
26
27type Result<T> = std::result::Result<T, Error>;
28
29#[derive(Clone, Debug, PartialEq, Snafu)]
30pub enum Error {
31    #[snafu(display("Recorder already initialized."))]
32    AlreadyInitialized,
33    #[snafu(display("Metrics system was not initialized."))]
34    NotInitialized,
35    #[snafu(display("Timeout value of {} must be positive.", timeout))]
36    TimeoutMustBePositive { timeout: f64 },
37    #[snafu(display("Invalid regex pattern: {}.", pattern))]
38    InvalidRegexPattern { pattern: String },
39}
40
41static CONTROLLER: OnceLock<Controller> = OnceLock::new();
42
43// Cardinality counter parameters, expose the internal metrics registry
44// cardinality. Useful for the end users to help understand the characteristics
45// of their environment and how vectors acts in it.
46const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
47static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
48
49// Older deprecated counter key name
50const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
51static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
52
53/// Controller allows capturing metric snapshots.
54pub struct Controller {
55    recorder: VectorRecorder,
56}
57
58fn metrics_enabled() -> bool {
59    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
60}
61
62fn tracing_context_layer_enabled() -> bool {
63    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
64}
65
66fn init(recorder: VectorRecorder) -> Result<()> {
67    // An escape hatch to allow disabling internal metrics core. May be used for
68    // performance reasons. This is a hidden and undocumented functionality.
69    if !metrics_enabled() {
70        metrics::set_global_recorder(metrics::NoopRecorder)
71            .map_err(|_| Error::AlreadyInitialized)?;
72        info!(message = "Internal metrics core is disabled.");
73        return Ok(());
74    }
75
76    ////
77    //// Initialize the recorder.
78    ////
79
80    // The recorder is the interface between metrics-rs and our registry. In our
81    // case it doesn't _do_ much other than shepherd into the registry and
82    // update the cardinality counter, see above, as needed.
83    if tracing_context_layer_enabled() {
84        // Apply a layer to capture tracing span fields as labels.
85        metrics::set_global_recorder(
86            TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
87        )
88        .map_err(|_| Error::AlreadyInitialized)?;
89    } else {
90        metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
91    }
92
93    ////
94    //// Prepare the controller
95    ////
96
97    // The `Controller` is a safe spot in memory for us to stash a clone of the registry -- where
98    // metrics are actually kept -- so that our sub-systems interested in these metrics can grab
99    // copies. See `capture_metrics` and its callers for an example. Note that this is done last to
100    // allow `init_test` below to use the initialization state of `CONTROLLER` to wait for the above
101    // steps to complete in another thread.
102    let controller = Controller { recorder };
103    CONTROLLER
104        .set(controller)
105        .map_err(|_| Error::AlreadyInitialized)?;
106
107    Ok(())
108}
109
110/// Initialize the default metrics sub-system
111///
112/// # Errors
113///
114/// This function will error if it is called multiple times.
115pub fn init_global() -> Result<()> {
116    init(VectorRecorder::new_global())
117}
118
119/// Initialize the thread-local metrics sub-system. This function will loop until a recorder is
120/// actually set.
121pub fn init_test() {
122    if init(VectorRecorder::new_test()).is_err() {
123        // The only error case returned by `init` is `AlreadyInitialized`. A race condition is
124        // possible here: if metrics are being initialized by two (or more) test threads
125        // simultaneously, the ones that fail to set return immediately, possibly allowing
126        // subsequent code to execute before the static recorder value is actually set within the
127        // `metrics` crate. To prevent subsequent code from running with an unset recorder, loop
128        // here until a recorder is available.
129        while CONTROLLER.get().is_none() {}
130    }
131}
132
133impl Controller {
134    /// Clear all metrics from the registry.
135    pub fn reset(&self) {
136        self.recorder.with_registry(Registry::clear);
137    }
138
139    /// Get a handle to the globally registered controller, if it's initialized.
140    ///
141    /// # Errors
142    ///
143    /// This function will fail if the metrics subsystem has not been correctly
144    /// initialized.
145    pub fn get() -> Result<&'static Self> {
146        CONTROLLER.get().ok_or(Error::NotInitialized)
147    }
148
149    /// Set or clear the expiry time after which idle metrics are dropped from the set of captured
150    /// metrics. Invalid timeouts (zero or negative values) are silently remapped to no expiry.
151    ///
152    /// # Errors
153    ///
154    /// The contained timeout value must be positive.
155    pub fn set_expiry(
156        &self,
157        global_timeout: Option<f64>,
158        expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
159    ) -> Result<()> {
160        if let Some(timeout) = global_timeout
161            && timeout <= 0.0
162        {
163            return Err(Error::TimeoutMustBePositive { timeout });
164        }
165        let per_metric_expiration = expire_metrics_per_metric_set
166            .into_iter()
167            .map(TryInto::try_into)
168            .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
169
170        self.recorder.with_registry(|registry| {
171            registry.set_expiry(
172                global_timeout.map(Duration::from_secs_f64),
173                per_metric_expiration,
174            );
175        });
176        Ok(())
177    }
178
179    /// Take a snapshot of all gathered metrics and expose them as metric
180    /// [`Event`](crate::event::Event)s.
181    pub fn capture_metrics(&self) -> Vec<Metric> {
182        let timestamp = Utc::now();
183
184        let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
185
186        #[allow(clippy::cast_precision_loss)]
187        let value = (metrics.len() + 2) as f64;
188        metrics.push(Metric::from_metric_kv(
189            &CARDINALITY_KEY,
190            MetricValue::Gauge { value },
191            timestamp,
192        ));
193        metrics.push(Metric::from_metric_kv(
194            &CARDINALITY_COUNTER_KEY,
195            MetricValue::Counter { value },
196            timestamp,
197        ));
198
199        metrics
200    }
201}
202
203#[macro_export]
204/// This macro is used to emit metrics as a `counter` while simultaneously
205/// converting from absolute values to incremental values.
206///
207/// Values that do not arrive in strictly monotonically increasing order are
208/// ignored and will not be emitted.
209macro_rules! update_counter {
210    ($label:literal, $value:expr) => {{
211        use ::std::sync::atomic::{AtomicU64, Ordering};
212
213        static PREVIOUS_VALUE: AtomicU64 = AtomicU64::new(0);
214
215        let new_value = $value;
216        let mut previous_value = PREVIOUS_VALUE.load(Ordering::Relaxed);
217
218        loop {
219            // Either a new greater value has been emitted before this thread updated the counter
220            // or values were provided that are not in strictly monotonically increasing order.
221            // Ignore.
222            if new_value <= previous_value {
223                break;
224            }
225
226            match PREVIOUS_VALUE.compare_exchange_weak(
227                previous_value,
228                new_value,
229                Ordering::SeqCst,
230                Ordering::Relaxed,
231            ) {
232                // Another thread has written a new value before us. Re-enter loop.
233                Err(value) => previous_value = value,
234                // Calculate delta to last emitted value and emit it.
235                Ok(_) => {
236                    let delta = new_value - previous_value;
237                    // Albeit very unlikely, note that this sequence of deltas might be emitted in
238                    // a different order than they were calculated.
239                    counter!($label).increment(delta);
240                    break;
241                }
242            }
243        }
244    }};
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use crate::{
251        config::metrics_expiration::{
252            MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
253        },
254        event::MetricKind,
255    };
256
257    const IDLE_TIMEOUT: f64 = 0.5;
258
259    fn init_metrics() -> &'static Controller {
260        init_test();
261        Controller::get().expect("Could not get global metrics controller")
262    }
263
264    #[test]
265    fn cardinality_matches() {
266        for cardinality in [0, 1, 10, 100, 1000, 10000] {
267            init_test();
268            let controller = Controller::get().unwrap();
269            controller.reset();
270
271            for idx in 0..cardinality {
272                metrics::counter!("test", "idx" => idx.to_string()).increment(1);
273            }
274
275            let metrics = controller.capture_metrics();
276            assert_eq!(metrics.len(), cardinality + 2);
277
278            #[allow(clippy::cast_precision_loss)]
279            let value = metrics.len() as f64;
280            for metric in metrics {
281                match metric.name() {
282                    CARDINALITY_KEY_NAME => {
283                        assert_eq!(metric.value(), &MetricValue::Gauge { value });
284                        assert_eq!(metric.kind(), MetricKind::Absolute);
285                    }
286                    CARDINALITY_COUNTER_KEY_NAME => {
287                        assert_eq!(metric.value(), &MetricValue::Counter { value });
288                        assert_eq!(metric.kind(), MetricKind::Absolute);
289                    }
290                    _ => {}
291                }
292            }
293        }
294    }
295
296    #[test]
297    fn handles_registered_metrics() {
298        let controller = init_metrics();
299
300        let counter = metrics::counter!("test7");
301        assert_eq!(controller.capture_metrics().len(), 3);
302        counter.increment(1);
303        assert_eq!(controller.capture_metrics().len(), 3);
304        let gauge = metrics::gauge!("test8");
305        assert_eq!(controller.capture_metrics().len(), 4);
306        gauge.set(1.0);
307        assert_eq!(controller.capture_metrics().len(), 4);
308    }
309
310    #[test]
311    fn expires_metrics() {
312        let controller = init_metrics();
313        controller
314            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
315            .unwrap();
316
317        metrics::counter!("test2").increment(1);
318        metrics::counter!("test3").increment(2);
319        assert_eq!(controller.capture_metrics().len(), 4);
320
321        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
322        metrics::counter!("test2").increment(3);
323        assert_eq!(controller.capture_metrics().len(), 3);
324    }
325
326    #[test]
327    fn expires_metrics_tags() {
328        let controller = init_metrics();
329        controller
330            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
331            .unwrap();
332
333        metrics::counter!("test4", "tag" => "value1").increment(1);
334        metrics::counter!("test4", "tag" => "value2").increment(2);
335        assert_eq!(controller.capture_metrics().len(), 4);
336
337        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
338        metrics::counter!("test4", "tag" => "value1").increment(3);
339        assert_eq!(controller.capture_metrics().len(), 3);
340    }
341
342    #[test]
343    fn skips_expiring_registered() {
344        let controller = init_metrics();
345        controller
346            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
347            .unwrap();
348
349        let a = metrics::counter!("test5");
350        metrics::counter!("test6").increment(5);
351        assert_eq!(controller.capture_metrics().len(), 4);
352        a.increment(1);
353        assert_eq!(controller.capture_metrics().len(), 4);
354
355        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
356        assert_eq!(controller.capture_metrics().len(), 3);
357
358        a.increment(1);
359        let metrics = controller.capture_metrics();
360        assert_eq!(metrics.len(), 3);
361        let metric = metrics
362            .into_iter()
363            .find(|metric| metric.name() == "test5")
364            .expect("Test metric is not present");
365        match metric.value() {
366            MetricValue::Counter { value } => assert_eq!(*value, 2.0),
367            value => panic!("Invalid metric value {value:?}"),
368        }
369    }
370
371    #[test]
372    fn expires_metrics_per_set() {
373        let controller = init_metrics();
374        controller
375            .set_expiry(
376                None,
377                vec![PerMetricSetExpiration {
378                    name: Some(MetricNameMatcherConfig::Exact {
379                        value: "test3".to_string(),
380                    }),
381                    labels: None,
382                    expire_secs: IDLE_TIMEOUT,
383                }],
384            )
385            .unwrap();
386
387        metrics::counter!("test2").increment(1);
388        metrics::counter!("test3").increment(2);
389        assert_eq!(controller.capture_metrics().len(), 4);
390
391        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
392        metrics::counter!("test2").increment(3);
393        assert_eq!(controller.capture_metrics().len(), 3);
394    }
395
396    #[test]
397    fn expires_metrics_multiple_different_sets() {
398        let controller = init_metrics();
399        controller
400            .set_expiry(
401                Some(IDLE_TIMEOUT * 3.0),
402                vec![
403                    PerMetricSetExpiration {
404                        name: Some(MetricNameMatcherConfig::Exact {
405                            value: "test3".to_string(),
406                        }),
407                        labels: None,
408                        expire_secs: IDLE_TIMEOUT,
409                    },
410                    PerMetricSetExpiration {
411                        name: None,
412                        labels: Some(MetricLabelMatcherConfig::All {
413                            matchers: vec![MetricLabelMatcher::Exact {
414                                key: "tag".to_string(),
415                                value: "value1".to_string(),
416                            }],
417                        }),
418                        expire_secs: IDLE_TIMEOUT * 2.0,
419                    },
420                ],
421            )
422            .unwrap();
423
424        metrics::counter!("test1").increment(1);
425        metrics::counter!("test2").increment(1);
426        metrics::counter!("test3").increment(2);
427        metrics::counter!("test4", "tag" => "value1").increment(3);
428        assert_eq!(controller.capture_metrics().len(), 6);
429
430        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
431        metrics::counter!("test2").increment(3);
432        assert_eq!(controller.capture_metrics().len(), 5);
433
434        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
435        metrics::counter!("test2").increment(3);
436        assert_eq!(controller.capture_metrics().len(), 4);
437
438        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
439        metrics::counter!("test2").increment(3);
440        assert_eq!(controller.capture_metrics().len(), 3);
441    }
442}