vector/sources/prometheus/
parser.rs

1use std::cmp::Ordering;
2
3use chrono::{DateTime, TimeZone, Utc};
4#[cfg(feature = "sources-prometheus-remote-write")]
5use vector_lib::prometheus::parser::proto;
6use vector_lib::prometheus::parser::{GroupKind, MetricGroup, ParserError};
7
8use crate::event::{
9    metric::{Bucket, Metric, MetricKind, MetricTags, MetricValue, Quantile},
10    Event,
11};
12
13fn utc_timestamp(timestamp: Option<i64>, default: DateTime<Utc>) -> DateTime<Utc> {
14    timestamp
15        .and_then(|timestamp| {
16            Utc.timestamp_opt(timestamp / 1000, (timestamp % 1000) as u32 * 1000000)
17                .latest()
18        })
19        .unwrap_or(default)
20}
21
22#[cfg(any(test, feature = "sources-prometheus-scrape"))]
23pub(super) fn parse_text(packet: &str) -> Result<Vec<Event>, ParserError> {
24    vector_lib::prometheus::parser::parse_text(packet)
25        .map(|group| reparse_groups(group, vec![], false))
26}
27
28#[cfg(any(test, feature = "sources-prometheus-pushgateway"))]
29pub(super) fn parse_text_with_overrides(
30    packet: &str,
31    tag_overrides: impl IntoIterator<Item = (String, String)> + Clone,
32    aggregate_metrics: bool,
33) -> Result<Vec<Event>, ParserError> {
34    vector_lib::prometheus::parser::parse_text(packet)
35        .map(|group| reparse_groups(group, tag_overrides, aggregate_metrics))
36}
37
38#[cfg(feature = "sources-prometheus-remote-write")]
39pub(super) fn parse_request(request: proto::WriteRequest) -> Result<Vec<Event>, ParserError> {
40    vector_lib::prometheus::parser::parse_request(request)
41        .map(|group| reparse_groups(group, vec![], false))
42}
43
44fn reparse_groups(
45    groups: Vec<MetricGroup>,
46    tag_overrides: impl IntoIterator<Item = (String, String)> + Clone,
47    aggregate_metrics: bool,
48) -> Vec<Event> {
49    let mut result = Vec::new();
50    let start = Utc::now();
51
52    let metric_kind = if aggregate_metrics {
53        MetricKind::Incremental
54    } else {
55        MetricKind::Absolute
56    };
57
58    for group in groups {
59        match group.metrics {
60            GroupKind::Counter(metrics) => {
61                for (key, metric) in metrics {
62                    let tags = combine_tags(key.labels, tag_overrides.clone());
63
64                    let counter = Metric::new(
65                        group.name.clone(),
66                        metric_kind,
67                        MetricValue::Counter {
68                            value: metric.value,
69                        },
70                    )
71                    .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
72                    .with_tags(tags.as_option());
73
74                    result.push(counter.into());
75                }
76            }
77            GroupKind::Gauge(metrics) | GroupKind::Untyped(metrics) => {
78                for (key, metric) in metrics {
79                    let tags = combine_tags(key.labels, tag_overrides.clone());
80
81                    let gauge = Metric::new(
82                        group.name.clone(),
83                        // Gauges are always absolute: aggregating them makes no sense
84                        MetricKind::Absolute,
85                        MetricValue::Gauge {
86                            value: metric.value,
87                        },
88                    )
89                    .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
90                    .with_tags(tags.as_option());
91
92                    result.push(gauge.into());
93                }
94            }
95            GroupKind::Histogram(metrics) => {
96                for (key, metric) in metrics {
97                    let tags = combine_tags(key.labels, tag_overrides.clone());
98
99                    let mut buckets = metric.buckets;
100                    buckets.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
101                    for i in (1..buckets.len()).rev() {
102                        buckets[i].count = buckets[i].count.saturating_sub(buckets[i - 1].count);
103                    }
104                    let drop_last = buckets
105                        .last()
106                        .is_some_and(|bucket| bucket.bucket == f64::INFINITY);
107                    if drop_last {
108                        buckets.pop();
109                    }
110
111                    result.push(
112                        Metric::new(
113                            group.name.clone(),
114                            metric_kind,
115                            MetricValue::AggregatedHistogram {
116                                buckets: buckets
117                                    .into_iter()
118                                    .map(|b| Bucket {
119                                        upper_limit: b.bucket,
120                                        count: b.count,
121                                    })
122                                    .collect(),
123                                count: metric.count,
124                                sum: metric.sum,
125                            },
126                        )
127                        .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
128                        .with_tags(tags.as_option())
129                        .into(),
130                    );
131                }
132            }
133            GroupKind::Summary(metrics) => {
134                for (key, metric) in metrics {
135                    let tags = combine_tags(key.labels, tag_overrides.clone());
136
137                    result.push(
138                        Metric::new(
139                            group.name.clone(),
140                            // Summaries are always absolute: aggregating them makes no sense
141                            MetricKind::Absolute,
142                            MetricValue::AggregatedSummary {
143                                quantiles: metric
144                                    .quantiles
145                                    .into_iter()
146                                    .map(|q| Quantile {
147                                        quantile: q.quantile,
148                                        value: q.value,
149                                    })
150                                    .collect(),
151                                count: metric.count,
152                                sum: metric.sum,
153                            },
154                        )
155                        .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
156                        .with_tags(tags.as_option())
157                        .into(),
158                    );
159                }
160            }
161        }
162    }
163
164    result
165}
166
167fn combine_tags(
168    base_tags: impl Into<MetricTags>,
169    tag_overrides: impl IntoIterator<Item = (String, String)>,
170) -> MetricTags {
171    let mut tags = base_tags.into();
172    for (k, v) in tag_overrides.into_iter() {
173        tags.replace(k, v);
174    }
175
176    tags
177}
178
179#[cfg(test)]
180mod test {
181    use std::sync::LazyLock;
182
183    use chrono::{TimeZone, Timelike, Utc};
184    use similar_asserts::assert_eq;
185    use vector_lib::assert_event_data_eq;
186    use vector_lib::metric_tags;
187
188    use super::*;
189    use crate::event::metric::{Metric, MetricKind, MetricValue};
190
191    static TIMESTAMP: LazyLock<DateTime<Utc>> = LazyLock::new(|| {
192        Utc.with_ymd_and_hms(2021, 2, 4, 4, 5, 6)
193            .single()
194            .and_then(|t| t.with_nanosecond(789 * 1_000_000))
195            .expect("invalid timestamp")
196    });
197
198    fn events_to_metrics(
199        events: Result<Vec<Event>, ParserError>,
200    ) -> Result<Vec<Metric>, ParserError> {
201        events.map(|events| events.into_iter().map(Event::into_metric).collect())
202    }
203
204    #[test]
205    fn adds_timestamp_if_missing() {
206        let now = Utc::now();
207        let exp = r"
208            # HELP counter Some counter
209            # TYPE count counter
210            http_requests_total 1027
211            ";
212        let result = events_to_metrics(parse_text(exp)).unwrap();
213        assert_eq!(result.len(), 1);
214        assert!(result[0].timestamp().unwrap() >= now);
215    }
216
217    #[test]
218    fn test_counter() {
219        let exp = r"
220            # HELP uptime A counter
221            # TYPE uptime counter
222            uptime 123.0 1612411506789
223            ";
224
225        assert_event_data_eq!(
226            events_to_metrics(parse_text(exp)),
227            Ok(vec![Metric::new(
228                "uptime",
229                MetricKind::Absolute,
230                MetricValue::Counter { value: 123.0 },
231            )
232            .with_timestamp(Some(*TIMESTAMP))]),
233        );
234    }
235
236    #[test]
237    fn test_counter_empty() {
238        let exp = r"
239            # HELP hidden A counter
240            # TYPE hidden counter
241            ";
242
243        assert_event_data_eq!(events_to_metrics(parse_text(exp)), Ok(vec![]));
244    }
245
246    #[test]
247    fn test_counter_nan() {
248        let exp = r#"
249            # TYPE name counter
250            name{labelname="val1",basename="basevalue"} NaN
251            "#;
252
253        match events_to_metrics(parse_text(exp)).unwrap()[0].value() {
254            MetricValue::Counter { value } => {
255                assert!(value.is_nan());
256            }
257            _ => unreachable!(),
258        }
259    }
260
261    #[test]
262    fn test_counter_weird() {
263        let exp = r#"
264            # A normal comment.
265            #
266            # TYPE name counter
267            name {labelname="val2",basename="base\"v\\al\nue"} 0.23 1612411506789
268            # HELP name two-line\n doc  str\\ing
269            # HELP  name2  	doc str"ing 2
270            #    TYPE    name2 counter
271            name2{labelname="val2"	,basename   =   "basevalue2"		} +Inf 1612411506789
272            name2{ labelname = "val1" , }-Inf 1612411506789
273            "#;
274
275        assert_event_data_eq!(
276            events_to_metrics(parse_text(exp)),
277            Ok(vec![
278                Metric::new(
279                    "name",
280                    MetricKind::Absolute,
281                    MetricValue::Counter { value: 0.23 },
282                )
283                .with_tags(Some(metric_tags!(
284                    "labelname" => "val2",
285                    "basename" => "base\"v\\al\nue",
286                )))
287                .with_timestamp(Some(*TIMESTAMP)),
288                Metric::new(
289                    "name2",
290                    MetricKind::Absolute,
291                    MetricValue::Counter {
292                        value: f64::INFINITY
293                    },
294                )
295                .with_tags(Some(metric_tags!(
296                    "labelname" => "val2",
297                    "basename" => "basevalue2",
298                )))
299                .with_timestamp(Some(*TIMESTAMP)),
300                Metric::new(
301                    "name2",
302                    MetricKind::Absolute,
303                    MetricValue::Counter {
304                        value: f64::NEG_INFINITY
305                    },
306                )
307                .with_tags(Some(metric_tags!("labelname" => "val1")))
308                .with_timestamp(Some(*TIMESTAMP)),
309            ]),
310        );
311    }
312
313    #[test]
314    fn test_counter_tags_and_timestamp() {
315        let exp = r#"
316            # HELP http_requests_total The total number of HTTP requests.
317            # TYPE http_requests_total counter
318            http_requests_total{method="post",code="200"} 1027 1395066363000
319            http_requests_total{method="post",code="400"}    3 1395066363000
320            "#;
321
322        assert_event_data_eq!(
323            events_to_metrics(parse_text(exp)),
324            Ok(vec![
325                Metric::new(
326                    "http_requests_total",
327                    MetricKind::Absolute,
328                    MetricValue::Counter { value: 1027.0 },
329                )
330                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
331                .with_tags(Some(metric_tags!(
332                    "method" => "post",
333                    "code" => "200",
334                ))),
335                Metric::new(
336                    "http_requests_total",
337                    MetricKind::Absolute,
338                    MetricValue::Counter { value: 3.0 },
339                )
340                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
341                .with_tags(Some(metric_tags!(
342                    "method" => "post",
343                    "code" => "400"
344                )))
345            ]),
346        );
347    }
348
349    #[test]
350    fn test_gauge() {
351        let exp = r"
352            # HELP latency A gauge
353            # TYPE latency gauge
354            latency 123.0 1612411506789
355            ";
356
357        assert_event_data_eq!(
358            events_to_metrics(parse_text(exp)),
359            Ok(vec![Metric::new(
360                "latency",
361                MetricKind::Absolute,
362                MetricValue::Gauge { value: 123.0 },
363            )
364            .with_timestamp(Some(*TIMESTAMP))]),
365        );
366    }
367
368    #[test]
369    fn test_gauge_minimalistic() {
370        let exp = r"
371            metric_without_timestamp_and_labels 12.47 1612411506789
372            ";
373
374        assert_event_data_eq!(
375            events_to_metrics(parse_text(exp)),
376            Ok(vec![Metric::new(
377                "metric_without_timestamp_and_labels",
378                MetricKind::Absolute,
379                MetricValue::Gauge { value: 12.47 },
380            )
381            .with_timestamp(Some(*TIMESTAMP))]),
382        );
383    }
384
385    #[test]
386    fn test_gauge_empty_labels() {
387        let exp = r"
388            no_labels{} 3 1612411506789
389            ";
390
391        assert_event_data_eq!(
392            events_to_metrics(parse_text(exp)),
393            Ok(vec![Metric::new(
394                "no_labels",
395                MetricKind::Absolute,
396                MetricValue::Gauge { value: 3.0 },
397            )
398            .with_timestamp(Some(*TIMESTAMP))]),
399        );
400    }
401
402    #[test]
403    fn test_gauge_minimalistic_escaped() {
404        let exp = r#"
405            msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9 1612411506789
406            "#;
407
408        assert_event_data_eq!(
409            events_to_metrics(parse_text(exp)),
410            Ok(vec![Metric::new(
411                "msdos_file_access_time_seconds",
412                MetricKind::Absolute,
413                MetricValue::Gauge {
414                    value: 1458255915.0
415                },
416            )
417            .with_tags(Some(metric_tags!(
418                "path" => "C:\\DIR\\FILE.TXT",
419                "error" => "Cannot find file:\n\"FILE.TXT\"",
420            )))
421            .with_timestamp(Some(*TIMESTAMP))]),
422        );
423    }
424
425    #[test]
426    fn test_tag_value_contain_bracket() {
427        let exp = r#"
428            # HELP name counter
429            # TYPE name counter
430            name{tag="}"} 0 1612411506789
431            "#;
432        assert_event_data_eq!(
433            events_to_metrics(parse_text(exp)),
434            Ok(vec![Metric::new(
435                "name",
436                MetricKind::Absolute,
437                MetricValue::Counter { value: 0.0 },
438            )
439            .with_tags(Some(metric_tags! { "tag" => "}" }))
440            .with_timestamp(Some(*TIMESTAMP))]),
441        );
442    }
443
444    #[test]
445    fn test_parse_tag_value_contain_comma() {
446        let exp = r#"
447            # HELP name counter
448            # TYPE name counter
449            name{tag="a,b"} 0 1612411506789
450            "#;
451        assert_event_data_eq!(
452            events_to_metrics(parse_text(exp)),
453            Ok(vec![Metric::new(
454                "name",
455                MetricKind::Absolute,
456                MetricValue::Counter { value: 0.0 },
457            )
458            .with_tags(Some(metric_tags! { "tag" => "a,b" }))
459            .with_timestamp(Some(*TIMESTAMP))]),
460        );
461    }
462
463    #[test]
464    fn test_parse_tag_escaping() {
465        let exp = r#"
466            # HELP name counter
467            # TYPE name counter
468            name{tag="\\n"} 0 1612411506789
469            "#;
470        assert_event_data_eq!(
471            events_to_metrics(parse_text(exp)),
472            Ok(vec![Metric::new(
473                "name",
474                MetricKind::Absolute,
475                MetricValue::Counter { value: 0.0 },
476            )
477            .with_tags(Some(metric_tags! { "tag" => "\\n" }))
478            .with_timestamp(Some(*TIMESTAMP))]),
479        );
480    }
481
482    #[test]
483    fn test_parse_tag_dont_trim_value() {
484        let exp = r#"
485            # HELP name counter
486            # TYPE name counter
487            name{tag=" * "} 0 1612411506789
488            "#;
489        assert_event_data_eq!(
490            events_to_metrics(parse_text(exp)),
491            Ok(vec![Metric::new(
492                "name",
493                MetricKind::Absolute,
494                MetricValue::Counter { value: 0.0 },
495            )
496            .with_tags(Some(metric_tags! { "tag" => " * " }))
497            .with_timestamp(Some(*TIMESTAMP))]),
498        );
499    }
500
501    #[test]
502    fn test_parse_tag_value_containing_equals() {
503        let exp = r#"
504            telemetry_scrape_size_bytes_count{registry="default",content_type="text/plain; version=0.0.4"} 1890 1612411506789
505            "#;
506
507        assert_event_data_eq!(
508            events_to_metrics(parse_text(exp)),
509            Ok(vec![Metric::new(
510                "telemetry_scrape_size_bytes_count",
511                MetricKind::Absolute,
512                MetricValue::Gauge { value: 1890.0 },
513            )
514            .with_tags(Some(metric_tags!( "registry" => "default",
515                    "content_type" => "text/plain; version=0.0.4" )))
516            .with_timestamp(Some(*TIMESTAMP))]),
517        );
518    }
519
520    #[test]
521    fn test_parse_tag_error_no_value() {
522        let exp = r#"
523            telemetry_scrape_size_bytes_count{registry="default",content_type} 1890 1612411506789
524            "#;
525
526        assert!(events_to_metrics(parse_text(exp)).is_err());
527    }
528
529    #[test]
530    fn test_parse_tag_error_equals_empty_value() {
531        let exp = r#"
532            telemetry_scrape_size_bytes_count{registry="default",content_type=} 1890 1612411506789
533            "#;
534
535        assert!(events_to_metrics(parse_text(exp)).is_err());
536    }
537
538    #[test]
539    fn test_gauge_weird_timestamp() {
540        let exp = r#"
541            something_weird{problem="division by zero"} +Inf -3982045000
542            "#;
543
544        assert_event_data_eq!(
545            events_to_metrics(parse_text(exp)),
546            Ok(vec![Metric::new(
547                "something_weird",
548                MetricKind::Absolute,
549                MetricValue::Gauge {
550                    value: f64::INFINITY
551                },
552            )
553            .with_timestamp(Utc.timestamp_opt(-3982045, 0).latest())
554            .with_tags(Some(
555                metric_tags!("problem" => "division by zero")
556            ))]),
557        );
558    }
559
560    #[test]
561    fn test_gauge_tabs() {
562        let exp = r#"
563            # TYPE	latency	gauge
564            latency{env="production"}	1.0		1395066363000
565            latency{env="testing"}		2.0		1395066363000
566            "#;
567
568        assert_event_data_eq!(
569            events_to_metrics(parse_text(exp)),
570            Ok(vec![
571                Metric::new(
572                    "latency",
573                    MetricKind::Absolute,
574                    MetricValue::Gauge { value: 1.0 },
575                )
576                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
577                .with_tags(Some(metric_tags!("env" => "production"))),
578                Metric::new(
579                    "latency",
580                    MetricKind::Absolute,
581                    MetricValue::Gauge { value: 2.0 },
582                )
583                .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
584                .with_tags(Some(metric_tags!("env" => "testing")))
585            ]),
586        );
587    }
588
589    #[test]
590    fn test_mixed() {
591        let exp = r"
592            # TYPE uptime counter
593            uptime 123.0 1612411506789
594            # TYPE temperature gauge
595            temperature -1.5 1612411506789
596            # TYPE launch_count counter
597            launch_count 10.0 1612411506789
598            ";
599
600        assert_event_data_eq!(
601            events_to_metrics(parse_text(exp)),
602            Ok(vec![
603                Metric::new(
604                    "uptime",
605                    MetricKind::Absolute,
606                    MetricValue::Counter { value: 123.0 },
607                )
608                .with_timestamp(Some(*TIMESTAMP)),
609                Metric::new(
610                    "temperature",
611                    MetricKind::Absolute,
612                    MetricValue::Gauge { value: -1.5 },
613                )
614                .with_timestamp(Some(*TIMESTAMP)),
615                Metric::new(
616                    "launch_count",
617                    MetricKind::Absolute,
618                    MetricValue::Counter { value: 10.0 },
619                )
620                .with_timestamp(Some(*TIMESTAMP))
621            ]),
622        );
623    }
624
625    #[test]
626    fn test_no_value() {
627        let exp = r#"
628            # TYPE latency counter
629            latency{env="production"}
630            "#;
631
632        assert!(events_to_metrics(parse_text(exp)).is_err());
633    }
634
635    #[test]
636    fn test_no_name() {
637        let exp = r"
638            # TYPE uptime counter
639            123.0 1612411506789
640            ";
641
642        assert!(events_to_metrics(parse_text(exp)).is_err());
643    }
644
645    #[test]
646    fn test_mixed_and_loosely_typed() {
647        let exp = r"
648            # TYPE uptime counter
649            uptime 123.0 1612411506789
650            last_downtime 4.0 1612411506789
651            # TYPE temperature gauge
652            temperature -1.5 1612411506789
653            temperature_7_days_average 0.1 1612411506789
654            ";
655
656        assert_event_data_eq!(
657            events_to_metrics(parse_text(exp)),
658            Ok(vec![
659                Metric::new(
660                    "uptime",
661                    MetricKind::Absolute,
662                    MetricValue::Counter { value: 123.0 },
663                )
664                .with_timestamp(Some(*TIMESTAMP)),
665                Metric::new(
666                    "last_downtime",
667                    MetricKind::Absolute,
668                    MetricValue::Gauge { value: 4.0 },
669                )
670                .with_timestamp(Some(*TIMESTAMP)),
671                Metric::new(
672                    "temperature",
673                    MetricKind::Absolute,
674                    MetricValue::Gauge { value: -1.5 },
675                )
676                .with_timestamp(Some(*TIMESTAMP)),
677                Metric::new(
678                    "temperature_7_days_average",
679                    MetricKind::Absolute,
680                    MetricValue::Gauge { value: 0.1 },
681                )
682                .with_timestamp(Some(*TIMESTAMP))
683            ]),
684        );
685    }
686
687    #[test]
688    fn test_histogram() {
689        let exp = r#"
690            # HELP http_request_duration_seconds A histogram of the request duration.
691            # TYPE http_request_duration_seconds histogram
692            http_request_duration_seconds_bucket{le="0.05"} 24054 1612411506789
693            http_request_duration_seconds_bucket{le="0.1"} 33444 1612411506789
694            http_request_duration_seconds_bucket{le="0.2"} 100392 1612411506789
695            http_request_duration_seconds_bucket{le="0.5"} 129389 1612411506789
696            http_request_duration_seconds_bucket{le="1"} 133988 1612411506789
697            http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411506789
698            http_request_duration_seconds_sum 53423 1612411506789
699            http_request_duration_seconds_count 144320 1612411506789
700            "#;
701
702        assert_event_data_eq!(
703            events_to_metrics(parse_text(exp)),
704            Ok(vec![Metric::new(
705                "http_request_duration_seconds",
706                MetricKind::Absolute,
707                MetricValue::AggregatedHistogram {
708                    buckets: vector_lib::buckets![
709                        0.05 => 24054, 0.1 => 9390, 0.2 => 66948, 0.5 => 28997, 1.0 => 4599
710                    ],
711                    count: 144320,
712                    sum: 53423.0,
713                },
714            )
715            .with_timestamp(Some(*TIMESTAMP))]),
716        );
717    }
718
719    #[test]
720    fn test_histogram_out_of_order() {
721        let exp = r#"
722            # HELP duration A histogram of the request duration.
723            # TYPE duration histogram
724            duration_bucket{le="+Inf"} 144320 1612411506789
725            duration_bucket{le="1"} 133988 1612411506789
726            duration_sum 53423 1612411506789
727            duration_count 144320 1612411506789
728            "#;
729
730        assert_event_data_eq!(
731            events_to_metrics(parse_text(exp)),
732            Ok(vec![Metric::new(
733                "duration",
734                MetricKind::Absolute,
735                MetricValue::AggregatedHistogram {
736                    buckets: vector_lib::buckets![1.0 => 133988],
737                    count: 144320,
738                    sum: 53423.0,
739                },
740            )
741            .with_timestamp(Some(*TIMESTAMP))]),
742        );
743    }
744
745    #[test]
746    fn test_histogram_backward_values() {
747        let exp = r#"
748            # HELP duration A histogram of the request duration.
749            # TYPE duration histogram
750            duration_bucket{le="1"} 2000 1612411506789
751            duration_bucket{le="10"} 1000 1612411506789
752            duration_bucket{le="+Inf"} 2000 1612411506789
753            duration_sum 2000 1612411506789
754            duration_count 2000 1612411506789
755            "#;
756
757        assert_event_data_eq!(
758            events_to_metrics(parse_text(exp)),
759            Ok(vec![Metric::new(
760                "duration",
761                MetricKind::Absolute,
762                MetricValue::AggregatedHistogram {
763                    buckets: vector_lib::buckets![1.0 => 2000, 10.0 => 0],
764                    count: 2000,
765                    sum: 2000.0,
766                },
767            )
768            .with_timestamp(Some(*TIMESTAMP))]),
769        );
770    }
771
772    #[test]
773    fn test_histogram_with_labels() {
774        let exp = r#"
775            # HELP gitlab_runner_job_duration_seconds Histogram of job durations
776            # TYPE gitlab_runner_job_duration_seconds histogram
777            gitlab_runner_job_duration_seconds_bucket{runner="z",le="30"} 327 1612411506789
778            gitlab_runner_job_duration_seconds_bucket{runner="z",le="60"} 474 1612411506789
779            gitlab_runner_job_duration_seconds_bucket{runner="z",le="300"} 535 1612411506789
780            gitlab_runner_job_duration_seconds_bucket{runner="z",le="600"} 536 1612411506789
781            gitlab_runner_job_duration_seconds_bucket{runner="z",le="1800"} 536 1612411506789
782            gitlab_runner_job_duration_seconds_bucket{runner="z",le="3600"} 536 1612411506789
783            gitlab_runner_job_duration_seconds_bucket{runner="z",le="7200"} 536 1612411506789
784            gitlab_runner_job_duration_seconds_bucket{runner="z",le="10800"} 536 1612411506789
785            gitlab_runner_job_duration_seconds_bucket{runner="z",le="18000"} 536 1612411506789
786            gitlab_runner_job_duration_seconds_bucket{runner="z",le="36000"} 536 1612411506789
787            gitlab_runner_job_duration_seconds_bucket{runner="z",le="+Inf"} 536 1612411506789
788            gitlab_runner_job_duration_seconds_sum{runner="z"} 19690.129384881966 1612411506789
789            gitlab_runner_job_duration_seconds_count{runner="z"} 536 1612411506789
790            gitlab_runner_job_duration_seconds_bucket{runner="x",le="30"} 1 1612411506789
791            gitlab_runner_job_duration_seconds_bucket{runner="x",le="60"} 1 1612411506789
792            gitlab_runner_job_duration_seconds_bucket{runner="x",le="300"} 1 1612411506789
793            gitlab_runner_job_duration_seconds_bucket{runner="x",le="600"} 1 1612411506789
794            gitlab_runner_job_duration_seconds_bucket{runner="x",le="1800"} 1 1612411506789
795            gitlab_runner_job_duration_seconds_bucket{runner="x",le="3600"} 1 1612411506789
796            gitlab_runner_job_duration_seconds_bucket{runner="x",le="7200"} 1 1612411506789
797            gitlab_runner_job_duration_seconds_bucket{runner="x",le="10800"} 1 1612411506789
798            gitlab_runner_job_duration_seconds_bucket{runner="x",le="18000"} 1 1612411506789
799            gitlab_runner_job_duration_seconds_bucket{runner="x",le="36000"} 1 1612411506789
800            gitlab_runner_job_duration_seconds_bucket{runner="x",le="+Inf"} 1 1612411506789
801            gitlab_runner_job_duration_seconds_sum{runner="x"} 28.975436316 1612411506789
802            gitlab_runner_job_duration_seconds_count{runner="x"} 1 1612411506789
803            gitlab_runner_job_duration_seconds_bucket{runner="y",le="30"} 285 1612411506789
804            gitlab_runner_job_duration_seconds_bucket{runner="y",le="60"} 1165 1612411506789
805            gitlab_runner_job_duration_seconds_bucket{runner="y",le="300"} 3071 1612411506789
806            gitlab_runner_job_duration_seconds_bucket{runner="y",le="600"} 3151 1612411506789
807            gitlab_runner_job_duration_seconds_bucket{runner="y",le="1800"} 3252 1612411506789
808            gitlab_runner_job_duration_seconds_bucket{runner="y",le="3600"} 3255 1612411506789
809            gitlab_runner_job_duration_seconds_bucket{runner="y",le="7200"} 3255 1612411506789
810            gitlab_runner_job_duration_seconds_bucket{runner="y",le="10800"} 3255 1612411506789
811            gitlab_runner_job_duration_seconds_bucket{runner="y",le="18000"} 3255 1612411506789
812            gitlab_runner_job_duration_seconds_bucket{runner="y",le="36000"} 3255 1612411506789
813            gitlab_runner_job_duration_seconds_bucket{runner="y",le="+Inf"} 3255 1612411506789
814            gitlab_runner_job_duration_seconds_sum{runner="y"} 381111.7498891335 1612411506789
815            gitlab_runner_job_duration_seconds_count{runner="y"} 3255 1612411506789
816        "#;
817
818        assert_event_data_eq!(
819            events_to_metrics(parse_text(exp)),
820            Ok(vec![
821                Metric::new(
822                    "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
823                        buckets: vector_lib::buckets![
824                            30.0 => 327,
825                            60.0 => 147,
826                            300.0 => 61,
827                            600.0 => 1,
828                            1800.0 => 0,
829                            3600.0 => 0,
830                            7200.0 => 0,
831                            10800.0 => 0,
832                            18000.0 => 0,
833                            36000.0 => 0
834                        ],
835                        count: 536,
836                        sum: 19690.129384881966,
837                    },
838                )
839                    .with_tags(Some(metric_tags!("runner" => "z")))
840                    .with_timestamp(Some(*TIMESTAMP)),
841                Metric::new(
842                    "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
843                        buckets: vector_lib::buckets![
844                            30.0 => 1,
845                            60.0 => 0,
846                            300.0 => 0,
847                            600.0 => 0,
848                            1800.0 => 0,
849                            3600.0 => 0,
850                            7200.0 => 0,
851                            10800.0 => 0,
852                            18000.0 => 0,
853                            36000.0 => 0
854                        ],
855                        count: 1,
856                        sum: 28.975436316,
857                    },
858                )
859                    .with_tags(Some(metric_tags!("runner" => "x")))
860                    .with_timestamp(Some(*TIMESTAMP)),
861                Metric::new(
862                    "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
863                        buckets: vector_lib::buckets![
864                            30.0 => 285, 60.0 => 880, 300.0 => 1906, 600.0 => 80, 1800.0 => 101, 3600.0 => 3,
865                            7200.0 => 0, 10800.0 => 0, 18000.0 => 0, 36000.0 => 0
866                        ],
867                        count: 3255,
868                        sum: 381111.7498891335,
869                    },
870                )
871                    .with_tags(Some(metric_tags!("runner" => "y")))
872                    .with_timestamp(Some(*TIMESTAMP))
873            ]),
874        );
875    }
876
877    #[test]
878    fn test_summary() {
879        let exp = r#"
880            # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
881            # TYPE rpc_duration_seconds summary
882            rpc_duration_seconds{service="a",quantile="0.01"} 3102 1612411506789
883            rpc_duration_seconds{service="a",quantile="0.05"} 3272 1612411506789
884            rpc_duration_seconds{service="a",quantile="0.5"} 4773 1612411506789
885            rpc_duration_seconds{service="a",quantile="0.9"} 9001 1612411506789
886            rpc_duration_seconds{service="a",quantile="0.99"} 76656 1612411506789
887            rpc_duration_seconds_sum{service="a"} 1.7560473e+07 1612411506789
888            rpc_duration_seconds_count{service="a"} 2693 1612411506789
889            # HELP go_gc_duration_seconds A summary of the GC invocation durations.
890            # TYPE go_gc_duration_seconds summary
891            go_gc_duration_seconds{quantile="0"} 0.009460965 1612411506789
892            go_gc_duration_seconds{quantile="0.25"} 0.009793382 1612411506789
893            go_gc_duration_seconds{quantile="0.5"} 0.009870205 1612411506789
894            go_gc_duration_seconds{quantile="0.75"} 0.01001838 1612411506789
895            go_gc_duration_seconds{quantile="1"} 0.018827136 1612411506789
896            go_gc_duration_seconds_sum 4668.551713715 1612411506789
897            go_gc_duration_seconds_count 602767 1612411506789
898            "#;
899
900        assert_event_data_eq!(
901            events_to_metrics(parse_text(exp)),
902            Ok(vec![
903                Metric::new(
904                    "rpc_duration_seconds",
905                    MetricKind::Absolute,
906                    MetricValue::AggregatedSummary {
907                        quantiles: vector_lib::quantiles![
908                            0.01 => 3102.0,
909                            0.05 => 3272.0,
910                            0.5 => 4773.0,
911                            0.9 => 9001.0,
912                            0.99 => 76656.0
913                        ],
914                        count: 2693,
915                        sum: 1.7560473e+07,
916                    },
917                )
918                .with_tags(Some(metric_tags!("service" => "a")))
919                .with_timestamp(Some(*TIMESTAMP)),
920                Metric::new(
921                    "go_gc_duration_seconds",
922                    MetricKind::Absolute,
923                    MetricValue::AggregatedSummary {
924                        quantiles: vector_lib::quantiles![
925                            0.0 => 0.009460965,
926                            0.25 => 0.009793382,
927                            0.5 => 0.009870205,
928                            0.75 => 0.01001838,
929                            1.0 => 0.018827136
930                        ],
931                        count: 602767,
932                        sum: 4668.551713715,
933                    },
934                )
935                .with_timestamp(Some(*TIMESTAMP)),
936            ]),
937        );
938    }
939
940    // https://github.com/vectordotdev/vector/issues/3276
941    #[test]
942    fn test_nginx() {
943        let exp = r#"
944            # HELP nginx_server_bytes request/response bytes
945            # TYPE nginx_server_bytes counter
946            nginx_server_bytes{direction="in",host="*"} 263719
947            nginx_server_bytes{direction="in",host="_"} 255061
948            nginx_server_bytes{direction="in",host="nginx-vts-status"} 8658
949            nginx_server_bytes{direction="out",host="*"} 944199
950            nginx_server_bytes{direction="out",host="_"} 360775
951            nginx_server_bytes{direction="out",host="nginx-vts-status"} 583424
952            # HELP nginx_server_cache cache counter
953            # TYPE nginx_server_cache counter
954            nginx_server_cache{host="*",status="bypass"} 0
955            nginx_server_cache{host="*",status="expired"} 0
956            nginx_server_cache{host="*",status="hit"} 0
957            nginx_server_cache{host="*",status="miss"} 0
958            nginx_server_cache{host="*",status="revalidated"} 0
959            nginx_server_cache{host="*",status="scarce"} 0
960            "#;
961
962        let now = Utc::now();
963        let result = events_to_metrics(parse_text(exp)).expect("Parsing failed");
964        // Reset all the timestamps for comparison
965        let result: Vec<_> = result
966            .into_iter()
967            .map(|metric| {
968                assert!(metric.timestamp().expect("Missing timestamp") >= now);
969                metric.with_timestamp(Some(*TIMESTAMP))
970            })
971            .collect();
972
973        assert_event_data_eq!(
974            result,
975            vec![
976                Metric::new(
977                    "nginx_server_bytes",
978                    MetricKind::Absolute,
979                    MetricValue::Counter { value: 263719.0 },
980                )
981                .with_tags(Some(metric_tags! { "direction" => "in", "host" => "*" }))
982                .with_timestamp(Some(*TIMESTAMP)),
983                Metric::new(
984                    "nginx_server_bytes",
985                    MetricKind::Absolute,
986                    MetricValue::Counter { value: 255061.0 },
987                )
988                .with_tags(Some(metric_tags! { "direction" => "in", "host" => "_" }))
989                .with_timestamp(Some(*TIMESTAMP)),
990                Metric::new(
991                    "nginx_server_bytes",
992                    MetricKind::Absolute,
993                    MetricValue::Counter { value: 8658.0 },
994                )
995                .with_tags(Some(
996                    metric_tags! { "direction" => "in", "host" => "nginx-vts-status" }
997                ))
998                .with_timestamp(Some(*TIMESTAMP)),
999                Metric::new(
1000                    "nginx_server_bytes",
1001                    MetricKind::Absolute,
1002                    MetricValue::Counter { value: 944199.0 },
1003                )
1004                .with_tags(Some(metric_tags! { "direction" => "out", "host" => "*" }))
1005                .with_timestamp(Some(*TIMESTAMP)),
1006                Metric::new(
1007                    "nginx_server_bytes",
1008                    MetricKind::Absolute,
1009                    MetricValue::Counter { value: 360775.0 },
1010                )
1011                .with_tags(Some(metric_tags! { "direction" => "out", "host" => "_" }))
1012                .with_timestamp(Some(*TIMESTAMP)),
1013                Metric::new(
1014                    "nginx_server_bytes",
1015                    MetricKind::Absolute,
1016                    MetricValue::Counter { value: 583424.0 },
1017                )
1018                .with_tags(Some(
1019                    metric_tags! { "direction" => "out", "host" => "nginx-vts-status" }
1020                ))
1021                .with_timestamp(Some(*TIMESTAMP)),
1022                Metric::new(
1023                    "nginx_server_cache",
1024                    MetricKind::Absolute,
1025                    MetricValue::Counter { value: 0.0 },
1026                )
1027                .with_tags(Some(metric_tags! { "host" => "*", "status" => "bypass" }))
1028                .with_timestamp(Some(*TIMESTAMP)),
1029                Metric::new(
1030                    "nginx_server_cache",
1031                    MetricKind::Absolute,
1032                    MetricValue::Counter { value: 0.0 },
1033                )
1034                .with_tags(Some(metric_tags! { "host" => "*", "status" => "expired" }))
1035                .with_timestamp(Some(*TIMESTAMP)),
1036                Metric::new(
1037                    "nginx_server_cache",
1038                    MetricKind::Absolute,
1039                    MetricValue::Counter { value: 0.0 },
1040                )
1041                .with_tags(Some(metric_tags! { "host" => "*", "status" => "hit" }))
1042                .with_timestamp(Some(*TIMESTAMP)),
1043                Metric::new(
1044                    "nginx_server_cache",
1045                    MetricKind::Absolute,
1046                    MetricValue::Counter { value: 0.0 },
1047                )
1048                .with_tags(Some(metric_tags! { "host" => "*", "status" => "miss" }))
1049                .with_timestamp(Some(*TIMESTAMP)),
1050                Metric::new(
1051                    "nginx_server_cache",
1052                    MetricKind::Absolute,
1053                    MetricValue::Counter { value: 0.0 },
1054                )
1055                .with_tags(Some(
1056                    metric_tags! { "host" => "*", "status" => "revalidated" }
1057                ))
1058                .with_timestamp(Some(*TIMESTAMP)),
1059                Metric::new(
1060                    "nginx_server_cache",
1061                    MetricKind::Absolute,
1062                    MetricValue::Counter { value: 0.0 },
1063                )
1064                .with_tags(Some(metric_tags! { "host" => "*", "status" => "scarce" }))
1065                .with_timestamp(Some(*TIMESTAMP))
1066            ]
1067        );
1068    }
1069
1070    #[test]
1071    fn test_overrides_nothing_overwritten() {
1072        let exp = r#"
1073            # TYPE jobs_total counter
1074            # HELP jobs_total Total number of jobs
1075            jobs_total{type="a"} 1.0 1612411506789
1076            "#;
1077
1078        assert_event_data_eq!(
1079            events_to_metrics(parse_text_with_overrides(exp, vec![], false)),
1080            Ok(vec![Metric::new(
1081                "jobs_total",
1082                MetricKind::Absolute,
1083                MetricValue::Counter { value: 1.0 },
1084            )
1085            .with_tags(Some(metric_tags! { "type" => "a" }))
1086            .with_timestamp(Some(*TIMESTAMP))]),
1087        );
1088    }
1089
1090    #[test]
1091    fn test_overrides_label_overwritten() {
1092        let exp = r#"
1093            # TYPE jobs_total counter
1094            # HELP jobs_total Total number of jobs
1095            jobs_total{type="a"} 1.0 1612411506789
1096            "#;
1097
1098        assert_event_data_eq!(
1099            events_to_metrics(parse_text_with_overrides(
1100                exp,
1101                vec![("type".to_owned(), "b".to_owned())],
1102                false
1103            )),
1104            Ok(vec![Metric::new(
1105                "jobs_total",
1106                MetricKind::Absolute,
1107                MetricValue::Counter { value: 1.0 },
1108            )
1109            .with_tags(Some(metric_tags! { "type" => "b" }))
1110            .with_timestamp(Some(*TIMESTAMP))]),
1111        );
1112    }
1113
1114    // This matches the behaviour of the real Prometheus Pushgateway, which I
1115    // tested manually.
1116    #[test]
1117    fn test_overrides_last_value_preferred() {
1118        let exp = r#"
1119            # TYPE jobs_total counter
1120            # HELP jobs_total Total number of jobs
1121            jobs_total{type="a"} 1.0 1612411506789
1122            "#;
1123
1124        assert_event_data_eq!(
1125            events_to_metrics(parse_text_with_overrides(
1126                exp,
1127                vec![
1128                    ("type".to_owned(), "b".to_owned()),
1129                    ("type".to_owned(), "c".to_owned())
1130                ],
1131                false
1132            )),
1133            Ok(vec![Metric::new(
1134                "jobs_total",
1135                MetricKind::Absolute,
1136                MetricValue::Counter { value: 1.0 },
1137            )
1138            .with_tags(Some(metric_tags! { "type" => "c" }))
1139            .with_timestamp(Some(*TIMESTAMP))]),
1140        );
1141    }
1142
1143    #[test]
1144    fn test_aggregation_enabled_only_aggregates_counter_and_histogram() {
1145        let exp = r#"
1146            # TYPE jobs_total counter
1147            # HELP jobs_total Total number of jobs
1148            jobs_total{type="a"} 1.0 1612411506789
1149            # TYPE jobs_current gauge
1150            # HELP jobs_current Current number of jobs
1151            jobs_current{type="a"} 5.0 1612411506789
1152            # TYPE jobs_distribution histogram
1153            # HELP jobs_distribution Distribution of jobs
1154            jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
1155            jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
1156            jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
1157            jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
1158            jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
1159            jobs_distribution_sum{type="a"} 8.0 1612411506789
1160            jobs_distribution_count{type="a"} 1.0 1612411506789
1161            # TYPE jobs_summary summary
1162            # HELP jobs_summary Summary of jobs
1163            jobs_summary_sum{type="a"} 8.0 1612411506789
1164            jobs_summary_count{type="a"} 1.0 1612411506789
1165            "#;
1166
1167        assert_event_data_eq!(
1168            events_to_metrics(parse_text_with_overrides(exp, vec![], true)),
1169            Ok(vec![
1170                Metric::new(
1171                    "jobs_total",
1172                    MetricKind::Incremental,
1173                    MetricValue::Counter { value: 1.0 },
1174                )
1175                .with_tags(Some(metric_tags! { "type" => "a" }))
1176                .with_timestamp(Some(*TIMESTAMP)),
1177                Metric::new(
1178                    "jobs_current",
1179                    MetricKind::Absolute,
1180                    MetricValue::Gauge { value: 5.0 },
1181                )
1182                .with_tags(Some(metric_tags! { "type" => "a" }))
1183                .with_timestamp(Some(*TIMESTAMP)),
1184                Metric::new(
1185                    "jobs_distribution",
1186                    MetricKind::Incremental,
1187                    MetricValue::AggregatedHistogram {
1188                        buckets: vector_lib::buckets![
1189                            1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
1190                        ],
1191                        count: 1,
1192                        sum: 8.0,
1193                    },
1194                )
1195                .with_tags(Some(metric_tags! { "type" => "a" }))
1196                .with_timestamp(Some(*TIMESTAMP)),
1197                Metric::new(
1198                    "jobs_summary",
1199                    MetricKind::Absolute,
1200                    MetricValue::AggregatedSummary {
1201                        quantiles: vector_lib::quantiles![],
1202                        count: 1,
1203                        sum: 8.0,
1204                    },
1205                )
1206                .with_tags(Some(metric_tags! { "type" => "a" }))
1207                .with_timestamp(Some(*TIMESTAMP)),
1208            ]),
1209        );
1210    }
1211
1212    #[test]
1213    fn test_aggregation_disabled_all_absolute() {
1214        let exp = r#"
1215            # TYPE jobs_total counter
1216            # HELP jobs_total Total number of jobs
1217            jobs_total{type="a"} 1.0 1612411506789
1218            # TYPE jobs_current gauge
1219            # HELP jobs_current Current number of jobs
1220            jobs_current{type="a"} 5.0 1612411506789
1221            # TYPE jobs_distribution histogram
1222            # HELP jobs_distribution Distribution of jobs
1223            jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
1224            jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
1225            jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
1226            jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
1227            jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
1228            jobs_distribution_sum{type="a"} 8.0 1612411506789
1229            jobs_distribution_count{type="a"} 1.0 1612411506789
1230            # TYPE jobs_summary summary
1231            # HELP jobs_summary Summary of jobs
1232            jobs_summary_sum{type="a"} 8.0 1612411506789
1233            jobs_summary_count{type="a"} 1.0 1612411506789
1234            "#;
1235
1236        assert_event_data_eq!(
1237            events_to_metrics(parse_text_with_overrides(exp, vec![], false)),
1238            Ok(vec![
1239                Metric::new(
1240                    "jobs_total",
1241                    MetricKind::Absolute,
1242                    MetricValue::Counter { value: 1.0 },
1243                )
1244                .with_tags(Some(metric_tags! { "type" => "a" }))
1245                .with_timestamp(Some(*TIMESTAMP)),
1246                Metric::new(
1247                    "jobs_current",
1248                    MetricKind::Absolute,
1249                    MetricValue::Gauge { value: 5.0 },
1250                )
1251                .with_tags(Some(metric_tags! { "type" => "a" }))
1252                .with_timestamp(Some(*TIMESTAMP)),
1253                Metric::new(
1254                    "jobs_distribution",
1255                    MetricKind::Absolute,
1256                    MetricValue::AggregatedHistogram {
1257                        buckets: vector_lib::buckets![
1258                            1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
1259                        ],
1260                        count: 1,
1261                        sum: 8.0,
1262                    },
1263                )
1264                .with_tags(Some(metric_tags! { "type" => "a" }))
1265                .with_timestamp(Some(*TIMESTAMP)),
1266                Metric::new(
1267                    "jobs_summary",
1268                    MetricKind::Absolute,
1269                    MetricValue::AggregatedSummary {
1270                        quantiles: vector_lib::quantiles![],
1271                        count: 1,
1272                        sum: 8.0,
1273                    },
1274                )
1275                .with_tags(Some(metric_tags! { "type" => "a" }))
1276                .with_timestamp(Some(*TIMESTAMP)),
1277            ]),
1278        );
1279    }
1280}