vector_core/metrics/
mod.rsmod ddsketch;
mod label_filter;
mod recency;
mod recorder;
mod storage;
use std::{sync::OnceLock, time::Duration};
use chrono::Utc;
use metrics::Key;
use metrics_tracing_context::TracingContextLayer;
use metrics_util::layers::Layer;
use snafu::Snafu;
pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
use self::{label_filter::VectorLabelFilter, recorder::Registry, recorder::VectorRecorder};
use crate::event::{Metric, MetricValue};
type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Copy, Debug, PartialEq, Snafu)]
pub enum Error {
#[snafu(display("Recorder already initialized."))]
AlreadyInitialized,
#[snafu(display("Metrics system was not initialized."))]
NotInitialized,
#[snafu(display("Timeout value of {} must be positive.", timeout))]
TimeoutMustBePositive { timeout: f64 },
}
static CONTROLLER: OnceLock<Controller> = OnceLock::new();
const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
pub struct Controller {
recorder: VectorRecorder,
}
fn metrics_enabled() -> bool {
!matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
}
fn tracing_context_layer_enabled() -> bool {
!matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
}
fn init(recorder: VectorRecorder) -> Result<()> {
if !metrics_enabled() {
metrics::set_global_recorder(metrics::NoopRecorder)
.map_err(|_| Error::AlreadyInitialized)?;
info!(message = "Internal metrics core is disabled.");
return Ok(());
}
if tracing_context_layer_enabled() {
metrics::set_global_recorder(
TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
)
.map_err(|_| Error::AlreadyInitialized)?;
} else {
metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
}
let controller = Controller { recorder };
CONTROLLER
.set(controller)
.map_err(|_| Error::AlreadyInitialized)?;
Ok(())
}
pub fn init_global() -> Result<()> {
init(VectorRecorder::new_global())
}
pub fn init_test() {
if init(VectorRecorder::new_test()).is_err() {
while CONTROLLER.get().is_none() {}
}
}
impl Controller {
pub fn reset(&self) {
self.recorder.with_registry(Registry::clear);
}
pub fn get() -> Result<&'static Self> {
CONTROLLER.get().ok_or(Error::NotInitialized)
}
pub fn set_expiry(&self, timeout: Option<f64>) -> Result<()> {
if let Some(timeout) = timeout {
if timeout <= 0.0 {
return Err(Error::TimeoutMustBePositive { timeout });
}
}
self.recorder
.with_registry(|registry| registry.set_expiry(timeout.map(Duration::from_secs_f64)));
Ok(())
}
pub fn capture_metrics(&self) -> Vec<Metric> {
let timestamp = Utc::now();
let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
#[allow(clippy::cast_precision_loss)]
let value = (metrics.len() + 2) as f64;
metrics.push(Metric::from_metric_kv(
&CARDINALITY_KEY,
MetricValue::Gauge { value },
timestamp,
));
metrics.push(Metric::from_metric_kv(
&CARDINALITY_COUNTER_KEY,
MetricValue::Counter { value },
timestamp,
));
metrics
}
}
#[macro_export]
macro_rules! update_counter {
($label:literal, $value:expr) => {{
use ::std::sync::atomic::{AtomicU64, Ordering};
static PREVIOUS_VALUE: AtomicU64 = AtomicU64::new(0);
let new_value = $value;
let mut previous_value = PREVIOUS_VALUE.load(Ordering::Relaxed);
loop {
if new_value <= previous_value {
break;
}
match PREVIOUS_VALUE.compare_exchange_weak(
previous_value,
new_value,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Err(value) => previous_value = value,
Ok(_) => {
let delta = new_value - previous_value;
counter!($label).increment(delta);
break;
}
}
}
}};
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::MetricKind;
const IDLE_TIMEOUT: f64 = 0.5;
fn init_metrics() -> &'static Controller {
init_test();
Controller::get().expect("Could not get global metrics controller")
}
#[test]
fn cardinality_matches() {
for cardinality in [0, 1, 10, 100, 1000, 10000] {
init_test();
let controller = Controller::get().unwrap();
controller.reset();
for idx in 0..cardinality {
metrics::counter!("test", "idx" => idx.to_string()).increment(1);
}
let metrics = controller.capture_metrics();
assert_eq!(metrics.len(), cardinality + 2);
#[allow(clippy::cast_precision_loss)]
let value = metrics.len() as f64;
for metric in metrics {
match metric.name() {
CARDINALITY_KEY_NAME => {
assert_eq!(metric.value(), &MetricValue::Gauge { value });
assert_eq!(metric.kind(), MetricKind::Absolute);
}
CARDINALITY_COUNTER_KEY_NAME => {
assert_eq!(metric.value(), &MetricValue::Counter { value });
assert_eq!(metric.kind(), MetricKind::Absolute);
}
_ => {}
}
}
}
}
#[test]
fn handles_registered_metrics() {
let controller = init_metrics();
let counter = metrics::counter!("test7");
assert_eq!(controller.capture_metrics().len(), 3);
counter.increment(1);
assert_eq!(controller.capture_metrics().len(), 3);
let gauge = metrics::gauge!("test8");
assert_eq!(controller.capture_metrics().len(), 4);
gauge.set(1.0);
assert_eq!(controller.capture_metrics().len(), 4);
}
#[test]
fn expires_metrics() {
let controller = init_metrics();
controller.set_expiry(Some(IDLE_TIMEOUT)).unwrap();
metrics::counter!("test2").increment(1);
metrics::counter!("test3").increment(2);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
metrics::counter!("test2").increment(3);
assert_eq!(controller.capture_metrics().len(), 3);
}
#[test]
fn expires_metrics_tags() {
let controller = init_metrics();
controller.set_expiry(Some(IDLE_TIMEOUT)).unwrap();
metrics::counter!("test4", "tag" => "value1").increment(1);
metrics::counter!("test4", "tag" => "value2").increment(2);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
metrics::counter!("test4", "tag" => "value1").increment(3);
assert_eq!(controller.capture_metrics().len(), 3);
}
#[test]
fn skips_expiring_registered() {
let controller = init_metrics();
controller.set_expiry(Some(IDLE_TIMEOUT)).unwrap();
let a = metrics::counter!("test5");
metrics::counter!("test6").increment(5);
assert_eq!(controller.capture_metrics().len(), 4);
a.increment(1);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
assert_eq!(controller.capture_metrics().len(), 3);
a.increment(1);
let metrics = controller.capture_metrics();
assert_eq!(metrics.len(), 3);
let metric = metrics
.into_iter()
.find(|metric| metric.name() == "test5")
.expect("Test metric is not present");
match metric.value() {
MetricValue::Counter { value } => assert_eq!(*value, 2.0),
value => panic!("Invalid metric value {value:?}"),
}
}
}