vector/sinks/prometheus/
collector.rs

1use std::{collections::BTreeMap, fmt::Write as _};
2
3use chrono::Utc;
4use indexmap::map::IndexMap;
5use vector_lib::event::metric::{samples_to_buckets, MetricSketch, MetricTags, Quantile};
6use vector_lib::prometheus::parser::{proto, METRIC_NAME_LABEL};
7
8use crate::{
9    event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
10    sinks::util::{encode_namespace, statistic::DistributionStatistic},
11};
12
13pub(super) trait MetricCollector {
14    type Output;
15
16    fn new() -> Self;
17
18    fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue);
19
20    fn emit_value(
21        &mut self,
22        timestamp_millis: Option<i64>,
23        name: &str,
24        suffix: &str,
25        value: f64,
26        tags: Option<&MetricTags>,
27        extra: Option<(&str, String)>,
28    );
29
30    fn finish(self) -> Self::Output;
31
32    fn encode_metric(
33        &mut self,
34        default_namespace: Option<&str>,
35        buckets: &[f64],
36        quantiles: &[f64],
37        metric: &Metric,
38    ) {
39        let name = encode_namespace(metric.namespace().or(default_namespace), '_', metric.name());
40        let name = &name;
41        let timestamp = metric.timestamp().map(|t| t.timestamp_millis());
42
43        if metric.kind() == MetricKind::Absolute {
44            let tags = metric.tags();
45            self.emit_metadata(metric.name(), name, metric.value());
46
47            match metric.value() {
48                MetricValue::Counter { value } => {
49                    self.emit_value(timestamp, name, "", *value, tags, None);
50                }
51                MetricValue::Gauge { value } => {
52                    self.emit_value(timestamp, name, "", *value, tags, None);
53                }
54                MetricValue::Set { values } => {
55                    self.emit_value(timestamp, name, "", values.len() as f64, tags, None);
56                }
57                MetricValue::Distribution {
58                    samples,
59                    statistic: StatisticKind::Histogram,
60                } => {
61                    // convert distributions into aggregated histograms
62                    let (buckets, count, sum) = samples_to_buckets(samples, buckets);
63                    let mut bucket_count = 0.0;
64                    for bucket in buckets {
65                        bucket_count += bucket.count as f64;
66                        self.emit_value(
67                            timestamp,
68                            name,
69                            "_bucket",
70                            bucket_count,
71                            tags,
72                            Some(("le", bucket.upper_limit.to_string())),
73                        );
74                    }
75                    self.emit_value(
76                        timestamp,
77                        name,
78                        "_bucket",
79                        count as f64,
80                        tags,
81                        Some(("le", "+Inf".to_string())),
82                    );
83                    self.emit_value(timestamp, name, "_sum", sum, tags, None);
84                    self.emit_value(timestamp, name, "_count", count as f64, tags, None);
85                }
86                MetricValue::Distribution {
87                    samples,
88                    statistic: StatisticKind::Summary,
89                } => {
90                    if let Some(statistic) = DistributionStatistic::from_samples(samples, quantiles)
91                    {
92                        for (q, v) in statistic.quantiles.iter() {
93                            self.emit_value(
94                                timestamp,
95                                name,
96                                "",
97                                *v,
98                                tags,
99                                Some(("quantile", q.to_string())),
100                            );
101                        }
102                        self.emit_value(timestamp, name, "_sum", statistic.sum, tags, None);
103                        self.emit_value(
104                            timestamp,
105                            name,
106                            "_count",
107                            statistic.count as f64,
108                            tags,
109                            None,
110                        );
111                        self.emit_value(timestamp, name, "_min", statistic.min, tags, None);
112                        self.emit_value(timestamp, name, "_max", statistic.max, tags, None);
113                        self.emit_value(timestamp, name, "_avg", statistic.avg, tags, None);
114                    } else {
115                        self.emit_value(timestamp, name, "_sum", 0.0, tags, None);
116                        self.emit_value(timestamp, name, "_count", 0.0, tags, None);
117                    }
118                }
119                MetricValue::AggregatedHistogram {
120                    buckets,
121                    count,
122                    sum,
123                } => {
124                    let mut bucket_count = 0.0;
125                    for bucket in buckets {
126                        // Aggregated histograms are cumulative in Prometheus.  This means that the
127                        // count of values in a bucket should only go up at the upper limit goes up,
128                        // because if you count a value in a specific bucket, by definition, it is
129                        // less than the upper limit of the next bucket.
130                        //
131                        // While most sources should give us buckets that have an "infinity" bucket
132                        // -- everything else that didn't fit in the non-infinity-upper-limit buckets
133                        // -- we can't be sure, so we calculate that bucket ourselves.  This is why
134                        // we make sure to avoid encoding a bucket if its upper limit is already
135                        // infinity, so that we don't double report.
136                        //
137                        // This check will also avoid printing out a bucket whose upper limit is
138                        // negative infinity, because that would make no sense.
139                        if bucket.upper_limit.is_infinite() {
140                            continue;
141                        }
142
143                        bucket_count += bucket.count as f64;
144                        self.emit_value(
145                            timestamp,
146                            name,
147                            "_bucket",
148                            bucket_count,
149                            tags,
150                            Some(("le", bucket.upper_limit.to_string())),
151                        );
152                    }
153                    self.emit_value(
154                        timestamp,
155                        name,
156                        "_bucket",
157                        *count as f64,
158                        tags,
159                        Some(("le", "+Inf".to_string())),
160                    );
161                    self.emit_value(timestamp, name, "_sum", *sum, tags, None);
162                    self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
163                }
164                MetricValue::AggregatedSummary {
165                    quantiles,
166                    count,
167                    sum,
168                } => {
169                    for quantile in quantiles {
170                        self.emit_value(
171                            timestamp,
172                            name,
173                            "",
174                            quantile.value,
175                            tags,
176                            Some(("quantile", quantile.quantile.to_string())),
177                        );
178                    }
179                    self.emit_value(timestamp, name, "_sum", *sum, tags, None);
180                    self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
181                }
182                MetricValue::Sketch { sketch } => match sketch {
183                    MetricSketch::AgentDDSketch(ddsketch) => {
184                        for q in quantiles {
185                            let quantile = Quantile {
186                                quantile: *q,
187                                value: ddsketch.quantile(*q).unwrap_or(0.0),
188                            };
189                            self.emit_value(
190                                timestamp,
191                                name,
192                                "",
193                                quantile.value,
194                                tags,
195                                Some(("quantile", quantile.quantile.to_string())),
196                            );
197                        }
198                        self.emit_value(
199                            timestamp,
200                            name,
201                            "_sum",
202                            ddsketch.sum().unwrap_or(0.0),
203                            tags,
204                            None,
205                        );
206                        self.emit_value(
207                            timestamp,
208                            name,
209                            "_count",
210                            ddsketch.count() as f64,
211                            tags,
212                            None,
213                        );
214                    }
215                },
216            }
217        }
218    }
219}
220
221pub(super) struct StringCollector {
222    // BTreeMap ensures we get sorted output, which whilst not required is preferable
223    processed: BTreeMap<String, String>,
224}
225
226impl MetricCollector for StringCollector {
227    type Output = String;
228
229    fn new() -> Self {
230        let processed = BTreeMap::new();
231        Self { processed }
232    }
233
234    fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
235        if !self.processed.contains_key(fullname) {
236            let header = Self::encode_header(name, fullname, value);
237            self.processed.insert(fullname.into(), header);
238        }
239    }
240
241    fn emit_value(
242        &mut self,
243        timestamp_millis: Option<i64>,
244        name: &str,
245        suffix: &str,
246        value: f64,
247        tags: Option<&MetricTags>,
248        extra: Option<(&str, String)>,
249    ) {
250        let result = self
251            .processed
252            .get_mut(name)
253            .expect("metric metadata not encoded");
254
255        result.push_str(name);
256        result.push_str(suffix);
257        Self::encode_tags(result, tags, extra);
258        _ = match timestamp_millis {
259            None => writeln!(result, " {value}"),
260            Some(timestamp) => writeln!(result, " {value} {timestamp}"),
261        };
262    }
263
264    fn finish(self) -> String {
265        self.processed.into_values().collect()
266    }
267}
268
269impl StringCollector {
270    fn encode_tags(result: &mut String, tags: Option<&MetricTags>, extra: Option<(&str, String)>) {
271        match (tags, extra) {
272            (None, None) => Ok(()),
273            (None, Some(tag)) => write!(result, "{{{}}}", Self::format_tag(tag.0, &tag.1)),
274            (Some(tags), ref tag) => {
275                let mut parts = tags
276                    .iter_single()
277                    .map(|(key, value)| Self::format_tag(key, value))
278                    .collect::<Vec<_>>();
279
280                if let Some((key, value)) = tag {
281                    parts.push(Self::format_tag(key, value))
282                }
283
284                parts.sort();
285                write!(result, "{{{}}}", parts.join(","))
286            }
287        }
288        .ok();
289    }
290
291    fn encode_header(name: &str, fullname: &str, value: &MetricValue) -> String {
292        let r#type = prometheus_metric_type(value).as_str();
293        format!("# HELP {fullname} {name}\n# TYPE {fullname} {type}\n")
294    }
295
296    fn format_tag(key: &str, mut value: &str) -> String {
297        // For most tags, this is just `{KEY}="{VALUE}"` so allocate optimistically
298        let mut result = String::with_capacity(key.len() + value.len() + 3);
299        result.push_str(key);
300        result.push_str("=\"");
301        while let Some(i) = value.find(['\\', '"']) {
302            result.push_str(&value[..i]);
303            result.push('\\');
304            // Ugly but works because we know the character at `i` is ASCII
305            result.push(value.as_bytes()[i] as char);
306            value = &value[i + 1..];
307        }
308        result.push_str(value);
309        result.push('"');
310        result
311    }
312}
313
314type Labels = Vec<proto::Label>;
315
316pub(super) struct TimeSeries {
317    buffer: IndexMap<Labels, Vec<proto::Sample>>,
318    metadata: IndexMap<String, proto::MetricMetadata>,
319    timestamp: Option<i64>,
320}
321
322impl TimeSeries {
323    fn make_labels(
324        tags: Option<&MetricTags>,
325        name: &str,
326        suffix: &str,
327        extra: Option<(&str, String)>,
328    ) -> Labels {
329        // Each Prometheus metric is grouped by its labels, which
330        // contains all the labels from the source metric, plus the name
331        // label for the actual metric name. For convenience below, an
332        // optional extra tag is added.
333        let mut labels = tags.cloned().unwrap_or_default();
334        labels.replace(METRIC_NAME_LABEL.into(), [name, suffix].join(""));
335        if let Some((name, value)) = extra {
336            labels.replace(name.into(), value);
337        }
338
339        // Extract the labels into a vec and sort to produce a
340        // consistent key for the buffer.
341        let mut labels = labels
342            .into_iter_single()
343            .map(|(name, value)| proto::Label { name, value })
344            .collect::<Labels>();
345        labels.sort();
346        labels
347    }
348
349    fn default_timestamp(&mut self) -> i64 {
350        *self
351            .timestamp
352            .get_or_insert_with(|| Utc::now().timestamp_millis())
353    }
354}
355
356impl MetricCollector for TimeSeries {
357    type Output = proto::WriteRequest;
358
359    fn new() -> Self {
360        Self {
361            buffer: Default::default(),
362            metadata: Default::default(),
363            timestamp: None,
364        }
365    }
366
367    fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
368        if !self.metadata.contains_key(name) {
369            let r#type = prometheus_metric_type(value);
370            let metadata = proto::MetricMetadata {
371                r#type: r#type as i32,
372                metric_family_name: fullname.into(),
373                help: name.into(),
374                unit: String::new(),
375            };
376            self.metadata.insert(name.into(), metadata);
377        }
378    }
379
380    fn emit_value(
381        &mut self,
382        timestamp_millis: Option<i64>,
383        name: &str,
384        suffix: &str,
385        value: f64,
386        tags: Option<&MetricTags>,
387        extra: Option<(&str, String)>,
388    ) {
389        let timestamp = timestamp_millis.unwrap_or_else(|| self.default_timestamp());
390        self.buffer
391            .entry(Self::make_labels(tags, name, suffix, extra))
392            .or_default()
393            .push(proto::Sample { value, timestamp });
394    }
395
396    fn finish(self) -> proto::WriteRequest {
397        let timeseries = self
398            .buffer
399            .into_iter()
400            .map(|(labels, samples)| proto::TimeSeries { labels, samples })
401            .collect::<Vec<_>>();
402        let metadata = self
403            .metadata
404            .into_iter()
405            .map(|(_, metadata)| metadata)
406            .collect();
407        proto::WriteRequest {
408            timeseries,
409            metadata,
410        }
411    }
412}
413
414const fn prometheus_metric_type(metric_value: &MetricValue) -> proto::MetricType {
415    use proto::MetricType;
416    match metric_value {
417        MetricValue::Counter { .. } => MetricType::Counter,
418        MetricValue::Gauge { .. } | MetricValue::Set { .. } => MetricType::Gauge,
419        MetricValue::Distribution {
420            statistic: StatisticKind::Histogram,
421            ..
422        } => MetricType::Histogram,
423        MetricValue::Distribution {
424            statistic: StatisticKind::Summary,
425            ..
426        } => MetricType::Summary,
427        MetricValue::AggregatedHistogram { .. } => MetricType::Histogram,
428        MetricValue::AggregatedSummary { .. } => MetricType::Summary,
429        MetricValue::Sketch { .. } => MetricType::Summary,
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use std::collections::BTreeSet;
436
437    use chrono::{DateTime, TimeZone, Timelike};
438    use indoc::indoc;
439    use similar_asserts::assert_eq;
440    use vector_lib::metric_tags;
441
442    use super::{super::default_summary_quantiles, *};
443    use crate::{
444        event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
445        test_util::stats::VariableHistogram,
446    };
447
448    fn encode_one<T: MetricCollector>(
449        default_namespace: Option<&str>,
450        buckets: &[f64],
451        quantiles: &[f64],
452        metric: &Metric,
453    ) -> T::Output {
454        let mut s = T::new();
455        s.encode_metric(default_namespace, buckets, quantiles, metric);
456        s.finish()
457    }
458
459    fn tags() -> MetricTags {
460        metric_tags!("code" => "200")
461    }
462
463    macro_rules! write_request {
464        ( $name:literal, $help:literal, $type:ident
465          [ $(
466              $suffix:literal @ $timestamp:literal = $svalue:literal
467                  [ $( $label:literal => $lvalue:literal ),* ]
468          ),* ]
469        ) => {
470            proto::WriteRequest {
471                timeseries: vec![
472                    $(
473                        proto::TimeSeries {
474                            labels: vec![
475                                proto::Label {
476                                    name: "__name__".into(),
477                                    value: format!("{}{}", $name, $suffix),
478                                },
479                                $(
480                                    proto::Label {
481                                        name: $label.into(),
482                                        value: $lvalue.into(),
483                                    },
484                                )*
485                            ],
486                            samples: vec![ proto::Sample {
487                                value: $svalue,
488                                timestamp: $timestamp,
489                            }],
490                        },
491                    )*
492                ],
493                metadata: vec![proto::MetricMetadata {
494                    r#type: proto::metric_metadata::MetricType::$type as i32,
495                    metric_family_name: $name.into(),
496                    help: $help.into(),
497                    unit: "".into(),
498                }],
499            }
500        };
501    }
502
503    #[test]
504    fn encodes_counter_text() {
505        assert_eq!(
506            encode_counter::<StringCollector>(),
507            indoc! { r#"
508                # HELP vector_hits hits
509                # TYPE vector_hits counter
510                vector_hits{code="200"} 10 1612325106789
511            "#}
512        );
513    }
514
515    #[test]
516    fn encodes_counter_request() {
517        assert_eq!(
518            encode_counter::<TimeSeries>(),
519            write_request!("vector_hits", "hits", Counter ["" @ 1612325106789 = 10.0 ["code" => "200"]])
520        );
521    }
522
523    fn encode_counter<T: MetricCollector>() -> T::Output {
524        let metric = Metric::new(
525            "hits".to_owned(),
526            MetricKind::Absolute,
527            MetricValue::Counter { value: 10.0 },
528        )
529        .with_tags(Some(tags()))
530        .with_timestamp(Some(timestamp()));
531        encode_one::<T>(Some("vector"), &[], &[], &metric)
532    }
533
534    #[test]
535    fn encodes_gauge_text() {
536        assert_eq!(
537            encode_gauge::<StringCollector>(),
538            indoc! { r#"
539                # HELP vector_temperature temperature
540                # TYPE vector_temperature gauge
541                vector_temperature{code="200"} -1.1 1612325106789
542            "#}
543        );
544    }
545
546    #[test]
547    fn encodes_gauge_request() {
548        assert_eq!(
549            encode_gauge::<TimeSeries>(),
550            write_request!("vector_temperature", "temperature", Gauge ["" @ 1612325106789 = -1.1 ["code" => "200"]])
551        );
552    }
553
554    fn encode_gauge<T: MetricCollector>() -> T::Output {
555        let metric = Metric::new(
556            "temperature".to_owned(),
557            MetricKind::Absolute,
558            MetricValue::Gauge { value: -1.1 },
559        )
560        .with_tags(Some(tags()))
561        .with_timestamp(Some(timestamp()));
562        encode_one::<T>(Some("vector"), &[], &[], &metric)
563    }
564
565    #[test]
566    fn encodes_set_text() {
567        assert_eq!(
568            encode_set::<StringCollector>(),
569            indoc! { r"
570                # HELP vector_users users
571                # TYPE vector_users gauge
572                vector_users 1 1612325106789
573            "}
574        );
575    }
576
577    #[test]
578    fn encodes_set_request() {
579        assert_eq!(
580            encode_set::<TimeSeries>(),
581            write_request!("vector_users", "users", Gauge [ "" @ 1612325106789 = 1.0 []])
582        );
583    }
584
585    fn encode_set<T: MetricCollector>() -> T::Output {
586        let metric = Metric::new(
587            "users".to_owned(),
588            MetricKind::Absolute,
589            MetricValue::Set {
590                values: vec!["foo".into()].into_iter().collect(),
591            },
592        )
593        .with_timestamp(Some(timestamp()));
594        encode_one::<T>(Some("vector"), &[], &[], &metric)
595    }
596
597    #[test]
598    fn encodes_expired_set_text() {
599        assert_eq!(
600            encode_expired_set::<StringCollector>(),
601            indoc! {r"
602                # HELP vector_users users
603                # TYPE vector_users gauge
604                vector_users 0 1612325106789
605            "}
606        );
607    }
608
609    #[test]
610    fn encodes_expired_set_request() {
611        assert_eq!(
612            encode_expired_set::<TimeSeries>(),
613            write_request!("vector_users", "users", Gauge ["" @ 1612325106789 = 0.0 []])
614        );
615    }
616
617    fn encode_expired_set<T: MetricCollector>() -> T::Output {
618        let metric = Metric::new(
619            "users".to_owned(),
620            MetricKind::Absolute,
621            MetricValue::Set {
622                values: BTreeSet::new(),
623            },
624        )
625        .with_timestamp(Some(timestamp()));
626        encode_one::<T>(Some("vector"), &[], &[], &metric)
627    }
628
629    #[test]
630    fn encodes_distribution_text() {
631        assert_eq!(
632            encode_distribution::<StringCollector>(),
633            indoc! {r#"
634                # HELP vector_requests requests
635                # TYPE vector_requests histogram
636                vector_requests_bucket{le="0"} 0 1612325106789
637                vector_requests_bucket{le="2.5"} 6 1612325106789
638                vector_requests_bucket{le="5"} 8 1612325106789
639                vector_requests_bucket{le="+Inf"} 8 1612325106789
640                vector_requests_sum 15 1612325106789
641                vector_requests_count 8 1612325106789
642            "#}
643        );
644    }
645
646    #[test]
647    fn encodes_distribution_request() {
648        assert_eq!(
649            encode_distribution::<TimeSeries>(),
650            write_request!(
651                "vector_requests", "requests", Histogram [
652                        "_bucket" @ 1612325106789 = 0.0 ["le" => "0"],
653                        "_bucket" @ 1612325106789 = 6.0 ["le" => "2.5"],
654                        "_bucket" @ 1612325106789 = 8.0 ["le" => "5"],
655                        "_bucket" @ 1612325106789 = 8.0 ["le" => "+Inf"],
656                        "_sum" @ 1612325106789 = 15.0 [],
657                        "_count" @ 1612325106789 = 8.0 []
658                ]
659            )
660        );
661    }
662
663    fn encode_distribution<T: MetricCollector>() -> T::Output {
664        let metric = Metric::new(
665            "requests".to_owned(),
666            MetricKind::Absolute,
667            MetricValue::Distribution {
668                samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
669                statistic: StatisticKind::Histogram,
670            },
671        )
672        .with_timestamp(Some(timestamp()));
673        encode_one::<T>(Some("vector"), &[0.0, 2.5, 5.0], &[], &metric)
674    }
675
676    #[test]
677    fn encodes_histogram_text() {
678        assert_eq!(
679            encode_histogram::<StringCollector>(false),
680            indoc! {r#"
681                # HELP vector_requests requests
682                # TYPE vector_requests histogram
683                vector_requests_bucket{le="1"} 1 1612325106789
684                vector_requests_bucket{le="2.1"} 3 1612325106789
685                vector_requests_bucket{le="3"} 6 1612325106789
686                vector_requests_bucket{le="+Inf"} 6 1612325106789
687                vector_requests_sum 11.5 1612325106789
688                vector_requests_count 6 1612325106789
689            "#}
690        );
691    }
692
693    #[test]
694    fn encodes_histogram_request() {
695        assert_eq!(
696            encode_histogram::<TimeSeries>(false),
697            write_request!(
698                "vector_requests", "requests", Histogram [
699                        "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
700                        "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
701                        "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
702                        "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
703                        "_sum" @ 1612325106789 = 11.5 [],
704                        "_count" @ 1612325106789 = 6.0 []
705                    ]
706            )
707        );
708    }
709
710    #[test]
711    fn encodes_histogram_text_with_extra_infinity_bound() {
712        assert_eq!(
713            encode_histogram::<StringCollector>(true),
714            indoc! {r#"
715                # HELP vector_requests requests
716                # TYPE vector_requests histogram
717                vector_requests_bucket{le="1"} 1 1612325106789
718                vector_requests_bucket{le="2.1"} 3 1612325106789
719                vector_requests_bucket{le="3"} 6 1612325106789
720                vector_requests_bucket{le="+Inf"} 6 1612325106789
721                vector_requests_sum 11.5 1612325106789
722                vector_requests_count 6 1612325106789
723            "#}
724        );
725    }
726
727    #[test]
728    fn encodes_histogram_request_with_extra_infinity_bound() {
729        assert_eq!(
730            encode_histogram::<TimeSeries>(true),
731            write_request!(
732                "vector_requests", "requests", Histogram [
733                        "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
734                        "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
735                        "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
736                        "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
737                        "_sum" @ 1612325106789 = 11.5 [],
738                        "_count" @ 1612325106789 = 6.0 []
739                    ]
740            )
741        );
742    }
743
744    fn encode_histogram<T: MetricCollector>(add_inf_bound: bool) -> T::Output {
745        let bounds = if add_inf_bound {
746            &[1.0, 2.1, 3.0, f64::INFINITY][..]
747        } else {
748            &[1.0, 2.1, 3.0][..]
749        };
750
751        let mut histogram = VariableHistogram::new(bounds);
752        histogram.record_many(&[0.4, 2.0, 1.75, 2.6, 2.25, 2.5][..]);
753
754        let metric = Metric::new(
755            "requests".to_owned(),
756            MetricKind::Absolute,
757            MetricValue::AggregatedHistogram {
758                buckets: histogram.buckets(),
759                count: histogram.count(),
760                sum: histogram.sum(),
761            },
762        )
763        .with_timestamp(Some(timestamp()));
764        encode_one::<T>(Some("vector"), &[], &[], &metric)
765    }
766
767    #[test]
768    fn encodes_summary_text() {
769        assert_eq!(
770            encode_summary::<StringCollector>(),
771            indoc! {r#"# HELP ns_requests requests
772                # TYPE ns_requests summary
773                ns_requests{code="200",quantile="0.01"} 1.5 1612325106789
774                ns_requests{code="200",quantile="0.5"} 2 1612325106789
775                ns_requests{code="200",quantile="0.99"} 3 1612325106789
776                ns_requests_sum{code="200"} 12 1612325106789
777                ns_requests_count{code="200"} 6 1612325106789
778            "#}
779        );
780    }
781
782    #[test]
783    fn encodes_summary_request() {
784        assert_eq!(
785            encode_summary::<TimeSeries>(),
786            write_request!(
787                "ns_requests", "requests", Summary [
788                    "" @ 1612325106789 = 1.5 ["code" => "200", "quantile" => "0.01"],
789                    "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
790                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
791                    "_sum" @ 1612325106789 = 12.0 ["code" => "200"],
792                    "_count" @ 1612325106789 = 6.0 ["code" => "200"]
793                ]
794            )
795        );
796    }
797
798    fn encode_summary<T: MetricCollector>() -> T::Output {
799        let metric = Metric::new(
800            "requests".to_owned(),
801            MetricKind::Absolute,
802            MetricValue::AggregatedSummary {
803                quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
804                count: 6,
805                sum: 12.0,
806            },
807        )
808        .with_tags(Some(tags()))
809        .with_timestamp(Some(timestamp()));
810        encode_one::<T>(Some("ns"), &[], &[], &metric)
811    }
812
813    #[test]
814    fn encodes_distribution_summary_text() {
815        assert_eq!(
816            encode_distribution_summary::<StringCollector>(),
817            indoc! {r#"
818                # HELP ns_requests requests
819                # TYPE ns_requests summary
820                ns_requests{code="200",quantile="0.5"} 2 1612325106789
821                ns_requests{code="200",quantile="0.75"} 2 1612325106789
822                ns_requests{code="200",quantile="0.9"} 3 1612325106789
823                ns_requests{code="200",quantile="0.95"} 3 1612325106789
824                ns_requests{code="200",quantile="0.99"} 3 1612325106789
825                ns_requests_sum{code="200"} 15 1612325106789
826                ns_requests_count{code="200"} 8 1612325106789
827                ns_requests_min{code="200"} 1 1612325106789
828                ns_requests_max{code="200"} 3 1612325106789
829                ns_requests_avg{code="200"} 1.875 1612325106789
830            "#}
831        );
832    }
833
834    #[test]
835    fn encodes_distribution_summary_request() {
836        assert_eq!(
837            encode_distribution_summary::<TimeSeries>(),
838            write_request!(
839                "ns_requests", "requests", Summary [
840                    "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
841                    "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.75"],
842                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.9"],
843                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.95"],
844                    "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
845                    "_sum" @ 1612325106789 = 15.0 ["code" => "200"],
846                    "_count" @ 1612325106789 = 8.0 ["code" => "200"],
847                    "_min" @ 1612325106789 = 1.0 ["code" => "200"],
848                    "_max" @ 1612325106789 = 3.0 ["code" => "200"],
849                    "_avg" @ 1612325106789 = 1.875 ["code" => "200"]
850                ]
851            )
852        );
853    }
854
855    fn encode_distribution_summary<T: MetricCollector>() -> T::Output {
856        let metric = Metric::new(
857            "requests".to_owned(),
858            MetricKind::Absolute,
859            MetricValue::Distribution {
860                samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
861                statistic: StatisticKind::Summary,
862            },
863        )
864        .with_tags(Some(tags()))
865        .with_timestamp(Some(timestamp()));
866        encode_one::<T>(Some("ns"), &[], &default_summary_quantiles(), &metric)
867    }
868
869    #[test]
870    fn encodes_timestamp_text() {
871        assert_eq!(
872            encode_timestamp::<StringCollector>(),
873            indoc! {r"
874                # HELP temperature temperature
875                # TYPE temperature counter
876                temperature 2 1612325106789
877            "}
878        );
879    }
880
881    #[test]
882    fn encodes_timestamp_request() {
883        assert_eq!(
884            encode_timestamp::<TimeSeries>(),
885            write_request!("temperature", "temperature", Counter ["" @ 1612325106789 = 2.0 []])
886        );
887    }
888
889    fn encode_timestamp<T: MetricCollector>() -> T::Output {
890        let metric = Metric::new(
891            "temperature".to_owned(),
892            MetricKind::Absolute,
893            MetricValue::Counter { value: 2.0 },
894        )
895        .with_timestamp(Some(timestamp()));
896        encode_one::<T>(None, &[], &[], &metric)
897    }
898
899    #[test]
900    fn adds_timestamp_request() {
901        let now = Utc::now().timestamp_millis();
902        let metric = Metric::new(
903            "something".to_owned(),
904            MetricKind::Absolute,
905            MetricValue::Gauge { value: 1.0 },
906        );
907        let encoded = encode_one::<TimeSeries>(None, &[], &[], &metric);
908        assert!(encoded.timeseries[0].samples[0].timestamp >= now);
909    }
910
911    fn timestamp() -> DateTime<Utc> {
912        Utc.with_ymd_and_hms(2021, 2, 3, 4, 5, 6)
913            .single()
914            .and_then(|t| t.with_nanosecond(789 * 1_000_000))
915            .expect("invalid timestamp")
916    }
917
918    #[test]
919    fn escapes_tags_text() {
920        let tags = metric_tags!(
921            "code" => "200",
922            "quoted" => r#"host"1""#,
923            "path" => r"c:\Windows",
924        );
925        let metric = Metric::new(
926            "something".to_owned(),
927            MetricKind::Absolute,
928            MetricValue::Counter { value: 1.0 },
929        )
930        .with_tags(Some(tags));
931        let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
932        assert_eq!(
933            encoded,
934            indoc! {r#"
935                # HELP something something
936                # TYPE something counter
937                something{code="200",path="c:\\Windows",quoted="host\"1\""} 1
938            "#}
939        );
940    }
941
942    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
943    ///
944    /// > Label names MUST be unique within a LabelSet.
945    ///
946    /// Prometheus itself will reject the metric with an error. Largely to remain backward
947    /// compatible with older versions of Vector, we only publish the last tag in the list.
948    #[test]
949    fn encodes_duplicate_tags() {
950        let tags = metric_tags!(
951            "code" => "200",
952            "code" => "success",
953        );
954        let metric = Metric::new(
955            "something".to_owned(),
956            MetricKind::Absolute,
957            MetricValue::Counter { value: 1.0 },
958        )
959        .with_tags(Some(tags));
960        let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
961        assert_eq!(
962            encoded,
963            indoc! {r#"
964                # HELP something something
965                # TYPE something counter
966                something{code="success"} 1
967            "#}
968        );
969    }
970}