1use std::cmp::Ordering;
2
3use chrono::{DateTime, TimeZone, Utc};
4#[cfg(feature = "sources-prometheus-remote-write")]
5use vector_lib::prometheus::parser::proto;
6use vector_lib::prometheus::parser::{GroupKind, MetricGroup, ParserError};
7
8use crate::event::{
9 metric::{Bucket, Metric, MetricKind, MetricTags, MetricValue, Quantile},
10 Event,
11};
12
13fn utc_timestamp(timestamp: Option<i64>, default: DateTime<Utc>) -> DateTime<Utc> {
14 timestamp
15 .and_then(|timestamp| {
16 Utc.timestamp_opt(timestamp / 1000, (timestamp % 1000) as u32 * 1000000)
17 .latest()
18 })
19 .unwrap_or(default)
20}
21
22#[cfg(any(test, feature = "sources-prometheus-scrape"))]
23pub(super) fn parse_text(packet: &str) -> Result<Vec<Event>, ParserError> {
24 vector_lib::prometheus::parser::parse_text(packet)
25 .map(|group| reparse_groups(group, vec![], false))
26}
27
28#[cfg(any(test, feature = "sources-prometheus-pushgateway"))]
29pub(super) fn parse_text_with_overrides(
30 packet: &str,
31 tag_overrides: impl IntoIterator<Item = (String, String)> + Clone,
32 aggregate_metrics: bool,
33) -> Result<Vec<Event>, ParserError> {
34 vector_lib::prometheus::parser::parse_text(packet)
35 .map(|group| reparse_groups(group, tag_overrides, aggregate_metrics))
36}
37
38#[cfg(feature = "sources-prometheus-remote-write")]
39pub(super) fn parse_request(request: proto::WriteRequest) -> Result<Vec<Event>, ParserError> {
40 vector_lib::prometheus::parser::parse_request(request)
41 .map(|group| reparse_groups(group, vec![], false))
42}
43
44fn reparse_groups(
45 groups: Vec<MetricGroup>,
46 tag_overrides: impl IntoIterator<Item = (String, String)> + Clone,
47 aggregate_metrics: bool,
48) -> Vec<Event> {
49 let mut result = Vec::new();
50 let start = Utc::now();
51
52 let metric_kind = if aggregate_metrics {
53 MetricKind::Incremental
54 } else {
55 MetricKind::Absolute
56 };
57
58 for group in groups {
59 match group.metrics {
60 GroupKind::Counter(metrics) => {
61 for (key, metric) in metrics {
62 let tags = combine_tags(key.labels, tag_overrides.clone());
63
64 let counter = Metric::new(
65 group.name.clone(),
66 metric_kind,
67 MetricValue::Counter {
68 value: metric.value,
69 },
70 )
71 .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
72 .with_tags(tags.as_option());
73
74 result.push(counter.into());
75 }
76 }
77 GroupKind::Gauge(metrics) | GroupKind::Untyped(metrics) => {
78 for (key, metric) in metrics {
79 let tags = combine_tags(key.labels, tag_overrides.clone());
80
81 let gauge = Metric::new(
82 group.name.clone(),
83 MetricKind::Absolute,
85 MetricValue::Gauge {
86 value: metric.value,
87 },
88 )
89 .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
90 .with_tags(tags.as_option());
91
92 result.push(gauge.into());
93 }
94 }
95 GroupKind::Histogram(metrics) => {
96 for (key, metric) in metrics {
97 let tags = combine_tags(key.labels, tag_overrides.clone());
98
99 let mut buckets = metric.buckets;
100 buckets.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap_or(Ordering::Equal));
101 for i in (1..buckets.len()).rev() {
102 buckets[i].count = buckets[i].count.saturating_sub(buckets[i - 1].count);
103 }
104 let drop_last = buckets
105 .last()
106 .is_some_and(|bucket| bucket.bucket == f64::INFINITY);
107 if drop_last {
108 buckets.pop();
109 }
110
111 result.push(
112 Metric::new(
113 group.name.clone(),
114 metric_kind,
115 MetricValue::AggregatedHistogram {
116 buckets: buckets
117 .into_iter()
118 .map(|b| Bucket {
119 upper_limit: b.bucket,
120 count: b.count,
121 })
122 .collect(),
123 count: metric.count,
124 sum: metric.sum,
125 },
126 )
127 .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
128 .with_tags(tags.as_option())
129 .into(),
130 );
131 }
132 }
133 GroupKind::Summary(metrics) => {
134 for (key, metric) in metrics {
135 let tags = combine_tags(key.labels, tag_overrides.clone());
136
137 result.push(
138 Metric::new(
139 group.name.clone(),
140 MetricKind::Absolute,
142 MetricValue::AggregatedSummary {
143 quantiles: metric
144 .quantiles
145 .into_iter()
146 .map(|q| Quantile {
147 quantile: q.quantile,
148 value: q.value,
149 })
150 .collect(),
151 count: metric.count,
152 sum: metric.sum,
153 },
154 )
155 .with_timestamp(Some(utc_timestamp(key.timestamp, start)))
156 .with_tags(tags.as_option())
157 .into(),
158 );
159 }
160 }
161 }
162 }
163
164 result
165}
166
167fn combine_tags(
168 base_tags: impl Into<MetricTags>,
169 tag_overrides: impl IntoIterator<Item = (String, String)>,
170) -> MetricTags {
171 let mut tags = base_tags.into();
172 for (k, v) in tag_overrides.into_iter() {
173 tags.replace(k, v);
174 }
175
176 tags
177}
178
179#[cfg(test)]
180mod test {
181 use std::sync::LazyLock;
182
183 use chrono::{TimeZone, Timelike, Utc};
184 use similar_asserts::assert_eq;
185 use vector_lib::assert_event_data_eq;
186 use vector_lib::metric_tags;
187
188 use super::*;
189 use crate::event::metric::{Metric, MetricKind, MetricValue};
190
191 static TIMESTAMP: LazyLock<DateTime<Utc>> = LazyLock::new(|| {
192 Utc.with_ymd_and_hms(2021, 2, 4, 4, 5, 6)
193 .single()
194 .and_then(|t| t.with_nanosecond(789 * 1_000_000))
195 .expect("invalid timestamp")
196 });
197
198 fn events_to_metrics(
199 events: Result<Vec<Event>, ParserError>,
200 ) -> Result<Vec<Metric>, ParserError> {
201 events.map(|events| events.into_iter().map(Event::into_metric).collect())
202 }
203
204 #[test]
205 fn adds_timestamp_if_missing() {
206 let now = Utc::now();
207 let exp = r"
208 # HELP counter Some counter
209 # TYPE count counter
210 http_requests_total 1027
211 ";
212 let result = events_to_metrics(parse_text(exp)).unwrap();
213 assert_eq!(result.len(), 1);
214 assert!(result[0].timestamp().unwrap() >= now);
215 }
216
217 #[test]
218 fn test_counter() {
219 let exp = r"
220 # HELP uptime A counter
221 # TYPE uptime counter
222 uptime 123.0 1612411506789
223 ";
224
225 assert_event_data_eq!(
226 events_to_metrics(parse_text(exp)),
227 Ok(vec![Metric::new(
228 "uptime",
229 MetricKind::Absolute,
230 MetricValue::Counter { value: 123.0 },
231 )
232 .with_timestamp(Some(*TIMESTAMP))]),
233 );
234 }
235
236 #[test]
237 fn test_counter_empty() {
238 let exp = r"
239 # HELP hidden A counter
240 # TYPE hidden counter
241 ";
242
243 assert_event_data_eq!(events_to_metrics(parse_text(exp)), Ok(vec![]));
244 }
245
246 #[test]
247 fn test_counter_nan() {
248 let exp = r#"
249 # TYPE name counter
250 name{labelname="val1",basename="basevalue"} NaN
251 "#;
252
253 match events_to_metrics(parse_text(exp)).unwrap()[0].value() {
254 MetricValue::Counter { value } => {
255 assert!(value.is_nan());
256 }
257 _ => unreachable!(),
258 }
259 }
260
261 #[test]
262 fn test_counter_weird() {
263 let exp = r#"
264 # A normal comment.
265 #
266 # TYPE name counter
267 name {labelname="val2",basename="base\"v\\al\nue"} 0.23 1612411506789
268 # HELP name two-line\n doc str\\ing
269 # HELP name2 doc str"ing 2
270 # TYPE name2 counter
271 name2{labelname="val2" ,basename = "basevalue2" } +Inf 1612411506789
272 name2{ labelname = "val1" , }-Inf 1612411506789
273 "#;
274
275 assert_event_data_eq!(
276 events_to_metrics(parse_text(exp)),
277 Ok(vec![
278 Metric::new(
279 "name",
280 MetricKind::Absolute,
281 MetricValue::Counter { value: 0.23 },
282 )
283 .with_tags(Some(metric_tags!(
284 "labelname" => "val2",
285 "basename" => "base\"v\\al\nue",
286 )))
287 .with_timestamp(Some(*TIMESTAMP)),
288 Metric::new(
289 "name2",
290 MetricKind::Absolute,
291 MetricValue::Counter {
292 value: f64::INFINITY
293 },
294 )
295 .with_tags(Some(metric_tags!(
296 "labelname" => "val2",
297 "basename" => "basevalue2",
298 )))
299 .with_timestamp(Some(*TIMESTAMP)),
300 Metric::new(
301 "name2",
302 MetricKind::Absolute,
303 MetricValue::Counter {
304 value: f64::NEG_INFINITY
305 },
306 )
307 .with_tags(Some(metric_tags!("labelname" => "val1")))
308 .with_timestamp(Some(*TIMESTAMP)),
309 ]),
310 );
311 }
312
313 #[test]
314 fn test_counter_tags_and_timestamp() {
315 let exp = r#"
316 # HELP http_requests_total The total number of HTTP requests.
317 # TYPE http_requests_total counter
318 http_requests_total{method="post",code="200"} 1027 1395066363000
319 http_requests_total{method="post",code="400"} 3 1395066363000
320 "#;
321
322 assert_event_data_eq!(
323 events_to_metrics(parse_text(exp)),
324 Ok(vec![
325 Metric::new(
326 "http_requests_total",
327 MetricKind::Absolute,
328 MetricValue::Counter { value: 1027.0 },
329 )
330 .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
331 .with_tags(Some(metric_tags!(
332 "method" => "post",
333 "code" => "200",
334 ))),
335 Metric::new(
336 "http_requests_total",
337 MetricKind::Absolute,
338 MetricValue::Counter { value: 3.0 },
339 )
340 .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
341 .with_tags(Some(metric_tags!(
342 "method" => "post",
343 "code" => "400"
344 )))
345 ]),
346 );
347 }
348
349 #[test]
350 fn test_gauge() {
351 let exp = r"
352 # HELP latency A gauge
353 # TYPE latency gauge
354 latency 123.0 1612411506789
355 ";
356
357 assert_event_data_eq!(
358 events_to_metrics(parse_text(exp)),
359 Ok(vec![Metric::new(
360 "latency",
361 MetricKind::Absolute,
362 MetricValue::Gauge { value: 123.0 },
363 )
364 .with_timestamp(Some(*TIMESTAMP))]),
365 );
366 }
367
368 #[test]
369 fn test_gauge_minimalistic() {
370 let exp = r"
371 metric_without_timestamp_and_labels 12.47 1612411506789
372 ";
373
374 assert_event_data_eq!(
375 events_to_metrics(parse_text(exp)),
376 Ok(vec![Metric::new(
377 "metric_without_timestamp_and_labels",
378 MetricKind::Absolute,
379 MetricValue::Gauge { value: 12.47 },
380 )
381 .with_timestamp(Some(*TIMESTAMP))]),
382 );
383 }
384
385 #[test]
386 fn test_gauge_empty_labels() {
387 let exp = r"
388 no_labels{} 3 1612411506789
389 ";
390
391 assert_event_data_eq!(
392 events_to_metrics(parse_text(exp)),
393 Ok(vec![Metric::new(
394 "no_labels",
395 MetricKind::Absolute,
396 MetricValue::Gauge { value: 3.0 },
397 )
398 .with_timestamp(Some(*TIMESTAMP))]),
399 );
400 }
401
402 #[test]
403 fn test_gauge_minimalistic_escaped() {
404 let exp = r#"
405 msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9 1612411506789
406 "#;
407
408 assert_event_data_eq!(
409 events_to_metrics(parse_text(exp)),
410 Ok(vec![Metric::new(
411 "msdos_file_access_time_seconds",
412 MetricKind::Absolute,
413 MetricValue::Gauge {
414 value: 1458255915.0
415 },
416 )
417 .with_tags(Some(metric_tags!(
418 "path" => "C:\\DIR\\FILE.TXT",
419 "error" => "Cannot find file:\n\"FILE.TXT\"",
420 )))
421 .with_timestamp(Some(*TIMESTAMP))]),
422 );
423 }
424
425 #[test]
426 fn test_tag_value_contain_bracket() {
427 let exp = r#"
428 # HELP name counter
429 # TYPE name counter
430 name{tag="}"} 0 1612411506789
431 "#;
432 assert_event_data_eq!(
433 events_to_metrics(parse_text(exp)),
434 Ok(vec![Metric::new(
435 "name",
436 MetricKind::Absolute,
437 MetricValue::Counter { value: 0.0 },
438 )
439 .with_tags(Some(metric_tags! { "tag" => "}" }))
440 .with_timestamp(Some(*TIMESTAMP))]),
441 );
442 }
443
444 #[test]
445 fn test_parse_tag_value_contain_comma() {
446 let exp = r#"
447 # HELP name counter
448 # TYPE name counter
449 name{tag="a,b"} 0 1612411506789
450 "#;
451 assert_event_data_eq!(
452 events_to_metrics(parse_text(exp)),
453 Ok(vec![Metric::new(
454 "name",
455 MetricKind::Absolute,
456 MetricValue::Counter { value: 0.0 },
457 )
458 .with_tags(Some(metric_tags! { "tag" => "a,b" }))
459 .with_timestamp(Some(*TIMESTAMP))]),
460 );
461 }
462
463 #[test]
464 fn test_parse_tag_escaping() {
465 let exp = r#"
466 # HELP name counter
467 # TYPE name counter
468 name{tag="\\n"} 0 1612411506789
469 "#;
470 assert_event_data_eq!(
471 events_to_metrics(parse_text(exp)),
472 Ok(vec![Metric::new(
473 "name",
474 MetricKind::Absolute,
475 MetricValue::Counter { value: 0.0 },
476 )
477 .with_tags(Some(metric_tags! { "tag" => "\\n" }))
478 .with_timestamp(Some(*TIMESTAMP))]),
479 );
480 }
481
482 #[test]
483 fn test_parse_tag_dont_trim_value() {
484 let exp = r#"
485 # HELP name counter
486 # TYPE name counter
487 name{tag=" * "} 0 1612411506789
488 "#;
489 assert_event_data_eq!(
490 events_to_metrics(parse_text(exp)),
491 Ok(vec![Metric::new(
492 "name",
493 MetricKind::Absolute,
494 MetricValue::Counter { value: 0.0 },
495 )
496 .with_tags(Some(metric_tags! { "tag" => " * " }))
497 .with_timestamp(Some(*TIMESTAMP))]),
498 );
499 }
500
501 #[test]
502 fn test_parse_tag_value_containing_equals() {
503 let exp = r#"
504 telemetry_scrape_size_bytes_count{registry="default",content_type="text/plain; version=0.0.4"} 1890 1612411506789
505 "#;
506
507 assert_event_data_eq!(
508 events_to_metrics(parse_text(exp)),
509 Ok(vec![Metric::new(
510 "telemetry_scrape_size_bytes_count",
511 MetricKind::Absolute,
512 MetricValue::Gauge { value: 1890.0 },
513 )
514 .with_tags(Some(metric_tags!( "registry" => "default",
515 "content_type" => "text/plain; version=0.0.4" )))
516 .with_timestamp(Some(*TIMESTAMP))]),
517 );
518 }
519
520 #[test]
521 fn test_parse_tag_error_no_value() {
522 let exp = r#"
523 telemetry_scrape_size_bytes_count{registry="default",content_type} 1890 1612411506789
524 "#;
525
526 assert!(events_to_metrics(parse_text(exp)).is_err());
527 }
528
529 #[test]
530 fn test_parse_tag_error_equals_empty_value() {
531 let exp = r#"
532 telemetry_scrape_size_bytes_count{registry="default",content_type=} 1890 1612411506789
533 "#;
534
535 assert!(events_to_metrics(parse_text(exp)).is_err());
536 }
537
538 #[test]
539 fn test_gauge_weird_timestamp() {
540 let exp = r#"
541 something_weird{problem="division by zero"} +Inf -3982045000
542 "#;
543
544 assert_event_data_eq!(
545 events_to_metrics(parse_text(exp)),
546 Ok(vec![Metric::new(
547 "something_weird",
548 MetricKind::Absolute,
549 MetricValue::Gauge {
550 value: f64::INFINITY
551 },
552 )
553 .with_timestamp(Utc.timestamp_opt(-3982045, 0).latest())
554 .with_tags(Some(
555 metric_tags!("problem" => "division by zero")
556 ))]),
557 );
558 }
559
560 #[test]
561 fn test_gauge_tabs() {
562 let exp = r#"
563 # TYPE latency gauge
564 latency{env="production"} 1.0 1395066363000
565 latency{env="testing"} 2.0 1395066363000
566 "#;
567
568 assert_event_data_eq!(
569 events_to_metrics(parse_text(exp)),
570 Ok(vec![
571 Metric::new(
572 "latency",
573 MetricKind::Absolute,
574 MetricValue::Gauge { value: 1.0 },
575 )
576 .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
577 .with_tags(Some(metric_tags!("env" => "production"))),
578 Metric::new(
579 "latency",
580 MetricKind::Absolute,
581 MetricValue::Gauge { value: 2.0 },
582 )
583 .with_timestamp(Utc.timestamp_opt(1395066363, 0).latest())
584 .with_tags(Some(metric_tags!("env" => "testing")))
585 ]),
586 );
587 }
588
589 #[test]
590 fn test_mixed() {
591 let exp = r"
592 # TYPE uptime counter
593 uptime 123.0 1612411506789
594 # TYPE temperature gauge
595 temperature -1.5 1612411506789
596 # TYPE launch_count counter
597 launch_count 10.0 1612411506789
598 ";
599
600 assert_event_data_eq!(
601 events_to_metrics(parse_text(exp)),
602 Ok(vec![
603 Metric::new(
604 "uptime",
605 MetricKind::Absolute,
606 MetricValue::Counter { value: 123.0 },
607 )
608 .with_timestamp(Some(*TIMESTAMP)),
609 Metric::new(
610 "temperature",
611 MetricKind::Absolute,
612 MetricValue::Gauge { value: -1.5 },
613 )
614 .with_timestamp(Some(*TIMESTAMP)),
615 Metric::new(
616 "launch_count",
617 MetricKind::Absolute,
618 MetricValue::Counter { value: 10.0 },
619 )
620 .with_timestamp(Some(*TIMESTAMP))
621 ]),
622 );
623 }
624
625 #[test]
626 fn test_no_value() {
627 let exp = r#"
628 # TYPE latency counter
629 latency{env="production"}
630 "#;
631
632 assert!(events_to_metrics(parse_text(exp)).is_err());
633 }
634
635 #[test]
636 fn test_no_name() {
637 let exp = r"
638 # TYPE uptime counter
639 123.0 1612411506789
640 ";
641
642 assert!(events_to_metrics(parse_text(exp)).is_err());
643 }
644
645 #[test]
646 fn test_mixed_and_loosely_typed() {
647 let exp = r"
648 # TYPE uptime counter
649 uptime 123.0 1612411506789
650 last_downtime 4.0 1612411506789
651 # TYPE temperature gauge
652 temperature -1.5 1612411506789
653 temperature_7_days_average 0.1 1612411506789
654 ";
655
656 assert_event_data_eq!(
657 events_to_metrics(parse_text(exp)),
658 Ok(vec![
659 Metric::new(
660 "uptime",
661 MetricKind::Absolute,
662 MetricValue::Counter { value: 123.0 },
663 )
664 .with_timestamp(Some(*TIMESTAMP)),
665 Metric::new(
666 "last_downtime",
667 MetricKind::Absolute,
668 MetricValue::Gauge { value: 4.0 },
669 )
670 .with_timestamp(Some(*TIMESTAMP)),
671 Metric::new(
672 "temperature",
673 MetricKind::Absolute,
674 MetricValue::Gauge { value: -1.5 },
675 )
676 .with_timestamp(Some(*TIMESTAMP)),
677 Metric::new(
678 "temperature_7_days_average",
679 MetricKind::Absolute,
680 MetricValue::Gauge { value: 0.1 },
681 )
682 .with_timestamp(Some(*TIMESTAMP))
683 ]),
684 );
685 }
686
687 #[test]
688 fn test_histogram() {
689 let exp = r#"
690 # HELP http_request_duration_seconds A histogram of the request duration.
691 # TYPE http_request_duration_seconds histogram
692 http_request_duration_seconds_bucket{le="0.05"} 24054 1612411506789
693 http_request_duration_seconds_bucket{le="0.1"} 33444 1612411506789
694 http_request_duration_seconds_bucket{le="0.2"} 100392 1612411506789
695 http_request_duration_seconds_bucket{le="0.5"} 129389 1612411506789
696 http_request_duration_seconds_bucket{le="1"} 133988 1612411506789
697 http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411506789
698 http_request_duration_seconds_sum 53423 1612411506789
699 http_request_duration_seconds_count 144320 1612411506789
700 "#;
701
702 assert_event_data_eq!(
703 events_to_metrics(parse_text(exp)),
704 Ok(vec![Metric::new(
705 "http_request_duration_seconds",
706 MetricKind::Absolute,
707 MetricValue::AggregatedHistogram {
708 buckets: vector_lib::buckets![
709 0.05 => 24054, 0.1 => 9390, 0.2 => 66948, 0.5 => 28997, 1.0 => 4599
710 ],
711 count: 144320,
712 sum: 53423.0,
713 },
714 )
715 .with_timestamp(Some(*TIMESTAMP))]),
716 );
717 }
718
719 #[test]
720 fn test_histogram_out_of_order() {
721 let exp = r#"
722 # HELP duration A histogram of the request duration.
723 # TYPE duration histogram
724 duration_bucket{le="+Inf"} 144320 1612411506789
725 duration_bucket{le="1"} 133988 1612411506789
726 duration_sum 53423 1612411506789
727 duration_count 144320 1612411506789
728 "#;
729
730 assert_event_data_eq!(
731 events_to_metrics(parse_text(exp)),
732 Ok(vec![Metric::new(
733 "duration",
734 MetricKind::Absolute,
735 MetricValue::AggregatedHistogram {
736 buckets: vector_lib::buckets![1.0 => 133988],
737 count: 144320,
738 sum: 53423.0,
739 },
740 )
741 .with_timestamp(Some(*TIMESTAMP))]),
742 );
743 }
744
745 #[test]
746 fn test_histogram_backward_values() {
747 let exp = r#"
748 # HELP duration A histogram of the request duration.
749 # TYPE duration histogram
750 duration_bucket{le="1"} 2000 1612411506789
751 duration_bucket{le="10"} 1000 1612411506789
752 duration_bucket{le="+Inf"} 2000 1612411506789
753 duration_sum 2000 1612411506789
754 duration_count 2000 1612411506789
755 "#;
756
757 assert_event_data_eq!(
758 events_to_metrics(parse_text(exp)),
759 Ok(vec![Metric::new(
760 "duration",
761 MetricKind::Absolute,
762 MetricValue::AggregatedHistogram {
763 buckets: vector_lib::buckets![1.0 => 2000, 10.0 => 0],
764 count: 2000,
765 sum: 2000.0,
766 },
767 )
768 .with_timestamp(Some(*TIMESTAMP))]),
769 );
770 }
771
772 #[test]
773 fn test_histogram_with_labels() {
774 let exp = r#"
775 # HELP gitlab_runner_job_duration_seconds Histogram of job durations
776 # TYPE gitlab_runner_job_duration_seconds histogram
777 gitlab_runner_job_duration_seconds_bucket{runner="z",le="30"} 327 1612411506789
778 gitlab_runner_job_duration_seconds_bucket{runner="z",le="60"} 474 1612411506789
779 gitlab_runner_job_duration_seconds_bucket{runner="z",le="300"} 535 1612411506789
780 gitlab_runner_job_duration_seconds_bucket{runner="z",le="600"} 536 1612411506789
781 gitlab_runner_job_duration_seconds_bucket{runner="z",le="1800"} 536 1612411506789
782 gitlab_runner_job_duration_seconds_bucket{runner="z",le="3600"} 536 1612411506789
783 gitlab_runner_job_duration_seconds_bucket{runner="z",le="7200"} 536 1612411506789
784 gitlab_runner_job_duration_seconds_bucket{runner="z",le="10800"} 536 1612411506789
785 gitlab_runner_job_duration_seconds_bucket{runner="z",le="18000"} 536 1612411506789
786 gitlab_runner_job_duration_seconds_bucket{runner="z",le="36000"} 536 1612411506789
787 gitlab_runner_job_duration_seconds_bucket{runner="z",le="+Inf"} 536 1612411506789
788 gitlab_runner_job_duration_seconds_sum{runner="z"} 19690.129384881966 1612411506789
789 gitlab_runner_job_duration_seconds_count{runner="z"} 536 1612411506789
790 gitlab_runner_job_duration_seconds_bucket{runner="x",le="30"} 1 1612411506789
791 gitlab_runner_job_duration_seconds_bucket{runner="x",le="60"} 1 1612411506789
792 gitlab_runner_job_duration_seconds_bucket{runner="x",le="300"} 1 1612411506789
793 gitlab_runner_job_duration_seconds_bucket{runner="x",le="600"} 1 1612411506789
794 gitlab_runner_job_duration_seconds_bucket{runner="x",le="1800"} 1 1612411506789
795 gitlab_runner_job_duration_seconds_bucket{runner="x",le="3600"} 1 1612411506789
796 gitlab_runner_job_duration_seconds_bucket{runner="x",le="7200"} 1 1612411506789
797 gitlab_runner_job_duration_seconds_bucket{runner="x",le="10800"} 1 1612411506789
798 gitlab_runner_job_duration_seconds_bucket{runner="x",le="18000"} 1 1612411506789
799 gitlab_runner_job_duration_seconds_bucket{runner="x",le="36000"} 1 1612411506789
800 gitlab_runner_job_duration_seconds_bucket{runner="x",le="+Inf"} 1 1612411506789
801 gitlab_runner_job_duration_seconds_sum{runner="x"} 28.975436316 1612411506789
802 gitlab_runner_job_duration_seconds_count{runner="x"} 1 1612411506789
803 gitlab_runner_job_duration_seconds_bucket{runner="y",le="30"} 285 1612411506789
804 gitlab_runner_job_duration_seconds_bucket{runner="y",le="60"} 1165 1612411506789
805 gitlab_runner_job_duration_seconds_bucket{runner="y",le="300"} 3071 1612411506789
806 gitlab_runner_job_duration_seconds_bucket{runner="y",le="600"} 3151 1612411506789
807 gitlab_runner_job_duration_seconds_bucket{runner="y",le="1800"} 3252 1612411506789
808 gitlab_runner_job_duration_seconds_bucket{runner="y",le="3600"} 3255 1612411506789
809 gitlab_runner_job_duration_seconds_bucket{runner="y",le="7200"} 3255 1612411506789
810 gitlab_runner_job_duration_seconds_bucket{runner="y",le="10800"} 3255 1612411506789
811 gitlab_runner_job_duration_seconds_bucket{runner="y",le="18000"} 3255 1612411506789
812 gitlab_runner_job_duration_seconds_bucket{runner="y",le="36000"} 3255 1612411506789
813 gitlab_runner_job_duration_seconds_bucket{runner="y",le="+Inf"} 3255 1612411506789
814 gitlab_runner_job_duration_seconds_sum{runner="y"} 381111.7498891335 1612411506789
815 gitlab_runner_job_duration_seconds_count{runner="y"} 3255 1612411506789
816 "#;
817
818 assert_event_data_eq!(
819 events_to_metrics(parse_text(exp)),
820 Ok(vec![
821 Metric::new(
822 "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
823 buckets: vector_lib::buckets![
824 30.0 => 327,
825 60.0 => 147,
826 300.0 => 61,
827 600.0 => 1,
828 1800.0 => 0,
829 3600.0 => 0,
830 7200.0 => 0,
831 10800.0 => 0,
832 18000.0 => 0,
833 36000.0 => 0
834 ],
835 count: 536,
836 sum: 19690.129384881966,
837 },
838 )
839 .with_tags(Some(metric_tags!("runner" => "z")))
840 .with_timestamp(Some(*TIMESTAMP)),
841 Metric::new(
842 "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
843 buckets: vector_lib::buckets![
844 30.0 => 1,
845 60.0 => 0,
846 300.0 => 0,
847 600.0 => 0,
848 1800.0 => 0,
849 3600.0 => 0,
850 7200.0 => 0,
851 10800.0 => 0,
852 18000.0 => 0,
853 36000.0 => 0
854 ],
855 count: 1,
856 sum: 28.975436316,
857 },
858 )
859 .with_tags(Some(metric_tags!("runner" => "x")))
860 .with_timestamp(Some(*TIMESTAMP)),
861 Metric::new(
862 "gitlab_runner_job_duration_seconds", MetricKind::Absolute, MetricValue::AggregatedHistogram {
863 buckets: vector_lib::buckets![
864 30.0 => 285, 60.0 => 880, 300.0 => 1906, 600.0 => 80, 1800.0 => 101, 3600.0 => 3,
865 7200.0 => 0, 10800.0 => 0, 18000.0 => 0, 36000.0 => 0
866 ],
867 count: 3255,
868 sum: 381111.7498891335,
869 },
870 )
871 .with_tags(Some(metric_tags!("runner" => "y")))
872 .with_timestamp(Some(*TIMESTAMP))
873 ]),
874 );
875 }
876
877 #[test]
878 fn test_summary() {
879 let exp = r#"
880 # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
881 # TYPE rpc_duration_seconds summary
882 rpc_duration_seconds{service="a",quantile="0.01"} 3102 1612411506789
883 rpc_duration_seconds{service="a",quantile="0.05"} 3272 1612411506789
884 rpc_duration_seconds{service="a",quantile="0.5"} 4773 1612411506789
885 rpc_duration_seconds{service="a",quantile="0.9"} 9001 1612411506789
886 rpc_duration_seconds{service="a",quantile="0.99"} 76656 1612411506789
887 rpc_duration_seconds_sum{service="a"} 1.7560473e+07 1612411506789
888 rpc_duration_seconds_count{service="a"} 2693 1612411506789
889 # HELP go_gc_duration_seconds A summary of the GC invocation durations.
890 # TYPE go_gc_duration_seconds summary
891 go_gc_duration_seconds{quantile="0"} 0.009460965 1612411506789
892 go_gc_duration_seconds{quantile="0.25"} 0.009793382 1612411506789
893 go_gc_duration_seconds{quantile="0.5"} 0.009870205 1612411506789
894 go_gc_duration_seconds{quantile="0.75"} 0.01001838 1612411506789
895 go_gc_duration_seconds{quantile="1"} 0.018827136 1612411506789
896 go_gc_duration_seconds_sum 4668.551713715 1612411506789
897 go_gc_duration_seconds_count 602767 1612411506789
898 "#;
899
900 assert_event_data_eq!(
901 events_to_metrics(parse_text(exp)),
902 Ok(vec![
903 Metric::new(
904 "rpc_duration_seconds",
905 MetricKind::Absolute,
906 MetricValue::AggregatedSummary {
907 quantiles: vector_lib::quantiles![
908 0.01 => 3102.0,
909 0.05 => 3272.0,
910 0.5 => 4773.0,
911 0.9 => 9001.0,
912 0.99 => 76656.0
913 ],
914 count: 2693,
915 sum: 1.7560473e+07,
916 },
917 )
918 .with_tags(Some(metric_tags!("service" => "a")))
919 .with_timestamp(Some(*TIMESTAMP)),
920 Metric::new(
921 "go_gc_duration_seconds",
922 MetricKind::Absolute,
923 MetricValue::AggregatedSummary {
924 quantiles: vector_lib::quantiles![
925 0.0 => 0.009460965,
926 0.25 => 0.009793382,
927 0.5 => 0.009870205,
928 0.75 => 0.01001838,
929 1.0 => 0.018827136
930 ],
931 count: 602767,
932 sum: 4668.551713715,
933 },
934 )
935 .with_timestamp(Some(*TIMESTAMP)),
936 ]),
937 );
938 }
939
940 #[test]
942 fn test_nginx() {
943 let exp = r#"
944 # HELP nginx_server_bytes request/response bytes
945 # TYPE nginx_server_bytes counter
946 nginx_server_bytes{direction="in",host="*"} 263719
947 nginx_server_bytes{direction="in",host="_"} 255061
948 nginx_server_bytes{direction="in",host="nginx-vts-status"} 8658
949 nginx_server_bytes{direction="out",host="*"} 944199
950 nginx_server_bytes{direction="out",host="_"} 360775
951 nginx_server_bytes{direction="out",host="nginx-vts-status"} 583424
952 # HELP nginx_server_cache cache counter
953 # TYPE nginx_server_cache counter
954 nginx_server_cache{host="*",status="bypass"} 0
955 nginx_server_cache{host="*",status="expired"} 0
956 nginx_server_cache{host="*",status="hit"} 0
957 nginx_server_cache{host="*",status="miss"} 0
958 nginx_server_cache{host="*",status="revalidated"} 0
959 nginx_server_cache{host="*",status="scarce"} 0
960 "#;
961
962 let now = Utc::now();
963 let result = events_to_metrics(parse_text(exp)).expect("Parsing failed");
964 let result: Vec<_> = result
966 .into_iter()
967 .map(|metric| {
968 assert!(metric.timestamp().expect("Missing timestamp") >= now);
969 metric.with_timestamp(Some(*TIMESTAMP))
970 })
971 .collect();
972
973 assert_event_data_eq!(
974 result,
975 vec![
976 Metric::new(
977 "nginx_server_bytes",
978 MetricKind::Absolute,
979 MetricValue::Counter { value: 263719.0 },
980 )
981 .with_tags(Some(metric_tags! { "direction" => "in", "host" => "*" }))
982 .with_timestamp(Some(*TIMESTAMP)),
983 Metric::new(
984 "nginx_server_bytes",
985 MetricKind::Absolute,
986 MetricValue::Counter { value: 255061.0 },
987 )
988 .with_tags(Some(metric_tags! { "direction" => "in", "host" => "_" }))
989 .with_timestamp(Some(*TIMESTAMP)),
990 Metric::new(
991 "nginx_server_bytes",
992 MetricKind::Absolute,
993 MetricValue::Counter { value: 8658.0 },
994 )
995 .with_tags(Some(
996 metric_tags! { "direction" => "in", "host" => "nginx-vts-status" }
997 ))
998 .with_timestamp(Some(*TIMESTAMP)),
999 Metric::new(
1000 "nginx_server_bytes",
1001 MetricKind::Absolute,
1002 MetricValue::Counter { value: 944199.0 },
1003 )
1004 .with_tags(Some(metric_tags! { "direction" => "out", "host" => "*" }))
1005 .with_timestamp(Some(*TIMESTAMP)),
1006 Metric::new(
1007 "nginx_server_bytes",
1008 MetricKind::Absolute,
1009 MetricValue::Counter { value: 360775.0 },
1010 )
1011 .with_tags(Some(metric_tags! { "direction" => "out", "host" => "_" }))
1012 .with_timestamp(Some(*TIMESTAMP)),
1013 Metric::new(
1014 "nginx_server_bytes",
1015 MetricKind::Absolute,
1016 MetricValue::Counter { value: 583424.0 },
1017 )
1018 .with_tags(Some(
1019 metric_tags! { "direction" => "out", "host" => "nginx-vts-status" }
1020 ))
1021 .with_timestamp(Some(*TIMESTAMP)),
1022 Metric::new(
1023 "nginx_server_cache",
1024 MetricKind::Absolute,
1025 MetricValue::Counter { value: 0.0 },
1026 )
1027 .with_tags(Some(metric_tags! { "host" => "*", "status" => "bypass" }))
1028 .with_timestamp(Some(*TIMESTAMP)),
1029 Metric::new(
1030 "nginx_server_cache",
1031 MetricKind::Absolute,
1032 MetricValue::Counter { value: 0.0 },
1033 )
1034 .with_tags(Some(metric_tags! { "host" => "*", "status" => "expired" }))
1035 .with_timestamp(Some(*TIMESTAMP)),
1036 Metric::new(
1037 "nginx_server_cache",
1038 MetricKind::Absolute,
1039 MetricValue::Counter { value: 0.0 },
1040 )
1041 .with_tags(Some(metric_tags! { "host" => "*", "status" => "hit" }))
1042 .with_timestamp(Some(*TIMESTAMP)),
1043 Metric::new(
1044 "nginx_server_cache",
1045 MetricKind::Absolute,
1046 MetricValue::Counter { value: 0.0 },
1047 )
1048 .with_tags(Some(metric_tags! { "host" => "*", "status" => "miss" }))
1049 .with_timestamp(Some(*TIMESTAMP)),
1050 Metric::new(
1051 "nginx_server_cache",
1052 MetricKind::Absolute,
1053 MetricValue::Counter { value: 0.0 },
1054 )
1055 .with_tags(Some(
1056 metric_tags! { "host" => "*", "status" => "revalidated" }
1057 ))
1058 .with_timestamp(Some(*TIMESTAMP)),
1059 Metric::new(
1060 "nginx_server_cache",
1061 MetricKind::Absolute,
1062 MetricValue::Counter { value: 0.0 },
1063 )
1064 .with_tags(Some(metric_tags! { "host" => "*", "status" => "scarce" }))
1065 .with_timestamp(Some(*TIMESTAMP))
1066 ]
1067 );
1068 }
1069
1070 #[test]
1071 fn test_overrides_nothing_overwritten() {
1072 let exp = r#"
1073 # TYPE jobs_total counter
1074 # HELP jobs_total Total number of jobs
1075 jobs_total{type="a"} 1.0 1612411506789
1076 "#;
1077
1078 assert_event_data_eq!(
1079 events_to_metrics(parse_text_with_overrides(exp, vec![], false)),
1080 Ok(vec![Metric::new(
1081 "jobs_total",
1082 MetricKind::Absolute,
1083 MetricValue::Counter { value: 1.0 },
1084 )
1085 .with_tags(Some(metric_tags! { "type" => "a" }))
1086 .with_timestamp(Some(*TIMESTAMP))]),
1087 );
1088 }
1089
1090 #[test]
1091 fn test_overrides_label_overwritten() {
1092 let exp = r#"
1093 # TYPE jobs_total counter
1094 # HELP jobs_total Total number of jobs
1095 jobs_total{type="a"} 1.0 1612411506789
1096 "#;
1097
1098 assert_event_data_eq!(
1099 events_to_metrics(parse_text_with_overrides(
1100 exp,
1101 vec![("type".to_owned(), "b".to_owned())],
1102 false
1103 )),
1104 Ok(vec![Metric::new(
1105 "jobs_total",
1106 MetricKind::Absolute,
1107 MetricValue::Counter { value: 1.0 },
1108 )
1109 .with_tags(Some(metric_tags! { "type" => "b" }))
1110 .with_timestamp(Some(*TIMESTAMP))]),
1111 );
1112 }
1113
1114 #[test]
1117 fn test_overrides_last_value_preferred() {
1118 let exp = r#"
1119 # TYPE jobs_total counter
1120 # HELP jobs_total Total number of jobs
1121 jobs_total{type="a"} 1.0 1612411506789
1122 "#;
1123
1124 assert_event_data_eq!(
1125 events_to_metrics(parse_text_with_overrides(
1126 exp,
1127 vec![
1128 ("type".to_owned(), "b".to_owned()),
1129 ("type".to_owned(), "c".to_owned())
1130 ],
1131 false
1132 )),
1133 Ok(vec![Metric::new(
1134 "jobs_total",
1135 MetricKind::Absolute,
1136 MetricValue::Counter { value: 1.0 },
1137 )
1138 .with_tags(Some(metric_tags! { "type" => "c" }))
1139 .with_timestamp(Some(*TIMESTAMP))]),
1140 );
1141 }
1142
1143 #[test]
1144 fn test_aggregation_enabled_only_aggregates_counter_and_histogram() {
1145 let exp = r#"
1146 # TYPE jobs_total counter
1147 # HELP jobs_total Total number of jobs
1148 jobs_total{type="a"} 1.0 1612411506789
1149 # TYPE jobs_current gauge
1150 # HELP jobs_current Current number of jobs
1151 jobs_current{type="a"} 5.0 1612411506789
1152 # TYPE jobs_distribution histogram
1153 # HELP jobs_distribution Distribution of jobs
1154 jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
1155 jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
1156 jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
1157 jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
1158 jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
1159 jobs_distribution_sum{type="a"} 8.0 1612411506789
1160 jobs_distribution_count{type="a"} 1.0 1612411506789
1161 # TYPE jobs_summary summary
1162 # HELP jobs_summary Summary of jobs
1163 jobs_summary_sum{type="a"} 8.0 1612411506789
1164 jobs_summary_count{type="a"} 1.0 1612411506789
1165 "#;
1166
1167 assert_event_data_eq!(
1168 events_to_metrics(parse_text_with_overrides(exp, vec![], true)),
1169 Ok(vec![
1170 Metric::new(
1171 "jobs_total",
1172 MetricKind::Incremental,
1173 MetricValue::Counter { value: 1.0 },
1174 )
1175 .with_tags(Some(metric_tags! { "type" => "a" }))
1176 .with_timestamp(Some(*TIMESTAMP)),
1177 Metric::new(
1178 "jobs_current",
1179 MetricKind::Absolute,
1180 MetricValue::Gauge { value: 5.0 },
1181 )
1182 .with_tags(Some(metric_tags! { "type" => "a" }))
1183 .with_timestamp(Some(*TIMESTAMP)),
1184 Metric::new(
1185 "jobs_distribution",
1186 MetricKind::Incremental,
1187 MetricValue::AggregatedHistogram {
1188 buckets: vector_lib::buckets![
1189 1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
1190 ],
1191 count: 1,
1192 sum: 8.0,
1193 },
1194 )
1195 .with_tags(Some(metric_tags! { "type" => "a" }))
1196 .with_timestamp(Some(*TIMESTAMP)),
1197 Metric::new(
1198 "jobs_summary",
1199 MetricKind::Absolute,
1200 MetricValue::AggregatedSummary {
1201 quantiles: vector_lib::quantiles![],
1202 count: 1,
1203 sum: 8.0,
1204 },
1205 )
1206 .with_tags(Some(metric_tags! { "type" => "a" }))
1207 .with_timestamp(Some(*TIMESTAMP)),
1208 ]),
1209 );
1210 }
1211
1212 #[test]
1213 fn test_aggregation_disabled_all_absolute() {
1214 let exp = r#"
1215 # TYPE jobs_total counter
1216 # HELP jobs_total Total number of jobs
1217 jobs_total{type="a"} 1.0 1612411506789
1218 # TYPE jobs_current gauge
1219 # HELP jobs_current Current number of jobs
1220 jobs_current{type="a"} 5.0 1612411506789
1221 # TYPE jobs_distribution histogram
1222 # HELP jobs_distribution Distribution of jobs
1223 jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
1224 jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
1225 jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
1226 jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
1227 jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
1228 jobs_distribution_sum{type="a"} 8.0 1612411506789
1229 jobs_distribution_count{type="a"} 1.0 1612411506789
1230 # TYPE jobs_summary summary
1231 # HELP jobs_summary Summary of jobs
1232 jobs_summary_sum{type="a"} 8.0 1612411506789
1233 jobs_summary_count{type="a"} 1.0 1612411506789
1234 "#;
1235
1236 assert_event_data_eq!(
1237 events_to_metrics(parse_text_with_overrides(exp, vec![], false)),
1238 Ok(vec![
1239 Metric::new(
1240 "jobs_total",
1241 MetricKind::Absolute,
1242 MetricValue::Counter { value: 1.0 },
1243 )
1244 .with_tags(Some(metric_tags! { "type" => "a" }))
1245 .with_timestamp(Some(*TIMESTAMP)),
1246 Metric::new(
1247 "jobs_current",
1248 MetricKind::Absolute,
1249 MetricValue::Gauge { value: 5.0 },
1250 )
1251 .with_tags(Some(metric_tags! { "type" => "a" }))
1252 .with_timestamp(Some(*TIMESTAMP)),
1253 Metric::new(
1254 "jobs_distribution",
1255 MetricKind::Absolute,
1256 MetricValue::AggregatedHistogram {
1257 buckets: vector_lib::buckets![
1258 1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
1259 ],
1260 count: 1,
1261 sum: 8.0,
1262 },
1263 )
1264 .with_tags(Some(metric_tags! { "type" => "a" }))
1265 .with_timestamp(Some(*TIMESTAMP)),
1266 Metric::new(
1267 "jobs_summary",
1268 MetricKind::Absolute,
1269 MetricValue::AggregatedSummary {
1270 quantiles: vector_lib::quantiles![],
1271 count: 1,
1272 sum: 8.0,
1273 },
1274 )
1275 .with_tags(Some(metric_tags! { "type" => "a" }))
1276 .with_timestamp(Some(*TIMESTAMP)),
1277 ]),
1278 );
1279 }
1280}