1mod ddsketch;
2mod label_filter;
3mod metric_matcher;
4mod recency;
5mod recorder;
6mod storage;
7
8use std::{sync::OnceLock, time::Duration};
9
10use chrono::Utc;
11use metric_matcher::MetricKeyMatcher;
12use metrics::Key;
13use metrics_tracing_context::TracingContextLayer;
14use metrics_util::layers::Layer;
15use snafu::Snafu;
16
17pub use self::ddsketch::{AgentDDSketch, BinMap, Config};
18use self::{
19 label_filter::VectorLabelFilter,
20 recorder::{Registry, VectorRecorder},
21};
22use crate::{
23 config::metrics_expiration::PerMetricSetExpiration,
24 event::{Metric, MetricValue},
25};
26
27type Result<T> = std::result::Result<T, Error>;
28
29#[derive(Clone, Debug, PartialEq, Snafu)]
30pub enum Error {
31 #[snafu(display("Recorder already initialized."))]
32 AlreadyInitialized,
33 #[snafu(display("Metrics system was not initialized."))]
34 NotInitialized,
35 #[snafu(display("Timeout value of {} must be positive.", timeout))]
36 TimeoutMustBePositive { timeout: f64 },
37 #[snafu(display("Invalid regex pattern: {}.", pattern))]
38 InvalidRegexPattern { pattern: String },
39}
40
41static CONTROLLER: OnceLock<Controller> = OnceLock::new();
42
43const CARDINALITY_KEY_NAME: &str = "internal_metrics_cardinality";
47static CARDINALITY_KEY: Key = Key::from_static_name(CARDINALITY_KEY_NAME);
48
49const CARDINALITY_COUNTER_KEY_NAME: &str = "internal_metrics_cardinality_total";
51static CARDINALITY_COUNTER_KEY: Key = Key::from_static_name(CARDINALITY_COUNTER_KEY_NAME);
52
53pub struct Controller {
55 recorder: VectorRecorder,
56}
57
58fn metrics_enabled() -> bool {
59 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_CORE"), Ok(x) if x == "true")
60}
61
62fn tracing_context_layer_enabled() -> bool {
63 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
64}
65
66fn init(recorder: VectorRecorder) -> Result<()> {
67 if !metrics_enabled() {
70 metrics::set_global_recorder(metrics::NoopRecorder)
71 .map_err(|_| Error::AlreadyInitialized)?;
72 info!(message = "Internal metrics core is disabled.");
73 return Ok(());
74 }
75
76 if tracing_context_layer_enabled() {
84 metrics::set_global_recorder(
86 TracingContextLayer::new(VectorLabelFilter).layer(recorder.clone()),
87 )
88 .map_err(|_| Error::AlreadyInitialized)?;
89 } else {
90 metrics::set_global_recorder(recorder.clone()).map_err(|_| Error::AlreadyInitialized)?;
91 }
92
93 let controller = Controller { recorder };
103 CONTROLLER
104 .set(controller)
105 .map_err(|_| Error::AlreadyInitialized)?;
106
107 Ok(())
108}
109
110pub fn init_global() -> Result<()> {
116 init(VectorRecorder::new_global())
117}
118
119pub fn init_test() {
122 if init(VectorRecorder::new_test()).is_err() {
123 while CONTROLLER.get().is_none() {}
130 }
131}
132
133impl Controller {
134 pub fn reset(&self) {
136 self.recorder.with_registry(Registry::clear);
137 }
138
139 pub fn get() -> Result<&'static Self> {
146 CONTROLLER.get().ok_or(Error::NotInitialized)
147 }
148
149 pub fn set_expiry(
156 &self,
157 global_timeout: Option<f64>,
158 expire_metrics_per_metric_set: Vec<PerMetricSetExpiration>,
159 ) -> Result<()> {
160 if let Some(timeout) = global_timeout
161 && timeout <= 0.0
162 {
163 return Err(Error::TimeoutMustBePositive { timeout });
164 }
165 let per_metric_expiration = expire_metrics_per_metric_set
166 .into_iter()
167 .map(TryInto::try_into)
168 .collect::<Result<Vec<(MetricKeyMatcher, Duration)>>>()?;
169
170 self.recorder.with_registry(|registry| {
171 registry.set_expiry(
172 global_timeout.map(Duration::from_secs_f64),
173 per_metric_expiration,
174 );
175 });
176 Ok(())
177 }
178
179 pub fn capture_metrics(&self) -> Vec<Metric> {
182 let timestamp = Utc::now();
183
184 let mut metrics = self.recorder.with_registry(Registry::visit_metrics);
185
186 #[allow(clippy::cast_precision_loss)]
187 let value = (metrics.len() + 2) as f64;
188 metrics.push(Metric::from_metric_kv(
189 &CARDINALITY_KEY,
190 MetricValue::Gauge { value },
191 timestamp,
192 ));
193 metrics.push(Metric::from_metric_kv(
194 &CARDINALITY_COUNTER_KEY,
195 MetricValue::Counter { value },
196 timestamp,
197 ));
198
199 metrics
200 }
201}
202
203#[macro_export]
204macro_rules! update_counter {
210 ($label:literal, $value:expr) => {{
211 use ::std::sync::atomic::{AtomicU64, Ordering};
212
213 static PREVIOUS_VALUE: AtomicU64 = AtomicU64::new(0);
214
215 let new_value = $value;
216 let mut previous_value = PREVIOUS_VALUE.load(Ordering::Relaxed);
217
218 loop {
219 if new_value <= previous_value {
223 break;
224 }
225
226 match PREVIOUS_VALUE.compare_exchange_weak(
227 previous_value,
228 new_value,
229 Ordering::SeqCst,
230 Ordering::Relaxed,
231 ) {
232 Err(value) => previous_value = value,
234 Ok(_) => {
236 let delta = new_value - previous_value;
237 counter!($label).increment(delta);
240 break;
241 }
242 }
243 }
244 }};
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use crate::{
251 config::metrics_expiration::{
252 MetricLabelMatcher, MetricLabelMatcherConfig, MetricNameMatcherConfig,
253 },
254 event::MetricKind,
255 };
256
257 const IDLE_TIMEOUT: f64 = 0.5;
258
259 fn init_metrics() -> &'static Controller {
260 init_test();
261 Controller::get().expect("Could not get global metrics controller")
262 }
263
264 #[test]
265 fn cardinality_matches() {
266 for cardinality in [0, 1, 10, 100, 1000, 10000] {
267 init_test();
268 let controller = Controller::get().unwrap();
269 controller.reset();
270
271 for idx in 0..cardinality {
272 metrics::counter!("test", "idx" => idx.to_string()).increment(1);
273 }
274
275 let metrics = controller.capture_metrics();
276 assert_eq!(metrics.len(), cardinality + 2);
277
278 #[allow(clippy::cast_precision_loss)]
279 let value = metrics.len() as f64;
280 for metric in metrics {
281 match metric.name() {
282 CARDINALITY_KEY_NAME => {
283 assert_eq!(metric.value(), &MetricValue::Gauge { value });
284 assert_eq!(metric.kind(), MetricKind::Absolute);
285 }
286 CARDINALITY_COUNTER_KEY_NAME => {
287 assert_eq!(metric.value(), &MetricValue::Counter { value });
288 assert_eq!(metric.kind(), MetricKind::Absolute);
289 }
290 _ => {}
291 }
292 }
293 }
294 }
295
296 #[test]
297 fn handles_registered_metrics() {
298 let controller = init_metrics();
299
300 let counter = metrics::counter!("test7");
301 assert_eq!(controller.capture_metrics().len(), 3);
302 counter.increment(1);
303 assert_eq!(controller.capture_metrics().len(), 3);
304 let gauge = metrics::gauge!("test8");
305 assert_eq!(controller.capture_metrics().len(), 4);
306 gauge.set(1.0);
307 assert_eq!(controller.capture_metrics().len(), 4);
308 }
309
310 #[test]
311 fn expires_metrics() {
312 let controller = init_metrics();
313 controller
314 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
315 .unwrap();
316
317 metrics::counter!("test2").increment(1);
318 metrics::counter!("test3").increment(2);
319 assert_eq!(controller.capture_metrics().len(), 4);
320
321 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
322 metrics::counter!("test2").increment(3);
323 assert_eq!(controller.capture_metrics().len(), 3);
324 }
325
326 #[test]
327 fn expires_metrics_tags() {
328 let controller = init_metrics();
329 controller
330 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
331 .unwrap();
332
333 metrics::counter!("test4", "tag" => "value1").increment(1);
334 metrics::counter!("test4", "tag" => "value2").increment(2);
335 assert_eq!(controller.capture_metrics().len(), 4);
336
337 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
338 metrics::counter!("test4", "tag" => "value1").increment(3);
339 assert_eq!(controller.capture_metrics().len(), 3);
340 }
341
342 #[test]
343 fn skips_expiring_registered() {
344 let controller = init_metrics();
345 controller
346 .set_expiry(Some(IDLE_TIMEOUT), Vec::new())
347 .unwrap();
348
349 let a = metrics::counter!("test5");
350 metrics::counter!("test6").increment(5);
351 assert_eq!(controller.capture_metrics().len(), 4);
352 a.increment(1);
353 assert_eq!(controller.capture_metrics().len(), 4);
354
355 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
356 assert_eq!(controller.capture_metrics().len(), 3);
357
358 a.increment(1);
359 let metrics = controller.capture_metrics();
360 assert_eq!(metrics.len(), 3);
361 let metric = metrics
362 .into_iter()
363 .find(|metric| metric.name() == "test5")
364 .expect("Test metric is not present");
365 match metric.value() {
366 MetricValue::Counter { value } => assert_eq!(*value, 2.0),
367 value => panic!("Invalid metric value {value:?}"),
368 }
369 }
370
371 #[test]
372 fn expires_metrics_per_set() {
373 let controller = init_metrics();
374 controller
375 .set_expiry(
376 None,
377 vec![PerMetricSetExpiration {
378 name: Some(MetricNameMatcherConfig::Exact {
379 value: "test3".to_string(),
380 }),
381 labels: None,
382 expire_secs: IDLE_TIMEOUT,
383 }],
384 )
385 .unwrap();
386
387 metrics::counter!("test2").increment(1);
388 metrics::counter!("test3").increment(2);
389 assert_eq!(controller.capture_metrics().len(), 4);
390
391 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 2.0));
392 metrics::counter!("test2").increment(3);
393 assert_eq!(controller.capture_metrics().len(), 3);
394 }
395
396 #[test]
397 fn expires_metrics_multiple_different_sets() {
398 let controller = init_metrics();
399 controller
400 .set_expiry(
401 Some(IDLE_TIMEOUT * 3.0),
402 vec![
403 PerMetricSetExpiration {
404 name: Some(MetricNameMatcherConfig::Exact {
405 value: "test3".to_string(),
406 }),
407 labels: None,
408 expire_secs: IDLE_TIMEOUT,
409 },
410 PerMetricSetExpiration {
411 name: None,
412 labels: Some(MetricLabelMatcherConfig::All {
413 matchers: vec![MetricLabelMatcher::Exact {
414 key: "tag".to_string(),
415 value: "value1".to_string(),
416 }],
417 }),
418 expire_secs: IDLE_TIMEOUT * 2.0,
419 },
420 ],
421 )
422 .unwrap();
423
424 metrics::counter!("test1").increment(1);
425 metrics::counter!("test2").increment(1);
426 metrics::counter!("test3").increment(2);
427 metrics::counter!("test4", "tag" => "value1").increment(3);
428 assert_eq!(controller.capture_metrics().len(), 6);
429
430 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT * 1.5));
431 metrics::counter!("test2").increment(3);
432 assert_eq!(controller.capture_metrics().len(), 5);
433
434 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
435 metrics::counter!("test2").increment(3);
436 assert_eq!(controller.capture_metrics().len(), 4);
437
438 std::thread::sleep(Duration::from_secs_f64(IDLE_TIMEOUT));
439 metrics::counter!("test2").increment(3);
440 assert_eq!(controller.capture_metrics().len(), 3);
441 }
442}