vector/sources/prometheus/
parser.rs

1#[cfg(feature = "sources-prometheus-remote-write")]
2use super::remote_write::MetadataConflictStrategy;
3use chrono::{DateTime, TimeZone, Utc};
4use vector_lib::prometheus::parser::{GroupKind, MetricGroup, ParserError};
5#[cfg(feature = "sources-prometheus-remote-write")]
6use vector_lib::prometheus::parser::{
7    MetadataConflictStrategy as ParserMetadataConflictStrategy, proto,
8};
9
10use crate::event::{
11    Event,
12    metric::{Bucket, Metric, MetricKind, MetricTags, MetricValue, Quantile},
13};
14
15fn utc_timestamp(timestamp: Option<i64>, default: DateTime<Utc>) -> DateTime<Utc> {
16    timestamp
17        .and_then(|timestamp| {
18            Utc.timestamp_opt(timestamp / 1000, (timestamp % 1000) as u32 * 1000000)
19                .latest()
20        })
21        .unwrap_or(default)
22}
23
24#[cfg(any(test, feature = "sources-prometheus-scrape"))]
25pub(super) fn parse_text(packet: &str) -> Result<Vec<Event>, ParserError> {
26    vector_lib::prometheus::parser::parse_text(packet)
27        .map(|group| reparse_groups(group, vec![], false, false))
28}
29
30#[cfg(any(test, feature = "sources-prometheus-pushgateway"))]
31pub(super) fn parse_text_with_overrides(
32    packet: &str,
33    tag_overrides: impl IntoIterator<Item = (String, String)> + Clone,
34    aggregate_metrics: bool,
35) -> Result<Vec<Event>, ParserError> {
36    vector_lib::prometheus::parser::parse_text(packet)
37        .map(|group| reparse_groups(group, tag_overrides, aggregate_metrics, false))
38}
39
40#[cfg(test)]
41fn parse_text_with_nan_filtering(packet: &str) -> Result<Vec<Event>, ParserError> {
42    vector_lib::prometheus::parser::parse_text(packet)
43        .map(|group| reparse_groups(group, vec![], false, true))
44}
45
46#[cfg(feature = "sources-prometheus-remote-write")]
47pub(super) fn parse_request(
48    request: proto::WriteRequest,
49    metadata_conflict_strategy: MetadataConflictStrategy,
50    skip_nan_values: bool,
51) -> Result<Vec<Event>, ParserError> {
52    vector_lib::prometheus::parser::parse_request(request, metadata_conflict_strategy.into())
53        .map(|group| reparse_groups(group, vec![], false, skip_nan_values))
54}
55
56fn reparse_groups(
57    groups: Vec<MetricGroup>,
58    tag_overrides: impl IntoIterator<Item = (String, String)> + Clone,
59    aggregate_metrics: bool,
60    skip_nan_values: bool,
61) -> Vec<Event> {
62    let mut result = Vec::new();
63    let start = Utc::now();
64
65    let metric_kind = if aggregate_metrics {
66        MetricKind::Incremental
67    } else {
68        MetricKind::Absolute
69    };
70
71    for group in groups {
72        match group.metrics {
73            GroupKind::Counter(metrics) => {
74                for (key, metric) in metrics {
75                    if skip_nan_values && metric.value.is_nan() {
76                        continue;
77                    }
78
79                    let tags = combine_tags(key.labels, tag_overrides.clone());
80
81                    let counter = Metric::new(
82                        group.name.clone(),
83                        metric_kind,
84                        MetricValue::Counter {
85                            value: metric.value,
86                        },
87                    )
88                    .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
89                    .with_tags(tags.as_option());
90
91                    result.push(counter.into());
92                }
93            }
94            GroupKind::Gauge(metrics) | GroupKind::Untyped(metrics) => {
95                for (key, metric) in metrics {
96                    if skip_nan_values && metric.value.is_nan() {
97                        continue;
98                    }
99
100                    let tags = combine_tags(key.labels, tag_overrides.clone());
101
102                    let gauge = Metric::new(
103                        group.name.clone(),
104                        // Gauges are always absolute: aggregating them makes no sense
105                        MetricKind::Absolute,
106                        MetricValue::Gauge {
107                            value: metric.value,
108                        },
109                    )
110                    .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
111                    .with_tags(tags.as_option());
112
113                    result.push(gauge.into());
114                }
115            }
116            GroupKind::Histogram(metrics) => {
117                for (key, metric) in metrics {
118                    if skip_nan_values
119                        && (metric.sum.is_nan() || metric.buckets.iter().any(|b| b.bucket.is_nan()))
120                    {
121                        continue;
122                    }
123
124                    let tags = combine_tags(key.labels, tag_overrides.clone());
125
126                    let mut buckets = metric.buckets;
127                    buckets.sort_unstable_by(|a, b| {
128                        a.bucket.total_cmp(&b.bucket).then(a.count.cmp(&b.count))
129                    });
130                    for i in (1..buckets.len()).rev() {
131                        buckets[i].count = buckets[i].count.saturating_sub(buckets[i - 1].count);
132                    }
133                    let drop_last = buckets
134                        .last()
135                        .is_some_and(|bucket| bucket.bucket == f64::INFINITY);
136                    if drop_last {
137                        buckets.pop();
138                    }
139
140                    result.push(
141                        Metric::new(
142                            group.name.clone(),
143                            metric_kind,
144                            MetricValue::AggregatedHistogram {
145                                buckets: buckets
146                                    .into_iter()
147                                    .map(|b| Bucket {
148                                        upper_limit: b.bucket,
149                                        count: b.count,
150                                    })
151                                    .collect(),
152                                count: metric.count,
153                                sum: metric.sum,
154                            },
155                        )
156                        .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
157                        .with_tags(tags.as_option())
158                        .into(),
159                    );
160                }
161            }
162            GroupKind::Summary(metrics) => {
163                for (key, metric) in metrics {
164                    if skip_nan_values
165                        && (metric.sum.is_nan()
166                            || metric
167                                .quantiles
168                                .iter()
169                                .any(|q| q.quantile.is_nan() || q.value.is_nan()))
170                    {
171                        continue;
172                    }
173
174                    let tags = combine_tags(key.labels, tag_overrides.clone());
175
176                    result.push(
177                        Metric::new(
178                            group.name.clone(),
179                            // Summaries are always absolute: aggregating them makes no sense
180                            MetricKind::Absolute,
181                            MetricValue::AggregatedSummary {
182                                quantiles: metric
183                                    .quantiles
184                                    .into_iter()
185                                    .map(|q| Quantile {
186                                        quantile: q.quantile,
187                                        value: q.value,
188                                    })
189                                    .collect(),
190                                count: metric.count,
191                                sum: metric.sum,
192                            },
193                        )
194                        .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
195                        .with_tags(tags.as_option())
196                        .into(),
197                    );
198                }
199            }
200        }
201    }
202
203    result
204}
205
206#[cfg(feature = "sources-prometheus-remote-write")]
207impl From<MetadataConflictStrategy> for ParserMetadataConflictStrategy {
208    fn from(strategy: MetadataConflictStrategy) -> Self {
209        match strategy {
210            MetadataConflictStrategy::Ignore => Self::Ignore,
211            MetadataConflictStrategy::Reject => Self::Reject,
212        }
213    }
214}
215
216fn combine_tags(
217    base_tags: impl Into<MetricTags>,
218    tag_overrides: impl IntoIterator<Item = (String, String)>,
219) -> MetricTags {
220    let mut tags = base_tags.into();
221    for (k, v) in tag_overrides.into_iter() {
222        tags.replace(k, v);
223    }
224
225    tags
226}
227
228#[cfg(test)]
229mod test {
230    use core::f64;
231    use std::sync::LazyLock;
232
233    use chrono::{TimeZone, Timelike, Utc};
234    use itertools::Itertools;
235    use similar_asserts::assert_eq;
236    use vector_lib::{assert_event_data_eq, metric_tags};
237
238    use super::*;
239    use crate::event::metric::{Metric, MetricKind, MetricValue};
240
241    static TIMESTAMP: LazyLock<DateTime<Utc>> = LazyLock::new(|| {
242        Utc.with_ymd_and_hms(2021, 2, 4, 4, 5, 6)
243            .single()
244            .and_then(|t| t.with_nanosecond(789 * 1_000_000))
245            .expect("invalid timestamp")
246    });
247
248    fn events_to_metrics(
249        events: Result<Vec<Event>, ParserError>,
250    ) -> Result<Vec<Metric>, ParserError> {
251        events.map(|events| events.into_iter().map(Event::into_metric).collect())
252    }
253
254    #[test]
255    fn adds_timestamp_if_missing() {
256        let now = Utc::now();
257        let exp = r"
258            # HELP counter Some counter
259            # TYPE count counter
260            http_requests_total 1027
261            ";
262        let result = events_to_metrics(parse_text(exp)).unwrap();
263        assert_eq!(result.len(), 1);
264        assert!(result[0].timestamp().unwrap() >= now);
265    }
266
267    #[test]
268    fn test_counter() {
269        let exp = r"
270            # HELP uptime A counter
271            # TYPE uptime counter
272            uptime 123.0 1612411506789
273            ";
274
275        assert_event_data_eq!(
276            events_to_metrics(parse_text(exp)),
277            Ok(vec![
278                Metric::new(
279                    "uptime",
280                    MetricKind::Absolute,
281                    MetricValue::Counter { value: 123.0 },
282                )
283                .with_timestamp(Some(*TIMESTAMP))
284            ]),
285        );
286    }
287
288    #[test]
289    fn test_counter_empty() {
290        let exp = r"
291            # HELP hidden A counter
292            # TYPE hidden counter
293            ";
294
295        assert_event_data_eq!(events_to_metrics(parse_text(exp)), Ok(vec![]));
296    }
297
298    #[test]
299    fn test_counter_nan() {
300        let exp = r#"
301            # TYPE name counter
302            name{labelname="val1",basename="basevalue"} NaN
303            "#;
304
305        match events_to_metrics(parse_text(exp)).unwrap()[0].value() {
306            MetricValue::Counter { value } => {
307                assert!(value.is_nan());
308            }
309            _ => unreachable!(),
310        }
311    }
312
313    #[test]
314    fn test_counter_weird() {
315        let exp = r#"
316            # A normal comment.
317            #
318            # TYPE name counter
319            name {labelname="val2",basename="base\"v\\al\nue"} 0.23 1612411506789
320            # HELP name two-line\n doc  str\\ing
321            # HELP  name2  	doc str"ing 2
322            #    TYPE    name2 counter
323            name2{labelname="val2"	,basename   =   "basevalue2"		} +Inf 1612411506789
324            name2{ labelname = "val1" , }-Inf 1612411506789
325            "#;
326
327        assert_event_data_eq!(
328            events_to_metrics(parse_text(exp)),
329            Ok(vec![
330                Metric::new(
331                    "name",
332                    MetricKind::Absolute,
333                    MetricValue::Counter { value: 0.23 },
334                )
335                .with_tags(Some(metric_tags!(
336                    "labelname" => "val2",
337                    "basename" => "base\"v\\al\nue",
338                )))
339                .with_timestamp(Some(*TIMESTAMP)),
340                Metric::new(
341                    "name2",
342                    MetricKind::Absolute,
343                    MetricValue::Counter {
344                        value: f64::INFINITY
345                    },
346                )
347                .with_tags(Some(metric_tags!(
348                    "labelname" => "val2",
349                    "basename" => "basevalue2",
350                )))
351                .with_timestamp(Some(*TIMESTAMP)),
352                Metric::new(
353                    "name2",
354                    MetricKind::Absolute,
355                    MetricValue::Counter {
356                        value: f64::NEG_INFINITY
357                    },
358                )
359                .with_tags(Some(metric_tags!("labelname" => "val1")))
360                .with_timestamp(Some(*TIMESTAMP)),
361            ]),
362        );
363    }
364
365    #[test]
366    fn test_counter_tags_and_timestamp() {
367        let exp = r#"
368            # HELP http_requests_total The total number of HTTP requests.
369            # TYPE http_requests_total counter
370            http_requests_total{method="post",code="200"} 1027 1395066363000
371            http_requests_total{method="post",code="400"}    3 1395066363000
372            "#;
373
374        assert_event_data_eq!(
375            events_to_metrics(parse_text(exp)),
376            Ok(vec![
377                Metric::new(
378                    "http_requests_total",
379                    MetricKind::Absolute,
380                    MetricValue::Counter { value: 1027.0 },
381                )
382                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
383                .with_tags(Some(metric_tags!(
384                    "method" => "post",
385                    "code" => "200",
386                ))),
387                Metric::new(
388                    "http_requests_total",
389                    MetricKind::Absolute,
390                    MetricValue::Counter { value: 3.0 },
391                )
392                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
393                .with_tags(Some(metric_tags!(
394                    "method" => "post",
395                    "code" => "400"
396                )))
397            ]),
398        );
399    }
400
401    #[test]
402    fn test_gauge() {
403        let exp = r"
404            # HELP latency A gauge
405            # TYPE latency gauge
406            latency 123.0 1612411506789
407            ";
408
409        assert_event_data_eq!(
410            events_to_metrics(parse_text(exp)),
411            Ok(vec![
412                Metric::new(
413                    "latency",
414                    MetricKind::Absolute,
415                    MetricValue::Gauge { value: 123.0 },
416                )
417                .with_timestamp(Some(*TIMESTAMP))
418            ]),
419        );
420    }
421
422    #[test]
423    fn test_gauge_minimalistic() {
424        let exp = r"
425            metric_without_timestamp_and_labels 12.47 1612411506789
426            ";
427
428        assert_event_data_eq!(
429            events_to_metrics(parse_text(exp)),
430            Ok(vec![
431                Metric::new(
432                    "metric_without_timestamp_and_labels",
433                    MetricKind::Absolute,
434                    MetricValue::Gauge { value: 12.47 },
435                )
436                .with_timestamp(Some(*TIMESTAMP))
437            ]),
438        );
439    }
440
441    #[test]
442    fn test_gauge_empty_labels() {
443        let exp = r"
444            no_labels{} 3 1612411506789
445            ";
446
447        assert_event_data_eq!(
448            events_to_metrics(parse_text(exp)),
449            Ok(vec![
450                Metric::new(
451                    "no_labels",
452                    MetricKind::Absolute,
453                    MetricValue::Gauge { value: 3.0 },
454                )
455                .with_timestamp(Some(*TIMESTAMP))
456            ]),
457        );
458    }
459
460    #[test]
461    fn test_gauge_minimalistic_escaped() {
462        let exp = r#"
463            msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9 1612411506789
464            "#;
465
466        assert_event_data_eq!(
467            events_to_metrics(parse_text(exp)),
468            Ok(vec![
469                Metric::new(
470                    "msdos_file_access_time_seconds",
471                    MetricKind::Absolute,
472                    MetricValue::Gauge {
473                        value: 1458255915.0
474                    },
475                )
476                .with_tags(Some(metric_tags!(
477                    "path" => "C:\\DIR\\FILE.TXT",
478                    "error" => "Cannot find file:\n\"FILE.TXT\"",
479                )))
480                .with_timestamp(Some(*TIMESTAMP))
481            ]),
482        );
483    }
484
485    #[test]
486    fn test_tag_value_contain_bracket() {
487        let exp = r#"
488            # HELP name counter
489            # TYPE name counter
490            name{tag="}"} 0 1612411506789
491            "#;
492        assert_event_data_eq!(
493            events_to_metrics(parse_text(exp)),
494            Ok(vec![
495                Metric::new(
496                    "name",
497                    MetricKind::Absolute,
498                    MetricValue::Counter { value: 0.0 },
499                )
500                .with_tags(Some(metric_tags! { "tag" => "}" }))
501                .with_timestamp(Some(*TIMESTAMP))
502            ]),
503        );
504    }
505
506    #[test]
507    fn test_parse_tag_value_contain_comma() {
508        let exp = r#"
509            # HELP name counter
510            # TYPE name counter
511            name{tag="a,b"} 0 1612411506789
512            "#;
513        assert_event_data_eq!(
514            events_to_metrics(parse_text(exp)),
515            Ok(vec![
516                Metric::new(
517                    "name",
518                    MetricKind::Absolute,
519                    MetricValue::Counter { value: 0.0 },
520                )
521                .with_tags(Some(metric_tags! { "tag" => "a,b" }))
522                .with_timestamp(Some(*TIMESTAMP))
523            ]),
524        );
525    }
526
527    #[test]
528    fn test_parse_tag_escaping() {
529        let exp = r#"
530            # HELP name counter
531            # TYPE name counter
532            name{tag="\\n"} 0 1612411506789
533            "#;
534        assert_event_data_eq!(
535            events_to_metrics(parse_text(exp)),
536            Ok(vec![
537                Metric::new(
538                    "name",
539                    MetricKind::Absolute,
540                    MetricValue::Counter { value: 0.0 },
541                )
542                .with_tags(Some(metric_tags! { "tag" => "\\n" }))
543                .with_timestamp(Some(*TIMESTAMP))
544            ]),
545        );
546    }
547
548    #[test]
549    fn test_parse_tag_dont_trim_value() {
550        let exp = r#"
551            # HELP name counter
552            # TYPE name counter
553            name{tag=" * "} 0 1612411506789
554            "#;
555        assert_event_data_eq!(
556            events_to_metrics(parse_text(exp)),
557            Ok(vec![
558                Metric::new(
559                    "name",
560                    MetricKind::Absolute,
561                    MetricValue::Counter { value: 0.0 },
562                )
563                .with_tags(Some(metric_tags! { "tag" => " * " }))
564                .with_timestamp(Some(*TIMESTAMP))
565            ]),
566        );
567    }
568
569    #[test]
570    fn test_parse_tag_value_containing_equals() {
571        let exp = r#"
572            telemetry_scrape_size_bytes_count{registry="default",content_type="text/plain; version=0.0.4"} 1890 1612411506789
573            "#;
574
575        assert_event_data_eq!(
576            events_to_metrics(parse_text(exp)),
577            Ok(vec![
578                Metric::new(
579                    "telemetry_scrape_size_bytes_count",
580                    MetricKind::Absolute,
581                    MetricValue::Gauge { value: 1890.0 },
582                )
583                .with_tags(Some(metric_tags!( "registry" => "default",
584                    "content_type" => "text/plain; version=0.0.4" )))
585                .with_timestamp(Some(*TIMESTAMP))
586            ]),
587        );
588    }
589
590    #[test]
591    fn test_parse_tag_error_no_value() {
592        let exp = r#"
593            telemetry_scrape_size_bytes_count{registry="default",content_type} 1890 1612411506789
594            "#;
595
596        assert!(events_to_metrics(parse_text(exp)).is_err());
597    }
598
599    #[test]
600    fn test_parse_tag_error_equals_empty_value() {
601        let exp = r#"
602            telemetry_scrape_size_bytes_count{registry="default",content_type=} 1890 1612411506789
603            "#;
604
605        assert!(events_to_metrics(parse_text(exp)).is_err());
606    }
607
608    #[test]
609    fn test_gauge_weird_timestamp() {
610        let exp = r#"
611            something_weird{problem="division by zero"} +Inf -3982045000
612            "#;
613
614        assert_event_data_eq!(
615            events_to_metrics(parse_text(exp)),
616            Ok(vec![
617                Metric::new(
618                    "something_weird",
619                    MetricKind::Absolute,
620                    MetricValue::Gauge {
621                        value: f64::INFINITY
622                    },
623                )
624                .with_timestamp(Utc.timestamp_opt(-3982045, 0).latest())
625                .with_tags(Some(metric_tags!("problem" => "division by zero")))
626            ]),
627        );
628    }
629
630    #[test]
631    fn test_gauge_tabs() {
632        let exp = r#"
633            # TYPE	latency	gauge
634            latency{env="production"}	1.0		1395066363000
635            latency{env="testing"}		2.0		1395066363000
636            "#;
637
638        assert_event_data_eq!(
639            events_to_metrics(parse_text(exp)),
640            Ok(vec![
641                Metric::new(
642                    "latency",
643                    MetricKind::Absolute,
644                    MetricValue::Gauge { value: 1.0 },
645                )
646                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
647                .with_tags(Some(metric_tags!("env" => "production"))),
648                Metric::new(
649                    "latency",
650                    MetricKind::Absolute,
651                    MetricValue::Gauge { value: 2.0 },
652                )
653                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
654                .with_tags(Some(metric_tags!("env" => "testing")))
655            ]),
656        );
657    }
658
659    #[test]
660    fn test_mixed() {
661        let exp = r"
662            # TYPE uptime counter
663            uptime 123.0 1612411506789
664            # TYPE temperature gauge
665            temperature -1.5 1612411506789
666            # TYPE launch_count counter
667            launch_count 10.0 1612411506789
668            ";
669
670        assert_event_data_eq!(
671            events_to_metrics(parse_text(exp)),
672            Ok(vec![
673                Metric::new(
674                    "uptime",
675                    MetricKind::Absolute,
676                    MetricValue::Counter { value: 123.0 },
677                )
678                .with_timestamp(Some(*TIMESTAMP)),
679                Metric::new(
680                    "temperature",
681                    MetricKind::Absolute,
682                    MetricValue::Gauge { value: -1.5 },
683                )
684                .with_timestamp(Some(*TIMESTAMP)),
685                Metric::new(
686                    "launch_count",
687                    MetricKind::Absolute,
688                    MetricValue::Counter { value: 10.0 },
689                )
690                .with_timestamp(Some(*TIMESTAMP))
691            ]),
692        );
693    }
694
695    #[test]
696    fn test_no_value() {
697        let exp = r#"
698            # TYPE latency counter
699            latency{env="production"}
700            "#;
701
702        assert!(events_to_metrics(parse_text(exp)).is_err());
703    }
704
705    #[test]
706    fn test_no_name() {
707        let exp = r"
708            # TYPE uptime counter
709            123.0 1612411506789
710            ";
711
712        assert!(events_to_metrics(parse_text(exp)).is_err());
713    }
714
715    #[test]
716    fn test_mixed_and_loosely_typed() {
717        let exp = r"
718            # TYPE uptime counter
719            uptime 123.0 1612411506789
720            last_downtime 4.0 1612411506789
721            # TYPE temperature gauge
722            temperature -1.5 1612411506789
723            temperature_7_days_average 0.1 1612411506789
724            ";
725
726        assert_event_data_eq!(
727            events_to_metrics(parse_text(exp)),
728            Ok(vec![
729                Metric::new(
730                    "uptime",
731                    MetricKind::Absolute,
732                    MetricValue::Counter { value: 123.0 },
733                )
734                .with_timestamp(Some(*TIMESTAMP)),
735                Metric::new(
736                    "last_downtime",
737                    MetricKind::Absolute,
738                    MetricValue::Gauge { value: 4.0 },
739                )
740                .with_timestamp(Some(*TIMESTAMP)),
741                Metric::new(
742                    "temperature",
743                    MetricKind::Absolute,
744                    MetricValue::Gauge { value: -1.5 },
745                )
746                .with_timestamp(Some(*TIMESTAMP)),
747                Metric::new(
748                    "temperature_7_days_average",
749                    MetricKind::Absolute,
750                    MetricValue::Gauge { value: 0.1 },
751                )
752                .with_timestamp(Some(*TIMESTAMP))
753            ]),
754        );
755    }
756
757    #[test]
758    fn test_histogram() {
759        let exp = r#"
760            # HELP http_request_duration_seconds A histogram of the request duration.
761            # TYPE http_request_duration_seconds histogram
762            http_request_duration_seconds_bucket{le="0.05"} 24054 1612411506789
763            http_request_duration_seconds_bucket{le="0.1"} 33444 1612411506789
764            http_request_duration_seconds_bucket{le="0.2"} 100392 1612411506789
765            http_request_duration_seconds_bucket{le="0.5"} 129389 1612411506789
766            http_request_duration_seconds_bucket{le="1"} 133988 1612411506789
767            http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411506789
768            http_request_duration_seconds_sum 53423 1612411506789
769            http_request_duration_seconds_count 144320 1612411506789
770            "#;
771
772        assert_event_data_eq!(
773            events_to_metrics(parse_text(exp)),
774            Ok(vec![
775                Metric::new(
776                    "http_request_duration_seconds",
777                    MetricKind::Absolute,
778                    MetricValue::AggregatedHistogram {
779                        buckets: vector_lib::buckets![
780                            0.05 => 24054, 0.1 => 9390, 0.2 => 66948, 0.5 => 28997, 1.0 => 4599
781                        ],
782                        count: 144320,
783                        sum: 53423.0,
784                    },
785                )
786                .with_timestamp(Some(*TIMESTAMP))
787            ]),
788        );
789    }
790
791    #[test]
792    fn test_histogram_doesnt_panic() {
793        let mut exp = r#"
794            # HELP http_request_duration_seconds A histogram of the request duration.
795            # TYPE http_request_duration_seconds histogram
796            "#
797        .to_string();
798
799        let to_float = |v: i32| -> f64 { v as f64 };
800        exp += &(0..=15)
801            .map(to_float)
802            .chain(std::iter::once(f64::NAN))
803            .chain((16..=20).map(to_float))
804            .rev()
805            .map(|f| format!("http_request_duration_seconds_bucket{{le=\"{f}\"}} 0 1612411506789"))
806            .join("\n");
807
808        assert_event_data_eq!(
809            events_to_metrics(parse_text(&exp)),
810            Ok(vec![
811                Metric::new(
812                    "http_request_duration_seconds",
813                    MetricKind::Absolute,
814                    MetricValue::AggregatedHistogram {
815                        // These bucket values don't mean/test anything, they just test that the
816                        // sort works without panicking
817                        buckets: (0..=20)
818                            .map(to_float)
819                            .chain(std::iter::once(f64::NAN))
820                            .map(|upper_limit| Bucket {
821                                upper_limit,
822                                count: 0
823                            })
824                            .collect(),
825                        count: 0,
826                        sum: 0.0,
827                    },
828                )
829                .with_timestamp(Some(*TIMESTAMP))
830            ]),
831        );
832    }
833
834    #[test]
835    fn test_histogram_out_of_order() {
836        let exp = r#"
837            # HELP duration A histogram of the request duration.
838            # TYPE duration histogram
839            duration_bucket{le="+Inf"} 144320 1612411506789
840            duration_bucket{le="1"} 133988 1612411506789
841            duration_sum 53423 1612411506789
842            duration_count 144320 1612411506789
843            "#;
844
845        assert_event_data_eq!(
846            events_to_metrics(parse_text(exp)),
847            Ok(vec![
848                Metric::new(
849                    "duration",
850                    MetricKind::Absolute,
851                    MetricValue::AggregatedHistogram {
852                        buckets: vector_lib::buckets![1.0 => 133988],
853                        count: 144320,
854                        sum: 53423.0,
855                    },
856                )
857                .with_timestamp(Some(*TIMESTAMP))
858            ]),
859        );
860    }
861
862    #[test]
863    fn test_histogram_backward_values() {
864        let exp = r#"
865            # HELP duration A histogram of the request duration.
866            # TYPE duration histogram
867            duration_bucket{le="1"} 2000 1612411506789
868            duration_bucket{le="10"} 1000 1612411506789
869            duration_bucket{le="+Inf"} 2000 1612411506789
870            duration_sum 2000 1612411506789
871            duration_count 2000 1612411506789
872            "#;
873
874        assert_event_data_eq!(
875            events_to_metrics(parse_text(exp)),
876            Ok(vec![
877                Metric::new(
878                    "duration",
879                    MetricKind::Absolute,
880                    MetricValue::AggregatedHistogram {
881                        buckets: vector_lib::buckets![1.0 => 2000, 10.0 => 0],
882                        count: 2000,
883                        sum: 2000.0,
884                    },
885                )
886                .with_timestamp(Some(*TIMESTAMP))
887            ]),
888        );
889    }
890
891    #[test]
892    fn test_histogram_with_labels() {
893        let exp = r#"
894            # HELP gitlab_runner_job_duration_seconds Histogram of job durations
895            # TYPE gitlab_runner_job_duration_seconds histogram
896            gitlab_runner_job_duration_seconds_bucket{runner="z",le="30"} 327 1612411506789
897            gitlab_runner_job_duration_seconds_bucket{runner="z",le="60"} 474 1612411506789
898            gitlab_runner_job_duration_seconds_bucket{runner="z",le="300"} 535 1612411506789
899            gitlab_runner_job_duration_seconds_bucket{runner="z",le="600"} 536 1612411506789
900            gitlab_runner_job_duration_seconds_bucket{runner="z",le="1800"} 536 1612411506789
901            gitlab_runner_job_duration_seconds_bucket{runner="z",le="3600"} 536 1612411506789
902            gitlab_runner_job_duration_seconds_bucket{runner="z",le="7200"} 536 1612411506789
903            gitlab_runner_job_duration_seconds_bucket{runner="z",le="10800"} 536 1612411506789
904            gitlab_runner_job_duration_seconds_bucket{runner="z",le="18000"} 536 1612411506789
905            gitlab_runner_job_duration_seconds_bucket{runner="z",le="36000"} 536 1612411506789
906            gitlab_runner_job_duration_seconds_bucket{runner="z",le="+Inf"} 536 1612411506789
907            gitlab_runner_job_duration_seconds_sum{runner="z"} 19690.129384881966 1612411506789
908            gitlab_runner_job_duration_seconds_count{runner="z"} 536 1612411506789
909            gitlab_runner_job_duration_seconds_bucket{runner="x",le="30"} 1 1612411506789
910            gitlab_runner_job_duration_seconds_bucket{runner="x",le="60"} 1 1612411506789
911            gitlab_runner_job_duration_seconds_bucket{runner="x",le="300"} 1 1612411506789
912            gitlab_runner_job_duration_seconds_bucket{runner="x",le="600"} 1 1612411506789
913            gitlab_runner_job_duration_seconds_bucket{runner="x",le="1800"} 1 1612411506789
914            gitlab_runner_job_duration_seconds_bucket{runner="x",le="3600"} 1 1612411506789
915            gitlab_runner_job_duration_seconds_bucket{runner="x",le="7200"} 1 1612411506789
916            gitlab_runner_job_duration_seconds_bucket{runner="x",le="10800"} 1 1612411506789
917            gitlab_runner_job_duration_seconds_bucket{runner="x",le="18000"} 1 1612411506789
918            gitlab_runner_job_duration_seconds_bucket{runner="x",le="36000"} 1 1612411506789
919            gitlab_runner_job_duration_seconds_bucket{runner="x",le="+Inf"} 1 1612411506789
920            gitlab_runner_job_duration_seconds_sum{runner="x"} 28.975436316 1612411506789
921            gitlab_runner_job_duration_seconds_count{runner="x"} 1 1612411506789
922            gitlab_runner_job_duration_seconds_bucket{runner="y",le="30"} 285 1612411506789
923            gitlab_runner_job_duration_seconds_bucket{runner="y",le="60"} 1165 1612411506789
924            gitlab_runner_job_duration_seconds_bucket{runner="y",le="300"} 3071 1612411506789
925            gitlab_runner_job_duration_seconds_bucket{runner="y",le="600"} 3151 1612411506789
926            gitlab_runner_job_duration_seconds_bucket{runner="y",le="1800"} 3252 1612411506789
927            gitlab_runner_job_duration_seconds_bucket{runner="y",le="3600"} 3255 1612411506789
928            gitlab_runner_job_duration_seconds_bucket{runner="y",le="7200"} 3255 1612411506789
929            gitlab_runner_job_duration_seconds_bucket{runner="y",le="10800"} 3255 1612411506789
930            gitlab_runner_job_duration_seconds_bucket{runner="y",le="18000"} 3255 1612411506789
931            gitlab_runner_job_duration_seconds_bucket{runner="y",le="36000"} 3255 1612411506789
932            gitlab_runner_job_duration_seconds_bucket{runner="y",le="+Inf"} 3255 1612411506789
933            gitlab_runner_job_duration_seconds_sum{runner="y"} 381111.7498891335 1612411506789
934            gitlab_runner_job_duration_seconds_count{runner="y"} 3255 1612411506789
935        "#;
936
937        assert_event_data_eq!(
938            events_to_metrics(parse_text(exp)),
939            Ok(vec![
940                Metric::new(
941                    "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
942                        buckets: vector_lib::buckets![
943                            30.0 => 327,
944                            60.0 => 147,
945                            300.0 => 61,
946                            600.0 => 1,
947                            1800.0 => 0,
948                            3600.0 => 0,
949                            7200.0 => 0,
950                            10800.0 => 0,
951                            18000.0 => 0,
952                            36000.0 => 0
953                        ],
954                        count: 536,
955                        sum: 19690.129384881966,
956                    },
957                )
958                    .with_tags(Some(metric_tags!("runner" => "z")))
959                    .with_timestamp(Some(*TIMESTAMP)),
960                Metric::new(
961                    "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
962                        buckets: vector_lib::buckets![
963                            30.0 => 1,
964                            60.0 => 0,
965                            300.0 => 0,
966                            600.0 => 0,
967                            1800.0 => 0,
968                            3600.0 => 0,
969                            7200.0 => 0,
970                            10800.0 => 0,
971                            18000.0 => 0,
972                            36000.0 => 0
973                        ],
974                        count: 1,
975                        sum: 28.975436316,
976                    },
977                )
978                    .with_tags(Some(metric_tags!("runner" => "x")))
979                    .with_timestamp(Some(*TIMESTAMP)),
980                Metric::new(
981                    "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
982                        buckets: vector_lib::buckets![
983                            30.0 => 285, 60.0 => 880, 300.0 => 1906, 600.0 => 80, 1800.0 => 101, 3600.0 => 3,
984                            7200.0 => 0, 10800.0 => 0, 18000.0 => 0, 36000.0 => 0
985                        ],
986                        count: 3255,
987                        sum: 381111.7498891335,
988                    },
989                )
990                    .with_tags(Some(metric_tags!("runner" => "y")))
991                    .with_timestamp(Some(*TIMESTAMP))
992            ]),
993        );
994    }
995
996    #[test]
997    fn test_summary() {
998        let exp = r#"
999            # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
1000            # TYPE rpc_duration_seconds summary
1001            rpc_duration_seconds{service="a",quantile="0.01"} 3102 1612411506789
1002            rpc_duration_seconds{service="a",quantile="0.05"} 3272 1612411506789
1003            rpc_duration_seconds{service="a",quantile="0.5"} 4773 1612411506789
1004            rpc_duration_seconds{service="a",quantile="0.9"} 9001 1612411506789
1005            rpc_duration_seconds{service="a",quantile="0.99"} 76656 1612411506789
1006            rpc_duration_seconds_sum{service="a"} 1.7560473e+07 1612411506789
1007            rpc_duration_seconds_count{service="a"} 2693 1612411506789
1008            # HELP go_gc_duration_seconds A summary of the GC invocation durations.
1009            # TYPE go_gc_duration_seconds summary
1010            go_gc_duration_seconds{quantile="0"} 0.009460965 1612411506789
1011            go_gc_duration_seconds{quantile="0.25"} 0.009793382 1612411506789
1012            go_gc_duration_seconds{quantile="0.5"} 0.009870205 1612411506789
1013            go_gc_duration_seconds{quantile="0.75"} 0.01001838 1612411506789
1014            go_gc_duration_seconds{quantile="1"} 0.018827136 1612411506789
1015            go_gc_duration_seconds_sum 4668.551713715 1612411506789
1016            go_gc_duration_seconds_count 602767 1612411506789
1017            "#;
1018
1019        assert_event_data_eq!(
1020            events_to_metrics(parse_text(exp)),
1021            Ok(vec![
1022                Metric::new(
1023                    "rpc_duration_seconds",
1024                    MetricKind::Absolute,
1025                    MetricValue::AggregatedSummary {
1026                        quantiles: vector_lib::quantiles![
1027                            0.01 => 3102.0,
1028                            0.05 => 3272.0,
1029                            0.5 => 4773.0,
1030                            0.9 => 9001.0,
1031                            0.99 => 76656.0
1032                        ],
1033                        count: 2693,
1034                        sum: 1.7560473e+07,
1035                    },
1036                )
1037                .with_tags(Some(metric_tags!("service" => "a")))
1038                .with_timestamp(Some(*TIMESTAMP)),
1039                Metric::new(
1040                    "go_gc_duration_seconds",
1041                    MetricKind::Absolute,
1042                    MetricValue::AggregatedSummary {
1043                        quantiles: vector_lib::quantiles![
1044                            0.0 => 0.009460965,
1045                            0.25 => 0.009793382,
1046                            0.5 => 0.009870205,
1047                            0.75 => 0.01001838,
1048                            1.0 => 0.018827136
1049                        ],
1050                        count: 602767,
1051                        sum: 4668.551713715,
1052                    },
1053                )
1054                .with_timestamp(Some(*TIMESTAMP)),
1055            ]),
1056        );
1057    }
1058
1059    // https://github.com/vectordotdev/vector/issues/3276
1060    #[test]
1061    fn test_nginx() {
1062        let exp = r#"
1063            # HELP nginx_server_bytes request/response bytes
1064            # TYPE nginx_server_bytes counter
1065            nginx_server_bytes{direction="in",host="*"} 263719
1066            nginx_server_bytes{direction="in",host="_"} 255061
1067            nginx_server_bytes{direction="in",host="nginx-vts-status"} 8658
1068            nginx_server_bytes{direction="out",host="*"} 944199
1069            nginx_server_bytes{direction="out",host="_"} 360775
1070            nginx_server_bytes{direction="out",host="nginx-vts-status"} 583424
1071            # HELP nginx_server_cache cache counter
1072            # TYPE nginx_server_cache counter
1073            nginx_server_cache{host="*",status="bypass"} 0
1074            nginx_server_cache{host="*",status="expired"} 0
1075            nginx_server_cache{host="*",status="hit"} 0
1076            nginx_server_cache{host="*",status="miss"} 0
1077            nginx_server_cache{host="*",status="revalidated"} 0
1078            nginx_server_cache{host="*",status="scarce"} 0
1079            "#;
1080
1081        let now = Utc::now();
1082        let result = events_to_metrics(parse_text(exp)).expect("Parsing failed");
1083        // Reset all the timestamps for comparison
1084        let result: Vec<_> = result
1085            .into_iter()
1086            .map(|metric| {
1087                assert!(metric.timestamp().expect("Missing timestamp") >= now);
1088                metric.with_timestamp(Some(*TIMESTAMP))
1089            })
1090            .collect();
1091
1092        assert_event_data_eq!(
1093            result,
1094            vec![
1095                Metric::new(
1096                    "nginx_server_bytes",
1097                    MetricKind::Absolute,
1098                    MetricValue::Counter { value: 263719.0 },
1099                )
1100                .with_tags(Some(metric_tags! { "direction" => "in", "host" => "*" }))
1101                .with_timestamp(Some(*TIMESTAMP)),
1102                Metric::new(
1103                    "nginx_server_bytes",
1104                    MetricKind::Absolute,
1105                    MetricValue::Counter { value: 255061.0 },
1106                )
1107                .with_tags(Some(metric_tags! { "direction" => "in", "host" => "_" }))
1108                .with_timestamp(Some(*TIMESTAMP)),
1109                Metric::new(
1110                    "nginx_server_bytes",
1111                    MetricKind::Absolute,
1112                    MetricValue::Counter { value: 8658.0 },
1113                )
1114                .with_tags(Some(
1115                    metric_tags! { "direction" => "in", "host" => "nginx-vts-status" }
1116                ))
1117                .with_timestamp(Some(*TIMESTAMP)),
1118                Metric::new(
1119                    "nginx_server_bytes",
1120                    MetricKind::Absolute,
1121                    MetricValue::Counter { value: 944199.0 },
1122                )
1123                .with_tags(Some(metric_tags! { "direction" => "out", "host" => "*" }))
1124                .with_timestamp(Some(*TIMESTAMP)),
1125                Metric::new(
1126                    "nginx_server_bytes",
1127                    MetricKind::Absolute,
1128                    MetricValue::Counter { value: 360775.0 },
1129                )
1130                .with_tags(Some(metric_tags! { "direction" => "out", "host" => "_" }))
1131                .with_timestamp(Some(*TIMESTAMP)),
1132                Metric::new(
1133                    "nginx_server_bytes",
1134                    MetricKind::Absolute,
1135                    MetricValue::Counter { value: 583424.0 },
1136                )
1137                .with_tags(Some(
1138                    metric_tags! { "direction" => "out", "host" => "nginx-vts-status" }
1139                ))
1140                .with_timestamp(Some(*TIMESTAMP)),
1141                Metric::new(
1142                    "nginx_server_cache",
1143                    MetricKind::Absolute,
1144                    MetricValue::Counter { value: 0.0 },
1145                )
1146                .with_tags(Some(metric_tags! { "host" => "*", "status" => "bypass" }))
1147                .with_timestamp(Some(*TIMESTAMP)),
1148                Metric::new(
1149                    "nginx_server_cache",
1150                    MetricKind::Absolute,
1151                    MetricValue::Counter { value: 0.0 },
1152                )
1153                .with_tags(Some(metric_tags! { "host" => "*", "status" => "expired" }))
1154                .with_timestamp(Some(*TIMESTAMP)),
1155                Metric::new(
1156                    "nginx_server_cache",
1157                    MetricKind::Absolute,
1158                    MetricValue::Counter { value: 0.0 },
1159                )
1160                .with_tags(Some(metric_tags! { "host" => "*", "status" => "hit" }))
1161                .with_timestamp(Some(*TIMESTAMP)),
1162                Metric::new(
1163                    "nginx_server_cache",
1164                    MetricKind::Absolute,
1165                    MetricValue::Counter { value: 0.0 },
1166                )
1167                .with_tags(Some(metric_tags! { "host" => "*", "status" => "miss" }))
1168                .with_timestamp(Some(*TIMESTAMP)),
1169                Metric::new(
1170                    "nginx_server_cache",
1171                    MetricKind::Absolute,
1172                    MetricValue::Counter { value: 0.0 },
1173                )
1174                .with_tags(Some(
1175                    metric_tags! { "host" => "*", "status" => "revalidated" }
1176                ))
1177                .with_timestamp(Some(*TIMESTAMP)),
1178                Metric::new(
1179                    "nginx_server_cache",
1180                    MetricKind::Absolute,
1181                    MetricValue::Counter { value: 0.0 },
1182                )
1183                .with_tags(Some(metric_tags! { "host" => "*", "status" => "scarce" }))
1184                .with_timestamp(Some(*TIMESTAMP))
1185            ]
1186        );
1187    }
1188
1189    #[test]
1190    fn test_overrides_nothing_overwritten() {
1191        let exp = r#"
1192            # TYPE jobs_total counter
1193            # HELP jobs_total Total number of jobs
1194            jobs_total{type="a"} 1.0 1612411506789
1195            "#;
1196
1197        assert_event_data_eq!(
1198            events_to_metrics(parse_text_with_overrides(exp, vec![], false)),
1199            Ok(vec![
1200                Metric::new(
1201                    "jobs_total",
1202                    MetricKind::Absolute,
1203                    MetricValue::Counter { value: 1.0 },
1204                )
1205                .with_tags(Some(metric_tags! { "type" => "a" }))
1206                .with_timestamp(Some(*TIMESTAMP))
1207            ]),
1208        );
1209    }
1210
1211    #[test]
1212    fn test_overrides_label_overwritten() {
1213        let exp = r#"
1214            # TYPE jobs_total counter
1215            # HELP jobs_total Total number of jobs
1216            jobs_total{type="a"} 1.0 1612411506789
1217            "#;
1218
1219        assert_event_data_eq!(
1220            events_to_metrics(parse_text_with_overrides(
1221                exp,
1222                vec![("type".to_owned(), "b".to_owned())],
1223                false
1224            )),
1225            Ok(vec![
1226                Metric::new(
1227                    "jobs_total",
1228                    MetricKind::Absolute,
1229                    MetricValue::Counter { value: 1.0 },
1230                )
1231                .with_tags(Some(metric_tags! { "type" => "b" }))
1232                .with_timestamp(Some(*TIMESTAMP))
1233            ]),
1234        );
1235    }
1236
1237    // This matches the behaviour of the real Prometheus Pushgateway, which I
1238    // tested manually.
1239    #[test]
1240    fn test_overrides_last_value_preferred() {
1241        let exp = r#"
1242            # TYPE jobs_total counter
1243            # HELP jobs_total Total number of jobs
1244            jobs_total{type="a"} 1.0 1612411506789
1245            "#;
1246
1247        assert_event_data_eq!(
1248            events_to_metrics(parse_text_with_overrides(
1249                exp,
1250                vec![
1251                    ("type".to_owned(), "b".to_owned()),
1252                    ("type".to_owned(), "c".to_owned())
1253                ],
1254                false
1255            )),
1256            Ok(vec![
1257                Metric::new(
1258                    "jobs_total",
1259                    MetricKind::Absolute,
1260                    MetricValue::Counter { value: 1.0 },
1261                )
1262                .with_tags(Some(metric_tags! { "type" => "c" }))
1263                .with_timestamp(Some(*TIMESTAMP))
1264            ]),
1265        );
1266    }
1267
1268    #[test]
1269    fn test_aggregation_enabled_only_aggregates_counter_and_histogram() {
1270        let exp = r#"
1271            # TYPE jobs_total counter
1272            # HELP jobs_total Total number of jobs
1273            jobs_total{type="a"} 1.0 1612411506789
1274            # TYPE jobs_current gauge
1275            # HELP jobs_current Current number of jobs
1276            jobs_current{type="a"} 5.0 1612411506789
1277            # TYPE jobs_distribution histogram
1278            # HELP jobs_distribution Distribution of jobs
1279            jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
1280            jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
1281            jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
1282            jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
1283            jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
1284            jobs_distribution_sum{type="a"} 8.0 1612411506789
1285            jobs_distribution_count{type="a"} 1.0 1612411506789
1286            # TYPE jobs_summary summary
1287            # HELP jobs_summary Summary of jobs
1288            jobs_summary_sum{type="a"} 8.0 1612411506789
1289            jobs_summary_count{type="a"} 1.0 1612411506789
1290            "#;
1291
1292        assert_event_data_eq!(
1293            events_to_metrics(parse_text_with_overrides(exp, vec![], true)),
1294            Ok(vec![
1295                Metric::new(
1296                    "jobs_total",
1297                    MetricKind::Incremental,
1298                    MetricValue::Counter { value: 1.0 },
1299                )
1300                .with_tags(Some(metric_tags! { "type" => "a" }))
1301                .with_timestamp(Some(*TIMESTAMP)),
1302                Metric::new(
1303                    "jobs_current",
1304                    MetricKind::Absolute,
1305                    MetricValue::Gauge { value: 5.0 },
1306                )
1307                .with_tags(Some(metric_tags! { "type" => "a" }))
1308                .with_timestamp(Some(*TIMESTAMP)),
1309                Metric::new(
1310                    "jobs_distribution",
1311                    MetricKind::Incremental,
1312                    MetricValue::AggregatedHistogram {
1313                        buckets: vector_lib::buckets![
1314                            1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
1315                        ],
1316                        count: 1,
1317                        sum: 8.0,
1318                    },
1319                )
1320                .with_tags(Some(metric_tags! { "type" => "a" }))
1321                .with_timestamp(Some(*TIMESTAMP)),
1322                Metric::new(
1323                    "jobs_summary",
1324                    MetricKind::Absolute,
1325                    MetricValue::AggregatedSummary {
1326                        quantiles: vector_lib::quantiles![],
1327                        count: 1,
1328                        sum: 8.0,
1329                    },
1330                )
1331                .with_tags(Some(metric_tags! { "type" => "a" }))
1332                .with_timestamp(Some(*TIMESTAMP)),
1333            ]),
1334        );
1335    }
1336
1337    #[test]
1338    fn test_aggregation_disabled_all_absolute() {
1339        let exp = r#"
1340            # TYPE jobs_total counter
1341            # HELP jobs_total Total number of jobs
1342            jobs_total{type="a"} 1.0 1612411506789
1343            # TYPE jobs_current gauge
1344            # HELP jobs_current Current number of jobs
1345            jobs_current{type="a"} 5.0 1612411506789
1346            # TYPE jobs_distribution histogram
1347            # HELP jobs_distribution Distribution of jobs
1348            jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
1349            jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
1350            jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
1351            jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
1352            jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
1353            jobs_distribution_sum{type="a"} 8.0 1612411506789
1354            jobs_distribution_count{type="a"} 1.0 1612411506789
1355            # TYPE jobs_summary summary
1356            # HELP jobs_summary Summary of jobs
1357            jobs_summary_sum{type="a"} 8.0 1612411506789
1358            jobs_summary_count{type="a"} 1.0 1612411506789
1359            "#;
1360
1361        assert_event_data_eq!(
1362            events_to_metrics(parse_text_with_overrides(exp, vec![], false)),
1363            Ok(vec![
1364                Metric::new(
1365                    "jobs_total",
1366                    MetricKind::Absolute,
1367                    MetricValue::Counter { value: 1.0 },
1368                )
1369                .with_tags(Some(metric_tags! { "type" => "a" }))
1370                .with_timestamp(Some(*TIMESTAMP)),
1371                Metric::new(
1372                    "jobs_current",
1373                    MetricKind::Absolute,
1374                    MetricValue::Gauge { value: 5.0 },
1375                )
1376                .with_tags(Some(metric_tags! { "type" => "a" }))
1377                .with_timestamp(Some(*TIMESTAMP)),
1378                Metric::new(
1379                    "jobs_distribution",
1380                    MetricKind::Absolute,
1381                    MetricValue::AggregatedHistogram {
1382                        buckets: vector_lib::buckets![
1383                            1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
1384                        ],
1385                        count: 1,
1386                        sum: 8.0,
1387                    },
1388                )
1389                .with_tags(Some(metric_tags! { "type" => "a" }))
1390                .with_timestamp(Some(*TIMESTAMP)),
1391                Metric::new(
1392                    "jobs_summary",
1393                    MetricKind::Absolute,
1394                    MetricValue::AggregatedSummary {
1395                        quantiles: vector_lib::quantiles![],
1396                        count: 1,
1397                        sum: 8.0,
1398                    },
1399                )
1400                .with_tags(Some(metric_tags! { "type" => "a" }))
1401                .with_timestamp(Some(*TIMESTAMP)),
1402            ]),
1403        );
1404    }
1405
1406    #[test]
1407    fn test_skip_nan_counter_enabled() {
1408        let exp = r#"
1409            # TYPE name counter
1410            name{labelname="val1"} NaN 1612411506789
1411            name{labelname="val2"} 123.0 1612411506789
1412            "#;
1413
1414        let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1415        assert_eq!(result.len(), 1);
1416        assert_eq!(result[0].name(), "name");
1417        match result[0].value() {
1418            MetricValue::Counter { value } => {
1419                assert_eq!(*value, 123.0);
1420            }
1421            _ => unreachable!(),
1422        }
1423        assert_eq!(result[0].tags().unwrap().get("labelname").unwrap(), "val2");
1424    }
1425
1426    #[test]
1427    fn test_skip_nan_counter_disabled() {
1428        let exp = r#"
1429            # TYPE name counter
1430            name{labelname="val1"} NaN 1612411506789
1431            name{labelname="val2"} 123.0 1612411506789
1432            "#;
1433
1434        let result = events_to_metrics(parse_text(exp)).unwrap();
1435        assert_eq!(result.len(), 2);
1436
1437        // Find the NaN metric
1438        let nan_metric = result
1439            .iter()
1440            .find(|m| m.tags().as_ref().and_then(|tags| tags.get("labelname")) == Some("val1"))
1441            .unwrap();
1442
1443        match nan_metric.value() {
1444            MetricValue::Counter { value } => {
1445                assert!(value.is_nan());
1446            }
1447            _ => unreachable!(),
1448        }
1449    }
1450
1451    #[test]
1452    fn test_skip_nan_gauge_enabled() {
1453        let exp = r#"
1454            # TYPE name gauge
1455            name{labelname="val1"} NaN 1612411506789
1456            name{labelname="val2"} 123.0 1612411506789
1457            "#;
1458
1459        let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1460        assert_eq!(result.len(), 1);
1461        assert_eq!(result[0].name(), "name");
1462        match result[0].value() {
1463            MetricValue::Gauge { value } => {
1464                assert_eq!(*value, 123.0);
1465            }
1466            _ => unreachable!(),
1467        }
1468    }
1469
1470    #[test]
1471    fn test_skip_nan_histogram_bucket_enabled() {
1472        let exp = r#"
1473            # TYPE duration histogram
1474            duration_bucket{le="1"} 133988 1612411506789
1475            duration_bucket{le="NaN"} 144320 1612411506789
1476            duration_sum 53423 1612411506789
1477            duration_count 144320 1612411506789
1478            "#;
1479
1480        let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1481        assert_eq!(result.len(), 0); // Should skip entire histogram due to NaN bucket le value
1482    }
1483
1484    #[test]
1485    fn test_skip_nan_histogram_sum_enabled() {
1486        let exp = r#"
1487            # TYPE duration histogram
1488            duration_bucket{le="1"} 133988 1612411506789
1489            duration_bucket{le="+Inf"} 144320 1612411506789
1490            duration_sum NaN 1612411506789
1491            duration_count 144320 1612411506789
1492            "#;
1493
1494        let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1495        assert_eq!(result.len(), 0); // Should skip entire histogram due to NaN sum
1496    }
1497
1498    #[test]
1499    fn test_skip_nan_summary_enabled() {
1500        let exp = r#"
1501            # TYPE duration summary
1502            duration{quantile="0.5"} NaN 1612411506789
1503            duration{quantile="0.99"} 76656 1612411506789
1504            duration_sum 1.7560473e+07 1612411506789
1505            duration_count 2693 1612411506789
1506            "#;
1507
1508        let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1509        assert_eq!(result.len(), 0); // Should skip entire summary due to NaN quantile value
1510    }
1511
1512    #[test]
1513    fn test_skip_nan_all_valid_values() {
1514        let exp = r#"
1515            # TYPE counter_metric counter
1516            counter_metric 123.0 1612411506789
1517            # TYPE gauge_metric gauge
1518            gauge_metric 456.0 1612411506789
1519            "#;
1520
1521        let result = events_to_metrics(parse_text_with_nan_filtering(exp)).unwrap();
1522        assert_eq!(result.len(), 2); // Both should be preserved
1523    }
1524}