vector_core/event/metric/
mod.rs

1#[cfg(feature = "vrl")]
2use std::convert::TryFrom;
3
4#[cfg(feature = "vrl")]
5use vrl::compiler::value::VrlValueConvert;
6
7use std::{
8    convert::AsRef,
9    fmt::{self, Display, Formatter},
10    num::NonZeroU32,
11};
12
13use chrono::{DateTime, Utc};
14use vector_common::{
15    byte_size_of::ByteSizeOf,
16    internal_event::{OptionalTag, TaggedEventsSent},
17    json_size::JsonSize,
18    request_metadata::GetEventCountTags,
19    EventDataEq,
20};
21use vector_config::configurable_component;
22
23use super::{
24    estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, BatchNotifier, EventFinalizer,
25    EventFinalizers, EventMetadata, Finalizable,
26};
27use crate::config::telemetry;
28
29#[cfg(any(test, feature = "test"))]
30mod arbitrary;
31
32mod data;
33pub use self::data::*;
34
35mod series;
36pub use self::series::*;
37
38mod tags;
39pub use self::tags::*;
40
41mod value;
42pub use self::value::*;
43
44#[macro_export]
45macro_rules! metric_tags {
46    () => { $crate::event::MetricTags::default() };
47
48    ($($key:expr => $value:expr,)+) => { $crate::metric_tags!($($key => $value),+) };
49
50    ($($key:expr => $value:expr),*) => {
51        [
52            $( ($key.into(), $crate::event::metric::TagValue::from($value)), )*
53        ].into_iter().collect::<$crate::event::MetricTags>()
54    };
55}
56
57/// A metric.
58#[configurable_component]
59#[derive(Clone, Debug, PartialEq)]
60pub struct Metric {
61    #[serde(flatten)]
62    pub(super) series: MetricSeries,
63
64    #[serde(flatten)]
65    pub(super) data: MetricData,
66
67    /// Internal event metadata.
68    #[serde(skip, default = "EventMetadata::default")]
69    metadata: EventMetadata,
70}
71
72impl Metric {
73    /// Creates a new `Metric` with the given `name`, `kind`, and `value`.
74    pub fn new<T: Into<String>>(name: T, kind: MetricKind, value: MetricValue) -> Self {
75        Self::new_with_metadata(name, kind, value, EventMetadata::default())
76    }
77
78    /// Creates a new `Metric` with the given `name`, `kind`, `value`, and `metadata`.
79    pub fn new_with_metadata<T: Into<String>>(
80        name: T,
81        kind: MetricKind,
82        value: MetricValue,
83        metadata: EventMetadata,
84    ) -> Self {
85        Self {
86            series: MetricSeries {
87                name: MetricName {
88                    name: name.into(),
89                    namespace: None,
90                },
91                tags: None,
92            },
93            data: MetricData {
94                time: MetricTime {
95                    timestamp: None,
96                    interval_ms: None,
97                },
98                kind,
99                value,
100            },
101            metadata,
102        }
103    }
104
105    /// Consumes this metric, returning it with an updated series based on the given `name`.
106    #[inline]
107    #[must_use]
108    pub fn with_name(mut self, name: impl Into<String>) -> Self {
109        self.series.name.name = name.into();
110        self
111    }
112
113    /// Consumes this metric, returning it with an updated series based on the given `namespace`.
114    #[inline]
115    #[must_use]
116    pub fn with_namespace<T: Into<String>>(mut self, namespace: Option<T>) -> Self {
117        self.series.name.namespace = namespace.map(Into::into);
118        self
119    }
120
121    /// Consumes this metric, returning it with an updated timestamp.
122    #[inline]
123    #[must_use]
124    pub fn with_timestamp(mut self, timestamp: Option<DateTime<Utc>>) -> Self {
125        self.data.time.timestamp = timestamp;
126        self
127    }
128
129    /// Consumes this metric, returning it with an updated interval.
130    #[inline]
131    #[must_use]
132    pub fn with_interval_ms(mut self, interval_ms: Option<NonZeroU32>) -> Self {
133        self.data.time.interval_ms = interval_ms;
134        self
135    }
136
137    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
138        self.metadata.add_finalizer(finalizer);
139    }
140
141    /// Consumes this metric, returning it with an updated set of event finalizers attached to `batch`.
142    #[must_use]
143    pub fn with_batch_notifier(mut self, batch: &BatchNotifier) -> Self {
144        self.metadata = self.metadata.with_batch_notifier(batch);
145        self
146    }
147
148    /// Consumes this metric, returning it with an optionally updated set of event finalizers attached to `batch`.
149    #[must_use]
150    pub fn with_batch_notifier_option(mut self, batch: &Option<BatchNotifier>) -> Self {
151        self.metadata = self.metadata.with_batch_notifier_option(batch);
152        self
153    }
154
155    /// Consumes this metric, returning it with an updated series based on the given `tags`.
156    #[inline]
157    #[must_use]
158    pub fn with_tags(mut self, tags: Option<MetricTags>) -> Self {
159        self.series.tags = tags;
160        self
161    }
162
163    /// Consumes this metric, returning it with an updated value.
164    #[inline]
165    #[must_use]
166    pub fn with_value(mut self, value: MetricValue) -> Self {
167        self.data.value = value;
168        self
169    }
170
171    /// Gets a reference to the series of this metric.
172    ///
173    /// The "series" is the name of the metric itself, including any tags. In other words, it is the unique identifier
174    /// for a metric, although metrics of different values (counter vs gauge) may be able to co-exist in outside metrics
175    /// implementations with identical series.
176    pub fn series(&self) -> &MetricSeries {
177        &self.series
178    }
179
180    /// Gets a reference to the data of this metric.
181    pub fn data(&self) -> &MetricData {
182        &self.data
183    }
184
185    /// Gets a mutable reference to the data of this metric.
186    pub fn data_mut(&mut self) -> &mut MetricData {
187        &mut self.data
188    }
189
190    /// Gets a reference to the metadata of this metric.
191    pub fn metadata(&self) -> &EventMetadata {
192        &self.metadata
193    }
194
195    /// Gets a mutable reference to the metadata of this metric.
196    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
197        &mut self.metadata
198    }
199
200    /// Gets a reference to the name of this metric.
201    ///
202    /// The name of the metric does not include the namespace or tags.
203    #[inline]
204    pub fn name(&self) -> &str {
205        &self.series.name.name
206    }
207
208    /// Gets a reference to the namespace of this metric, if it exists.
209    #[inline]
210    pub fn namespace(&self) -> Option<&str> {
211        self.series.name.namespace.as_deref()
212    }
213
214    /// Takes the namespace out of this metric, if it exists, leaving it empty.
215    #[inline]
216    pub fn take_namespace(&mut self) -> Option<String> {
217        self.series.name.namespace.take()
218    }
219
220    /// Gets a reference to the tags of this metric, if they exist.
221    #[inline]
222    pub fn tags(&self) -> Option<&MetricTags> {
223        self.series.tags.as_ref()
224    }
225
226    /// Gets a mutable reference to the tags of this metric, if they exist.
227    #[inline]
228    pub fn tags_mut(&mut self) -> Option<&mut MetricTags> {
229        self.series.tags.as_mut()
230    }
231
232    /// Gets a reference to the timestamp of this metric, if it exists.
233    #[inline]
234    pub fn timestamp(&self) -> Option<DateTime<Utc>> {
235        self.data.time.timestamp
236    }
237
238    /// Gets a reference to the interval (in milliseconds) covered by this metric, if it exists.
239    #[inline]
240    pub fn interval_ms(&self) -> Option<NonZeroU32> {
241        self.data.time.interval_ms
242    }
243
244    /// Gets a reference to the value of this metric.
245    #[inline]
246    pub fn value(&self) -> &MetricValue {
247        &self.data.value
248    }
249
250    /// Gets a mutable reference to the value of this metric.
251    #[inline]
252    pub fn value_mut(&mut self) -> &mut MetricValue {
253        &mut self.data.value
254    }
255
256    /// Gets the kind of this metric.
257    #[inline]
258    pub fn kind(&self) -> MetricKind {
259        self.data.kind
260    }
261
262    /// Gets the time information of this metric.
263    #[inline]
264    pub fn time(&self) -> MetricTime {
265        self.data.time
266    }
267
268    /// Decomposes a `Metric` into its individual parts.
269    #[inline]
270    pub fn into_parts(self) -> (MetricSeries, MetricData, EventMetadata) {
271        (self.series, self.data, self.metadata)
272    }
273
274    /// Creates a `Metric` directly from the raw components of another metric.
275    #[inline]
276    pub fn from_parts(series: MetricSeries, data: MetricData, metadata: EventMetadata) -> Self {
277        Self {
278            series,
279            data,
280            metadata,
281        }
282    }
283
284    /// Consumes this metric, returning it as an absolute metric.
285    ///
286    /// If the metric was already absolute, nothing is changed.
287    #[must_use]
288    pub fn into_absolute(self) -> Self {
289        Self {
290            series: self.series,
291            data: self.data.into_absolute(),
292            metadata: self.metadata,
293        }
294    }
295
296    /// Consumes this metric, returning it as an incremental metric.
297    ///
298    /// If the metric was already incremental, nothing is changed.
299    #[must_use]
300    pub fn into_incremental(self) -> Self {
301        Self {
302            series: self.series,
303            data: self.data.into_incremental(),
304            metadata: self.metadata,
305        }
306    }
307
308    /// Creates a new metric from components specific to a metric emitted by `metrics`.
309    #[allow(clippy::cast_precision_loss)]
310    pub(crate) fn from_metric_kv(
311        key: &metrics::Key,
312        value: MetricValue,
313        timestamp: DateTime<Utc>,
314    ) -> Self {
315        let labels = key
316            .labels()
317            .map(|label| (String::from(label.key()), String::from(label.value())))
318            .collect::<MetricTags>();
319
320        Self::new(key.name().to_string(), MetricKind::Absolute, value)
321            .with_namespace(Some("vector"))
322            .with_timestamp(Some(timestamp))
323            .with_tags((!labels.is_empty()).then_some(labels))
324    }
325
326    /// Removes a tag from this metric, returning the value of the tag if the tag was previously in the metric.
327    pub fn remove_tag(&mut self, key: &str) -> Option<String> {
328        self.series.remove_tag(key)
329    }
330
331    /// Removes all the tags.
332    pub fn remove_tags(&mut self) {
333        self.series.remove_tags();
334    }
335
336    /// Returns `true` if `name` tag is present, and matches the provided `value`
337    pub fn tag_matches(&self, name: &str, value: &str) -> bool {
338        self.tags()
339            .filter(|t| t.get(name).filter(|v| *v == value).is_some())
340            .is_some()
341    }
342
343    /// Returns the string value of a tag, if it exists
344    pub fn tag_value(&self, name: &str) -> Option<String> {
345        self.tags().and_then(|t| t.get(name)).map(ToOwned::to_owned)
346    }
347
348    /// Inserts a tag into this metric.
349    ///
350    /// If the metric did not have this tag, `None` will be returned. Otherwise, `Some(String)` will be returned,
351    /// containing the previous value of the tag.
352    ///
353    /// *Note:* This will create the tags map if it is not present.
354    pub fn replace_tag(&mut self, name: String, value: String) -> Option<String> {
355        self.series.replace_tag(name, value)
356    }
357
358    pub fn set_multi_value_tag(
359        &mut self,
360        name: String,
361        values: impl IntoIterator<Item = TagValue>,
362    ) {
363        self.series.set_multi_value_tag(name, values);
364    }
365
366    /// Zeroes out the data in this metric.
367    pub fn zero(&mut self) {
368        self.data.zero();
369    }
370
371    /// Adds the data from the `other` metric to this one.
372    ///
373    /// The other metric must be incremental and contain the same value type as this one.
374    #[must_use]
375    pub fn add(&mut self, other: impl AsRef<MetricData>) -> bool {
376        self.data.add(other.as_ref())
377    }
378
379    /// Updates this metric by adding the data from `other`.
380    #[must_use]
381    pub fn update(&mut self, other: impl AsRef<MetricData>) -> bool {
382        self.data.update(other.as_ref())
383    }
384
385    /// Subtracts the data from the `other` metric from this one.
386    ///
387    /// The other metric must contain the same value type as this one.
388    #[must_use]
389    pub fn subtract(&mut self, other: impl AsRef<MetricData>) -> bool {
390        self.data.subtract(other.as_ref())
391    }
392
393    /// Reduces all the tag values to their single value, discarding any for which that value would
394    /// be null. If the result is empty, the tag set is dropped.
395    pub fn reduce_tags_to_single(&mut self) {
396        if let Some(tags) = &mut self.series.tags {
397            tags.reduce_to_single();
398            if tags.is_empty() {
399                self.series.tags = None;
400            }
401        }
402    }
403}
404
405impl AsRef<MetricData> for Metric {
406    fn as_ref(&self) -> &MetricData {
407        &self.data
408    }
409}
410
411impl AsRef<MetricValue> for Metric {
412    fn as_ref(&self) -> &MetricValue {
413        &self.data.value
414    }
415}
416
417impl Display for Metric {
418    /// Display a metric using something like Prometheus' text format:
419    ///
420    /// ```text
421    /// TIMESTAMP NAMESPACE_NAME{TAGS} KIND DATA
422    /// ```
423    ///
424    /// TIMESTAMP is in ISO 8601 format with UTC time zone.
425    ///
426    /// KIND is either `=` for absolute metrics, or `+` for incremental
427    /// metrics.
428    ///
429    /// DATA is dependent on the type of metric, and is a simplified
430    /// representation of the data contents. In particular,
431    /// distributions, histograms, and summaries are represented as a
432    /// list of `X@Y` words, where `X` is the rate, count, or quantile,
433    /// and `Y` is the value or bucket.
434    ///
435    /// example:
436    /// ```text
437    /// 2020-08-12T20:23:37.248661343Z vector_received_bytes_total{component_kind="sink",component_type="blackhole"} = 6391
438    /// ```
439    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
440        if let Some(timestamp) = &self.data.time.timestamp {
441            write!(fmt, "{timestamp:?} ")?;
442        }
443        let kind = match self.data.kind {
444            MetricKind::Absolute => '=',
445            MetricKind::Incremental => '+',
446        };
447        self.series.fmt(fmt)?;
448        write!(fmt, " {kind} ")?;
449        self.data.value.fmt(fmt)
450    }
451}
452
453impl EventDataEq for Metric {
454    fn event_data_eq(&self, other: &Self) -> bool {
455        self.series == other.series
456            && self.data == other.data
457            && self.metadata.event_data_eq(&other.metadata)
458    }
459}
460
461impl ByteSizeOf for Metric {
462    fn allocated_bytes(&self) -> usize {
463        self.series.allocated_bytes()
464            + self.data.allocated_bytes()
465            + self.metadata.allocated_bytes()
466    }
467}
468
469impl EstimatedJsonEncodedSizeOf for Metric {
470    fn estimated_json_encoded_size_of(&self) -> JsonSize {
471        // TODO: For now we're using the in-memory representation of the metric, but we'll convert
472        // this to actually calculate the JSON encoded size in the near future.
473        self.size_of().into()
474    }
475}
476
477impl Finalizable for Metric {
478    fn take_finalizers(&mut self) -> EventFinalizers {
479        self.metadata.take_finalizers()
480    }
481}
482
483impl GetEventCountTags for Metric {
484    fn get_tags(&self) -> TaggedEventsSent {
485        let source = if telemetry().tags().emit_source {
486            self.metadata().source_id().cloned().into()
487        } else {
488            OptionalTag::Ignored
489        };
490
491        // Currently there is no way to specify a tag that means the service,
492        // so we will be hardcoding it to "service".
493        let service = if telemetry().tags().emit_service {
494            self.tags()
495                .and_then(|tags| tags.get("service").map(ToString::to_string))
496                .into()
497        } else {
498            OptionalTag::Ignored
499        };
500
501        TaggedEventsSent { source, service }
502    }
503}
504
505/// Metric kind.
506///
507/// Metrics can be either absolute or incremental. Absolute metrics represent a sort of "last write wins" scenario,
508/// where the latest absolute value seen is meant to be the actual metric value.  In contrast, and perhaps intuitively,
509/// incremental metrics are meant to be additive, such that we don't know what total value of the metric is, but we know
510/// that we'll be adding or subtracting the given value from it.
511///
512/// Generally speaking, most metrics storage systems deal with incremental updates. A notable exception is Prometheus,
513/// which deals with, and expects, absolute values from clients.
514#[configurable_component]
515#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd)]
516#[serde(rename_all = "snake_case")]
517pub enum MetricKind {
518    /// Incremental metric.
519    Incremental,
520
521    /// Absolute metric.
522    Absolute,
523}
524
525#[cfg(feature = "vrl")]
526impl TryFrom<vrl::value::Value> for MetricKind {
527    type Error = String;
528
529    fn try_from(value: vrl::value::Value) -> Result<Self, Self::Error> {
530        let value = value.try_bytes().map_err(|e| e.to_string())?;
531        match std::str::from_utf8(&value).map_err(|e| e.to_string())? {
532            "incremental" => Ok(Self::Incremental),
533            "absolute" => Ok(Self::Absolute),
534            value => Err(format!(
535                "invalid metric kind {value}, metric kind must be `absolute` or `incremental`"
536            )),
537        }
538    }
539}
540
541#[cfg(feature = "vrl")]
542impl From<MetricKind> for vrl::value::Value {
543    fn from(kind: MetricKind) -> Self {
544        match kind {
545            MetricKind::Incremental => "incremental".into(),
546            MetricKind::Absolute => "absolute".into(),
547        }
548    }
549}
550
551#[macro_export]
552macro_rules! samples {
553    ( $( $value:expr => $rate:expr ),* ) => {
554        vec![ $( $crate::event::metric::Sample { value: $value, rate: $rate }, )* ]
555    }
556}
557
558#[macro_export]
559macro_rules! buckets {
560    ( $( $limit:expr => $count:expr ),* ) => {
561        vec![ $( $crate::event::metric::Bucket { upper_limit: $limit, count: $count }, )* ]
562    }
563}
564
565#[macro_export]
566macro_rules! quantiles {
567    ( $( $q:expr => $value:expr ),* ) => {
568        vec![ $( $crate::event::metric::Quantile { quantile: $q, value: $value }, )* ]
569    }
570}
571
572#[inline]
573pub(crate) fn zip_samples(
574    values: impl IntoIterator<Item = f64>,
575    rates: impl IntoIterator<Item = u32>,
576) -> Vec<Sample> {
577    values
578        .into_iter()
579        .zip(rates)
580        .map(|(value, rate)| Sample { value, rate })
581        .collect()
582}
583
584#[inline]
585pub(crate) fn zip_buckets(
586    limits: impl IntoIterator<Item = f64>,
587    counts: impl IntoIterator<Item = u64>,
588) -> Vec<Bucket> {
589    limits
590        .into_iter()
591        .zip(counts)
592        .map(|(upper_limit, count)| Bucket { upper_limit, count })
593        .collect()
594}
595
596#[inline]
597pub(crate) fn zip_quantiles(
598    quantiles: impl IntoIterator<Item = f64>,
599    values: impl IntoIterator<Item = f64>,
600) -> Vec<Quantile> {
601    quantiles
602        .into_iter()
603        .zip(values)
604        .map(|(quantile, value)| Quantile { quantile, value })
605        .collect()
606}
607
608fn write_list<I, T, W>(
609    fmt: &mut Formatter<'_>,
610    sep: &str,
611    items: I,
612    writer: W,
613) -> Result<(), fmt::Error>
614where
615    I: IntoIterator<Item = T>,
616    W: Fn(&mut Formatter<'_>, T) -> Result<(), fmt::Error>,
617{
618    let mut this_sep = "";
619    for item in items {
620        write!(fmt, "{this_sep}")?;
621        writer(fmt, item)?;
622        this_sep = sep;
623    }
624    Ok(())
625}
626
627fn write_word(fmt: &mut Formatter<'_>, word: &str) -> Result<(), fmt::Error> {
628    if word.contains(|c: char| !c.is_ascii_alphanumeric() && c != '_') {
629        write!(fmt, "{word:?}")
630    } else {
631        write!(fmt, "{word}")
632    }
633}
634
635pub fn samples_to_buckets(samples: &[Sample], buckets: &[f64]) -> (Vec<Bucket>, u64, f64) {
636    let mut counts = vec![0; buckets.len()];
637    let mut sum = 0.0;
638    let mut count = 0;
639    for sample in samples {
640        let rate = u64::from(sample.rate);
641
642        if let Some((i, _)) = buckets
643            .iter()
644            .enumerate()
645            .find(|&(_, b)| *b >= sample.value)
646        {
647            counts[i] += rate;
648        }
649
650        sum += sample.value * f64::from(sample.rate);
651        count += rate;
652    }
653
654    let buckets = buckets
655        .iter()
656        .zip(counts.iter())
657        .map(|(b, c)| Bucket {
658            upper_limit: *b,
659            count: *c,
660        })
661        .collect();
662
663    (buckets, count, sum)
664}
665
666#[cfg(test)]
667mod test {
668    use std::collections::BTreeSet;
669
670    use chrono::{offset::TimeZone, DateTime, Timelike, Utc};
671    use similar_asserts::assert_eq;
672
673    use super::*;
674
675    fn ts() -> DateTime<Utc> {
676        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
677            .single()
678            .and_then(|t| t.with_nanosecond(11))
679            .expect("invalid timestamp")
680    }
681
682    fn tags() -> MetricTags {
683        metric_tags!(
684            "normal_tag" => "value",
685            "true_tag" => "true",
686            "empty_tag" => "",
687        )
688    }
689
690    #[test]
691    fn merge_counters() {
692        let mut counter = Metric::new(
693            "counter",
694            MetricKind::Incremental,
695            MetricValue::Counter { value: 1.0 },
696        );
697
698        let delta = Metric::new(
699            "counter",
700            MetricKind::Incremental,
701            MetricValue::Counter { value: 2.0 },
702        )
703        .with_namespace(Some("vector"))
704        .with_tags(Some(tags()))
705        .with_timestamp(Some(ts()));
706
707        let expected = counter
708            .clone()
709            .with_value(MetricValue::Counter { value: 3.0 })
710            .with_timestamp(Some(ts()));
711
712        assert!(counter.data.add(&delta.data));
713        assert_eq!(counter, expected);
714    }
715
716    #[test]
717    fn merge_gauges() {
718        let mut gauge = Metric::new(
719            "gauge",
720            MetricKind::Incremental,
721            MetricValue::Gauge { value: 1.0 },
722        );
723
724        let delta = Metric::new(
725            "gauge",
726            MetricKind::Incremental,
727            MetricValue::Gauge { value: -2.0 },
728        )
729        .with_namespace(Some("vector"))
730        .with_tags(Some(tags()))
731        .with_timestamp(Some(ts()));
732
733        let expected = gauge
734            .clone()
735            .with_value(MetricValue::Gauge { value: -1.0 })
736            .with_timestamp(Some(ts()));
737
738        assert!(gauge.data.add(&delta.data));
739        assert_eq!(gauge, expected);
740    }
741
742    #[test]
743    fn merge_sets() {
744        let mut set = Metric::new(
745            "set",
746            MetricKind::Incremental,
747            MetricValue::Set {
748                values: vec!["old".into()].into_iter().collect(),
749            },
750        );
751
752        let delta = Metric::new(
753            "set",
754            MetricKind::Incremental,
755            MetricValue::Set {
756                values: vec!["new".into()].into_iter().collect(),
757            },
758        )
759        .with_namespace(Some("vector"))
760        .with_tags(Some(tags()))
761        .with_timestamp(Some(ts()));
762
763        let expected = set
764            .clone()
765            .with_value(MetricValue::Set {
766                values: vec!["old".into(), "new".into()].into_iter().collect(),
767            })
768            .with_timestamp(Some(ts()));
769
770        assert!(set.data.add(&delta.data));
771        assert_eq!(set, expected);
772    }
773
774    #[test]
775    fn merge_histograms() {
776        let mut dist = Metric::new(
777            "hist",
778            MetricKind::Incremental,
779            MetricValue::Distribution {
780                samples: samples![1.0 => 10],
781                statistic: StatisticKind::Histogram,
782            },
783        );
784
785        let delta = Metric::new(
786            "hist",
787            MetricKind::Incremental,
788            MetricValue::Distribution {
789                samples: samples![1.0 => 20],
790                statistic: StatisticKind::Histogram,
791            },
792        )
793        .with_namespace(Some("vector"))
794        .with_tags(Some(tags()))
795        .with_timestamp(Some(ts()));
796
797        let expected = dist
798            .clone()
799            .with_value(MetricValue::Distribution {
800                samples: samples![1.0 => 10, 1.0 => 20],
801                statistic: StatisticKind::Histogram,
802            })
803            .with_timestamp(Some(ts()));
804
805        assert!(dist.data.add(&delta.data));
806        assert_eq!(dist, expected);
807    }
808
809    #[test]
810    fn subtract_counters() {
811        // Make sure a newer/higher value counter can subtract an older/lesser value counter:
812        let old_counter = Metric::new(
813            "counter",
814            MetricKind::Absolute,
815            MetricValue::Counter { value: 4.0 },
816        );
817
818        let mut new_counter = Metric::new(
819            "counter",
820            MetricKind::Absolute,
821            MetricValue::Counter { value: 6.0 },
822        );
823
824        assert!(new_counter.subtract(&old_counter));
825        assert_eq!(new_counter.value(), &MetricValue::Counter { value: 2.0 });
826
827        // But not the other way around:
828        let old_counter = Metric::new(
829            "counter",
830            MetricKind::Absolute,
831            MetricValue::Counter { value: 6.0 },
832        );
833
834        let mut new_reset_counter = Metric::new(
835            "counter",
836            MetricKind::Absolute,
837            MetricValue::Counter { value: 1.0 },
838        );
839
840        assert!(!new_reset_counter.subtract(&old_counter));
841    }
842
843    #[test]
844    fn subtract_aggregated_histograms() {
845        // Make sure a newer/higher count aggregated histogram can subtract an older/lower count
846        // aggregated histogram:
847        let old_histogram = Metric::new(
848            "histogram",
849            MetricKind::Absolute,
850            MetricValue::AggregatedHistogram {
851                count: 1,
852                sum: 1.0,
853                buckets: buckets!(2.0 => 1),
854            },
855        );
856
857        let mut new_histogram = Metric::new(
858            "histogram",
859            MetricKind::Absolute,
860            MetricValue::AggregatedHistogram {
861                count: 3,
862                sum: 3.0,
863                buckets: buckets!(2.0 => 3),
864            },
865        );
866
867        assert!(new_histogram.subtract(&old_histogram));
868        assert_eq!(
869            new_histogram.value(),
870            &MetricValue::AggregatedHistogram {
871                count: 2,
872                sum: 2.0,
873                buckets: buckets!(2.0 => 2),
874            }
875        );
876
877        // But not the other way around:
878        let old_histogram = Metric::new(
879            "histogram",
880            MetricKind::Absolute,
881            MetricValue::AggregatedHistogram {
882                count: 3,
883                sum: 3.0,
884                buckets: buckets!(2.0 => 3),
885            },
886        );
887
888        let mut new_reset_histogram = Metric::new(
889            "histogram",
890            MetricKind::Absolute,
891            MetricValue::AggregatedHistogram {
892                count: 1,
893                sum: 1.0,
894                buckets: buckets!(2.0 => 1),
895            },
896        );
897
898        assert!(!new_reset_histogram.subtract(&old_histogram));
899    }
900
901    #[test]
902    // `too_many_lines` is mostly just useful for production code but we're not
903    // able to flag the lint on only for non-test.
904    #[allow(clippy::too_many_lines)]
905    fn display() {
906        assert_eq!(
907            format!(
908                "{}",
909                Metric::new(
910                    "one",
911                    MetricKind::Absolute,
912                    MetricValue::Counter { value: 1.23 },
913                )
914                .with_tags(Some(tags()))
915            ),
916            r#"one{empty_tag="",normal_tag="value",true_tag="true"} = 1.23"#
917        );
918
919        assert_eq!(
920            format!(
921                "{}",
922                Metric::new(
923                    "two word",
924                    MetricKind::Incremental,
925                    MetricValue::Gauge { value: 2.0 }
926                )
927                .with_timestamp(Some(ts()))
928            ),
929            r#"2018-11-14T08:09:10.000000011Z "two word"{} + 2"#
930        );
931
932        assert_eq!(
933            format!(
934                "{}",
935                Metric::new(
936                    "namespace",
937                    MetricKind::Absolute,
938                    MetricValue::Counter { value: 1.23 },
939                )
940                .with_namespace(Some("vector"))
941            ),
942            r"vector_namespace{} = 1.23"
943        );
944
945        assert_eq!(
946            format!(
947                "{}",
948                Metric::new(
949                    "namespace",
950                    MetricKind::Absolute,
951                    MetricValue::Counter { value: 1.23 },
952                )
953                .with_namespace(Some("vector host"))
954            ),
955            r#""vector host"_namespace{} = 1.23"#
956        );
957
958        let mut values = BTreeSet::<String>::new();
959        values.insert("v1".into());
960        values.insert("v2_two".into());
961        values.insert("thrəë".into());
962        values.insert("four=4".into());
963        assert_eq!(
964            format!(
965                "{}",
966                Metric::new("three", MetricKind::Absolute, MetricValue::Set { values })
967            ),
968            r#"three{} = "four=4" "thrəë" v1 v2_two"#
969        );
970
971        assert_eq!(
972            format!(
973                "{}",
974                Metric::new(
975                    "four",
976                    MetricKind::Absolute,
977                    MetricValue::Distribution {
978                        samples: samples![1.0 => 3, 2.0 => 4],
979                        statistic: StatisticKind::Histogram,
980                    }
981                )
982            ),
983            r"four{} = histogram 3@1 4@2"
984        );
985
986        assert_eq!(
987            format!(
988                "{}",
989                Metric::new(
990                    "five",
991                    MetricKind::Absolute,
992                    MetricValue::AggregatedHistogram {
993                        buckets: buckets![51.0 => 53, 52.0 => 54],
994                        count: 107,
995                        sum: 103.0,
996                    }
997                )
998            ),
999            r"five{} = count=107 sum=103 53@51 54@52"
1000        );
1001
1002        assert_eq!(
1003            format!(
1004                "{}",
1005                Metric::new(
1006                    "six",
1007                    MetricKind::Absolute,
1008                    MetricValue::AggregatedSummary {
1009                        quantiles: quantiles![1.0 => 63.0, 2.0 => 64.0],
1010                        count: 2,
1011                        sum: 127.0,
1012                    }
1013                )
1014            ),
1015            r"six{} = count=2 sum=127 1@63 2@64"
1016        );
1017    }
1018
1019    #[test]
1020    fn quantile_to_percentile_string() {
1021        let quantiles = [
1022            (-1.0, "0"),
1023            (0.0, "0"),
1024            (0.25, "25"),
1025            (0.50, "50"),
1026            (0.999, "999"),
1027            (0.9999, "9999"),
1028            (0.99999, "9999"),
1029            (1.0, "100"),
1030            (3.0, "100"),
1031        ];
1032
1033        for (quantile, expected) in quantiles {
1034            let quantile = Quantile {
1035                quantile,
1036                value: 1.0,
1037            };
1038            let result = quantile.to_percentile_string();
1039            assert_eq!(result, expected);
1040        }
1041    }
1042
1043    #[test]
1044    fn quantile_to_string() {
1045        let quantiles = [
1046            (-1.0, "0"),
1047            (0.0, "0"),
1048            (0.25, "0.25"),
1049            (0.50, "0.5"),
1050            (0.999, "0.999"),
1051            (0.9999, "0.9999"),
1052            (0.99999, "0.9999"),
1053            (1.0, "1"),
1054            (3.0, "1"),
1055        ];
1056
1057        for (quantile, expected) in quantiles {
1058            let quantile = Quantile {
1059                quantile,
1060                value: 1.0,
1061            };
1062            let result = quantile.to_quantile_string();
1063            assert_eq!(result, expected);
1064        }
1065    }
1066
1067    #[test]
1068    fn value_conversions() {
1069        let counter_value = MetricValue::Counter { value: 3.13 };
1070        assert_eq!(counter_value.distribution_to_agg_histogram(&[1.0]), None);
1071
1072        let counter_value = MetricValue::Counter { value: 3.13 };
1073        assert_eq!(counter_value.distribution_to_sketch(), None);
1074
1075        let distrib_value = MetricValue::Distribution {
1076            samples: samples!(1.0 => 10, 2.0 => 5, 5.0 => 2),
1077            statistic: StatisticKind::Summary,
1078        };
1079        let converted = distrib_value.distribution_to_agg_histogram(&[1.0, 5.0, 10.0]);
1080        assert_eq!(
1081            converted,
1082            Some(MetricValue::AggregatedHistogram {
1083                buckets: vec![
1084                    Bucket {
1085                        upper_limit: 1.0,
1086                        count: 10,
1087                    },
1088                    Bucket {
1089                        upper_limit: 5.0,
1090                        count: 7,
1091                    },
1092                    Bucket {
1093                        upper_limit: 10.0,
1094                        count: 0,
1095                    },
1096                ],
1097                sum: 30.0,
1098                count: 17,
1099            })
1100        );
1101
1102        let distrib_value = MetricValue::Distribution {
1103            samples: samples!(1.0 => 1),
1104            statistic: StatisticKind::Summary,
1105        };
1106        let converted = distrib_value.distribution_to_sketch();
1107        assert!(matches!(converted, Some(MetricValue::Sketch { .. })));
1108    }
1109
1110    #[test]
1111    fn merge_non_contiguous_interval() {
1112        let mut gauge = Metric::new(
1113            "gauge",
1114            MetricKind::Incremental,
1115            MetricValue::Gauge { value: 12.0 },
1116        )
1117        .with_timestamp(Some(ts()))
1118        .with_interval_ms(std::num::NonZeroU32::new(10));
1119
1120        let delta = Metric::new(
1121            "gauge",
1122            MetricKind::Incremental,
1123            MetricValue::Gauge { value: -5.0 },
1124        )
1125        .with_timestamp(Some(ts() + chrono::Duration::milliseconds(20)))
1126        .with_interval_ms(std::num::NonZeroU32::new(15));
1127
1128        let expected = gauge
1129            .clone()
1130            .with_value(MetricValue::Gauge { value: 7.0 })
1131            .with_timestamp(Some(ts()))
1132            .with_interval_ms(std::num::NonZeroU32::new(35));
1133
1134        assert!(gauge.data.add(&delta.data));
1135        assert_eq!(gauge, expected);
1136    }
1137
1138    #[test]
1139    fn merge_contiguous_interval() {
1140        let mut gauge = Metric::new(
1141            "gauge",
1142            MetricKind::Incremental,
1143            MetricValue::Gauge { value: 12.0 },
1144        )
1145        .with_timestamp(Some(ts()))
1146        .with_interval_ms(std::num::NonZeroU32::new(10));
1147
1148        let delta = Metric::new(
1149            "gauge",
1150            MetricKind::Incremental,
1151            MetricValue::Gauge { value: -5.0 },
1152        )
1153        .with_timestamp(Some(ts() + chrono::Duration::milliseconds(5)))
1154        .with_interval_ms(std::num::NonZeroU32::new(15));
1155
1156        let expected = gauge
1157            .clone()
1158            .with_value(MetricValue::Gauge { value: 7.0 })
1159            .with_timestamp(Some(ts()))
1160            .with_interval_ms(std::num::NonZeroU32::new(20));
1161
1162        assert!(gauge.data.add(&delta.data));
1163        assert_eq!(gauge, expected);
1164    }
1165}