1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18use self::{label_filter::VectorLabelFilter, recorder::Registry, recorder::VectorRecorder};
19use crate::{
20 config::metrics_expiration::PerMetricSetExpiration,
21 event::{Metric, MetricValue},
22};
23
24type Result<T> = std::result::Result<T, Error>;
25
26#[derive(Clone, Debug, PartialEq, Snafu)]
27pub enum Error {
28 #[snafu(display("Recorder already initialized."))]
29 AlreadyInitialized,
30 #[snafu(display("Metrics system was not initialized."))]
31 NotInitialized,
32 #[snafu(display("Timeout value of {} must be positive.", timeout))]
33 TimeoutMustBePositive { timeout: f64 },
34 #[snafu(display("Invalid regex pattern: {}.", pattern))]
35 InvalidRegexPattern { pattern: String },
36}
37
38static CONTROLLER: OnceLock<Controller> = OnceLock::new();
39
40const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
44static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
45
46const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
48static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
49
50pub struct Controller {
52 recorder: VectorRecorder,
53}
54
55fn metrics_enabled() -> bool {
56 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
57}
58
59fn tracing_context_layer_enabled() -> bool {
60 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
61}
62
63fn init(recorder: VectorRecorder) -> Result<()> {
64 if !metrics_enabled() {
67 metrics::set_global_recorder(metrics::NoopRecorder)
68 .map_err(|_| Error::AlreadyInitialized)?;
69 info!(message = "Internal metrics core is disabled.");
70 return Ok(());
71 }
72
73 if tracing_context_layer_enabled() {
81 metrics::set_global_recorder(
83 TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
84 )
85 .map_err(|_| Error::AlreadyInitialized)?;
86 } else {
87 metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
88 }
89
90 let controller = Controller { recorder };
100 CONTROLLER
101 .set(controller)
102 .map_err(|_| Error::AlreadyInitialized)?;
103
104 Ok(())
105}
106
107pub fn init_global() -> Result<()> {
113 init(VectorRecorder::new_global())
114}
115
116pub fn init_test() {
119 if init(VectorRecorder::new_test()).is_err() {
120 while CONTROLLER.get().is_none() {}
127 }
128}
129
130impl Controller {
131 pub fn reset(&self) {
133 self.recorder.with_registry(Registry::clear);
134 }
135
136 pub fn get() -> Result<&'static Self> {
143 CONTROLLER.get().ok_or(Error::NotInitialized)
144 }
145
146 pub fn set_expiry(
153 &self,
154 global_timeout: Option<f64>,
155 expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
156 ) -> Result<()> {
157 if let Some(timeout) = global_timeout {
158 if timeout <= 0.0 {
159 return Err(Error::TimeoutMustBePositive { timeout });
160 }
161 }
162 let per_metric_expiration = expire_metrics_per_metric_set
163 .into_iter()
164 .map(TryInto::try_into)
165 .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
166
167 self.recorder.with_registry(|registry| {
168 registry.set_expiry(
169 global_timeout.map(Duration::from_secs_f64),
170 per_metric_expiration,
171 );
172 });
173 Ok(())
174 }
175
176 pub fn capture_metrics(&self) -> Vec<Metric> {
179 let timestamp = Utc::now();
180
181 let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
182
183 #[allow(clippy::cast_precision_loss)]
184 let value = (metrics.len() + 2) as f64;
185 metrics.push(Metric::from_metric_kv(
186 &CARDINALITY_KEY,
187 MetricValue::Gauge { value },
188 timestamp,
189 ));
190 metrics.push(Metric::from_metric_kv(
191 &CARDINALITY_COUNTER_KEY,
192 MetricValue::Counter { value },
193 timestamp,
194 ));
195
196 metrics
197 }
198}
199
200#[macro_export]
201macro_rules! update_counter {
207 ($label:literal, $value:expr) => {{
208 use ::std::sync::atomic::{AtomicU64, Ordering};
209
210 static PREVIOUS_VALUE: AtomicU64 = AtomicU64::new(0);
211
212 let new_value = $value;
213 let mut previous_value = PREVIOUS_VALUE.load(Ordering::Relaxed);
214
215 loop {
216 if new_value <= previous_value {
220 break;
221 }
222
223 match PREVIOUS_VALUE.compare_exchange_weak(
224 previous_value,
225 new_value,
226 Ordering::SeqCst,
227 Ordering::Relaxed,
228 ) {
229 Err(value) => previous_value = value,
231 Ok(_) => {
233 let delta = new_value - previous_value;
234 counter!($label).increment(delta);
237 break;
238 }
239 }
240 }
241 }};
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 use crate::{
249 config::metrics_expiration::{
250 MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
251 },
252 event::MetricKind,
253 };
254
255 const IDLE_TIMEOUT: f64 = 0.5;
256
257 fn init_metrics() -> &'static Controller {
258 init_test();
259 Controller::get().expect("Could not get global metrics controller")
260 }
261
262 #[test]
263 fn cardinality_matches() {
264 for cardinality in [0, 1, 10, 100, 1000, 10000] {
265 init_test();
266 let controller = Controller::get().unwrap();
267 controller.reset();
268
269 for idx in 0..cardinality {
270 metrics::counter!("test", "idx" => idx.to_string()).increment(1);
271 }
272
273 let metrics = controller.capture_metrics();
274 assert_eq!(metrics.len(), cardinality + 2);
275
276 #[allow(clippy::cast_precision_loss)]
277 let value = metrics.len() as f64;
278 for metric in metrics {
279 match metric.name() {
280 CARDINALITY_KEY_NAME => {
281 assert_eq!(metric.value(), &MetricValue::Gauge { value });
282 assert_eq!(metric.kind(), MetricKind::Absolute);
283 }
284 CARDINALITY_COUNTER_KEY_NAME => {
285 assert_eq!(metric.value(), &MetricValue::Counter { value });
286 assert_eq!(metric.kind(), MetricKind::Absolute);
287 }
288 _ => {}
289 }
290 }
291 }
292 }
293
294 #[test]
295 fn handles_registered_metrics() {
296 let controller = init_metrics();
297
298 let counter = metrics::counter!("test7");
299 assert_eq!(controller.capture_metrics().len(), 3);
300 counter.increment(1);
301 assert_eq!(controller.capture_metrics().len(), 3);
302 let gauge = metrics::gauge!("test8");
303 assert_eq!(controller.capture_metrics().len(), 4);
304 gauge.set(1.0);
305 assert_eq!(controller.capture_metrics().len(), 4);
306 }
307
308 #[test]
309 fn expires_metrics() {
310 let controller = init_metrics();
311 controller
312 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
313 .unwrap();
314
315 metrics::counter!("test2").increment(1);
316 metrics::counter!("test3").increment(2);
317 assert_eq!(controller.capture_metrics().len(), 4);
318
319 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
320 metrics::counter!("test2").increment(3);
321 assert_eq!(controller.capture_metrics().len(), 3);
322 }
323
324 #[test]
325 fn expires_metrics_tags() {
326 let controller = init_metrics();
327 controller
328 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
329 .unwrap();
330
331 metrics::counter!("test4", "tag" => "value1").increment(1);
332 metrics::counter!("test4", "tag" => "value2").increment(2);
333 assert_eq!(controller.capture_metrics().len(), 4);
334
335 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
336 metrics::counter!("test4", "tag" => "value1").increment(3);
337 assert_eq!(controller.capture_metrics().len(), 3);
338 }
339
340 #[test]
341 fn skips_expiring_registered() {
342 let controller = init_metrics();
343 controller
344 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
345 .unwrap();
346
347 let a = metrics::counter!("test5");
348 metrics::counter!("test6").increment(5);
349 assert_eq!(controller.capture_metrics().len(), 4);
350 a.increment(1);
351 assert_eq!(controller.capture_metrics().len(), 4);
352
353 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
354 assert_eq!(controller.capture_metrics().len(), 3);
355
356 a.increment(1);
357 let metrics = controller.capture_metrics();
358 assert_eq!(metrics.len(), 3);
359 let metric = metrics
360 .into_iter()
361 .find(|metric| metric.name() == "test5")
362 .expect("Test metric is not present");
363 match metric.value() {
364 MetricValue::Counter { value } => assert_eq!(*value, 2.0),
365 value => panic!("Invalid metric value {value:?}"),
366 }
367 }
368
369 #[test]
370 fn expires_metrics_per_set() {
371 let controller = init_metrics();
372 controller
373 .set_expiry(
374 None,
375 vec![PerMetricSetExpiration {
376 name: Some(MetricNameMatcherConfig::Exact {
377 value: "test3".to_string(),
378 }),
379 labels: None,
380 expire_secs: IDLE_TIMEOUT,
381 }],
382 )
383 .unwrap();
384
385 metrics::counter!("test2").increment(1);
386 metrics::counter!("test3").increment(2);
387 assert_eq!(controller.capture_metrics().len(), 4);
388
389 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
390 metrics::counter!("test2").increment(3);
391 assert_eq!(controller.capture_metrics().len(), 3);
392 }
393
394 #[test]
395 fn expires_metrics_multiple_different_sets() {
396 let controller = init_metrics();
397 controller
398 .set_expiry(
399 Some(IDLE_TIMEOUT * 3.0),
400 vec![
401 PerMetricSetExpiration {
402 name: Some(MetricNameMatcherConfig::Exact {
403 value: "test3".to_string(),
404 }),
405 labels: None,
406 expire_secs: IDLE_TIMEOUT,
407 },
408 PerMetricSetExpiration {
409 name: None,
410 labels: Some(MetricLabelMatcherConfig::All {
411 matchers: vec![MetricLabelMatcher::Exact {
412 key: "tag".to_string(),
413 value: "value1".to_string(),
414 }],
415 }),
416 expire_secs: IDLE_TIMEOUT * 2.0,
417 },
418 ],
419 )
420 .unwrap();
421
422 metrics::counter!("test1").increment(1);
423 metrics::counter!("test2").increment(1);
424 metrics::counter!("test3").increment(2);
425 metrics::counter!("test4", "tag" => "value1").increment(3);
426 assert_eq!(controller.capture_metrics().len(), 6);
427
428 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
429 metrics::counter!("test2").increment(3);
430 assert_eq!(controller.capture_metrics().len(), 5);
431
432 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
433 metrics::counter!("test2").increment(3);
434 assert_eq!(controller.capture_metrics().len(), 4);
435
436 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
437 metrics::counter!("test2").increment(3);
438 assert_eq!(controller.capture_metrics().len(), 3);
439 }
440}