vector_core/metrics/
mod.rs

1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18use self::{label_filter::VectorLabelFilter, recorder::Registry, recorder::VectorRecorder};
19use crate::{
20    config::metrics_expiration::PerMetricSetExpiration,
21    event::{Metric, MetricValue},
22};
23
24type Result<T> = std::result::Result<T, Error>;
25
26#[derive(Clone, Debug, PartialEq, Snafu)]
27pub enum Error {
28    #[snafu(display("Recorder already initialized."))]
29    AlreadyInitialized,
30    #[snafu(display("Metrics system was not initialized."))]
31    NotInitialized,
32    #[snafu(display("Timeout value of {} must be positive.", timeout))]
33    TimeoutMustBePositive { timeout: f64 },
34    #[snafu(display("Invalid regex pattern: {}.", pattern))]
35    InvalidRegexPattern { pattern: String },
36}
37
38static CONTROLLER: OnceLock<Controller> = OnceLock::new();
39
40// Cardinality counter parameters, expose the internal metrics registry
41// cardinality. Useful for the end users to help understand the characteristics
42// of their environment and how vectors acts in it.
43const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
44static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
45
46// Older deprecated counter key name
47const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
48static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
49
50/// Controller allows capturing metric snapshots.
51pub struct Controller {
52    recorder: VectorRecorder,
53}
54
55fn metrics_enabled() -> bool {
56    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
57}
58
59fn tracing_context_layer_enabled() -> bool {
60    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
61}
62
63fn init(recorder: VectorRecorder) -> Result<()> {
64    // An escape hatch to allow disabling internal metrics core. May be used for
65    // performance reasons. This is a hidden and undocumented functionality.
66    if !metrics_enabled() {
67        metrics::set_global_recorder(metrics::NoopRecorder)
68            .map_err(|_| Error::AlreadyInitialized)?;
69        info!(message = "Internal metrics core is disabled.");
70        return Ok(());
71    }
72
73    ////
74    //// Initialize the recorder.
75    ////
76
77    // The recorder is the interface between metrics-rs and our registry. In our
78    // case it doesn't _do_ much other than shepherd into the registry and
79    // update the cardinality counter, see above, as needed.
80    if tracing_context_layer_enabled() {
81        // Apply a layer to capture tracing span fields as labels.
82        metrics::set_global_recorder(
83            TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
84        )
85        .map_err(|_| Error::AlreadyInitialized)?;
86    } else {
87        metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
88    }
89
90    ////
91    //// Prepare the controller
92    ////
93
94    // The `Controller` is a safe spot in memory for us to stash a clone of the registry -- where
95    // metrics are actually kept -- so that our sub-systems interested in these metrics can grab
96    // copies. See `capture_metrics` and its callers for an example. Note that this is done last to
97    // allow `init_test` below to use the initialization state of `CONTROLLER` to wait for the above
98    // steps to complete in another thread.
99    let controller = Controller { recorder };
100    CONTROLLER
101        .set(controller)
102        .map_err(|_| Error::AlreadyInitialized)?;
103
104    Ok(())
105}
106
107/// Initialize the default metrics sub-system
108///
109/// # Errors
110///
111/// This function will error if it is called multiple times.
112pub fn init_global() -> Result<()> {
113    init(VectorRecorder::new_global())
114}
115
116/// Initialize the thread-local metrics sub-system. This function will loop until a recorder is
117/// actually set.
118pub fn init_test() {
119    if init(VectorRecorder::new_test()).is_err() {
120        // The only error case returned by `init` is `AlreadyInitialized`. A race condition is
121        // possible here: if metrics are being initialized by two (or more) test threads
122        // simultaneously, the ones that fail to set return immediately, possibly allowing
123        // subsequent code to execute before the static recorder value is actually set within the
124        // `metrics` crate. To prevent subsequent code from running with an unset recorder, loop
125        // here until a recorder is available.
126        while CONTROLLER.get().is_none() {}
127    }
128}
129
130impl Controller {
131    /// Clear all metrics from the registry.
132    pub fn reset(&self) {
133        self.recorder.with_registry(Registry::clear);
134    }
135
136    /// Get a handle to the globally registered controller, if it's initialized.
137    ///
138    /// # Errors
139    ///
140    /// This function will fail if the metrics subsystem has not been correctly
141    /// initialized.
142    pub fn get() -> Result<&'static Self> {
143        CONTROLLER.get().ok_or(Error::NotInitialized)
144    }
145
146    /// Set or clear the expiry time after which idle metrics are dropped from the set of captured
147    /// metrics. Invalid timeouts (zero or negative values) are silently remapped to no expiry.
148    ///
149    /// # Errors
150    ///
151    /// The contained timeout value must be positive.
152    pub fn set_expiry(
153        &self,
154        global_timeout: Option<f64>,
155        expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
156    ) -> Result<()> {
157        if let Some(timeout) = global_timeout {
158            if timeout <= 0.0 {
159                return Err(Error::TimeoutMustBePositive { timeout });
160            }
161        }
162        let per_metric_expiration = expire_metrics_per_metric_set
163            .into_iter()
164            .map(TryInto::try_into)
165            .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
166
167        self.recorder.with_registry(|registry| {
168            registry.set_expiry(
169                global_timeout.map(Duration::from_secs_f64),
170                per_metric_expiration,
171            );
172        });
173        Ok(())
174    }
175
176    /// Take a snapshot of all gathered metrics and expose them as metric
177    /// [`Event`](crate::event::Event)s.
178    pub fn capture_metrics(&self) -> Vec<Metric> {
179        let timestamp = Utc::now();
180
181        let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
182
183        #[allow(clippy::cast_precision_loss)]
184        let value = (metrics.len() + 2) as f64;
185        metrics.push(Metric::from_metric_kv(
186            &CARDINALITY_KEY,
187            MetricValue::Gauge { value },
188            timestamp,
189        ));
190        metrics.push(Metric::from_metric_kv(
191            &CARDINALITY_COUNTER_KEY,
192            MetricValue::Counter { value },
193            timestamp,
194        ));
195
196        metrics
197    }
198}
199
200#[macro_export]
201/// This macro is used to emit metrics as a `counter` while simultaneously
202/// converting from absolute values to incremental values.
203///
204/// Values that do not arrive in strictly monotonically increasing order are
205/// ignored and will not be emitted.
206macro_rules! update_counter {
207    ($label:literal, $value:expr) => {{
208        use ::std::sync::atomic::{AtomicU64, Ordering};
209
210        static PREVIOUS_VALUE: AtomicU64 = AtomicU64::new(0);
211
212        let new_value = $value;
213        let mut previous_value = PREVIOUS_VALUE.load(Ordering::Relaxed);
214
215        loop {
216            // Either a new greater value has been emitted before this thread updated the counter
217            // or values were provided that are not in strictly monotonically increasing order.
218            // Ignore.
219            if new_value <= previous_value {
220                break;
221            }
222
223            match PREVIOUS_VALUE.compare_exchange_weak(
224                previous_value,
225                new_value,
226                Ordering::SeqCst,
227                Ordering::Relaxed,
228            ) {
229                // Another thread has written a new value before us. Re-enter loop.
230                Err(value) => previous_value = value,
231                // Calculate delta to last emitted value and emit it.
232                Ok(_) => {
233                    let delta = new_value - previous_value;
234                    // Albeit very unlikely, note that this sequence of deltas might be emitted in
235                    // a different order than they were calculated.
236                    counter!($label).increment(delta);
237                    break;
238                }
239            }
240        }
241    }};
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    use crate::{
249        config::metrics_expiration::{
250            MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
251        },
252        event::MetricKind,
253    };
254
255    const IDLE_TIMEOUT: f64 = 0.5;
256
257    fn init_metrics() -> &'static Controller {
258        init_test();
259        Controller::get().expect("Could not get global metrics controller")
260    }
261
262    #[test]
263    fn cardinality_matches() {
264        for cardinality in [0, 1, 10, 100, 1000, 10000] {
265            init_test();
266            let controller = Controller::get().unwrap();
267            controller.reset();
268
269            for idx in 0..cardinality {
270                metrics::counter!("test", "idx" => idx.to_string()).increment(1);
271            }
272
273            let metrics = controller.capture_metrics();
274            assert_eq!(metrics.len(), cardinality + 2);
275
276            #[allow(clippy::cast_precision_loss)]
277            let value = metrics.len() as f64;
278            for metric in metrics {
279                match metric.name() {
280                    CARDINALITY_KEY_NAME => {
281                        assert_eq!(metric.value(), &MetricValue::Gauge { value });
282                        assert_eq!(metric.kind(), MetricKind::Absolute);
283                    }
284                    CARDINALITY_COUNTER_KEY_NAME => {
285                        assert_eq!(metric.value(), &MetricValue::Counter { value });
286                        assert_eq!(metric.kind(), MetricKind::Absolute);
287                    }
288                    _ => {}
289                }
290            }
291        }
292    }
293
294    #[test]
295    fn handles_registered_metrics() {
296        let controller = init_metrics();
297
298        let counter = metrics::counter!("test7");
299        assert_eq!(controller.capture_metrics().len(), 3);
300        counter.increment(1);
301        assert_eq!(controller.capture_metrics().len(), 3);
302        let gauge = metrics::gauge!("test8");
303        assert_eq!(controller.capture_metrics().len(), 4);
304        gauge.set(1.0);
305        assert_eq!(controller.capture_metrics().len(), 4);
306    }
307
308    #[test]
309    fn expires_metrics() {
310        let controller = init_metrics();
311        controller
312            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
313            .unwrap();
314
315        metrics::counter!("test2").increment(1);
316        metrics::counter!("test3").increment(2);
317        assert_eq!(controller.capture_metrics().len(), 4);
318
319        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
320        metrics::counter!("test2").increment(3);
321        assert_eq!(controller.capture_metrics().len(), 3);
322    }
323
324    #[test]
325    fn expires_metrics_tags() {
326        let controller = init_metrics();
327        controller
328            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
329            .unwrap();
330
331        metrics::counter!("test4", "tag" => "value1").increment(1);
332        metrics::counter!("test4", "tag" => "value2").increment(2);
333        assert_eq!(controller.capture_metrics().len(), 4);
334
335        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
336        metrics::counter!("test4", "tag" => "value1").increment(3);
337        assert_eq!(controller.capture_metrics().len(), 3);
338    }
339
340    #[test]
341    fn skips_expiring_registered() {
342        let controller = init_metrics();
343        controller
344            .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
345            .unwrap();
346
347        let a = metrics::counter!("test5");
348        metrics::counter!("test6").increment(5);
349        assert_eq!(controller.capture_metrics().len(), 4);
350        a.increment(1);
351        assert_eq!(controller.capture_metrics().len(), 4);
352
353        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
354        assert_eq!(controller.capture_metrics().len(), 3);
355
356        a.increment(1);
357        let metrics = controller.capture_metrics();
358        assert_eq!(metrics.len(), 3);
359        let metric = metrics
360            .into_iter()
361            .find(|metric| metric.name() == "test5")
362            .expect("Test metric is not present");
363        match metric.value() {
364            MetricValue::Counter { value } => assert_eq!(*value, 2.0),
365            value => panic!("Invalid metric value {value:?}"),
366        }
367    }
368
369    #[test]
370    fn expires_metrics_per_set() {
371        let controller = init_metrics();
372        controller
373            .set_expiry(
374                None,
375                vec![PerMetricSetExpiration {
376                    name: Some(MetricNameMatcherConfig::Exact {
377                        value: "test3".to_string(),
378                    }),
379                    labels: None,
380                    expire_secs: IDLE_TIMEOUT,
381                }],
382            )
383            .unwrap();
384
385        metrics::counter!("test2").increment(1);
386        metrics::counter!("test3").increment(2);
387        assert_eq!(controller.capture_metrics().len(), 4);
388
389        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
390        metrics::counter!("test2").increment(3);
391        assert_eq!(controller.capture_metrics().len(), 3);
392    }
393
394    #[test]
395    fn expires_metrics_multiple_different_sets() {
396        let controller = init_metrics();
397        controller
398            .set_expiry(
399                Some(IDLE_TIMEOUT * 3.0),
400                vec![
401                    PerMetricSetExpiration {
402                        name: Some(MetricNameMatcherConfig::Exact {
403                            value: "test3".to_string(),
404                        }),
405                        labels: None,
406                        expire_secs: IDLE_TIMEOUT,
407                    },
408                    PerMetricSetExpiration {
409                        name: None,
410                        labels: Some(MetricLabelMatcherConfig::All {
411                            matchers: vec![MetricLabelMatcher::Exact {
412                                key: "tag".to_string(),
413                                value: "value1".to_string(),
414                            }],
415                        }),
416                        expire_secs: IDLE_TIMEOUT * 2.0,
417                    },
418                ],
419            )
420            .unwrap();
421
422        metrics::counter!("test1").increment(1);
423        metrics::counter!("test2").increment(1);
424        metrics::counter!("test3").increment(2);
425        metrics::counter!("test4", "tag" => "value1").increment(3);
426        assert_eq!(controller.capture_metrics().len(), 6);
427
428        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
429        metrics::counter!("test2").increment(3);
430        assert_eq!(controller.capture_metrics().len(), 5);
431
432        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
433        metrics::counter!("test2").increment(3);
434        assert_eq!(controller.capture_metrics().len(), 4);
435
436        std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
437        metrics::counter!("test2").increment(3);
438        assert_eq!(controller.capture_metrics().len(), 3);
439    }
440}