1use std::{collections::BTreeMap, fmt::Write as _};
2
3use chrono::Utc;
4use indexmap::map::IndexMap;
5use vector_lib::event::metric::{samples_to_buckets, MetricSketch, MetricTags, Quantile};
6use vector_lib::prometheus::parser::{proto, METRIC_NAME_LABEL};
7
8use crate::{
9 event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
10 sinks::util::{encode_namespace, statistic::DistributionStatistic},
11};
12
13pub(super) trait MetricCollector {
14 type Output;
15
16 fn new() -> Self;
17
18 fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue);
19
20 fn emit_value(
21 &mut self,
22 timestamp_millis: Option<i64>,
23 name: &str,
24 suffix: &str,
25 value: f64,
26 tags: Option<&MetricTags>,
27 extra: Option<(&str, String)>,
28 );
29
30 fn finish(self) -> Self::Output;
31
32 fn encode_metric(
33 &mut self,
34 default_namespace: Option<&str>,
35 buckets: &[f64],
36 quantiles: &[f64],
37 metric: &Metric,
38 ) {
39 let name = encode_namespace(metric.namespace().or(default_namespace), '_', metric.name());
40 let name = &name;
41 let timestamp = metric.timestamp().map(|t| t.timestamp_millis());
42
43 if metric.kind() == MetricKind::Absolute {
44 let tags = metric.tags();
45 self.emit_metadata(metric.name(), name, metric.value());
46
47 match metric.value() {
48 MetricValue::Counter { value } => {
49 self.emit_value(timestamp, name, "", *value, tags, None);
50 }
51 MetricValue::Gauge { value } => {
52 self.emit_value(timestamp, name, "", *value, tags, None);
53 }
54 MetricValue::Set { values } => {
55 self.emit_value(timestamp, name, "", values.len() as f64, tags, None);
56 }
57 MetricValue::Distribution {
58 samples,
59 statistic: StatisticKind::Histogram,
60 } => {
61 let (buckets, count, sum) = samples_to_buckets(samples, buckets);
63 let mut bucket_count = 0.0;
64 for bucket in buckets {
65 bucket_count += bucket.count as f64;
66 self.emit_value(
67 timestamp,
68 name,
69 "_bucket",
70 bucket_count,
71 tags,
72 Some(("le", bucket.upper_limit.to_string())),
73 );
74 }
75 self.emit_value(
76 timestamp,
77 name,
78 "_bucket",
79 count as f64,
80 tags,
81 Some(("le", "+Inf".to_string())),
82 );
83 self.emit_value(timestamp, name, "_sum", sum, tags, None);
84 self.emit_value(timestamp, name, "_count", count as f64, tags, None);
85 }
86 MetricValue::Distribution {
87 samples,
88 statistic: StatisticKind::Summary,
89 } => {
90 if let Some(statistic) = DistributionStatistic::from_samples(samples, quantiles)
91 {
92 for (q, v) in statistic.quantiles.iter() {
93 self.emit_value(
94 timestamp,
95 name,
96 "",
97 *v,
98 tags,
99 Some(("quantile", q.to_string())),
100 );
101 }
102 self.emit_value(timestamp, name, "_sum", statistic.sum, tags, None);
103 self.emit_value(
104 timestamp,
105 name,
106 "_count",
107 statistic.count as f64,
108 tags,
109 None,
110 );
111 self.emit_value(timestamp, name, "_min", statistic.min, tags, None);
112 self.emit_value(timestamp, name, "_max", statistic.max, tags, None);
113 self.emit_value(timestamp, name, "_avg", statistic.avg, tags, None);
114 } else {
115 self.emit_value(timestamp, name, "_sum", 0.0, tags, None);
116 self.emit_value(timestamp, name, "_count", 0.0, tags, None);
117 }
118 }
119 MetricValue::AggregatedHistogram {
120 buckets,
121 count,
122 sum,
123 } => {
124 let mut bucket_count = 0.0;
125 for bucket in buckets {
126 if bucket.upper_limit.is_infinite() {
140 continue;
141 }
142
143 bucket_count += bucket.count as f64;
144 self.emit_value(
145 timestamp,
146 name,
147 "_bucket",
148 bucket_count,
149 tags,
150 Some(("le", bucket.upper_limit.to_string())),
151 );
152 }
153 self.emit_value(
154 timestamp,
155 name,
156 "_bucket",
157 *count as f64,
158 tags,
159 Some(("le", "+Inf".to_string())),
160 );
161 self.emit_value(timestamp, name, "_sum", *sum, tags, None);
162 self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
163 }
164 MetricValue::AggregatedSummary {
165 quantiles,
166 count,
167 sum,
168 } => {
169 for quantile in quantiles {
170 self.emit_value(
171 timestamp,
172 name,
173 "",
174 quantile.value,
175 tags,
176 Some(("quantile", quantile.quantile.to_string())),
177 );
178 }
179 self.emit_value(timestamp, name, "_sum", *sum, tags, None);
180 self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
181 }
182 MetricValue::Sketch { sketch } => match sketch {
183 MetricSketch::AgentDDSketch(ddsketch) => {
184 for q in quantiles {
185 let quantile = Quantile {
186 quantile: *q,
187 value: ddsketch.quantile(*q).unwrap_or(0.0),
188 };
189 self.emit_value(
190 timestamp,
191 name,
192 "",
193 quantile.value,
194 tags,
195 Some(("quantile", quantile.quantile.to_string())),
196 );
197 }
198 self.emit_value(
199 timestamp,
200 name,
201 "_sum",
202 ddsketch.sum().unwrap_or(0.0),
203 tags,
204 None,
205 );
206 self.emit_value(
207 timestamp,
208 name,
209 "_count",
210 ddsketch.count() as f64,
211 tags,
212 None,
213 );
214 }
215 },
216 }
217 }
218 }
219}
220
221pub(super) struct StringCollector {
222 processed: BTreeMap<String, String>,
224}
225
226impl MetricCollector for StringCollector {
227 type Output = String;
228
229 fn new() -> Self {
230 let processed = BTreeMap::new();
231 Self { processed }
232 }
233
234 fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
235 if !self.processed.contains_key(fullname) {
236 let header = Self::encode_header(name, fullname, value);
237 self.processed.insert(fullname.into(), header);
238 }
239 }
240
241 fn emit_value(
242 &mut self,
243 timestamp_millis: Option<i64>,
244 name: &str,
245 suffix: &str,
246 value: f64,
247 tags: Option<&MetricTags>,
248 extra: Option<(&str, String)>,
249 ) {
250 let result = self
251 .processed
252 .get_mut(name)
253 .expect("metric metadata not encoded");
254
255 result.push_str(name);
256 result.push_str(suffix);
257 Self::encode_tags(result, tags, extra);
258 _ = match timestamp_millis {
259 None => writeln!(result, " {value}"),
260 Some(timestamp) => writeln!(result, " {value} {timestamp}"),
261 };
262 }
263
264 fn finish(self) -> String {
265 self.processed.into_values().collect()
266 }
267}
268
269impl StringCollector {
270 fn encode_tags(result: &mut String, tags: Option<&MetricTags>, extra: Option<(&str, String)>) {
271 match (tags, extra) {
272 (None, None) => Ok(()),
273 (None, Some(tag)) => write!(result, "{{{}}}", Self::format_tag(tag.0, &tag.1)),
274 (Some(tags), ref tag) => {
275 let mut parts = tags
276 .iter_single()
277 .map(|(key, value)| Self::format_tag(key, value))
278 .collect::<Vec<_>>();
279
280 if let Some((key, value)) = tag {
281 parts.push(Self::format_tag(key, value))
282 }
283
284 parts.sort();
285 write!(result, "{{{}}}", parts.join(","))
286 }
287 }
288 .ok();
289 }
290
291 fn encode_header(name: &str, fullname: &str, value: &MetricValue) -> String {
292 let r#type = prometheus_metric_type(value).as_str();
293 format!("# HELP {fullname} {name}\n# TYPE {fullname} {type}\n")
294 }
295
296 fn format_tag(key: &str, mut value: &str) -> String {
297 let mut result = String::with_capacity(key.len() + value.len() + 3);
299 result.push_str(key);
300 result.push_str("=\"");
301 while let Some(i) = value.find(['\\', '"']) {
302 result.push_str(&value[..i]);
303 result.push('\\');
304 result.push(value.as_bytes()[i] as char);
306 value = &value[i + 1..];
307 }
308 result.push_str(value);
309 result.push('"');
310 result
311 }
312}
313
314type Labels = Vec<proto::Label>;
315
316pub(super) struct TimeSeries {
317 buffer: IndexMap<Labels, Vec<proto::Sample>>,
318 metadata: IndexMap<String, proto::MetricMetadata>,
319 timestamp: Option<i64>,
320}
321
322impl TimeSeries {
323 fn make_labels(
324 tags: Option<&MetricTags>,
325 name: &str,
326 suffix: &str,
327 extra: Option<(&str, String)>,
328 ) -> Labels {
329 let mut labels = tags.cloned().unwrap_or_default();
334 labels.replace(METRIC_NAME_LABEL.into(), [name, suffix].join(""));
335 if let Some((name, value)) = extra {
336 labels.replace(name.into(), value);
337 }
338
339 let mut labels = labels
342 .into_iter_single()
343 .map(|(name, value)| proto::Label { name, value })
344 .collect::<Labels>();
345 labels.sort();
346 labels
347 }
348
349 fn default_timestamp(&mut self) -> i64 {
350 *self
351 .timestamp
352 .get_or_insert_with(|| Utc::now().timestamp_millis())
353 }
354}
355
356impl MetricCollector for TimeSeries {
357 type Output = proto::WriteRequest;
358
359 fn new() -> Self {
360 Self {
361 buffer: Default::default(),
362 metadata: Default::default(),
363 timestamp: None,
364 }
365 }
366
367 fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
368 if !self.metadata.contains_key(name) {
369 let r#type = prometheus_metric_type(value);
370 let metadata = proto::MetricMetadata {
371 r#type: r#type as i32,
372 metric_family_name: fullname.into(),
373 help: name.into(),
374 unit: String::new(),
375 };
376 self.metadata.insert(name.into(), metadata);
377 }
378 }
379
380 fn emit_value(
381 &mut self,
382 timestamp_millis: Option<i64>,
383 name: &str,
384 suffix: &str,
385 value: f64,
386 tags: Option<&MetricTags>,
387 extra: Option<(&str, String)>,
388 ) {
389 let timestamp = timestamp_millis.unwrap_or_else(|| self.default_timestamp());
390 self.buffer
391 .entry(Self::make_labels(tags, name, suffix, extra))
392 .or_default()
393 .push(proto::Sample { value, timestamp });
394 }
395
396 fn finish(self) -> proto::WriteRequest {
397 let timeseries = self
398 .buffer
399 .into_iter()
400 .map(|(labels, samples)| proto::TimeSeries { labels, samples })
401 .collect::<Vec<_>>();
402 let metadata = self
403 .metadata
404 .into_iter()
405 .map(|(_, metadata)| metadata)
406 .collect();
407 proto::WriteRequest {
408 timeseries,
409 metadata,
410 }
411 }
412}
413
414const fn prometheus_metric_type(metric_value: &MetricValue) -> proto::MetricType {
415 use proto::MetricType;
416 match metric_value {
417 MetricValue::Counter { .. } => MetricType::Counter,
418 MetricValue::Gauge { .. } | MetricValue::Set { .. } => MetricType::Gauge,
419 MetricValue::Distribution {
420 statistic: StatisticKind::Histogram,
421 ..
422 } => MetricType::Histogram,
423 MetricValue::Distribution {
424 statistic: StatisticKind::Summary,
425 ..
426 } => MetricType::Summary,
427 MetricValue::AggregatedHistogram { .. } => MetricType::Histogram,
428 MetricValue::AggregatedSummary { .. } => MetricType::Summary,
429 MetricValue::Sketch { .. } => MetricType::Summary,
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use std::collections::BTreeSet;
436
437 use chrono::{DateTime, TimeZone, Timelike};
438 use indoc::indoc;
439 use similar_asserts::assert_eq;
440 use vector_lib::metric_tags;
441
442 use super::{super::default_summary_quantiles, *};
443 use crate::{
444 event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
445 test_util::stats::VariableHistogram,
446 };
447
448 fn encode_one<T: MetricCollector>(
449 default_namespace: Option<&str>,
450 buckets: &[f64],
451 quantiles: &[f64],
452 metric: &Metric,
453 ) -> T::Output {
454 let mut s = T::new();
455 s.encode_metric(default_namespace, buckets, quantiles, metric);
456 s.finish()
457 }
458
459 fn tags() -> MetricTags {
460 metric_tags!("code" => "200")
461 }
462
463 macro_rules! write_request {
464 ( $name:literal, $help:literal, $type:ident
465 [ $(
466 $suffix:literal @ $timestamp:literal = $svalue:literal
467 [ $( $label:literal => $lvalue:literal ),* ]
468 ),* ]
469 ) => {
470 proto::WriteRequest {
471 timeseries: vec![
472 $(
473 proto::TimeSeries {
474 labels: vec![
475 proto::Label {
476 name: "__name__".into(),
477 value: format!("{}{}", $name, $suffix),
478 },
479 $(
480 proto::Label {
481 name: $label.into(),
482 value: $lvalue.into(),
483 },
484 )*
485 ],
486 samples: vec![ proto::Sample {
487 value: $svalue,
488 timestamp: $timestamp,
489 }],
490 },
491 )*
492 ],
493 metadata: vec![proto::MetricMetadata {
494 r#type: proto::metric_metadata::MetricType::$type as i32,
495 metric_family_name: $name.into(),
496 help: $help.into(),
497 unit: "".into(),
498 }],
499 }
500 };
501 }
502
503 #[test]
504 fn encodes_counter_text() {
505 assert_eq!(
506 encode_counter::<StringCollector>(),
507 indoc! { r#"
508 # HELP vector_hits hits
509 # TYPE vector_hits counter
510 vector_hits{code="200"} 10 1612325106789
511 "#}
512 );
513 }
514
515 #[test]
516 fn encodes_counter_request() {
517 assert_eq!(
518 encode_counter::<TimeSeries>(),
519 write_request!("vector_hits", "hits", Counter ["" @ 1612325106789 = 10.0 ["code" => "200"]])
520 );
521 }
522
523 fn encode_counter<T: MetricCollector>() -> T::Output {
524 let metric = Metric::new(
525 "hits".to_owned(),
526 MetricKind::Absolute,
527 MetricValue::Counter { value: 10.0 },
528 )
529 .with_tags(Some(tags()))
530 .with_timestamp(Some(timestamp()));
531 encode_one::<T>(Some("vector"), &[], &[], &metric)
532 }
533
534 #[test]
535 fn encodes_gauge_text() {
536 assert_eq!(
537 encode_gauge::<StringCollector>(),
538 indoc! { r#"
539 # HELP vector_temperature temperature
540 # TYPE vector_temperature gauge
541 vector_temperature{code="200"} -1.1 1612325106789
542 "#}
543 );
544 }
545
546 #[test]
547 fn encodes_gauge_request() {
548 assert_eq!(
549 encode_gauge::<TimeSeries>(),
550 write_request!("vector_temperature", "temperature", Gauge ["" @ 1612325106789 = -1.1 ["code" => "200"]])
551 );
552 }
553
554 fn encode_gauge<T: MetricCollector>() -> T::Output {
555 let metric = Metric::new(
556 "temperature".to_owned(),
557 MetricKind::Absolute,
558 MetricValue::Gauge { value: -1.1 },
559 )
560 .with_tags(Some(tags()))
561 .with_timestamp(Some(timestamp()));
562 encode_one::<T>(Some("vector"), &[], &[], &metric)
563 }
564
565 #[test]
566 fn encodes_set_text() {
567 assert_eq!(
568 encode_set::<StringCollector>(),
569 indoc! { r"
570 # HELP vector_users users
571 # TYPE vector_users gauge
572 vector_users 1 1612325106789
573 "}
574 );
575 }
576
577 #[test]
578 fn encodes_set_request() {
579 assert_eq!(
580 encode_set::<TimeSeries>(),
581 write_request!("vector_users", "users", Gauge [ "" @ 1612325106789 = 1.0 []])
582 );
583 }
584
585 fn encode_set<T: MetricCollector>() -> T::Output {
586 let metric = Metric::new(
587 "users".to_owned(),
588 MetricKind::Absolute,
589 MetricValue::Set {
590 values: vec!["foo".into()].into_iter().collect(),
591 },
592 )
593 .with_timestamp(Some(timestamp()));
594 encode_one::<T>(Some("vector"), &[], &[], &metric)
595 }
596
597 #[test]
598 fn encodes_expired_set_text() {
599 assert_eq!(
600 encode_expired_set::<StringCollector>(),
601 indoc! {r"
602 # HELP vector_users users
603 # TYPE vector_users gauge
604 vector_users 0 1612325106789
605 "}
606 );
607 }
608
609 #[test]
610 fn encodes_expired_set_request() {
611 assert_eq!(
612 encode_expired_set::<TimeSeries>(),
613 write_request!("vector_users", "users", Gauge ["" @ 1612325106789 = 0.0 []])
614 );
615 }
616
617 fn encode_expired_set<T: MetricCollector>() -> T::Output {
618 let metric = Metric::new(
619 "users".to_owned(),
620 MetricKind::Absolute,
621 MetricValue::Set {
622 values: BTreeSet::new(),
623 },
624 )
625 .with_timestamp(Some(timestamp()));
626 encode_one::<T>(Some("vector"), &[], &[], &metric)
627 }
628
629 #[test]
630 fn encodes_distribution_text() {
631 assert_eq!(
632 encode_distribution::<StringCollector>(),
633 indoc! {r#"
634 # HELP vector_requests requests
635 # TYPE vector_requests histogram
636 vector_requests_bucket{le="0"} 0 1612325106789
637 vector_requests_bucket{le="2.5"} 6 1612325106789
638 vector_requests_bucket{le="5"} 8 1612325106789
639 vector_requests_bucket{le="+Inf"} 8 1612325106789
640 vector_requests_sum 15 1612325106789
641 vector_requests_count 8 1612325106789
642 "#}
643 );
644 }
645
646 #[test]
647 fn encodes_distribution_request() {
648 assert_eq!(
649 encode_distribution::<TimeSeries>(),
650 write_request!(
651 "vector_requests", "requests", Histogram [
652 "_bucket" @ 1612325106789 = 0.0 ["le" => "0"],
653 "_bucket" @ 1612325106789 = 6.0 ["le" => "2.5"],
654 "_bucket" @ 1612325106789 = 8.0 ["le" => "5"],
655 "_bucket" @ 1612325106789 = 8.0 ["le" => "+Inf"],
656 "_sum" @ 1612325106789 = 15.0 [],
657 "_count" @ 1612325106789 = 8.0 []
658 ]
659 )
660 );
661 }
662
663 fn encode_distribution<T: MetricCollector>() -> T::Output {
664 let metric = Metric::new(
665 "requests".to_owned(),
666 MetricKind::Absolute,
667 MetricValue::Distribution {
668 samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
669 statistic: StatisticKind::Histogram,
670 },
671 )
672 .with_timestamp(Some(timestamp()));
673 encode_one::<T>(Some("vector"), &[0.0, 2.5, 5.0], &[], &metric)
674 }
675
676 #[test]
677 fn encodes_histogram_text() {
678 assert_eq!(
679 encode_histogram::<StringCollector>(false),
680 indoc! {r#"
681 # HELP vector_requests requests
682 # TYPE vector_requests histogram
683 vector_requests_bucket{le="1"} 1 1612325106789
684 vector_requests_bucket{le="2.1"} 3 1612325106789
685 vector_requests_bucket{le="3"} 6 1612325106789
686 vector_requests_bucket{le="+Inf"} 6 1612325106789
687 vector_requests_sum 11.5 1612325106789
688 vector_requests_count 6 1612325106789
689 "#}
690 );
691 }
692
693 #[test]
694 fn encodes_histogram_request() {
695 assert_eq!(
696 encode_histogram::<TimeSeries>(false),
697 write_request!(
698 "vector_requests", "requests", Histogram [
699 "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
700 "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
701 "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
702 "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
703 "_sum" @ 1612325106789 = 11.5 [],
704 "_count" @ 1612325106789 = 6.0 []
705 ]
706 )
707 );
708 }
709
710 #[test]
711 fn encodes_histogram_text_with_extra_infinity_bound() {
712 assert_eq!(
713 encode_histogram::<StringCollector>(true),
714 indoc! {r#"
715 # HELP vector_requests requests
716 # TYPE vector_requests histogram
717 vector_requests_bucket{le="1"} 1 1612325106789
718 vector_requests_bucket{le="2.1"} 3 1612325106789
719 vector_requests_bucket{le="3"} 6 1612325106789
720 vector_requests_bucket{le="+Inf"} 6 1612325106789
721 vector_requests_sum 11.5 1612325106789
722 vector_requests_count 6 1612325106789
723 "#}
724 );
725 }
726
727 #[test]
728 fn encodes_histogram_request_with_extra_infinity_bound() {
729 assert_eq!(
730 encode_histogram::<TimeSeries>(true),
731 write_request!(
732 "vector_requests", "requests", Histogram [
733 "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
734 "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
735 "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
736 "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
737 "_sum" @ 1612325106789 = 11.5 [],
738 "_count" @ 1612325106789 = 6.0 []
739 ]
740 )
741 );
742 }
743
744 fn encode_histogram<T: MetricCollector>(add_inf_bound: bool) -> T::Output {
745 let bounds = if add_inf_bound {
746 &[1.0, 2.1, 3.0, f64::INFINITY][..]
747 } else {
748 &[1.0, 2.1, 3.0][..]
749 };
750
751 let mut histogram = VariableHistogram::new(bounds);
752 histogram.record_many(&[0.4, 2.0, 1.75, 2.6, 2.25, 2.5][..]);
753
754 let metric = Metric::new(
755 "requests".to_owned(),
756 MetricKind::Absolute,
757 MetricValue::AggregatedHistogram {
758 buckets: histogram.buckets(),
759 count: histogram.count(),
760 sum: histogram.sum(),
761 },
762 )
763 .with_timestamp(Some(timestamp()));
764 encode_one::<T>(Some("vector"), &[], &[], &metric)
765 }
766
767 #[test]
768 fn encodes_summary_text() {
769 assert_eq!(
770 encode_summary::<StringCollector>(),
771 indoc! {r#"# HELP ns_requests requests
772 # TYPE ns_requests summary
773 ns_requests{code="200",quantile="0.01"} 1.5 1612325106789
774 ns_requests{code="200",quantile="0.5"} 2 1612325106789
775 ns_requests{code="200",quantile="0.99"} 3 1612325106789
776 ns_requests_sum{code="200"} 12 1612325106789
777 ns_requests_count{code="200"} 6 1612325106789
778 "#}
779 );
780 }
781
782 #[test]
783 fn encodes_summary_request() {
784 assert_eq!(
785 encode_summary::<TimeSeries>(),
786 write_request!(
787 "ns_requests", "requests", Summary [
788 "" @ 1612325106789 = 1.5 ["code" => "200", "quantile" => "0.01"],
789 "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
790 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
791 "_sum" @ 1612325106789 = 12.0 ["code" => "200"],
792 "_count" @ 1612325106789 = 6.0 ["code" => "200"]
793 ]
794 )
795 );
796 }
797
798 fn encode_summary<T: MetricCollector>() -> T::Output {
799 let metric = Metric::new(
800 "requests".to_owned(),
801 MetricKind::Absolute,
802 MetricValue::AggregatedSummary {
803 quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
804 count: 6,
805 sum: 12.0,
806 },
807 )
808 .with_tags(Some(tags()))
809 .with_timestamp(Some(timestamp()));
810 encode_one::<T>(Some("ns"), &[], &[], &metric)
811 }
812
813 #[test]
814 fn encodes_distribution_summary_text() {
815 assert_eq!(
816 encode_distribution_summary::<StringCollector>(),
817 indoc! {r#"
818 # HELP ns_requests requests
819 # TYPE ns_requests summary
820 ns_requests{code="200",quantile="0.5"} 2 1612325106789
821 ns_requests{code="200",quantile="0.75"} 2 1612325106789
822 ns_requests{code="200",quantile="0.9"} 3 1612325106789
823 ns_requests{code="200",quantile="0.95"} 3 1612325106789
824 ns_requests{code="200",quantile="0.99"} 3 1612325106789
825 ns_requests_sum{code="200"} 15 1612325106789
826 ns_requests_count{code="200"} 8 1612325106789
827 ns_requests_min{code="200"} 1 1612325106789
828 ns_requests_max{code="200"} 3 1612325106789
829 ns_requests_avg{code="200"} 1.875 1612325106789
830 "#}
831 );
832 }
833
834 #[test]
835 fn encodes_distribution_summary_request() {
836 assert_eq!(
837 encode_distribution_summary::<TimeSeries>(),
838 write_request!(
839 "ns_requests", "requests", Summary [
840 "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
841 "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.75"],
842 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.9"],
843 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.95"],
844 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
845 "_sum" @ 1612325106789 = 15.0 ["code" => "200"],
846 "_count" @ 1612325106789 = 8.0 ["code" => "200"],
847 "_min" @ 1612325106789 = 1.0 ["code" => "200"],
848 "_max" @ 1612325106789 = 3.0 ["code" => "200"],
849 "_avg" @ 1612325106789 = 1.875 ["code" => "200"]
850 ]
851 )
852 );
853 }
854
855 fn encode_distribution_summary<T: MetricCollector>() -> T::Output {
856 let metric = Metric::new(
857 "requests".to_owned(),
858 MetricKind::Absolute,
859 MetricValue::Distribution {
860 samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
861 statistic: StatisticKind::Summary,
862 },
863 )
864 .with_tags(Some(tags()))
865 .with_timestamp(Some(timestamp()));
866 encode_one::<T>(Some("ns"), &[], &default_summary_quantiles(), &metric)
867 }
868
869 #[test]
870 fn encodes_timestamp_text() {
871 assert_eq!(
872 encode_timestamp::<StringCollector>(),
873 indoc! {r"
874 # HELP temperature temperature
875 # TYPE temperature counter
876 temperature 2 1612325106789
877 "}
878 );
879 }
880
881 #[test]
882 fn encodes_timestamp_request() {
883 assert_eq!(
884 encode_timestamp::<TimeSeries>(),
885 write_request!("temperature", "temperature", Counter ["" @ 1612325106789 = 2.0 []])
886 );
887 }
888
889 fn encode_timestamp<T: MetricCollector>() -> T::Output {
890 let metric = Metric::new(
891 "temperature".to_owned(),
892 MetricKind::Absolute,
893 MetricValue::Counter { value: 2.0 },
894 )
895 .with_timestamp(Some(timestamp()));
896 encode_one::<T>(None, &[], &[], &metric)
897 }
898
899 #[test]
900 fn adds_timestamp_request() {
901 let now = Utc::now().timestamp_millis();
902 let metric = Metric::new(
903 "something".to_owned(),
904 MetricKind::Absolute,
905 MetricValue::Gauge { value: 1.0 },
906 );
907 let encoded = encode_one::<TimeSeries>(None, &[], &[], &metric);
908 assert!(encoded.timeseries[0].samples[0].timestamp >= now);
909 }
910
911 fn timestamp() -> DateTime<Utc> {
912 Utc.with_ymd_and_hms(2021, 2, 3, 4, 5, 6)
913 .single()
914 .and_then(|t| t.with_nanosecond(789 * 1_000_000))
915 .expect("invalid timestamp")
916 }
917
918 #[test]
919 fn escapes_tags_text() {
920 let tags = metric_tags!(
921 "code" => "200",
922 "quoted" => r#"host"1""#,
923 "path" => r"c:\Windows",
924 );
925 let metric = Metric::new(
926 "something".to_owned(),
927 MetricKind::Absolute,
928 MetricValue::Counter { value: 1.0 },
929 )
930 .with_tags(Some(tags));
931 let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
932 assert_eq!(
933 encoded,
934 indoc! {r#"
935 # HELP something something
936 # TYPE something counter
937 something{code="200",path="c:\\Windows",quoted="host\"1\""} 1
938 "#}
939 );
940 }
941
942 #[test]
949 fn encodes_duplicate_tags() {
950 let tags = metric_tags!(
951 "code" => "200",
952 "code" => "success",
953 );
954 let metric = Metric::new(
955 "something".to_owned(),
956 MetricKind::Absolute,
957 MetricValue::Counter { value: 1.0 },
958 )
959 .with_tags(Some(tags));
960 let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
961 assert_eq!(
962 encoded,
963 indoc! {r#"
964 # HELP something something
965 # TYPE something counter
966 something{code="success"} 1
967 "#}
968 );
969 }
970}