vector/sinks/prometheus/
collector.rs

1use std::{collections::BTreeMap, fmt::Write as _};
2
3use chrono::Utc;
4use indexmap::map::IndexMap;
5use vector_lib::{
6    event::metric::{MetricSketch, MetricTags, Quantile, samples_to_buckets},
7    prometheus::parser::{METRIC_NAME_LABEL, proto},
8};
9
10use crate::{
11    event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
12    sinks::util::{encode_namespace, statistic::DistributionStatistic},
13};
14
15pub(super) trait MetricCollector {
16    type Output;
17
18    fn new() -> Self;
19
20    fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue);
21
22    fn emit_value(
23        &mut self,
24        timestamp_millis: Option<i64>,
25        name: &str,
26        suffix: &str,
27        value: f64,
28        tags: Option<&MetricTags>,
29        extra: Option<(&str, String)>,
30    );
31
32    fn finish(self) -> Self::Output;
33
34    fn encode_metric(
35        &mut self,
36        default_namespace: Option<&str>,
37        buckets: &[f64],
38        quantiles: &[f64],
39        metric: &Metric,
40    ) {
41        let name = encode_namespace(metric.namespace().or(default_namespace), '_', metric.name());
42        let name = &name;
43        let timestamp = metric.timestamp().map(|t| t.timestamp_millis());
44
45        if metric.kind() == MetricKind::Absolute {
46            let tags = metric.tags();
47            self.emit_metadata(metric.name(), name, metric.value());
48
49            match metric.value() {
50                MetricValue::Counter { value } => {
51                    self.emit_value(timestamp, name, "", *value, tags, None);
52                }
53                MetricValue::Gauge { value } => {
54                    self.emit_value(timestamp, name, "", *value, tags, None);
55                }
56                MetricValue::Set { values } => {
57                    self.emit_value(timestamp, name, "", values.len() as f64, tags, None);
58                }
59                MetricValue::Distribution {
60                    samples,
61                    statistic: StatisticKind::Histogram,
62                } => {
63                    // convert distributions into aggregated histograms
64                    let (buckets, count, sum) = samples_to_buckets(samples, buckets);
65                    let mut bucket_count = 0.0;
66                    for bucket in buckets {
67                        bucket_count += bucket.count as f64;
68                        self.emit_value(
69                            timestamp,
70                            name,
71                            "_bucket",
72                            bucket_count,
73                            tags,
74                            Some(("le", bucket.upper_limit.to_string())),
75                        );
76                    }
77                    self.emit_value(
78                        timestamp,
79                        name,
80                        "_bucket",
81                        count as f64,
82                        tags,
83                        Some(("le", "+Inf".to_string())),
84                    );
85                    self.emit_value(timestamp, name, "_sum", sum, tags, None);
86                    self.emit_value(timestamp, name, "_count", count as f64, tags, None);
87                }
88                MetricValue::Distribution {
89                    samples,
90                    statistic: StatisticKind::Summary,
91                } => {
92                    if let Some(statistic) = DistributionStatistic::from_samples(samples, quantiles)
93                    {
94                        for (q, v) in statistic.quantiles.iter() {
95                            self.emit_value(
96                                timestamp,
97                                name,
98                                "",
99                                *v,
100                                tags,
101                                Some(("quantile", q.to_string())),
102                            );
103                        }
104                        self.emit_value(timestamp, name, "_sum", statistic.sum, tags, None);
105                        self.emit_value(
106                            timestamp,
107                            name,
108                            "_count",
109                            statistic.count as f64,
110                            tags,
111                            None,
112                        );
113                        self.emit_value(timestamp, name, "_min", statistic.min, tags, None);
114                        self.emit_value(timestamp, name, "_max", statistic.max, tags, None);
115                        self.emit_value(timestamp, name, "_avg", statistic.avg, tags, None);
116                    } else {
117                        self.emit_value(timestamp, name, "_sum", 0.0, tags, None);
118                        self.emit_value(timestamp, name, "_count", 0.0, tags, None);
119                    }
120                }
121                MetricValue::AggregatedHistogram {
122                    buckets,
123                    count,
124                    sum,
125                } => {
126                    let mut bucket_count = 0.0;
127                    for bucket in buckets {
128                        // Aggregated histograms are cumulative in Prometheus.  This means that the
129                        // count of values in a bucket should only go up at the upper limit goes up,
130                        // because if you count a value in a specific bucket, by definition, it is
131                        // less than the upper limit of the next bucket.
132                        //
133                        // While most sources should give us buckets that have an "infinity" bucket
134                        // -- everything else that didn't fit in the non-infinity-upper-limit buckets
135                        // -- we can't be sure, so we calculate that bucket ourselves.  This is why
136                        // we make sure to avoid encoding a bucket if its upper limit is already
137                        // infinity, so that we don't double report.
138                        //
139                        // This check will also avoid printing out a bucket whose upper limit is
140                        // negative infinity, because that would make no sense.
141                        if bucket.upper_limit.is_infinite() {
142                            continue;
143                        }
144
145                        bucket_count += bucket.count as f64;
146                        self.emit_value(
147                            timestamp,
148                            name,
149                            "_bucket",
150                            bucket_count,
151                            tags,
152                            Some(("le", bucket.upper_limit.to_string())),
153                        );
154                    }
155                    self.emit_value(
156                        timestamp,
157                        name,
158                        "_bucket",
159                        *count as f64,
160                        tags,
161                        Some(("le", "+Inf".to_string())),
162                    );
163                    self.emit_value(timestamp, name, "_sum", *sum, tags, None);
164                    self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
165                }
166                MetricValue::AggregatedSummary {
167                    quantiles,
168                    count,
169                    sum,
170                } => {
171                    for quantile in quantiles {
172                        self.emit_value(
173                            timestamp,
174                            name,
175                            "",
176                            quantile.value,
177                            tags,
178                            Some(("quantile", quantile.quantile.to_string())),
179                        );
180                    }
181                    self.emit_value(timestamp, name, "_sum", *sum, tags, None);
182                    self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
183                }
184                MetricValue::Sketch { sketch } => match sketch {
185                    MetricSketch::AgentDDSketch(ddsketch) => {
186                        for q in quantiles {
187                            let quantile = Quantile {
188                                quantile: *q,
189                                value: ddsketch.quantile(*q).unwrap_or(0.0),
190                            };
191                            self.emit_value(
192                                timestamp,
193                                name,
194                                "",
195                                quantile.value,
196                                tags,
197                                Some(("quantile", quantile.quantile.to_string())),
198                            );
199                        }
200                        self.emit_value(
201                            timestamp,
202                            name,
203                            "_sum",
204                            ddsketch.sum().unwrap_or(0.0),
205                            tags,
206                            None,
207                        );
208                        self.emit_value(
209                            timestamp,
210                            name,
211                            "_count",
212                            ddsketch.count() as f64,
213                            tags,
214                            None,
215                        );
216                    }
217                },
218            }
219        }
220    }
221}
222
223pub(super) struct StringCollector {
224    // BTreeMap ensures we get sorted output, which whilst not required is preferable
225    processed: BTreeMap<String, String>,
226}
227
228impl MetricCollector for StringCollector {
229    type Output = String;
230
231    fn new() -> Self {
232        let processed = BTreeMap::new();
233        Self { processed }
234    }
235
236    fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
237        if !self.processed.contains_key(fullname) {
238            let header = Self::encode_header(name, fullname, value);
239            self.processed.insert(fullname.into(), header);
240        }
241    }
242
243    fn emit_value(
244        &mut self,
245        timestamp_millis: Option<i64>,
246        name: &str,
247        suffix: &str,
248        value: f64,
249        tags: Option<&MetricTags>,
250        extra: Option<(&str, String)>,
251    ) {
252        let result = self
253            .processed
254            .get_mut(name)
255            .expect("metric metadata not encoded");
256
257        result.push_str(name);
258        result.push_str(suffix);
259        Self::encode_tags(result, tags, extra);
260        _ = match timestamp_millis {
261            None => writeln!(result, " {value}"),
262            Some(timestamp) => writeln!(result, " {value} {timestamp}"),
263        };
264    }
265
266    fn finish(self) -> String {
267        self.processed.into_values().collect()
268    }
269}
270
271impl StringCollector {
272    fn encode_tags(result: &mut String, tags: Option<&MetricTags>, extra: Option<(&str, String)>) {
273        match (tags, extra) {
274            (None, None) => Ok(()),
275            (None, Some(tag)) => write!(result, "{{{}}}", Self::format_tag(tag.0, &tag.1)),
276            (Some(tags), ref tag) => {
277                let mut parts = tags
278                    .iter_single()
279                    .map(|(key, value)| Self::format_tag(key, value))
280                    .collect::<Vec<_>>();
281
282                if let Some((key, value)) = tag {
283                    parts.push(Self::format_tag(key, value))
284                }
285
286                parts.sort();
287                write!(result, "{{{}}}", parts.join(","))
288            }
289        }
290        .ok();
291    }
292
293    fn encode_header(name: &str, fullname: &str, value: &MetricValue) -> String {
294        let r#type = prometheus_metric_type(value).as_str();
295        format!("# HELP {fullname} {name}\n# TYPE {fullname} {type}\n")
296    }
297
298    fn format_tag(key: &str, mut value: &str) -> String {
299        // For most tags, this is just `{KEY}="{VALUE}"` so allocate optimistically
300        let mut result = String::with_capacity(key.len() + value.len() + 3);
301        result.push_str(key);
302        result.push_str("=\"");
303        while let Some(i) = value.find(['\\', '"']) {
304            result.push_str(&value[..i]);
305            result.push('\\');
306            // Ugly but works because we know the character at `i` is ASCII
307            result.push(value.as_bytes()[i] as char);
308            value = &value[i + 1..];
309        }
310        result.push_str(value);
311        result.push('"');
312        result
313    }
314}
315
316type Labels = Vec<proto::Label>;
317
318pub(super) struct TimeSeries {
319    buffer: IndexMap<Labels, Vec<proto::Sample>>,
320    metadata: IndexMap<String, proto::MetricMetadata>,
321    timestamp: Option<i64>,
322}
323
324impl TimeSeries {
325    fn make_labels(
326        tags: Option<&MetricTags>,
327        name: &str,
328        suffix: &str,
329        extra: Option<(&str, String)>,
330    ) -> Labels {
331        // Each Prometheus metric is grouped by its labels, which
332        // contains all the labels from the source metric, plus the name
333        // label for the actual metric name. For convenience below, an
334        // optional extra tag is added.
335        let mut labels = tags.cloned().unwrap_or_default();
336        labels.replace(METRIC_NAME_LABEL.into(), [name, suffix].join(""));
337        if let Some((name, value)) = extra {
338            labels.replace(name.into(), value);
339        }
340
341        // Extract the labels into a vec and sort to produce a
342        // consistent key for the buffer.
343        let mut labels = labels
344            .into_iter_single()
345            .map(|(name, value)| proto::Label { name, value })
346            .collect::<Labels>();
347        labels.sort();
348        labels
349    }
350
351    fn default_timestamp(&mut self) -> i64 {
352        *self
353            .timestamp
354            .get_or_insert_with(|| Utc::now().timestamp_millis())
355    }
356}
357
358impl MetricCollector for TimeSeries {
359    type Output = proto::WriteRequest;
360
361    fn new() -> Self {
362        Self {
363            buffer: Default::default(),
364            metadata: Default::default(),
365            timestamp: None,
366        }
367    }
368
369    fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
370        if !self.metadata.contains_key(name) {
371            let r#type = prometheus_metric_type(value);
372            let metadata = proto::MetricMetadata {
373                r#type: r#type as i32,
374                metric_family_name: fullname.into(),
375                help: name.into(),
376                unit: String::new(),
377            };
378            self.metadata.insert(name.into(), metadata);
379        }
380    }
381
382    fn emit_value(
383        &mut self,
384        timestamp_millis: Option<i64>,
385        name: &str,
386        suffix: &str,
387        value: f64,
388        tags: Option<&MetricTags>,
389        extra: Option<(&str, String)>,
390    ) {
391        let timestamp = timestamp_millis.unwrap_or_else(|| self.default_timestamp());
392        self.buffer
393            .entry(Self::make_labels(tags, name, suffix, extra))
394            .or_default()
395            .push(proto::Sample { value, timestamp });
396    }
397
398    fn finish(self) -> proto::WriteRequest {
399        let timeseries = self
400            .buffer
401            .into_iter()
402            .map(|(labels, samples)| proto::TimeSeries { labels, samples })
403            .collect::<Vec<_>>();
404        let metadata = self
405            .metadata
406            .into_iter()
407            .map(|(_, metadata)| metadata)
408            .collect();
409        proto::WriteRequest {
410            timeseries,
411            metadata,
412        }
413    }
414}
415
416const fn prometheus_metric_type(metric_value: &MetricValue) -> proto::MetricType {
417    use proto::MetricType;
418    match metric_value {
419        MetricValue::Counter { .. } => MetricType::Counter,
420        MetricValue::Gauge { .. } | MetricValue::Set { .. } => MetricType::Gauge,
421        MetricValue::Distribution {
422            statistic: StatisticKind::Histogram,
423            ..
424        } => MetricType::Histogram,
425        MetricValue::Distribution {
426            statistic: StatisticKind::Summary,
427            ..
428        } => MetricType::Summary,
429        MetricValue::AggregatedHistogram { .. } => MetricType::Histogram,
430        MetricValue::AggregatedSummary { .. } => MetricType::Summary,
431        MetricValue::Sketch { .. } => MetricType::Summary,
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use std::collections::BTreeSet;
438
439    use chrono::{DateTime, TimeZone, Timelike};
440    use indoc::indoc;
441    use similar_asserts::assert_eq;
442    use vector_lib::metric_tags;
443
444    use super::{super::default_summary_quantiles, *};
445    use crate::{
446        event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
447        test_util::stats::VariableHistogram,
448    };
449
450    fn encode_one<T: MetricCollector>(
451        default_namespace: Option<&str>,
452        buckets: &[f64],
453        quantiles: &[f64],
454        metric: &Metric,
455    ) -> T::Output {
456        let mut s = T::new();
457        s.encode_metric(default_namespace, buckets, quantiles, metric);
458        s.finish()
459    }
460
461    fn tags() -> MetricTags {
462        metric_tags!("code" => "200")
463    }
464
465    macro_rules! write_request {
466        ( $name:literal, $help:literal, $type:ident
467          [ $(
468              $suffix:literal @ $timestamp:literal = $svalue:literal
469                  [ $( $label:literal => $lvalue:literal ),* ]
470          ),* ]
471        ) => {
472            proto::WriteRequest {
473                timeseries: vec![
474                    $(
475                        proto::TimeSeries {
476                            labels: vec![
477                                proto::Label {
478                                    name: "__name__".into(),
479                                    value: format!("{}{}", $name, $suffix),
480                                },
481                                $(
482                                    proto::Label {
483                                        name: $label.into(),
484                                        value: $lvalue.into(),
485                                    },
486                                )*
487                            ],
488                            samples: vec![ proto::Sample {
489                                value: $svalue,
490                                timestamp: $timestamp,
491                            }],
492                        },
493                    )*
494                ],
495                metadata: vec![proto::MetricMetadata {
496                    r#type: proto::metric_metadata::MetricType::$type as i32,
497                    metric_family_name: $name.into(),
498                    help: $help.into(),
499                    unit: "".into(),
500                }],
501            }
502        };
503    }
504
505    #[test]
506    fn encodes_counter_text() {
507        assert_eq!(
508            encode_counter::<StringCollector>(),
509            indoc! { r#"
510                # HELP vector_hits hits
511                # TYPE vector_hits counter
512                vector_hits{code="200"} 10 1612325106789
513            "#}
514        );
515    }
516
517    #[test]
518    fn encodes_counter_request() {
519        assert_eq!(
520            encode_counter::<TimeSeries>(),
521            write_request!("vector_hits", "hits", Counter ["" @ 1612325106789 = 10.0 ["code" => "200"]])
522        );
523    }
524
525    fn encode_counter<T: MetricCollector>() -> T::Output {
526        let metric = Metric::new(
527            "hits".to_owned(),
528            MetricKind::Absolute,
529            MetricValue::Counter { value: 10.0 },
530        )
531        .with_tags(Some(tags()))
532        .with_timestamp(Some(timestamp()));
533        encode_one::<T>(Some("vector"), &[], &[], &metric)
534    }
535
536    #[test]
537    fn encodes_gauge_text() {
538        assert_eq!(
539            encode_gauge::<StringCollector>(),
540            indoc! { r#"
541                # HELP vector_temperature temperature
542                # TYPE vector_temperature gauge
543                vector_temperature{code="200"} -1.1 1612325106789
544            "#}
545        );
546    }
547
548    #[test]
549    fn encodes_gauge_request() {
550        assert_eq!(
551            encode_gauge::<TimeSeries>(),
552            write_request!("vector_temperature", "temperature", Gauge ["" @ 1612325106789 = -1.1 ["code" => "200"]])
553        );
554    }
555
556    fn encode_gauge<T: MetricCollector>() -> T::Output {
557        let metric = Metric::new(
558            "temperature".to_owned(),
559            MetricKind::Absolute,
560            MetricValue::Gauge { value: -1.1 },
561        )
562        .with_tags(Some(tags()))
563        .with_timestamp(Some(timestamp()));
564        encode_one::<T>(Some("vector"), &[], &[], &metric)
565    }
566
567    #[test]
568    fn encodes_set_text() {
569        assert_eq!(
570            encode_set::<StringCollector>(),
571            indoc! { r"
572                # HELP vector_users users
573                # TYPE vector_users gauge
574                vector_users 1 1612325106789
575            "}
576        );
577    }
578
579    #[test]
580    fn encodes_set_request() {
581        assert_eq!(
582            encode_set::<TimeSeries>(),
583            write_request!("vector_users", "users", Gauge [ "" @ 1612325106789 = 1.0 []])
584        );
585    }
586
587    fn encode_set<T: MetricCollector>() -> T::Output {
588        let metric = Metric::new(
589            "users".to_owned(),
590            MetricKind::Absolute,
591            MetricValue::Set {
592                values: vec!["foo".into()].into_iter().collect(),
593            },
594        )
595        .with_timestamp(Some(timestamp()));
596        encode_one::<T>(Some("vector"), &[], &[], &metric)
597    }
598
599    #[test]
600    fn encodes_expired_set_text() {
601        assert_eq!(
602            encode_expired_set::<StringCollector>(),
603            indoc! {r"
604                # HELP vector_users users
605                # TYPE vector_users gauge
606                vector_users 0 1612325106789
607            "}
608        );
609    }
610
611    #[test]
612    fn encodes_expired_set_request() {
613        assert_eq!(
614            encode_expired_set::<TimeSeries>(),
615            write_request!("vector_users", "users", Gauge ["" @ 1612325106789 = 0.0 []])
616        );
617    }
618
619    fn encode_expired_set<T: MetricCollector>() -> T::Output {
620        let metric = Metric::new(
621            "users".to_owned(),
622            MetricKind::Absolute,
623            MetricValue::Set {
624                values: BTreeSet::new(),
625            },
626        )
627        .with_timestamp(Some(timestamp()));
628        encode_one::<T>(Some("vector"), &[], &[], &metric)
629    }
630
631    #[test]
632    fn encodes_distribution_text() {
633        assert_eq!(
634            encode_distribution::<StringCollector>(),
635            indoc! {r#"
636                # HELP vector_requests requests
637                # TYPE vector_requests histogram
638                vector_requests_bucket{le="0"} 0 1612325106789
639                vector_requests_bucket{le="2.5"} 6 1612325106789
640                vector_requests_bucket{le="5"} 8 1612325106789
641                vector_requests_bucket{le="+Inf"} 8 1612325106789
642                vector_requests_sum 15 1612325106789
643                vector_requests_count 8 1612325106789
644            "#}
645        );
646    }
647
648    #[test]
649    fn encodes_distribution_request() {
650        assert_eq!(
651            encode_distribution::<TimeSeries>(),
652            write_request!(
653                "vector_requests", "requests", Histogram [
654                        "_bucket" @ 1612325106789 = 0.0 ["le" => "0"],
655                        "_bucket" @ 1612325106789 = 6.0 ["le" => "2.5"],
656                        "_bucket" @ 1612325106789 = 8.0 ["le" => "5"],
657                        "_bucket" @ 1612325106789 = 8.0 ["le" => "+Inf"],
658                        "_sum" @ 1612325106789 = 15.0 [],
659                        "_count" @ 1612325106789 = 8.0 []
660                ]
661            )
662        );
663    }
664
665    fn encode_distribution<T: MetricCollector>() -> T::Output {
666        let metric = Metric::new(
667            "requests".to_owned(),
668            MetricKind::Absolute,
669            MetricValue::Distribution {
670                samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
671                statistic: StatisticKind::Histogram,
672            },
673        )
674        .with_timestamp(Some(timestamp()));
675        encode_one::<T>(Some("vector"), &[0.0, 2.5, 5.0], &[], &metric)
676    }
677
678    #[test]
679    fn encodes_histogram_text() {
680        assert_eq!(
681            encode_histogram::<StringCollector>(false),
682            indoc! {r#"
683                # HELP vector_requests requests
684                # TYPE vector_requests histogram
685                vector_requests_bucket{le="1"} 1 1612325106789
686                vector_requests_bucket{le="2.1"} 3 1612325106789
687                vector_requests_bucket{le="3"} 6 1612325106789
688                vector_requests_bucket{le="+Inf"} 6 1612325106789
689                vector_requests_sum 11.5 1612325106789
690                vector_requests_count 6 1612325106789
691            "#}
692        );
693    }
694
695    #[test]
696    fn encodes_histogram_request() {
697        assert_eq!(
698            encode_histogram::<TimeSeries>(false),
699            write_request!(
700                "vector_requests", "requests", Histogram [
701                        "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
702                        "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
703                        "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
704                        "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
705                        "_sum" @ 1612325106789 = 11.5 [],
706                        "_count" @ 1612325106789 = 6.0 []
707                    ]
708            )
709        );
710    }
711
712    #[test]
713    fn encodes_histogram_text_with_extra_infinity_bound() {
714        assert_eq!(
715            encode_histogram::<StringCollector>(true),
716            indoc! {r#"
717                # HELP vector_requests requests
718                # TYPE vector_requests histogram
719                vector_requests_bucket{le="1"} 1 1612325106789
720                vector_requests_bucket{le="2.1"} 3 1612325106789
721                vector_requests_bucket{le="3"} 6 1612325106789
722                vector_requests_bucket{le="+Inf"} 6 1612325106789
723                vector_requests_sum 11.5 1612325106789
724                vector_requests_count 6 1612325106789
725            "#}
726        );
727    }
728
729    #[test]
730    fn encodes_histogram_request_with_extra_infinity_bound() {
731        assert_eq!(
732            encode_histogram::<TimeSeries>(true),
733            write_request!(
734                "vector_requests", "requests", Histogram [
735                        "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
736                        "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
737                        "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
738                        "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
739                        "_sum" @ 1612325106789 = 11.5 [],
740                        "_count" @ 1612325106789 = 6.0 []
741                    ]
742            )
743        );
744    }
745
746    fn encode_histogram<T: MetricCollector>(add_inf_bound: bool) -> T::Output {
747        let bounds = if add_inf_bound {
748            &[1.0, 2.1, 3.0, f64::INFINITY][..]
749        } else {
750            &[1.0, 2.1, 3.0][..]
751        };
752
753        let mut histogram = VariableHistogram::new(bounds);
754        histogram.record_many(&[0.4, 2.0, 1.75, 2.6, 2.25, 2.5][..]);
755
756        let metric = Metric::new(
757            "requests".to_owned(),
758            MetricKind::Absolute,
759            MetricValue::AggregatedHistogram {
760                buckets: histogram.buckets(),
761                count: histogram.count(),
762                sum: histogram.sum(),
763            },
764        )
765        .with_timestamp(Some(timestamp()));
766        encode_one::<T>(Some("vector"), &[], &[], &metric)
767    }
768
769    #[test]
770    fn encodes_summary_text() {
771        assert_eq!(
772            encode_summary::<StringCollector>(),
773            indoc! {r#"# HELP ns_requests requests
774                # TYPE ns_requests summary
775                ns_requests{code="200",quantile="0.01"} 1.5 1612325106789
776                ns_requests{code="200",quantile="0.5"} 2 1612325106789
777                ns_requests{code="200",quantile="0.99"} 3 1612325106789
778                ns_requests_sum{code="200"} 12 1612325106789
779                ns_requests_count{code="200"} 6 1612325106789
780            "#}
781        );
782    }
783
784    #[test]
785    fn encodes_summary_request() {
786        assert_eq!(
787            encode_summary::<TimeSeries>(),
788            write_request!(
789                "ns_requests", "requests", Summary [
790                    "" @ 1612325106789 = 1.5 ["code" => "200", "quantile" => "0.01"],
791                    "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
792                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
793                    "_sum" @ 1612325106789 = 12.0 ["code" => "200"],
794                    "_count" @ 1612325106789 = 6.0 ["code" => "200"]
795                ]
796            )
797        );
798    }
799
800    fn encode_summary<T: MetricCollector>() -> T::Output {
801        let metric = Metric::new(
802            "requests".to_owned(),
803            MetricKind::Absolute,
804            MetricValue::AggregatedSummary {
805                quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
806                count: 6,
807                sum: 12.0,
808            },
809        )
810        .with_tags(Some(tags()))
811        .with_timestamp(Some(timestamp()));
812        encode_one::<T>(Some("ns"), &[], &[], &metric)
813    }
814
815    #[test]
816    fn encodes_distribution_summary_text() {
817        assert_eq!(
818            encode_distribution_summary::<StringCollector>(),
819            indoc! {r#"
820                # HELP ns_requests requests
821                # TYPE ns_requests summary
822                ns_requests{code="200",quantile="0.5"} 2 1612325106789
823                ns_requests{code="200",quantile="0.75"} 2 1612325106789
824                ns_requests{code="200",quantile="0.9"} 3 1612325106789
825                ns_requests{code="200",quantile="0.95"} 3 1612325106789
826                ns_requests{code="200",quantile="0.99"} 3 1612325106789
827                ns_requests_sum{code="200"} 15 1612325106789
828                ns_requests_count{code="200"} 8 1612325106789
829                ns_requests_min{code="200"} 1 1612325106789
830                ns_requests_max{code="200"} 3 1612325106789
831                ns_requests_avg{code="200"} 1.875 1612325106789
832            "#}
833        );
834    }
835
836    #[test]
837    fn encodes_distribution_summary_request() {
838        assert_eq!(
839            encode_distribution_summary::<TimeSeries>(),
840            write_request!(
841                "ns_requests", "requests", Summary [
842                    "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
843                    "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.75"],
844                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.9"],
845                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.95"],
846                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
847                    "_sum" @ 1612325106789 = 15.0 ["code" => "200"],
848                    "_count" @ 1612325106789 = 8.0 ["code" => "200"],
849                    "_min" @ 1612325106789 = 1.0 ["code" => "200"],
850                    "_max" @ 1612325106789 = 3.0 ["code" => "200"],
851                    "_avg" @ 1612325106789 = 1.875 ["code" => "200"]
852                ]
853            )
854        );
855    }
856
857    fn encode_distribution_summary<T: MetricCollector>() -> T::Output {
858        let metric = Metric::new(
859            "requests".to_owned(),
860            MetricKind::Absolute,
861            MetricValue::Distribution {
862                samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
863                statistic: StatisticKind::Summary,
864            },
865        )
866        .with_tags(Some(tags()))
867        .with_timestamp(Some(timestamp()));
868        encode_one::<T>(Some("ns"), &[], &default_summary_quantiles(), &metric)
869    }
870
871    #[test]
872    fn encodes_timestamp_text() {
873        assert_eq!(
874            encode_timestamp::<StringCollector>(),
875            indoc! {r"
876                # HELP temperature temperature
877                # TYPE temperature counter
878                temperature 2 1612325106789
879            "}
880        );
881    }
882
883    #[test]
884    fn encodes_timestamp_request() {
885        assert_eq!(
886            encode_timestamp::<TimeSeries>(),
887            write_request!("temperature", "temperature", Counter ["" @ 1612325106789 = 2.0 []])
888        );
889    }
890
891    fn encode_timestamp<T: MetricCollector>() -> T::Output {
892        let metric = Metric::new(
893            "temperature".to_owned(),
894            MetricKind::Absolute,
895            MetricValue::Counter { value: 2.0 },
896        )
897        .with_timestamp(Some(timestamp()));
898        encode_one::<T>(None, &[], &[], &metric)
899    }
900
901    #[test]
902    fn adds_timestamp_request() {
903        let now = Utc::now().timestamp_millis();
904        let metric = Metric::new(
905            "something".to_owned(),
906            MetricKind::Absolute,
907            MetricValue::Gauge { value: 1.0 },
908        );
909        let encoded = encode_one::<TimeSeries>(None, &[], &[], &metric);
910        assert!(encoded.timeseries[0].samples[0].timestamp >= now);
911    }
912
913    fn timestamp() -> DateTime<Utc> {
914        Utc.with_ymd_and_hms(2021, 2, 3, 4, 5, 6)
915            .single()
916            .and_then(|t| t.with_nanosecond(789 * 1_000_000))
917            .expect("invalid timestamp")
918    }
919
920    #[test]
921    fn escapes_tags_text() {
922        let tags = metric_tags!(
923            "code" => "200",
924            "quoted" => r#"host"1""#,
925            "path" => r"c:\Windows",
926        );
927        let metric = Metric::new(
928            "something".to_owned(),
929            MetricKind::Absolute,
930            MetricValue::Counter { value: 1.0 },
931        )
932        .with_tags(Some(tags));
933        let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
934        assert_eq!(
935            encoded,
936            indoc! {r#"
937                # HELP something something
938                # TYPE something counter
939                something{code="200",path="c:\\Windows",quoted="host\"1\""} 1
940            "#}
941        );
942    }
943
944    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
945    ///
946    /// > Label names MUST be unique within a LabelSet.
947    ///
948    /// Prometheus itself will reject the metric with an error. Largely to remain backward
949    /// compatible with older versions of Vector, we only publish the last tag in the list.
950    #[test]
951    fn encodes_duplicate_tags() {
952        let tags = metric_tags!(
953            "code" => "200",
954            "code" => "success",
955        );
956        let metric = Metric::new(
957            "something".to_owned(),
958            MetricKind::Absolute,
959            MetricValue::Counter { value: 1.0 },
960        )
961        .with_tags(Some(tags));
962        let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
963        assert_eq!(
964            encoded,
965            indoc! {r#"
966                # HELP something something
967                # TYPE something counter
968                something{code="success"} 1
969            "#}
970        );
971    }
972}