1use std::{collections::BTreeMap, fmt::Write as _};
2
3use chrono::Utc;
4use indexmap::map::IndexMap;
5use vector_lib::{
6 event::metric::{MetricSketch, MetricTags, Quantile, samples_to_buckets},
7 prometheus::parser::{METRIC_NAME_LABEL, proto},
8};
9
10use crate::{
11 event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
12 sinks::util::{encode_namespace, statistic::DistributionStatistic},
13};
14
15pub(super) trait MetricCollector {
16 type Output;
17
18 fn new() -> Self;
19
20 fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue);
21
22 fn emit_value(
23 &mut self,
24 timestamp_millis: Option<i64>,
25 name: &str,
26 suffix: &str,
27 value: f64,
28 tags: Option<&MetricTags>,
29 extra: Option<(&str, String)>,
30 );
31
32 fn finish(self) -> Self::Output;
33
34 fn encode_metric(
35 &mut self,
36 default_namespace: Option<&str>,
37 buckets: &[f64],
38 quantiles: &[f64],
39 metric: &Metric,
40 ) {
41 let name = encode_namespace(metric.namespace().or(default_namespace), '_', metric.name());
42 let name = &name;
43 let timestamp = metric.timestamp().map(|t| t.timestamp_millis());
44
45 if metric.kind() == MetricKind::Absolute {
46 let tags = metric.tags();
47 self.emit_metadata(metric.name(), name, metric.value());
48
49 match metric.value() {
50 MetricValue::Counter { value } => {
51 self.emit_value(timestamp, name, "", *value, tags, None);
52 }
53 MetricValue::Gauge { value } => {
54 self.emit_value(timestamp, name, "", *value, tags, None);
55 }
56 MetricValue::Set { values } => {
57 self.emit_value(timestamp, name, "", values.len() as f64, tags, None);
58 }
59 MetricValue::Distribution {
60 samples,
61 statistic: StatisticKind::Histogram,
62 } => {
63 let (buckets, count, sum) = samples_to_buckets(samples, buckets);
65 let mut bucket_count = 0.0;
66 for bucket in buckets {
67 bucket_count += bucket.count as f64;
68 self.emit_value(
69 timestamp,
70 name,
71 "_bucket",
72 bucket_count,
73 tags,
74 Some(("le", bucket.upper_limit.to_string())),
75 );
76 }
77 self.emit_value(
78 timestamp,
79 name,
80 "_bucket",
81 count as f64,
82 tags,
83 Some(("le", "+Inf".to_string())),
84 );
85 self.emit_value(timestamp, name, "_sum", sum, tags, None);
86 self.emit_value(timestamp, name, "_count", count as f64, tags, None);
87 }
88 MetricValue::Distribution {
89 samples,
90 statistic: StatisticKind::Summary,
91 } => {
92 if let Some(statistic) = DistributionStatistic::from_samples(samples, quantiles)
93 {
94 for (q, v) in statistic.quantiles.iter() {
95 self.emit_value(
96 timestamp,
97 name,
98 "",
99 *v,
100 tags,
101 Some(("quantile", q.to_string())),
102 );
103 }
104 self.emit_value(timestamp, name, "_sum", statistic.sum, tags, None);
105 self.emit_value(
106 timestamp,
107 name,
108 "_count",
109 statistic.count as f64,
110 tags,
111 None,
112 );
113 self.emit_value(timestamp, name, "_min", statistic.min, tags, None);
114 self.emit_value(timestamp, name, "_max", statistic.max, tags, None);
115 self.emit_value(timestamp, name, "_avg", statistic.avg, tags, None);
116 } else {
117 self.emit_value(timestamp, name, "_sum", 0.0, tags, None);
118 self.emit_value(timestamp, name, "_count", 0.0, tags, None);
119 }
120 }
121 MetricValue::AggregatedHistogram {
122 buckets,
123 count,
124 sum,
125 } => {
126 let mut bucket_count = 0.0;
127 for bucket in buckets {
128 if bucket.upper_limit.is_infinite() {
142 continue;
143 }
144
145 bucket_count += bucket.count as f64;
146 self.emit_value(
147 timestamp,
148 name,
149 "_bucket",
150 bucket_count,
151 tags,
152 Some(("le", bucket.upper_limit.to_string())),
153 );
154 }
155 self.emit_value(
156 timestamp,
157 name,
158 "_bucket",
159 *count as f64,
160 tags,
161 Some(("le", "+Inf".to_string())),
162 );
163 self.emit_value(timestamp, name, "_sum", *sum, tags, None);
164 self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
165 }
166 MetricValue::AggregatedSummary {
167 quantiles,
168 count,
169 sum,
170 } => {
171 for quantile in quantiles {
172 self.emit_value(
173 timestamp,
174 name,
175 "",
176 quantile.value,
177 tags,
178 Some(("quantile", quantile.quantile.to_string())),
179 );
180 }
181 self.emit_value(timestamp, name, "_sum", *sum, tags, None);
182 self.emit_value(timestamp, name, "_count", *count as f64, tags, None);
183 }
184 MetricValue::Sketch { sketch } => match sketch {
185 MetricSketch::AgentDDSketch(ddsketch) => {
186 for q in quantiles {
187 let quantile = Quantile {
188 quantile: *q,
189 value: ddsketch.quantile(*q).unwrap_or(0.0),
190 };
191 self.emit_value(
192 timestamp,
193 name,
194 "",
195 quantile.value,
196 tags,
197 Some(("quantile", quantile.quantile.to_string())),
198 );
199 }
200 self.emit_value(
201 timestamp,
202 name,
203 "_sum",
204 ddsketch.sum().unwrap_or(0.0),
205 tags,
206 None,
207 );
208 self.emit_value(
209 timestamp,
210 name,
211 "_count",
212 ddsketch.count() as f64,
213 tags,
214 None,
215 );
216 }
217 },
218 }
219 }
220 }
221}
222
223pub(super) struct StringCollector {
224 processed: BTreeMap<String, String>,
226}
227
228impl MetricCollector for StringCollector {
229 type Output = String;
230
231 fn new() -> Self {
232 let processed = BTreeMap::new();
233 Self { processed }
234 }
235
236 fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
237 if !self.processed.contains_key(fullname) {
238 let header = Self::encode_header(name, fullname, value);
239 self.processed.insert(fullname.into(), header);
240 }
241 }
242
243 fn emit_value(
244 &mut self,
245 timestamp_millis: Option<i64>,
246 name: &str,
247 suffix: &str,
248 value: f64,
249 tags: Option<&MetricTags>,
250 extra: Option<(&str, String)>,
251 ) {
252 let result = self
253 .processed
254 .get_mut(name)
255 .expect("metric metadata not encoded");
256
257 result.push_str(name);
258 result.push_str(suffix);
259 Self::encode_tags(result, tags, extra);
260 _ = match timestamp_millis {
261 None => writeln!(result, " {value}"),
262 Some(timestamp) => writeln!(result, " {value} {timestamp}"),
263 };
264 }
265
266 fn finish(self) -> String {
267 self.processed.into_values().collect()
268 }
269}
270
271impl StringCollector {
272 fn encode_tags(result: &mut String, tags: Option<&MetricTags>, extra: Option<(&str, String)>) {
273 match (tags, extra) {
274 (None, None) => Ok(()),
275 (None, Some(tag)) => write!(result, "{{{}}}", Self::format_tag(tag.0, &tag.1)),
276 (Some(tags), ref tag) => {
277 let mut parts = tags
278 .iter_single()
279 .map(|(key, value)| Self::format_tag(key, value))
280 .collect::<Vec<_>>();
281
282 if let Some((key, value)) = tag {
283 parts.push(Self::format_tag(key, value))
284 }
285
286 parts.sort();
287 write!(result, "{{{}}}", parts.join(","))
288 }
289 }
290 .ok();
291 }
292
293 fn encode_header(name: &str, fullname: &str, value: &MetricValue) -> String {
294 let r#type = prometheus_metric_type(value).as_str();
295 format!("# HELP {fullname} {name}\n# TYPE {fullname} {type}\n")
296 }
297
298 fn format_tag(key: &str, mut value: &str) -> String {
299 let mut result = String::with_capacity(key.len() + value.len() + 3);
301 result.push_str(key);
302 result.push_str("=\"");
303 while let Some(i) = value.find(['\\', '"']) {
304 result.push_str(&value[..i]);
305 result.push('\\');
306 result.push(value.as_bytes()[i] as char);
308 value = &value[i + 1..];
309 }
310 result.push_str(value);
311 result.push('"');
312 result
313 }
314}
315
316type Labels = Vec<proto::Label>;
317
318pub(super) struct TimeSeries {
319 buffer: IndexMap<Labels, Vec<proto::Sample>>,
320 metadata: IndexMap<String, proto::MetricMetadata>,
321 timestamp: Option<i64>,
322}
323
324impl TimeSeries {
325 fn make_labels(
326 tags: Option<&MetricTags>,
327 name: &str,
328 suffix: &str,
329 extra: Option<(&str, String)>,
330 ) -> Labels {
331 let mut labels = tags.cloned().unwrap_or_default();
336 labels.replace(METRIC_NAME_LABEL.into(), [name, suffix].join(""));
337 if let Some((name, value)) = extra {
338 labels.replace(name.into(), value);
339 }
340
341 let mut labels = labels
344 .into_iter_single()
345 .map(|(name, value)| proto::Label { name, value })
346 .collect::<Labels>();
347 labels.sort();
348 labels
349 }
350
351 fn default_timestamp(&mut self) -> i64 {
352 *self
353 .timestamp
354 .get_or_insert_with(|| Utc::now().timestamp_millis())
355 }
356}
357
358impl MetricCollector for TimeSeries {
359 type Output = proto::WriteRequest;
360
361 fn new() -> Self {
362 Self {
363 buffer: Default::default(),
364 metadata: Default::default(),
365 timestamp: None,
366 }
367 }
368
369 fn emit_metadata(&mut self, name: &str, fullname: &str, value: &MetricValue) {
370 if !self.metadata.contains_key(name) {
371 let r#type = prometheus_metric_type(value);
372 let metadata = proto::MetricMetadata {
373 r#type: r#type as i32,
374 metric_family_name: fullname.into(),
375 help: name.into(),
376 unit: String::new(),
377 };
378 self.metadata.insert(name.into(), metadata);
379 }
380 }
381
382 fn emit_value(
383 &mut self,
384 timestamp_millis: Option<i64>,
385 name: &str,
386 suffix: &str,
387 value: f64,
388 tags: Option<&MetricTags>,
389 extra: Option<(&str, String)>,
390 ) {
391 let timestamp = timestamp_millis.unwrap_or_else(|| self.default_timestamp());
392 self.buffer
393 .entry(Self::make_labels(tags, name, suffix, extra))
394 .or_default()
395 .push(proto::Sample { value, timestamp });
396 }
397
398 fn finish(self) -> proto::WriteRequest {
399 let timeseries = self
400 .buffer
401 .into_iter()
402 .map(|(labels, samples)| proto::TimeSeries { labels, samples })
403 .collect::<Vec<_>>();
404 let metadata = self
405 .metadata
406 .into_iter()
407 .map(|(_, metadata)| metadata)
408 .collect();
409 proto::WriteRequest {
410 timeseries,
411 metadata,
412 }
413 }
414}
415
416const fn prometheus_metric_type(metric_value: &MetricValue) -> proto::MetricType {
417 use proto::MetricType;
418 match metric_value {
419 MetricValue::Counter { .. } => MetricType::Counter,
420 MetricValue::Gauge { .. } | MetricValue::Set { .. } => MetricType::Gauge,
421 MetricValue::Distribution {
422 statistic: StatisticKind::Histogram,
423 ..
424 } => MetricType::Histogram,
425 MetricValue::Distribution {
426 statistic: StatisticKind::Summary,
427 ..
428 } => MetricType::Summary,
429 MetricValue::AggregatedHistogram { .. } => MetricType::Histogram,
430 MetricValue::AggregatedSummary { .. } => MetricType::Summary,
431 MetricValue::Sketch { .. } => MetricType::Summary,
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use std::collections::BTreeSet;
438
439 use chrono::{DateTime, TimeZone, Timelike};
440 use indoc::indoc;
441 use similar_asserts::assert_eq;
442 use vector_lib::metric_tags;
443
444 use super::{super::default_summary_quantiles, *};
445 use crate::{
446 event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
447 test_util::stats::VariableHistogram,
448 };
449
450 fn encode_one<T: MetricCollector>(
451 default_namespace: Option<&str>,
452 buckets: &[f64],
453 quantiles: &[f64],
454 metric: &Metric,
455 ) -> T::Output {
456 let mut s = T::new();
457 s.encode_metric(default_namespace, buckets, quantiles, metric);
458 s.finish()
459 }
460
461 fn tags() -> MetricTags {
462 metric_tags!("code" => "200")
463 }
464
465 macro_rules! write_request {
466 ( $name:literal, $help:literal, $type:ident
467 [ $(
468 $suffix:literal @ $timestamp:literal = $svalue:literal
469 [ $( $label:literal => $lvalue:literal ),* ]
470 ),* ]
471 ) => {
472 proto::WriteRequest {
473 timeseries: vec![
474 $(
475 proto::TimeSeries {
476 labels: vec![
477 proto::Label {
478 name: "__name__".into(),
479 value: format!("{}{}", $name, $suffix),
480 },
481 $(
482 proto::Label {
483 name: $label.into(),
484 value: $lvalue.into(),
485 },
486 )*
487 ],
488 samples: vec![ proto::Sample {
489 value: $svalue,
490 timestamp: $timestamp,
491 }],
492 },
493 )*
494 ],
495 metadata: vec![proto::MetricMetadata {
496 r#type: proto::metric_metadata::MetricType::$type as i32,
497 metric_family_name: $name.into(),
498 help: $help.into(),
499 unit: "".into(),
500 }],
501 }
502 };
503 }
504
505 #[test]
506 fn encodes_counter_text() {
507 assert_eq!(
508 encode_counter::<StringCollector>(),
509 indoc! { r#"
510 # HELP vector_hits hits
511 # TYPE vector_hits counter
512 vector_hits{code="200"} 10 1612325106789
513 "#}
514 );
515 }
516
517 #[test]
518 fn encodes_counter_request() {
519 assert_eq!(
520 encode_counter::<TimeSeries>(),
521 write_request!("vector_hits", "hits", Counter ["" @ 1612325106789 = 10.0 ["code" => "200"]])
522 );
523 }
524
525 fn encode_counter<T: MetricCollector>() -> T::Output {
526 let metric = Metric::new(
527 "hits".to_owned(),
528 MetricKind::Absolute,
529 MetricValue::Counter { value: 10.0 },
530 )
531 .with_tags(Some(tags()))
532 .with_timestamp(Some(timestamp()));
533 encode_one::<T>(Some("vector"), &[], &[], &metric)
534 }
535
536 #[test]
537 fn encodes_gauge_text() {
538 assert_eq!(
539 encode_gauge::<StringCollector>(),
540 indoc! { r#"
541 # HELP vector_temperature temperature
542 # TYPE vector_temperature gauge
543 vector_temperature{code="200"} -1.1 1612325106789
544 "#}
545 );
546 }
547
548 #[test]
549 fn encodes_gauge_request() {
550 assert_eq!(
551 encode_gauge::<TimeSeries>(),
552 write_request!("vector_temperature", "temperature", Gauge ["" @ 1612325106789 = -1.1 ["code" => "200"]])
553 );
554 }
555
556 fn encode_gauge<T: MetricCollector>() -> T::Output {
557 let metric = Metric::new(
558 "temperature".to_owned(),
559 MetricKind::Absolute,
560 MetricValue::Gauge { value: -1.1 },
561 )
562 .with_tags(Some(tags()))
563 .with_timestamp(Some(timestamp()));
564 encode_one::<T>(Some("vector"), &[], &[], &metric)
565 }
566
567 #[test]
568 fn encodes_set_text() {
569 assert_eq!(
570 encode_set::<StringCollector>(),
571 indoc! { r"
572 # HELP vector_users users
573 # TYPE vector_users gauge
574 vector_users 1 1612325106789
575 "}
576 );
577 }
578
579 #[test]
580 fn encodes_set_request() {
581 assert_eq!(
582 encode_set::<TimeSeries>(),
583 write_request!("vector_users", "users", Gauge [ "" @ 1612325106789 = 1.0 []])
584 );
585 }
586
587 fn encode_set<T: MetricCollector>() -> T::Output {
588 let metric = Metric::new(
589 "users".to_owned(),
590 MetricKind::Absolute,
591 MetricValue::Set {
592 values: vec!["foo".into()].into_iter().collect(),
593 },
594 )
595 .with_timestamp(Some(timestamp()));
596 encode_one::<T>(Some("vector"), &[], &[], &metric)
597 }
598
599 #[test]
600 fn encodes_expired_set_text() {
601 assert_eq!(
602 encode_expired_set::<StringCollector>(),
603 indoc! {r"
604 # HELP vector_users users
605 # TYPE vector_users gauge
606 vector_users 0 1612325106789
607 "}
608 );
609 }
610
611 #[test]
612 fn encodes_expired_set_request() {
613 assert_eq!(
614 encode_expired_set::<TimeSeries>(),
615 write_request!("vector_users", "users", Gauge ["" @ 1612325106789 = 0.0 []])
616 );
617 }
618
619 fn encode_expired_set<T: MetricCollector>() -> T::Output {
620 let metric = Metric::new(
621 "users".to_owned(),
622 MetricKind::Absolute,
623 MetricValue::Set {
624 values: BTreeSet::new(),
625 },
626 )
627 .with_timestamp(Some(timestamp()));
628 encode_one::<T>(Some("vector"), &[], &[], &metric)
629 }
630
631 #[test]
632 fn encodes_distribution_text() {
633 assert_eq!(
634 encode_distribution::<StringCollector>(),
635 indoc! {r#"
636 # HELP vector_requests requests
637 # TYPE vector_requests histogram
638 vector_requests_bucket{le="0"} 0 1612325106789
639 vector_requests_bucket{le="2.5"} 6 1612325106789
640 vector_requests_bucket{le="5"} 8 1612325106789
641 vector_requests_bucket{le="+Inf"} 8 1612325106789
642 vector_requests_sum 15 1612325106789
643 vector_requests_count 8 1612325106789
644 "#}
645 );
646 }
647
648 #[test]
649 fn encodes_distribution_request() {
650 assert_eq!(
651 encode_distribution::<TimeSeries>(),
652 write_request!(
653 "vector_requests", "requests", Histogram [
654 "_bucket" @ 1612325106789 = 0.0 ["le" => "0"],
655 "_bucket" @ 1612325106789 = 6.0 ["le" => "2.5"],
656 "_bucket" @ 1612325106789 = 8.0 ["le" => "5"],
657 "_bucket" @ 1612325106789 = 8.0 ["le" => "+Inf"],
658 "_sum" @ 1612325106789 = 15.0 [],
659 "_count" @ 1612325106789 = 8.0 []
660 ]
661 )
662 );
663 }
664
665 fn encode_distribution<T: MetricCollector>() -> T::Output {
666 let metric = Metric::new(
667 "requests".to_owned(),
668 MetricKind::Absolute,
669 MetricValue::Distribution {
670 samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
671 statistic: StatisticKind::Histogram,
672 },
673 )
674 .with_timestamp(Some(timestamp()));
675 encode_one::<T>(Some("vector"), &[0.0, 2.5, 5.0], &[], &metric)
676 }
677
678 #[test]
679 fn encodes_histogram_text() {
680 assert_eq!(
681 encode_histogram::<StringCollector>(false),
682 indoc! {r#"
683 # HELP vector_requests requests
684 # TYPE vector_requests histogram
685 vector_requests_bucket{le="1"} 1 1612325106789
686 vector_requests_bucket{le="2.1"} 3 1612325106789
687 vector_requests_bucket{le="3"} 6 1612325106789
688 vector_requests_bucket{le="+Inf"} 6 1612325106789
689 vector_requests_sum 11.5 1612325106789
690 vector_requests_count 6 1612325106789
691 "#}
692 );
693 }
694
695 #[test]
696 fn encodes_histogram_request() {
697 assert_eq!(
698 encode_histogram::<TimeSeries>(false),
699 write_request!(
700 "vector_requests", "requests", Histogram [
701 "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
702 "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
703 "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
704 "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
705 "_sum" @ 1612325106789 = 11.5 [],
706 "_count" @ 1612325106789 = 6.0 []
707 ]
708 )
709 );
710 }
711
712 #[test]
713 fn encodes_histogram_text_with_extra_infinity_bound() {
714 assert_eq!(
715 encode_histogram::<StringCollector>(true),
716 indoc! {r#"
717 # HELP vector_requests requests
718 # TYPE vector_requests histogram
719 vector_requests_bucket{le="1"} 1 1612325106789
720 vector_requests_bucket{le="2.1"} 3 1612325106789
721 vector_requests_bucket{le="3"} 6 1612325106789
722 vector_requests_bucket{le="+Inf"} 6 1612325106789
723 vector_requests_sum 11.5 1612325106789
724 vector_requests_count 6 1612325106789
725 "#}
726 );
727 }
728
729 #[test]
730 fn encodes_histogram_request_with_extra_infinity_bound() {
731 assert_eq!(
732 encode_histogram::<TimeSeries>(true),
733 write_request!(
734 "vector_requests", "requests", Histogram [
735 "_bucket" @ 1612325106789 = 1.0 ["le" => "1"],
736 "_bucket" @ 1612325106789 = 3.0 ["le" => "2.1"],
737 "_bucket" @ 1612325106789 = 6.0 ["le" => "3"],
738 "_bucket" @ 1612325106789 = 6.0 ["le" => "+Inf"],
739 "_sum" @ 1612325106789 = 11.5 [],
740 "_count" @ 1612325106789 = 6.0 []
741 ]
742 )
743 );
744 }
745
746 fn encode_histogram<T: MetricCollector>(add_inf_bound: bool) -> T::Output {
747 let bounds = if add_inf_bound {
748 &[1.0, 2.1, 3.0, f64::INFINITY][..]
749 } else {
750 &[1.0, 2.1, 3.0][..]
751 };
752
753 let mut histogram = VariableHistogram::new(bounds);
754 histogram.record_many(&[0.4, 2.0, 1.75, 2.6, 2.25, 2.5][..]);
755
756 let metric = Metric::new(
757 "requests".to_owned(),
758 MetricKind::Absolute,
759 MetricValue::AggregatedHistogram {
760 buckets: histogram.buckets(),
761 count: histogram.count(),
762 sum: histogram.sum(),
763 },
764 )
765 .with_timestamp(Some(timestamp()));
766 encode_one::<T>(Some("vector"), &[], &[], &metric)
767 }
768
769 #[test]
770 fn encodes_summary_text() {
771 assert_eq!(
772 encode_summary::<StringCollector>(),
773 indoc! {r#"# HELP ns_requests requests
774 # TYPE ns_requests summary
775 ns_requests{code="200",quantile="0.01"} 1.5 1612325106789
776 ns_requests{code="200",quantile="0.5"} 2 1612325106789
777 ns_requests{code="200",quantile="0.99"} 3 1612325106789
778 ns_requests_sum{code="200"} 12 1612325106789
779 ns_requests_count{code="200"} 6 1612325106789
780 "#}
781 );
782 }
783
784 #[test]
785 fn encodes_summary_request() {
786 assert_eq!(
787 encode_summary::<TimeSeries>(),
788 write_request!(
789 "ns_requests", "requests", Summary [
790 "" @ 1612325106789 = 1.5 ["code" => "200", "quantile" => "0.01"],
791 "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
792 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
793 "_sum" @ 1612325106789 = 12.0 ["code" => "200"],
794 "_count" @ 1612325106789 = 6.0 ["code" => "200"]
795 ]
796 )
797 );
798 }
799
800 fn encode_summary<T: MetricCollector>() -> T::Output {
801 let metric = Metric::new(
802 "requests".to_owned(),
803 MetricKind::Absolute,
804 MetricValue::AggregatedSummary {
805 quantiles: vector_lib::quantiles![0.01 => 1.5, 0.5 => 2.0, 0.99 => 3.0],
806 count: 6,
807 sum: 12.0,
808 },
809 )
810 .with_tags(Some(tags()))
811 .with_timestamp(Some(timestamp()));
812 encode_one::<T>(Some("ns"), &[], &[], &metric)
813 }
814
815 #[test]
816 fn encodes_distribution_summary_text() {
817 assert_eq!(
818 encode_distribution_summary::<StringCollector>(),
819 indoc! {r#"
820 # HELP ns_requests requests
821 # TYPE ns_requests summary
822 ns_requests{code="200",quantile="0.5"} 2 1612325106789
823 ns_requests{code="200",quantile="0.75"} 2 1612325106789
824 ns_requests{code="200",quantile="0.9"} 3 1612325106789
825 ns_requests{code="200",quantile="0.95"} 3 1612325106789
826 ns_requests{code="200",quantile="0.99"} 3 1612325106789
827 ns_requests_sum{code="200"} 15 1612325106789
828 ns_requests_count{code="200"} 8 1612325106789
829 ns_requests_min{code="200"} 1 1612325106789
830 ns_requests_max{code="200"} 3 1612325106789
831 ns_requests_avg{code="200"} 1.875 1612325106789
832 "#}
833 );
834 }
835
836 #[test]
837 fn encodes_distribution_summary_request() {
838 assert_eq!(
839 encode_distribution_summary::<TimeSeries>(),
840 write_request!(
841 "ns_requests", "requests", Summary [
842 "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.5"],
843 "" @ 1612325106789 = 2.0 ["code" => "200", "quantile" => "0.75"],
844 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.9"],
845 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.95"],
846 "" @ 1612325106789 = 3.0 ["code" => "200", "quantile" => "0.99"],
847 "_sum" @ 1612325106789 = 15.0 ["code" => "200"],
848 "_count" @ 1612325106789 = 8.0 ["code" => "200"],
849 "_min" @ 1612325106789 = 1.0 ["code" => "200"],
850 "_max" @ 1612325106789 = 3.0 ["code" => "200"],
851 "_avg" @ 1612325106789 = 1.875 ["code" => "200"]
852 ]
853 )
854 );
855 }
856
857 fn encode_distribution_summary<T: MetricCollector>() -> T::Output {
858 let metric = Metric::new(
859 "requests".to_owned(),
860 MetricKind::Absolute,
861 MetricValue::Distribution {
862 samples: vector_lib::samples![1.0 => 3, 2.0 => 3, 3.0 => 2],
863 statistic: StatisticKind::Summary,
864 },
865 )
866 .with_tags(Some(tags()))
867 .with_timestamp(Some(timestamp()));
868 encode_one::<T>(Some("ns"), &[], &default_summary_quantiles(), &metric)
869 }
870
871 #[test]
872 fn encodes_timestamp_text() {
873 assert_eq!(
874 encode_timestamp::<StringCollector>(),
875 indoc! {r"
876 # HELP temperature temperature
877 # TYPE temperature counter
878 temperature 2 1612325106789
879 "}
880 );
881 }
882
883 #[test]
884 fn encodes_timestamp_request() {
885 assert_eq!(
886 encode_timestamp::<TimeSeries>(),
887 write_request!("temperature", "temperature", Counter ["" @ 1612325106789 = 2.0 []])
888 );
889 }
890
891 fn encode_timestamp<T: MetricCollector>() -> T::Output {
892 let metric = Metric::new(
893 "temperature".to_owned(),
894 MetricKind::Absolute,
895 MetricValue::Counter { value: 2.0 },
896 )
897 .with_timestamp(Some(timestamp()));
898 encode_one::<T>(None, &[], &[], &metric)
899 }
900
901 #[test]
902 fn adds_timestamp_request() {
903 let now = Utc::now().timestamp_millis();
904 let metric = Metric::new(
905 "something".to_owned(),
906 MetricKind::Absolute,
907 MetricValue::Gauge { value: 1.0 },
908 );
909 let encoded = encode_one::<TimeSeries>(None, &[], &[], &metric);
910 assert!(encoded.timeseries[0].samples[0].timestamp >= now);
911 }
912
913 fn timestamp() -> DateTime<Utc> {
914 Utc.with_ymd_and_hms(2021, 2, 3, 4, 5, 6)
915 .single()
916 .and_then(|t| t.with_nanosecond(789 * 1_000_000))
917 .expect("invalid timestamp")
918 }
919
920 #[test]
921 fn escapes_tags_text() {
922 let tags = metric_tags!(
923 "code" => "200",
924 "quoted" => r#"host"1""#,
925 "path" => r"c:\Windows",
926 );
927 let metric = Metric::new(
928 "something".to_owned(),
929 MetricKind::Absolute,
930 MetricValue::Counter { value: 1.0 },
931 )
932 .with_tags(Some(tags));
933 let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
934 assert_eq!(
935 encoded,
936 indoc! {r#"
937 # HELP something something
938 # TYPE something counter
939 something{code="200",path="c:\\Windows",quoted="host\"1\""} 1
940 "#}
941 );
942 }
943
944 #[test]
951 fn encodes_duplicate_tags() {
952 let tags = metric_tags!(
953 "code" => "200",
954 "code" => "success",
955 );
956 let metric = Metric::new(
957 "something".to_owned(),
958 MetricKind::Absolute,
959 MetricValue::Counter { value: 1.0 },
960 )
961 .with_tags(Some(tags));
962 let encoded = encode_one::<StringCollector>(None, &[], &[], &metric);
963 assert_eq!(
964 encoded,
965 indoc! {r#"
966 # HELP something something
967 # TYPE something counter
968 something{code="success"} 1
969 "#}
970 );
971 }
972}