mod ddsketch;
mod label_filter;
mod metric_matcher;
mod recency;
mod recorder;
mod storage;
use std::{sync::OnceLock, time::Duration};
use chrono::Utc;
use metric_matcher::MetricKeyMatcher;
use metrics::Key;
use metrics_tracing_context::TracingContextLayer;
use metrics_util::layers::Layer;
use snafu::Snafu;
pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
use self::{label_filter::VectorLabelFilter, recorder::Registry, recorder::VectorRecorder};
use crate::{
config::metrics_expiration::PerMetricSetExpiration,
event::{Metric, MetricValue},
};
type Result<T> = std::result::Result<T, Error>;
#[derive(Clone, Debug, PartialEq, Snafu)]
pub enum Error {
#[snafu(display("Recorder already initialized."))]
AlreadyInitialized,
#[snafu(display("Metrics system was not initialized."))]
NotInitialized,
#[snafu(display("Timeout value of {} must be positive.", timeout))]
TimeoutMustBePositive { timeout: f64 },
#[snafu(display("Invalid regex pattern: {}.", pattern))]
InvalidRegexPattern { pattern: String },
}
static CONTROLLER: OnceLock<Controller> = OnceLock::new();
const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
pub struct Controller {
recorder: VectorRecorder,
}
fn metrics_enabled() -> bool {
!matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
}
fn tracing_context_layer_enabled() -> bool {
!matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
}
fn init(recorder: VectorRecorder) -> Result<()> {
if !metrics_enabled() {
metrics::set_global_recorder(metrics::NoopRecorder)
.map_err(|_| Error::AlreadyInitialized)?;
info!(message = "Internal metrics core is disabled.");
return Ok(());
}
if tracing_context_layer_enabled() {
metrics::set_global_recorder(
TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
)
.map_err(|_| Error::AlreadyInitialized)?;
} else {
metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
}
let controller = Controller { recorder };
CONTROLLER
.set(controller)
.map_err(|_| Error::AlreadyInitialized)?;
Ok(())
}
pub fn init_global() -> Result<()> {
init(VectorRecorder::new_global())
}
pub fn init_test() {
if init(VectorRecorder::new_test()).is_err() {
while CONTROLLER.get().is_none() {}
}
}
impl Controller {
pub fn reset(&self) {
self.recorder.with_registry(Registry::clear);
}
pub fn get() -> Result<&'static Self> {
CONTROLLER.get().ok_or(Error::NotInitialized)
}
pub fn set_expiry(
&self,
global_timeout: Option<f64>,
expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
) -> Result<()> {
if let Some(timeout) = global_timeout {
if timeout <= 0.0 {
return Err(Error::TimeoutMustBePositive { timeout });
}
}
let per_metric_expiration = expire_metrics_per_metric_set
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
self.recorder.with_registry(|registry| {
registry.set_expiry(
global_timeout.map(Duration::from_secs_f64),
per_metric_expiration,
);
});
Ok(())
}
pub fn capture_metrics(&self) -> Vec<Metric> {
let timestamp = Utc::now();
let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
#[allow(clippy::cast_precision_loss)]
let value = (metrics.len() + 2) as f64;
metrics.push(Metric::from_metric_kv(
&CARDINALITY_KEY,
MetricValue::Gauge { value },
timestamp,
));
metrics.push(Metric::from_metric_kv(
&CARDINALITY_COUNTER_KEY,
MetricValue::Counter { value },
timestamp,
));
metrics
}
}
#[macro_export]
macro_rules! update_counter {
($label:literal, $value:expr) => {{
use ::std::sync::atomic::{AtomicU64, Ordering};
static PREVIOUS_VALUE: AtomicU64 = AtomicU64::new(0);
let new_value = $value;
let mut previous_value = PREVIOUS_VALUE.load(Ordering::Relaxed);
loop {
if new_value <= previous_value {
break;
}
match PREVIOUS_VALUE.compare_exchange_weak(
previous_value,
new_value,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Err(value) => previous_value = value,
Ok(_) => {
let delta = new_value - previous_value;
counter!($label).increment(delta);
break;
}
}
}
}};
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
config::metrics_expiration::{
MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
},
event::MetricKind,
};
const IDLE_TIMEOUT: f64 = 0.5;
fn init_metrics() -> &'static Controller {
init_test();
Controller::get().expect("Could not get global metrics controller")
}
#[test]
fn cardinality_matches() {
for cardinality in [0, 1, 10, 100, 1000, 10000] {
init_test();
let controller = Controller::get().unwrap();
controller.reset();
for idx in 0..cardinality {
metrics::counter!("test", "idx" => idx.to_string()).increment(1);
}
let metrics = controller.capture_metrics();
assert_eq!(metrics.len(), cardinality + 2);
#[allow(clippy::cast_precision_loss)]
let value = metrics.len() as f64;
for metric in metrics {
match metric.name() {
CARDINALITY_KEY_NAME => {
assert_eq!(metric.value(), &MetricValue::Gauge { value });
assert_eq!(metric.kind(), MetricKind::Absolute);
}
CARDINALITY_COUNTER_KEY_NAME => {
assert_eq!(metric.value(), &MetricValue::Counter { value });
assert_eq!(metric.kind(), MetricKind::Absolute);
}
_ => {}
}
}
}
}
#[test]
fn handles_registered_metrics() {
let controller = init_metrics();
let counter = metrics::counter!("test7");
assert_eq!(controller.capture_metrics().len(), 3);
counter.increment(1);
assert_eq!(controller.capture_metrics().len(), 3);
let gauge = metrics::gauge!("test8");
assert_eq!(controller.capture_metrics().len(), 4);
gauge.set(1.0);
assert_eq!(controller.capture_metrics().len(), 4);
}
#[test]
fn expires_metrics() {
let controller = init_metrics();
controller
.set_expiry(Some(IDLE_TIMEOUT), Vec::new())
.unwrap();
metrics::counter!("test2").increment(1);
metrics::counter!("test3").increment(2);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
metrics::counter!("test2").increment(3);
assert_eq!(controller.capture_metrics().len(), 3);
}
#[test]
fn expires_metrics_tags() {
let controller = init_metrics();
controller
.set_expiry(Some(IDLE_TIMEOUT), Vec::new())
.unwrap();
metrics::counter!("test4", "tag" => "value1").increment(1);
metrics::counter!("test4", "tag" => "value2").increment(2);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
metrics::counter!("test4", "tag" => "value1").increment(3);
assert_eq!(controller.capture_metrics().len(), 3);
}
#[test]
fn skips_expiring_registered() {
let controller = init_metrics();
controller
.set_expiry(Some(IDLE_TIMEOUT), Vec::new())
.unwrap();
let a = metrics::counter!("test5");
metrics::counter!("test6").increment(5);
assert_eq!(controller.capture_metrics().len(), 4);
a.increment(1);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
assert_eq!(controller.capture_metrics().len(), 3);
a.increment(1);
let metrics = controller.capture_metrics();
assert_eq!(metrics.len(), 3);
let metric = metrics
.into_iter()
.find(|metric| metric.name() == "test5")
.expect("Test metric is not present");
match metric.value() {
MetricValue::Counter { value } => assert_eq!(*value, 2.0),
value => panic!("Invalid metric value {value:?}"),
}
}
#[test]
fn expires_metrics_per_set() {
let controller = init_metrics();
controller
.set_expiry(
None,
vec![PerMetricSetExpiration {
name: Some(MetricNameMatcherConfig::Exact {
value: "test3".to_string(),
}),
labels: None,
expire_secs: IDLE_TIMEOUT,
}],
)
.unwrap();
metrics::counter!("test2").increment(1);
metrics::counter!("test3").increment(2);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
metrics::counter!("test2").increment(3);
assert_eq!(controller.capture_metrics().len(), 3);
}
#[test]
fn expires_metrics_multiple_different_sets() {
let controller = init_metrics();
controller
.set_expiry(
Some(IDLE_TIMEOUT * 3.0),
vec![
PerMetricSetExpiration {
name: Some(MetricNameMatcherConfig::Exact {
value: "test3".to_string(),
}),
labels: None,
expire_secs: IDLE_TIMEOUT,
},
PerMetricSetExpiration {
name: None,
labels: Some(MetricLabelMatcherConfig::All {
matchers: vec![MetricLabelMatcher::Exact {
key: "tag".to_string(),
value: "value1".to_string(),
}],
}),
expire_secs: IDLE_TIMEOUT * 2.0,
},
],
)
.unwrap();
metrics::counter!("test1").increment(1);
metrics::counter!("test2").increment(1);
metrics::counter!("test3").increment(2);
metrics::counter!("test4", "tag" => "value1").increment(3);
assert_eq!(controller.capture_metrics().len(), 6);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
metrics::counter!("test2").increment(3);
assert_eq!(controller.capture_metrics().len(), 5);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
metrics::counter!("test2").increment(3);
assert_eq!(controller.capture_metrics().len(), 4);
std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
metrics::counter!("test2").increment(3);
assert_eq!(controller.capture_metrics().len(), 3);
}
}