vector_vrl_metrics/
common.rs

1use std::{collections::BTreeMap, sync::Arc, time::Duration};
2use tokio::time::interval;
3use tokio_stream::{wrappers::IntervalStream, StreamExt};
4use vector_common::shutdown::ShutdownSignal;
5use vrl::{
6    diagnostic::Label,
7    prelude::{expression::Expr, *},
8    value,
9};
10
11use arc_swap::ArcSwap;
12use vector_core::{event::Metric, metrics::Controller};
13
14#[derive(Debug)]
15pub(crate) enum Error {
16    MetricsStorageNotLoaded,
17}
18
19impl fmt::Display for Error {
20    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
21        match self {
22            Error::MetricsStorageNotLoaded => write!(f, "metrics storage not loaded"),
23        }
24    }
25}
26
27impl std::error::Error for Error {}
28
29impl DiagnosticMessage for Error {
30    fn code(&self) -> usize {
31        112
32    }
33
34    fn labels(&self) -> Vec<Label> {
35        match self {
36            Error::MetricsStorageNotLoaded => {
37                vec![Label::primary(
38                    "VRL metrics error: metrics storage not loaded".to_string(),
39                    Span::default(),
40                )]
41            }
42        }
43    }
44}
45
46#[derive(Debug, Default, Clone)]
47pub struct MetricsStorage {
48    // Made pub only for vrl-test module
49    #[doc(hidden)]
50    pub cache: Arc<ArcSwap<Vec<Metric>>>,
51}
52
53impl MetricsStorage {
54    pub(crate) fn get_metric(
55        &self,
56        metric: &str,
57        tags: BTreeMap<String, String>,
58    ) -> Option<Metric> {
59        self.cache
60            .load()
61            .iter()
62            .find(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
63            .cloned()
64    }
65
66    pub(crate) fn find_metrics(&self, metric: &str, tags: BTreeMap<String, String>) -> Vec<Metric> {
67        self.cache
68            .load()
69            .iter()
70            .filter(|m| m.name() == metric && tags.iter().all(|tag| tag_matches(m, tag)))
71            .cloned()
72            .collect()
73    }
74
75    pub fn refresh_metrics(&self) {
76        let new_metrics = Controller::get()
77            .expect("metrics not initialized")
78            .capture_metrics();
79        self.cache.store(new_metrics.into());
80    }
81
82    pub async fn run_periodic_refresh(
83        &self,
84        refresh_interval: Duration,
85        mut shutdown: ShutdownSignal,
86    ) {
87        let mut intervals = IntervalStream::new(interval(refresh_interval));
88        loop {
89            tokio::select! {
90                Some(_) = intervals.next() => {
91                    self.refresh_metrics();
92                }
93                _ = &mut shutdown => {
94                    break;
95                }
96            }
97        }
98    }
99}
100
101/// Checks if the tag matches - also considers wildcards
102fn tag_matches(metric: &Metric, (tag_key, tag_value): (&String, &String)) -> bool {
103    if let Some(wildcard_index) = tag_value.find('*') {
104        let Some(metric_tag_value) = metric.tag_value(tag_key) else {
105            return false;
106        };
107
108        metric_tag_value.starts_with(&tag_value[0..wildcard_index])
109            && metric_tag_value.ends_with(&tag_value[(wildcard_index + 1)..])
110    } else {
111        metric.tag_matches(tag_key, tag_value)
112    }
113}
114
115pub(crate) fn metrics_vrl_typedef() -> BTreeMap<Field, Kind> {
116    BTreeMap::from([
117        (Field::from("name"), Kind::bytes()),
118        (Field::from("tags"), Kind::any_object()),
119        (Field::from("type"), Kind::bytes()),
120        (Field::from("kind"), Kind::bytes()),
121        (Field::from("value"), Kind::float() | Kind::null()),
122    ])
123}
124
125pub(crate) fn metric_into_vrl(value: &Metric) -> Value {
126    value!({
127        name: { value.name() },
128        tags: {
129            BTreeMap::from_iter(
130                value
131                .tags()
132                .map(|t| {
133                    t.iter_sets()
134                        .map(|(k, v)| {
135                            (
136                                k.into(),
137                                Value::Array(
138                                    v.iter()
139                                    .filter_map(|v| {
140                                        v.map(ToString::to_string).map(Into::into).map(Value::Bytes)
141                                    })
142                                    .collect(),
143                                ),
144                            )
145                        })
146                    .collect::<Vec<_>>()
147                })
148                .unwrap_or_default(),
149            )
150        },
151        "type": { value.value().as_name() },
152        kind: {
153            match value.kind() {
154                vector_core::event::MetricKind::Incremental => "incremental",
155                vector_core::event::MetricKind::Absolute => "absolute",
156            }
157        },
158        value: {
159            match value.value() {
160                vector_core::event::MetricValue::Counter { value }
161                | vector_core::event::MetricValue::Gauge { value } => NotNan::new(*value).ok(),
162                _ => None,
163            }
164        }
165    })
166}
167
168pub(crate) fn validate_tags(
169    state: &TypeState,
170    tags: &BTreeMap<KeyString, Expr>,
171) -> Result<(), Box<dyn DiagnosticMessage>> {
172    for v in tags.values() {
173        if *v.type_def(state).kind() != Kind::bytes() {
174            return Err(Box::new(vrl::compiler::function::Error::InvalidArgument {
175                keyword: "tags.value",
176                value: v.resolve_constant(state).unwrap_or(Value::Null),
177                error: "Tag values must be strings",
178            }));
179        }
180    }
181    Ok(())
182}
183
184pub(crate) fn resolve_tags(
185    ctx: &mut Context,
186    tags: &BTreeMap<KeyString, Expr>,
187) -> Result<BTreeMap<String, String>, ExpressionError> {
188    tags.iter()
189        .map(|(k, v)| {
190            v.resolve(ctx).and_then(|v| {
191                Ok((
192                    k.clone().into(),
193                    v.as_str().ok_or("Tag must be a string")?.into_owned(),
194                ))
195            })
196        })
197        .collect::<Result<_, _>>()
198}
199
200// Tests are defined here to simplify them - enabling access to `MetricsStorage`
201#[cfg(test)]
202mod tests {
203    use vector_core::{
204        compile_vrl,
205        event::{Event, LogEvent, MetricKind, MetricTags, VrlTarget},
206    };
207    use vrl::{
208        compiler::{
209            runtime::{Runtime, Terminate},
210            CompilationResult, CompileConfig,
211        },
212        diagnostic::DiagnosticList,
213    };
214
215    use super::*;
216
217    fn compile(
218        storage: MetricsStorage,
219        vrl_source: &str,
220    ) -> Result<CompilationResult, DiagnosticList> {
221        // vector_vrl_functions depends on this crate, so we can't use that here
222        #[allow(clippy::disallowed_methods)]
223        let functions = vrl::stdlib::all().into_iter();
224
225        let functions = functions.chain(crate::all()).collect::<Vec<_>>();
226
227        let state = TypeState::default();
228
229        let mut config = CompileConfig::default();
230        config.set_custom(storage.clone());
231        config.set_read_only();
232
233        compile_vrl(vrl_source, &functions, &state, config)
234    }
235
236    fn compile_and_run(storage: MetricsStorage, vrl_source: &str) -> Result<Value, Terminate> {
237        let CompilationResult {
238            program,
239            warnings: _,
240            config: _,
241        } = compile(storage, vrl_source).expect("compilation failed");
242
243        let mut target = VrlTarget::new(Event::Log(LogEvent::default()), program.info(), false);
244        Runtime::default().resolve(&mut target, &program, &TimeZone::default())
245    }
246
247    fn assert_metric_matches(
248        metric: &BTreeMap<KeyString, Value>,
249        name: &str,
250        value: f64,
251        tags: Option<Vec<(&str, &str)>>,
252    ) {
253        assert_eq!(metric.get("name").unwrap().as_str().unwrap(), name);
254        assert_eq!(
255            metric.get("value").unwrap().as_float().unwrap(),
256            NotNan::new(value).unwrap()
257        );
258
259        if let Some(tags) = tags {
260            let metric_tags = metric.get("tags").unwrap().as_object().unwrap();
261            for (key, value) in tags {
262                assert_eq!(
263                    metric_tags
264                        .get(key)
265                        .unwrap()
266                        .as_array_unwrap()
267                        .first()
268                        .unwrap()
269                        .as_str()
270                        .unwrap(),
271                    value
272                );
273            }
274        }
275    }
276
277    #[test]
278    fn test_get_vector_metric() {
279        let storage = MetricsStorage::default();
280        storage.cache.store(
281            vec![Metric::new(
282                "test",
283                MetricKind::Absolute,
284                vector_core::event::MetricValue::Gauge { value: 1.0 },
285            )]
286            .into(),
287        );
288
289        let result = compile_and_run(
290            storage,
291            r#"
292            get_vector_metric("test")
293        "#,
294        )
295        .expect("vrl failed");
296        let result = result.as_object().unwrap();
297
298        assert_metric_matches(result, "test", 1.0, None);
299    }
300
301    #[test]
302    fn test_find_vector_metrics() {
303        let storage = MetricsStorage::default();
304        storage.cache.store(
305            vec![
306                Metric::new(
307                    "test",
308                    MetricKind::Absolute,
309                    vector_core::event::MetricValue::Gauge { value: 1.0 },
310                )
311                .with_tags(Some(MetricTags::from_iter([(
312                    "component_id".to_string(),
313                    "a".to_string(),
314                )]))),
315                Metric::new(
316                    "test",
317                    MetricKind::Absolute,
318                    vector_core::event::MetricValue::Gauge { value: 1.0 },
319                )
320                .with_tags(Some(MetricTags::from_iter([(
321                    "component_id".to_string(),
322                    "b".to_string(),
323                )]))),
324            ]
325            .into(),
326        );
327
328        let result = compile_and_run(
329            storage,
330            r#"
331            find_vector_metrics("test")
332        "#,
333        )
334        .expect("vrl failed");
335        let result = result.as_array_unwrap();
336
337        assert_metric_matches(
338            result[0].as_object().unwrap(),
339            "test",
340            1.0,
341            Some(vec![("component_id", "a")]),
342        );
343        assert_metric_matches(
344            result[1].as_object().unwrap(),
345            "test",
346            1.0,
347            Some(vec![("component_id", "b")]),
348        );
349    }
350
351    #[test]
352    fn test_get_vector_metric_by_tag() {
353        let storage = MetricsStorage::default();
354        storage.cache.store(
355            vec![
356                Metric::new(
357                    "test",
358                    MetricKind::Absolute,
359                    vector_core::event::MetricValue::Gauge { value: 1.0 },
360                )
361                .with_tags(Some(MetricTags::from_iter([(
362                    "component_id".to_string(),
363                    "a".to_string(),
364                )]))),
365                Metric::new(
366                    "test",
367                    MetricKind::Absolute,
368                    vector_core::event::MetricValue::Gauge { value: 1.0 },
369                )
370                .with_tags(Some(MetricTags::from_iter([(
371                    "component_id".to_string(),
372                    "b".to_string(),
373                )]))),
374            ]
375            .into(),
376        );
377
378        let result = compile_and_run(
379            storage,
380            r#"
381            get_vector_metric("test", tags: { "component_id": "b" })
382        "#,
383        )
384        .expect("vrl failed");
385        let result = result.as_object().unwrap();
386
387        assert_metric_matches(result, "test", 1.0, Some(vec![("component_id", "b")]));
388    }
389
390    #[test]
391    fn test_find_vector_metrics_wildcard() {
392        let storage = MetricsStorage::default();
393        storage.cache.store(
394            vec![
395                Metric::new(
396                    "test",
397                    MetricKind::Absolute,
398                    vector_core::event::MetricValue::Gauge { value: 1.0 },
399                )
400                .with_tags(Some(MetricTags::from_iter([(
401                    "component_id".to_string(),
402                    "a".to_string(),
403                )]))),
404                Metric::new(
405                    "test",
406                    MetricKind::Absolute,
407                    vector_core::event::MetricValue::Gauge { value: 1.0 },
408                )
409                .with_tags(Some(MetricTags::from_iter([(
410                    "component_id".to_string(),
411                    "b".to_string(),
412                )]))),
413                Metric::new(
414                    "test",
415                    MetricKind::Absolute,
416                    vector_core::event::MetricValue::Gauge { value: 1.0 },
417                ),
418            ]
419            .into(),
420        );
421
422        let result = compile_and_run(
423            storage,
424            r#"
425            find_vector_metrics("test", tags: { "component_id": "*" })
426        "#,
427        )
428        .expect("vrl failed");
429        let result = result.as_array_unwrap();
430
431        // 2 metrics, because they have component_id, 3rd one doesn't
432        assert_eq!(result.len(), 2);
433        assert_metric_matches(
434            result[0].as_object().unwrap(),
435            "test",
436            1.0,
437            Some(vec![("component_id", "a")]),
438        );
439        assert_metric_matches(
440            result[1].as_object().unwrap(),
441            "test",
442            1.0,
443            Some(vec![("component_id", "b")]),
444        );
445    }
446
447    #[test]
448    fn test_find_vector_metrics_wildcard_start() {
449        let storage = MetricsStorage::default();
450        storage.cache.store(
451            vec![
452                Metric::new(
453                    "test",
454                    MetricKind::Absolute,
455                    vector_core::event::MetricValue::Gauge { value: 1.0 },
456                )
457                .with_tags(Some(MetricTags::from_iter([(
458                    "component_id".to_string(),
459                    "prefix.a".to_string(),
460                )]))),
461                Metric::new(
462                    "test",
463                    MetricKind::Absolute,
464                    vector_core::event::MetricValue::Gauge { value: 1.0 },
465                )
466                .with_tags(Some(MetricTags::from_iter([(
467                    "component_id".to_string(),
468                    "something_else".to_string(),
469                )]))),
470                Metric::new(
471                    "test",
472                    MetricKind::Absolute,
473                    vector_core::event::MetricValue::Gauge { value: 1.0 },
474                )
475                .with_tags(Some(MetricTags::from_iter([(
476                    "component_id".to_string(),
477                    "prefix.c".to_string(),
478                )]))),
479            ]
480            .into(),
481        );
482
483        let result = compile_and_run(
484            storage,
485            r#"
486            find_vector_metrics("test", tags: { "component_id": "prefix.*" })
487        "#,
488        )
489        .expect("vrl failed");
490        let result = result.as_array_unwrap();
491
492        assert_eq!(result.len(), 2);
493        assert_metric_matches(
494            result[0].as_object().unwrap(),
495            "test",
496            1.0,
497            Some(vec![("component_id", "prefix.a")]),
498        );
499        assert_metric_matches(
500            result[1].as_object().unwrap(),
501            "test",
502            1.0,
503            Some(vec![("component_id", "prefix.c")]),
504        );
505    }
506
507    #[test]
508    fn test_find_vector_metrics_wildcard_end() {
509        let storage = MetricsStorage::default();
510        storage.cache.store(
511            vec![
512                Metric::new(
513                    "test",
514                    MetricKind::Absolute,
515                    vector_core::event::MetricValue::Gauge { value: 1.0 },
516                )
517                .with_tags(Some(MetricTags::from_iter([(
518                    "component_id".to_string(),
519                    "a.suffix".to_string(),
520                )]))),
521                Metric::new(
522                    "test",
523                    MetricKind::Absolute,
524                    vector_core::event::MetricValue::Gauge { value: 1.0 },
525                )
526                .with_tags(Some(MetricTags::from_iter([(
527                    "component_id".to_string(),
528                    "something_else".to_string(),
529                )]))),
530                Metric::new(
531                    "test",
532                    MetricKind::Absolute,
533                    vector_core::event::MetricValue::Gauge { value: 1.0 },
534                )
535                .with_tags(Some(MetricTags::from_iter([(
536                    "component_id".to_string(),
537                    "c.suffix".to_string(),
538                )]))),
539            ]
540            .into(),
541        );
542
543        let result = compile_and_run(
544            storage,
545            r#"
546            find_vector_metrics("test", tags: { "component_id": "*.suffix" })
547        "#,
548        )
549        .expect("vrl failed");
550        let result = result.as_array_unwrap();
551
552        assert_eq!(result.len(), 2);
553        assert_metric_matches(
554            result[0].as_object().unwrap(),
555            "test",
556            1.0,
557            Some(vec![("component_id", "a.suffix")]),
558        );
559        assert_metric_matches(
560            result[1].as_object().unwrap(),
561            "test",
562            1.0,
563            Some(vec![("component_id", "c.suffix")]),
564        );
565    }
566
567    #[test]
568    fn test_find_vector_metrics_wildcard_middle() {
569        let storage = MetricsStorage::default();
570        storage.cache.store(
571            vec![
572                Metric::new(
573                    "test",
574                    MetricKind::Absolute,
575                    vector_core::event::MetricValue::Gauge { value: 1.0 },
576                )
577                .with_tags(Some(MetricTags::from_iter([(
578                    "component_id".to_string(),
579                    "start.a.end".to_string(),
580                )]))),
581                Metric::new(
582                    "test",
583                    MetricKind::Absolute,
584                    vector_core::event::MetricValue::Gauge { value: 1.0 },
585                )
586                .with_tags(Some(MetricTags::from_iter([(
587                    "component_id".to_string(),
588                    "something_else".to_string(),
589                )]))),
590                Metric::new(
591                    "test",
592                    MetricKind::Absolute,
593                    vector_core::event::MetricValue::Gauge { value: 1.0 },
594                )
595                .with_tags(Some(MetricTags::from_iter([(
596                    "component_id".to_string(),
597                    "start.c.end".to_string(),
598                )]))),
599            ]
600            .into(),
601        );
602
603        let result = compile_and_run(
604            storage,
605            r#"
606            find_vector_metrics("test", tags: { "component_id": "start.*.end" })
607        "#,
608        )
609        .expect("vrl failed");
610        let result = result.as_array_unwrap();
611
612        assert_eq!(result.len(), 2);
613        assert_metric_matches(
614            result[0].as_object().unwrap(),
615            "test",
616            1.0,
617            Some(vec![("component_id", "start.a.end")]),
618        );
619        assert_metric_matches(
620            result[1].as_object().unwrap(),
621            "test",
622            1.0,
623            Some(vec![("component_id", "start.c.end")]),
624        );
625    }
626
627    #[test]
628    fn test_aggregate_vector_metrics_sum() {
629        let storage = MetricsStorage::default();
630        storage.cache.store(
631            vec![
632                Metric::new(
633                    "test",
634                    MetricKind::Absolute,
635                    vector_core::event::MetricValue::Gauge { value: 6.0 },
636                )
637                .with_tags(Some(MetricTags::from_iter([(
638                    "component_id".to_string(),
639                    "start.a.end".to_string(),
640                )]))),
641                Metric::new(
642                    "test",
643                    MetricKind::Absolute,
644                    vector_core::event::MetricValue::Gauge { value: 1.0 },
645                )
646                .with_tags(Some(MetricTags::from_iter([(
647                    "component_id".to_string(),
648                    "something_else".to_string(),
649                )]))),
650                Metric::new(
651                    "test",
652                    MetricKind::Absolute,
653                    vector_core::event::MetricValue::Gauge { value: 3.0 },
654                )
655                .with_tags(Some(MetricTags::from_iter([(
656                    "component_id".to_string(),
657                    "start.c.end".to_string(),
658                )]))),
659            ]
660            .into(),
661        );
662
663        let result = compile_and_run(
664            storage,
665            r#"
666            aggregate_vector_metrics("sum", "test", tags: { "component_id": "start.*.end" })
667        "#,
668        )
669        .expect("vrl failed");
670        let result = result.as_float().unwrap();
671
672        assert_eq!(result.into_inner(), 9.0);
673    }
674
675    #[test]
676    fn test_aggregate_vector_metrics_avg() {
677        let storage = MetricsStorage::default();
678        storage.cache.store(
679            vec![
680                Metric::new(
681                    "test",
682                    MetricKind::Absolute,
683                    vector_core::event::MetricValue::Gauge { value: 6.0 },
684                )
685                .with_tags(Some(MetricTags::from_iter([(
686                    "component_id".to_string(),
687                    "start.a.end".to_string(),
688                )]))),
689                Metric::new(
690                    "test",
691                    MetricKind::Absolute,
692                    vector_core::event::MetricValue::Gauge { value: 1.0 },
693                )
694                .with_tags(Some(MetricTags::from_iter([(
695                    "component_id".to_string(),
696                    "something_else".to_string(),
697                )]))),
698                Metric::new(
699                    "test",
700                    MetricKind::Absolute,
701                    vector_core::event::MetricValue::Gauge { value: 3.0 },
702                )
703                .with_tags(Some(MetricTags::from_iter([(
704                    "component_id".to_string(),
705                    "start.c.end".to_string(),
706                )]))),
707            ]
708            .into(),
709        );
710
711        let result = compile_and_run(
712            storage,
713            r#"
714            aggregate_vector_metrics("avg", "test", tags: { "component_id": "start.*.end" })
715        "#,
716        )
717        .expect("vrl failed");
718        let result = result.as_float().unwrap();
719
720        assert_eq!(result.into_inner(), 4.5);
721    }
722
723    #[test]
724    fn test_aggregate_vector_metrics_max() {
725        let storage = MetricsStorage::default();
726        storage.cache.store(
727            vec![
728                Metric::new(
729                    "test",
730                    MetricKind::Absolute,
731                    vector_core::event::MetricValue::Gauge { value: 6.0 },
732                )
733                .with_tags(Some(MetricTags::from_iter([(
734                    "component_id".to_string(),
735                    "start.a.end".to_string(),
736                )]))),
737                Metric::new(
738                    "test",
739                    MetricKind::Absolute,
740                    vector_core::event::MetricValue::Gauge { value: 1.0 },
741                )
742                .with_tags(Some(MetricTags::from_iter([(
743                    "component_id".to_string(),
744                    "something_else".to_string(),
745                )]))),
746                Metric::new(
747                    "test",
748                    MetricKind::Absolute,
749                    vector_core::event::MetricValue::Gauge { value: 3.0 },
750                )
751                .with_tags(Some(MetricTags::from_iter([(
752                    "component_id".to_string(),
753                    "start.c.end".to_string(),
754                )]))),
755            ]
756            .into(),
757        );
758
759        let result = compile_and_run(
760            storage,
761            r#"
762            aggregate_vector_metrics("max", "test", tags: { "component_id": "start.*.end" })
763        "#,
764        )
765        .expect("vrl failed");
766        let result = result.as_float().unwrap();
767
768        assert_eq!(result.into_inner(), 6.0);
769    }
770
771    #[test]
772    fn test_aggregate_vector_metrics_min() {
773        let storage = MetricsStorage::default();
774        storage.cache.store(
775            vec![
776                Metric::new(
777                    "test",
778                    MetricKind::Absolute,
779                    vector_core::event::MetricValue::Gauge { value: 6.0 },
780                )
781                .with_tags(Some(MetricTags::from_iter([(
782                    "component_id".to_string(),
783                    "start.a.end".to_string(),
784                )]))),
785                Metric::new(
786                    "test",
787                    MetricKind::Absolute,
788                    vector_core::event::MetricValue::Gauge { value: 1.0 },
789                )
790                .with_tags(Some(MetricTags::from_iter([(
791                    "component_id".to_string(),
792                    "something_else".to_string(),
793                )]))),
794                Metric::new(
795                    "test",
796                    MetricKind::Absolute,
797                    vector_core::event::MetricValue::Gauge { value: 3.0 },
798                )
799                .with_tags(Some(MetricTags::from_iter([(
800                    "component_id".to_string(),
801                    "start.c.end".to_string(),
802                )]))),
803            ]
804            .into(),
805        );
806
807        let result = compile_and_run(
808            storage,
809            r#"
810            aggregate_vector_metrics("min", "test", tags: { "component_id": "start.*.end" })
811        "#,
812        )
813        .expect("vrl failed");
814        let result = result.as_float().unwrap();
815
816        assert_eq!(result.into_inner(), 3.0);
817    }
818}