1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2use tokio::time::interval;
3use tokio_stream::{wrappers::IntervalStream, StreamExt};
4use vector_common::shutdown::ShutdownSignal;
5use vrl::{
6 diagnostic::Label,
7 prelude::{expression::Expr, *},
8 value,
9};
10
11use arc_swap::ArcSwap;
12use vector_core::{event::Metric, metrics::Controller};
13
14#[derive(Debug)]
15pub(crate) enum Error {
16 MetricsStorageNotLoaded,
17}
18
19impl fmt::Display for Error {
20 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21 match self {
22 Error::MetricsStorageNotLoaded => write!(f, "metrics storage not loaded"),
23 }
24 }
25}
26
27impl std::error::Error for Error {}
28
29impl DiagnosticMessage for Error {
30 fn code(&self) -> usize {
31 112
32 }
33
34 fn labels(&self) -> Vec<Label> {
35 match self {
36 Error::MetricsStorageNotLoaded => {
37 vec![Label::primary(
38 "VRL metrics error: metrics storage not loaded".to_string(),
39 Span::default(),
40 )]
41 }
42 }
43 }
44}
45
46#[derive(Debug, Default, Clone)]
47pub struct MetricsStorage {
48 #[doc(hidden)]
50 pub cache: Arc<ArcSwap<Vec<Metric>>>,
51}
52
53impl MetricsStorage {
54 pub(crate) fn get_metric(
55 &self,
56 metric: &str,
57 tags: BTreeMap<String, String>,
58 ) -> Option<Metric> {
59 self.cache
60 .load()
61 .iter()
62 .find(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
63 .cloned()
64 }
65
66 pub(crate) fn find_metrics(&self, metric: &str, tags: BTreeMap<String, String>) -> Vec<Metric> {
67 self.cache
68 .load()
69 .iter()
70 .filter(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
71 .cloned()
72 .collect()
73 }
74
75 pub fn refresh_metrics(&self) {
76 let new_metrics = Controller::get()
77 .expect("metrics not initialized")
78 .capture_metrics();
79 self.cache.store(new_metrics.into());
80 }
81
82 pub async fn run_periodic_refresh(
83 &self,
84 refresh_interval: Duration,
85 mut shutdown: ShutdownSignal,
86 ) {
87 let mut intervals = IntervalStream::new(interval(refresh_interval));
88 loop {
89 tokio::select! {
90 Some(_) = intervals.next() => {
91 self.refresh_metrics();
92 }
93 _ = &mut shutdown => {
94 break;
95 }
96 }
97 }
98 }
99}
100
101fn tag_matches(metric: &Metric, (tag_key, tag_value): (&String, &String)) -> bool {
103 if let Some(wildcard_index) = tag_value.find('*') {
104 let Some(metric_tag_value) = metric.tag_value(tag_key) else {
105 return false;
106 };
107
108 metric_tag_value.starts_with(&tag_value[0..wildcard_index])
109 && metric_tag_value.ends_with(&tag_value[(wildcard_index + 1)..])
110 } else {
111 metric.tag_matches(tag_key, tag_value)
112 }
113}
114
115pub(crate) fn metrics_vrl_typedef() -> BTreeMap<Field, Kind> {
116 BTreeMap::from([
117 (Field::from("name"), Kind::bytes()),
118 (Field::from("tags"), Kind::any_object()),
119 (Field::from("type"), Kind::bytes()),
120 (Field::from("kind"), Kind::bytes()),
121 (Field::from("value"), Kind::float() | Kind::null()),
122 ])
123}
124
125pub(crate) fn metric_into_vrl(value: &Metric) -> Value {
126 value!({
127 name: { value.name() },
128 tags: {
129 BTreeMap::from_iter(
130 value
131 .tags()
132 .map(|t| {
133 t.iter_sets()
134 .map(|(k, v)| {
135 (
136 k.into(),
137 Value::Array(
138 v.iter()
139 .filter_map(|v| {
140 v.map(ToString::to_string).map(Into::into).map(Value::Bytes)
141 })
142 .collect(),
143 ),
144 )
145 })
146 .collect::<Vec<_>>()
147 })
148 .unwrap_or_default(),
149 )
150 },
151 "type": { value.value().as_name() },
152 kind: {
153 match value.kind() {
154 vector_core::event::MetricKind::Incremental => "incremental",
155 vector_core::event::MetricKind::Absolute => "absolute",
156 }
157 },
158 value: {
159 match value.value() {
160 vector_core::event::MetricValue::Counter { value }
161 | vector_core::event::MetricValue::Gauge { value } => NotNan::new(*value).ok(),
162 _ => None,
163 }
164 }
165 })
166}
167
168pub(crate) fn validate_tags(
169 state: &TypeState,
170 tags: &BTreeMap<KeyString, Expr>,
171) -> Result<(), Box<dyn DiagnosticMessage>> {
172 for v in tags.values() {
173 if *v.type_def(state).kind() != Kind::bytes() {
174 return Err(Box::new(vrl::compiler::function::Error::InvalidArgument {
175 keyword: "tags.value",
176 value: v.resolve_constant(state).unwrap_or(Value::Null),
177 error: "Tag values must be strings",
178 }));
179 }
180 }
181 Ok(())
182}
183
184pub(crate) fn resolve_tags(
185 ctx: &mut Context,
186 tags: &BTreeMap<KeyString, Expr>,
187) -> Result<BTreeMap<String, String>, ExpressionError> {
188 tags.iter()
189 .map(|(k, v)| {
190 v.resolve(ctx).and_then(|v| {
191 Ok((
192 k.clone().into(),
193 v.as_str().ok_or("Tag must be a string")?.into_owned(),
194 ))
195 })
196 })
197 .collect::<Result<_, _>>()
198}
199
200#[cfg(test)]
202mod tests {
203 use vector_core::{
204 compile_vrl,
205 event::{Event, LogEvent, MetricKind, MetricTags, VrlTarget},
206 };
207 use vrl::{
208 compiler::{
209 runtime::{Runtime, Terminate},
210 CompilationResult, CompileConfig,
211 },
212 diagnostic::DiagnosticList,
213 };
214
215 use super::*;
216
217 fn compile(
218 storage: MetricsStorage,
219 vrl_source: &str,
220 ) -> Result<CompilationResult, DiagnosticList> {
221 #[allow(clippy::disallowed_methods)]
223 let functions = vrl::stdlib::all().into_iter();
224
225 let functions = functions.chain(crate::all()).collect::<Vec<_>>();
226
227 let state = TypeState::default();
228
229 let mut config = CompileConfig::default();
230 config.set_custom(storage.clone());
231 config.set_read_only();
232
233 compile_vrl(vrl_source, &functions, &state, config)
234 }
235
236 fn compile_and_run(storage: MetricsStorage, vrl_source: &str) -> Result<Value, Terminate> {
237 let CompilationResult {
238 program,
239 warnings: _,
240 config: _,
241 } = compile(storage, vrl_source).expect("compilation failed");
242
243 let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
244 Runtime::default().resolve(&mut target, &program, &TimeZone::default())
245 }
246
247 fn assert_metric_matches(
248 metric: &BTreeMap<KeyString, Value>,
249 name: &str,
250 value: f64,
251 tags: Option<Vec<(&str, &str)>>,
252 ) {
253 assert_eq!(metric.get("name").unwrap().as_str().unwrap(), name);
254 assert_eq!(
255 metric.get("value").unwrap().as_float().unwrap(),
256 NotNan::new(value).unwrap()
257 );
258
259 if let Some(tags) = tags {
260 let metric_tags = metric.get("tags").unwrap().as_object().unwrap();
261 for (key, value) in tags {
262 assert_eq!(
263 metric_tags
264 .get(key)
265 .unwrap()
266 .as_array_unwrap()
267 .first()
268 .unwrap()
269 .as_str()
270 .unwrap(),
271 value
272 );
273 }
274 }
275 }
276
277 #[test]
278 fn test_get_vector_metric() {
279 let storage = MetricsStorage::default();
280 storage.cache.store(
281 vec![Metric::new(
282 "test",
283 MetricKind::Absolute,
284 vector_core::event::MetricValue::Gauge { value: 1.0 },
285 )]
286 .into(),
287 );
288
289 let result = compile_and_run(
290 storage,
291 r#"
292 get_vector_metric("test")
293 "#,
294 )
295 .expect("vrl failed");
296 let result = result.as_object().unwrap();
297
298 assert_metric_matches(result, "test", 1.0, None);
299 }
300
301 #[test]
302 fn test_find_vector_metrics() {
303 let storage = MetricsStorage::default();
304 storage.cache.store(
305 vec![
306 Metric::new(
307 "test",
308 MetricKind::Absolute,
309 vector_core::event::MetricValue::Gauge { value: 1.0 },
310 )
311 .with_tags(Some(MetricTags::from_iter([(
312 "component_id".to_string(),
313 "a".to_string(),
314 )]))),
315 Metric::new(
316 "test",
317 MetricKind::Absolute,
318 vector_core::event::MetricValue::Gauge { value: 1.0 },
319 )
320 .with_tags(Some(MetricTags::from_iter([(
321 "component_id".to_string(),
322 "b".to_string(),
323 )]))),
324 ]
325 .into(),
326 );
327
328 let result = compile_and_run(
329 storage,
330 r#"
331 find_vector_metrics("test")
332 "#,
333 )
334 .expect("vrl failed");
335 let result = result.as_array_unwrap();
336
337 assert_metric_matches(
338 result[0].as_object().unwrap(),
339 "test",
340 1.0,
341 Some(vec![("component_id", "a")]),
342 );
343 assert_metric_matches(
344 result[1].as_object().unwrap(),
345 "test",
346 1.0,
347 Some(vec![("component_id", "b")]),
348 );
349 }
350
351 #[test]
352 fn test_get_vector_metric_by_tag() {
353 let storage = MetricsStorage::default();
354 storage.cache.store(
355 vec![
356 Metric::new(
357 "test",
358 MetricKind::Absolute,
359 vector_core::event::MetricValue::Gauge { value: 1.0 },
360 )
361 .with_tags(Some(MetricTags::from_iter([(
362 "component_id".to_string(),
363 "a".to_string(),
364 )]))),
365 Metric::new(
366 "test",
367 MetricKind::Absolute,
368 vector_core::event::MetricValue::Gauge { value: 1.0 },
369 )
370 .with_tags(Some(MetricTags::from_iter([(
371 "component_id".to_string(),
372 "b".to_string(),
373 )]))),
374 ]
375 .into(),
376 );
377
378 let result = compile_and_run(
379 storage,
380 r#"
381 get_vector_metric("test", tags: { "component_id": "b" })
382 "#,
383 )
384 .expect("vrl failed");
385 let result = result.as_object().unwrap();
386
387 assert_metric_matches(result, "test", 1.0, Some(vec![("component_id", "b")]));
388 }
389
390 #[test]
391 fn test_find_vector_metrics_wildcard() {
392 let storage = MetricsStorage::default();
393 storage.cache.store(
394 vec![
395 Metric::new(
396 "test",
397 MetricKind::Absolute,
398 vector_core::event::MetricValue::Gauge { value: 1.0 },
399 )
400 .with_tags(Some(MetricTags::from_iter([(
401 "component_id".to_string(),
402 "a".to_string(),
403 )]))),
404 Metric::new(
405 "test",
406 MetricKind::Absolute,
407 vector_core::event::MetricValue::Gauge { value: 1.0 },
408 )
409 .with_tags(Some(MetricTags::from_iter([(
410 "component_id".to_string(),
411 "b".to_string(),
412 )]))),
413 Metric::new(
414 "test",
415 MetricKind::Absolute,
416 vector_core::event::MetricValue::Gauge { value: 1.0 },
417 ),
418 ]
419 .into(),
420 );
421
422 let result = compile_and_run(
423 storage,
424 r#"
425 find_vector_metrics("test", tags: { "component_id": "*" })
426 "#,
427 )
428 .expect("vrl failed");
429 let result = result.as_array_unwrap();
430
431 assert_eq!(result.len(), 2);
433 assert_metric_matches(
434 result[0].as_object().unwrap(),
435 "test",
436 1.0,
437 Some(vec![("component_id", "a")]),
438 );
439 assert_metric_matches(
440 result[1].as_object().unwrap(),
441 "test",
442 1.0,
443 Some(vec![("component_id", "b")]),
444 );
445 }
446
447 #[test]
448 fn test_find_vector_metrics_wildcard_start() {
449 let storage = MetricsStorage::default();
450 storage.cache.store(
451 vec![
452 Metric::new(
453 "test",
454 MetricKind::Absolute,
455 vector_core::event::MetricValue::Gauge { value: 1.0 },
456 )
457 .with_tags(Some(MetricTags::from_iter([(
458 "component_id".to_string(),
459 "prefix.a".to_string(),
460 )]))),
461 Metric::new(
462 "test",
463 MetricKind::Absolute,
464 vector_core::event::MetricValue::Gauge { value: 1.0 },
465 )
466 .with_tags(Some(MetricTags::from_iter([(
467 "component_id".to_string(),
468 "something_else".to_string(),
469 )]))),
470 Metric::new(
471 "test",
472 MetricKind::Absolute,
473 vector_core::event::MetricValue::Gauge { value: 1.0 },
474 )
475 .with_tags(Some(MetricTags::from_iter([(
476 "component_id".to_string(),
477 "prefix.c".to_string(),
478 )]))),
479 ]
480 .into(),
481 );
482
483 let result = compile_and_run(
484 storage,
485 r#"
486 find_vector_metrics("test", tags: { "component_id": "prefix.*" })
487 "#,
488 )
489 .expect("vrl failed");
490 let result = result.as_array_unwrap();
491
492 assert_eq!(result.len(), 2);
493 assert_metric_matches(
494 result[0].as_object().unwrap(),
495 "test",
496 1.0,
497 Some(vec![("component_id", "prefix.a")]),
498 );
499 assert_metric_matches(
500 result[1].as_object().unwrap(),
501 "test",
502 1.0,
503 Some(vec![("component_id", "prefix.c")]),
504 );
505 }
506
507 #[test]
508 fn test_find_vector_metrics_wildcard_end() {
509 let storage = MetricsStorage::default();
510 storage.cache.store(
511 vec![
512 Metric::new(
513 "test",
514 MetricKind::Absolute,
515 vector_core::event::MetricValue::Gauge { value: 1.0 },
516 )
517 .with_tags(Some(MetricTags::from_iter([(
518 "component_id".to_string(),
519 "a.suffix".to_string(),
520 )]))),
521 Metric::new(
522 "test",
523 MetricKind::Absolute,
524 vector_core::event::MetricValue::Gauge { value: 1.0 },
525 )
526 .with_tags(Some(MetricTags::from_iter([(
527 "component_id".to_string(),
528 "something_else".to_string(),
529 )]))),
530 Metric::new(
531 "test",
532 MetricKind::Absolute,
533 vector_core::event::MetricValue::Gauge { value: 1.0 },
534 )
535 .with_tags(Some(MetricTags::from_iter([(
536 "component_id".to_string(),
537 "c.suffix".to_string(),
538 )]))),
539 ]
540 .into(),
541 );
542
543 let result = compile_and_run(
544 storage,
545 r#"
546 find_vector_metrics("test", tags: { "component_id": "*.suffix" })
547 "#,
548 )
549 .expect("vrl failed");
550 let result = result.as_array_unwrap();
551
552 assert_eq!(result.len(), 2);
553 assert_metric_matches(
554 result[0].as_object().unwrap(),
555 "test",
556 1.0,
557 Some(vec![("component_id", "a.suffix")]),
558 );
559 assert_metric_matches(
560 result[1].as_object().unwrap(),
561 "test",
562 1.0,
563 Some(vec![("component_id", "c.suffix")]),
564 );
565 }
566
567 #[test]
568 fn test_find_vector_metrics_wildcard_middle() {
569 let storage = MetricsStorage::default();
570 storage.cache.store(
571 vec![
572 Metric::new(
573 "test",
574 MetricKind::Absolute,
575 vector_core::event::MetricValue::Gauge { value: 1.0 },
576 )
577 .with_tags(Some(MetricTags::from_iter([(
578 "component_id".to_string(),
579 "start.a.end".to_string(),
580 )]))),
581 Metric::new(
582 "test",
583 MetricKind::Absolute,
584 vector_core::event::MetricValue::Gauge { value: 1.0 },
585 )
586 .with_tags(Some(MetricTags::from_iter([(
587 "component_id".to_string(),
588 "something_else".to_string(),
589 )]))),
590 Metric::new(
591 "test",
592 MetricKind::Absolute,
593 vector_core::event::MetricValue::Gauge { value: 1.0 },
594 )
595 .with_tags(Some(MetricTags::from_iter([(
596 "component_id".to_string(),
597 "start.c.end".to_string(),
598 )]))),
599 ]
600 .into(),
601 );
602
603 let result = compile_and_run(
604 storage,
605 r#"
606 find_vector_metrics("test", tags: { "component_id": "start.*.end" })
607 "#,
608 )
609 .expect("vrl failed");
610 let result = result.as_array_unwrap();
611
612 assert_eq!(result.len(), 2);
613 assert_metric_matches(
614 result[0].as_object().unwrap(),
615 "test",
616 1.0,
617 Some(vec![("component_id", "start.a.end")]),
618 );
619 assert_metric_matches(
620 result[1].as_object().unwrap(),
621 "test",
622 1.0,
623 Some(vec![("component_id", "start.c.end")]),
624 );
625 }
626
627 #[test]
628 fn test_aggregate_vector_metrics_sum() {
629 let storage = MetricsStorage::default();
630 storage.cache.store(
631 vec![
632 Metric::new(
633 "test",
634 MetricKind::Absolute,
635 vector_core::event::MetricValue::Gauge { value: 6.0 },
636 )
637 .with_tags(Some(MetricTags::from_iter([(
638 "component_id".to_string(),
639 "start.a.end".to_string(),
640 )]))),
641 Metric::new(
642 "test",
643 MetricKind::Absolute,
644 vector_core::event::MetricValue::Gauge { value: 1.0 },
645 )
646 .with_tags(Some(MetricTags::from_iter([(
647 "component_id".to_string(),
648 "something_else".to_string(),
649 )]))),
650 Metric::new(
651 "test",
652 MetricKind::Absolute,
653 vector_core::event::MetricValue::Gauge { value: 3.0 },
654 )
655 .with_tags(Some(MetricTags::from_iter([(
656 "component_id".to_string(),
657 "start.c.end".to_string(),
658 )]))),
659 ]
660 .into(),
661 );
662
663 let result = compile_and_run(
664 storage,
665 r#"
666 aggregate_vector_metrics("sum", "test", tags: { "component_id": "start.*.end" })
667 "#,
668 )
669 .expect("vrl failed");
670 let result = result.as_float().unwrap();
671
672 assert_eq!(result.into_inner(), 9.0);
673 }
674
675 #[test]
676 fn test_aggregate_vector_metrics_avg() {
677 let storage = MetricsStorage::default();
678 storage.cache.store(
679 vec![
680 Metric::new(
681 "test",
682 MetricKind::Absolute,
683 vector_core::event::MetricValue::Gauge { value: 6.0 },
684 )
685 .with_tags(Some(MetricTags::from_iter([(
686 "component_id".to_string(),
687 "start.a.end".to_string(),
688 )]))),
689 Metric::new(
690 "test",
691 MetricKind::Absolute,
692 vector_core::event::MetricValue::Gauge { value: 1.0 },
693 )
694 .with_tags(Some(MetricTags::from_iter([(
695 "component_id".to_string(),
696 "something_else".to_string(),
697 )]))),
698 Metric::new(
699 "test",
700 MetricKind::Absolute,
701 vector_core::event::MetricValue::Gauge { value: 3.0 },
702 )
703 .with_tags(Some(MetricTags::from_iter([(
704 "component_id".to_string(),
705 "start.c.end".to_string(),
706 )]))),
707 ]
708 .into(),
709 );
710
711 let result = compile_and_run(
712 storage,
713 r#"
714 aggregate_vector_metrics("avg", "test", tags: { "component_id": "start.*.end" })
715 "#,
716 )
717 .expect("vrl failed");
718 let result = result.as_float().unwrap();
719
720 assert_eq!(result.into_inner(), 4.5);
721 }
722
723 #[test]
724 fn test_aggregate_vector_metrics_max() {
725 let storage = MetricsStorage::default();
726 storage.cache.store(
727 vec![
728 Metric::new(
729 "test",
730 MetricKind::Absolute,
731 vector_core::event::MetricValue::Gauge { value: 6.0 },
732 )
733 .with_tags(Some(MetricTags::from_iter([(
734 "component_id".to_string(),
735 "start.a.end".to_string(),
736 )]))),
737 Metric::new(
738 "test",
739 MetricKind::Absolute,
740 vector_core::event::MetricValue::Gauge { value: 1.0 },
741 )
742 .with_tags(Some(MetricTags::from_iter([(
743 "component_id".to_string(),
744 "something_else".to_string(),
745 )]))),
746 Metric::new(
747 "test",
748 MetricKind::Absolute,
749 vector_core::event::MetricValue::Gauge { value: 3.0 },
750 )
751 .with_tags(Some(MetricTags::from_iter([(
752 "component_id".to_string(),
753 "start.c.end".to_string(),
754 )]))),
755 ]
756 .into(),
757 );
758
759 let result = compile_and_run(
760 storage,
761 r#"
762 aggregate_vector_metrics("max", "test", tags: { "component_id": "start.*.end" })
763 "#,
764 )
765 .expect("vrl failed");
766 let result = result.as_float().unwrap();
767
768 assert_eq!(result.into_inner(), 6.0);
769 }
770
771 #[test]
772 fn test_aggregate_vector_metrics_min() {
773 let storage = MetricsStorage::default();
774 storage.cache.store(
775 vec![
776 Metric::new(
777 "test",
778 MetricKind::Absolute,
779 vector_core::event::MetricValue::Gauge { value: 6.0 },
780 )
781 .with_tags(Some(MetricTags::from_iter([(
782 "component_id".to_string(),
783 "start.a.end".to_string(),
784 )]))),
785 Metric::new(
786 "test",
787 MetricKind::Absolute,
788 vector_core::event::MetricValue::Gauge { value: 1.0 },
789 )
790 .with_tags(Some(MetricTags::from_iter([(
791 "component_id".to_string(),
792 "something_else".to_string(),
793 )]))),
794 Metric::new(
795 "test",
796 MetricKind::Absolute,
797 vector_core::event::MetricValue::Gauge { value: 3.0 },
798 )
799 .with_tags(Some(MetricTags::from_iter([(
800 "component_id".to_string(),
801 "start.c.end".to_string(),
802 )]))),
803 ]
804 .into(),
805 );
806
807 let result = compile_and_run(
808 storage,
809 r#"
810 aggregate_vector_metrics("min", "test", tags: { "component_id": "start.*.end" })
811 "#,
812 )
813 .expect("vrl failed");
814 let result = result.as_float().unwrap();
815
816 assert_eq!(result.into_inner(), 3.0);
817 }
818}