vector_core/event/metric/
mod.rs

1#[cfg(feature = "vrl")]
2use std::convert::TryFrom;
3use std::{
4    convert::AsRef,
5    fmt::{self, Display, Formatter},
6    num::NonZeroU32,
7};
8
9use chrono::{DateTime, Utc};
10use vector_common::{
11    EventDataEq,
12    byte_size_of::ByteSizeOf,
13    internal_event::{OptionalTag, TaggedEventsSent},
14    json_size::JsonSize,
15    request_metadata::GetEventCountTags,
16};
17use vector_config::configurable_component;
18#[cfg(feature = "vrl")]
19use vrl::compiler::value::VrlValueConvert;
20
21use super::{
22    BatchNotifier, EventFinalizer, EventFinalizers, EventMetadata, Finalizable,
23    estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf,
24};
25use crate::config::telemetry;
26
27#[cfg(any(test, feature = "test"))]
28mod arbitrary;
29
30mod data;
31pub use self::data::*;
32
33mod series;
34pub use self::series::*;
35
36mod tags;
37pub use self::tags::*;
38
39mod value;
40pub use self::value::*;
41
42#[macro_export]
43macro_rules! metric_tags {
44    () => { $crate::event::MetricTags::default() };
45
46    ($($key:expr => $value:expr,)+) => { $crate::metric_tags!($($key => $value),+) };
47
48    ($($key:expr => $value:expr),*) => {
49        [
50            $( ($key.into(), $crate::event::metric::TagValue::from($value)), )*
51        ].into_iter().collect::<$crate::event::MetricTags>()
52    };
53}
54
55/// A metric.
56#[configurable_component]
57#[derive(Clone, Debug, PartialEq)]
58pub struct Metric {
59    #[serde(flatten)]
60    pub(super) series: MetricSeries,
61
62    #[serde(flatten)]
63    pub(super) data: MetricData,
64
65    /// Internal event metadata.
66    #[serde(skip, default = "EventMetadata::default")]
67    metadata: EventMetadata,
68}
69
70impl Metric {
71    /// Creates a new `Metric` with the given `name`, `kind`, and `value`.
72    pub fn new<T: Into<String>>(name: T, kind: MetricKind, value: MetricValue) -> Self {
73        Self::new_with_metadata(name, kind, value, EventMetadata::default())
74    }
75
76    /// Creates a new `Metric` with the given `name`, `kind`, `value`, and `metadata`.
77    pub fn new_with_metadata<T: Into<String>>(
78        name: T,
79        kind: MetricKind,
80        value: MetricValue,
81        metadata: EventMetadata,
82    ) -> Self {
83        Self {
84            series: MetricSeries {
85                name: MetricName {
86                    name: name.into(),
87                    namespace: None,
88                },
89                tags: None,
90            },
91            data: MetricData {
92                time: MetricTime {
93                    timestamp: None,
94                    interval_ms: None,
95                },
96                kind,
97                value,
98            },
99            metadata,
100        }
101    }
102
103    /// Consumes this metric, returning it with an updated series based on the given `name`.
104    #[inline]
105    #[must_use]
106    pub fn with_name(mut self, name: impl Into<String>) -> Self {
107        self.series.name.name = name.into();
108        self
109    }
110
111    /// Consumes this metric, returning it with an updated series based on the given `namespace`.
112    #[inline]
113    #[must_use]
114    pub fn with_namespace<T: Into<String>>(mut self, namespace: Option<T>) -> Self {
115        self.series.name.namespace = namespace.map(Into::into);
116        self
117    }
118
119    /// Consumes this metric, returning it with an updated timestamp.
120    #[inline]
121    #[must_use]
122    pub fn with_timestamp(mut self, timestamp: Option<DateTime<Utc>>) -> Self {
123        self.data.time.timestamp = timestamp;
124        self
125    }
126
127    /// Consumes this metric, returning it with an updated interval.
128    #[inline]
129    #[must_use]
130    pub fn with_interval_ms(mut self, interval_ms: Option<NonZeroU32>) -> Self {
131        self.data.time.interval_ms = interval_ms;
132        self
133    }
134
135    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
136        self.metadata.add_finalizer(finalizer);
137    }
138
139    /// Consumes this metric, returning it with an updated set of event finalizers attached to `batch`.
140    #[must_use]
141    pub fn with_batch_notifier(mut self, batch: &BatchNotifier) -> Self {
142        self.metadata = self.metadata.with_batch_notifier(batch);
143        self
144    }
145
146    /// Consumes this metric, returning it with an optionally updated set of event finalizers attached to `batch`.
147    #[must_use]
148    pub fn with_batch_notifier_option(mut self, batch: &Option<BatchNotifier>) -> Self {
149        self.metadata = self.metadata.with_batch_notifier_option(batch);
150        self
151    }
152
153    /// Consumes this metric, returning it with an updated series based on the given `tags`.
154    #[inline]
155    #[must_use]
156    pub fn with_tags(mut self, tags: Option<MetricTags>) -> Self {
157        self.series.tags = tags;
158        self
159    }
160
161    /// Consumes this metric, returning it with an updated value.
162    #[inline]
163    #[must_use]
164    pub fn with_value(mut self, value: MetricValue) -> Self {
165        self.data.value = value;
166        self
167    }
168
169    /// Gets a reference to the series of this metric.
170    ///
171    /// The "series" is the name of the metric itself, including any tags. In other words, it is the unique identifier
172    /// for a metric, although metrics of different values (counter vs gauge) may be able to co-exist in outside metrics
173    /// implementations with identical series.
174    pub fn series(&self) -> &MetricSeries {
175        &self.series
176    }
177
178    /// Gets a reference to the data of this metric.
179    pub fn data(&self) -> &MetricData {
180        &self.data
181    }
182
183    /// Gets a mutable reference to the data of this metric.
184    pub fn data_mut(&mut self) -> &mut MetricData {
185        &mut self.data
186    }
187
188    /// Gets a reference to the metadata of this metric.
189    pub fn metadata(&self) -> &EventMetadata {
190        &self.metadata
191    }
192
193    /// Gets a mutable reference to the metadata of this metric.
194    pub fn metadata_mut(&mut self) -> &mut EventMetadata {
195        &mut self.metadata
196    }
197
198    /// Gets a reference to the name of this metric.
199    ///
200    /// The name of the metric does not include the namespace or tags.
201    #[inline]
202    pub fn name(&self) -> &str {
203        &self.series.name.name
204    }
205
206    /// Gets a reference to the namespace of this metric, if it exists.
207    #[inline]
208    pub fn namespace(&self) -> Option<&str> {
209        self.series.name.namespace.as_deref()
210    }
211
212    /// Takes the namespace out of this metric, if it exists, leaving it empty.
213    #[inline]
214    pub fn take_namespace(&mut self) -> Option<String> {
215        self.series.name.namespace.take()
216    }
217
218    /// Gets a reference to the tags of this metric, if they exist.
219    #[inline]
220    pub fn tags(&self) -> Option<&MetricTags> {
221        self.series.tags.as_ref()
222    }
223
224    /// Gets a mutable reference to the tags of this metric, if they exist.
225    #[inline]
226    pub fn tags_mut(&mut self) -> Option<&mut MetricTags> {
227        self.series.tags.as_mut()
228    }
229
230    /// Gets a reference to the timestamp of this metric, if it exists.
231    #[inline]
232    pub fn timestamp(&self) -> Option<DateTime<Utc>> {
233        self.data.time.timestamp
234    }
235
236    /// Gets a reference to the interval (in milliseconds) covered by this metric, if it exists.
237    #[inline]
238    pub fn interval_ms(&self) -> Option<NonZeroU32> {
239        self.data.time.interval_ms
240    }
241
242    /// Gets a reference to the value of this metric.
243    #[inline]
244    pub fn value(&self) -> &MetricValue {
245        &self.data.value
246    }
247
248    /// Gets a mutable reference to the value of this metric.
249    #[inline]
250    pub fn value_mut(&mut self) -> &mut MetricValue {
251        &mut self.data.value
252    }
253
254    /// Gets the kind of this metric.
255    #[inline]
256    pub fn kind(&self) -> MetricKind {
257        self.data.kind
258    }
259
260    /// Gets the time information of this metric.
261    #[inline]
262    pub fn time(&self) -> MetricTime {
263        self.data.time
264    }
265
266    /// Decomposes a `Metric` into its individual parts.
267    #[inline]
268    pub fn into_parts(self) -> (MetricSeries, MetricData, EventMetadata) {
269        (self.series, self.data, self.metadata)
270    }
271
272    /// Creates a `Metric` directly from the raw components of another metric.
273    #[inline]
274    pub fn from_parts(series: MetricSeries, data: MetricData, metadata: EventMetadata) -> Self {
275        Self {
276            series,
277            data,
278            metadata,
279        }
280    }
281
282    /// Consumes this metric, returning it as an absolute metric.
283    ///
284    /// If the metric was already absolute, nothing is changed.
285    #[must_use]
286    pub fn into_absolute(self) -> Self {
287        Self {
288            series: self.series,
289            data: self.data.into_absolute(),
290            metadata: self.metadata,
291        }
292    }
293
294    /// Consumes this metric, returning it as an incremental metric.
295    ///
296    /// If the metric was already incremental, nothing is changed.
297    #[must_use]
298    pub fn into_incremental(self) -> Self {
299        Self {
300            series: self.series,
301            data: self.data.into_incremental(),
302            metadata: self.metadata,
303        }
304    }
305
306    /// Creates a new metric from components specific to a metric emitted by `metrics`.
307    #[allow(clippy::cast_precision_loss)]
308    pub(crate) fn from_metric_kv(
309        key: &metrics::Key,
310        value: MetricValue,
311        timestamp: DateTime<Utc>,
312    ) -> Self {
313        let labels = key
314            .labels()
315            .map(|label| (String::from(label.key()), String::from(label.value())))
316            .collect::<MetricTags>();
317
318        Self::new(key.name().to_string(), MetricKind::Absolute, value)
319            .with_namespace(Some("vector"))
320            .with_timestamp(Some(timestamp))
321            .with_tags((!labels.is_empty()).then_some(labels))
322    }
323
324    /// Removes a tag from this metric, returning the value of the tag if the tag was previously in the metric.
325    pub fn remove_tag(&mut self, key: &str) -> Option<String> {
326        self.series.remove_tag(key)
327    }
328
329    /// Removes all the tags.
330    pub fn remove_tags(&mut self) {
331        self.series.remove_tags();
332    }
333
334    /// Returns `true` if `name` tag is present, and matches the provided `value`
335    pub fn tag_matches(&self, name: &str, value: &str) -> bool {
336        self.tags()
337            .filter(|t| t.get(name).filter(|v| *v == value).is_some())
338            .is_some()
339    }
340
341    /// Returns the string value of a tag, if it exists
342    pub fn tag_value(&self, name: &str) -> Option<String> {
343        self.tags().and_then(|t| t.get(name)).map(ToOwned::to_owned)
344    }
345
346    /// Inserts a tag into this metric.
347    ///
348    /// If the metric did not have this tag, `None` will be returned. Otherwise, `Some(String)` will be returned,
349    /// containing the previous value of the tag.
350    ///
351    /// *Note:* This will create the tags map if it is not present.
352    pub fn replace_tag(&mut self, name: String, value: String) -> Option<String> {
353        self.series.replace_tag(name, value)
354    }
355
356    pub fn set_multi_value_tag(
357        &mut self,
358        name: String,
359        values: impl IntoIterator<Item = TagValue>,
360    ) {
361        self.series.set_multi_value_tag(name, values);
362    }
363
364    /// Zeroes out the data in this metric.
365    pub fn zero(&mut self) {
366        self.data.zero();
367    }
368
369    /// Adds the data from the `other` metric to this one.
370    ///
371    /// The other metric must be incremental and contain the same value type as this one.
372    #[must_use]
373    pub fn add(&mut self, other: impl AsRef<MetricData>) -> bool {
374        self.data.add(other.as_ref())
375    }
376
377    /// Updates this metric by adding the data from `other`.
378    #[must_use]
379    pub fn update(&mut self, other: impl AsRef<MetricData>) -> bool {
380        self.data.update(other.as_ref())
381    }
382
383    /// Subtracts the data from the `other` metric from this one.
384    ///
385    /// The other metric must contain the same value type as this one.
386    #[must_use]
387    pub fn subtract(&mut self, other: impl AsRef<MetricData>) -> bool {
388        self.data.subtract(other.as_ref())
389    }
390
391    /// Reduces all the tag values to their single value, discarding any for which that value would
392    /// be null. If the result is empty, the tag set is dropped.
393    pub fn reduce_tags_to_single(&mut self) {
394        if let Some(tags) = &mut self.series.tags {
395            tags.reduce_to_single();
396            if tags.is_empty() {
397                self.series.tags = None;
398            }
399        }
400    }
401}
402
403impl AsRef<MetricData> for Metric {
404    fn as_ref(&self) -> &MetricData {
405        &self.data
406    }
407}
408
409impl AsRef<MetricValue> for Metric {
410    fn as_ref(&self) -> &MetricValue {
411        &self.data.value
412    }
413}
414
415impl Display for Metric {
416    /// Display a metric using something like Prometheus' text format:
417    ///
418    /// ```text
419    /// TIMESTAMP NAMESPACE_NAME{TAGS} KIND DATA
420    /// ```
421    ///
422    /// TIMESTAMP is in ISO 8601 format with UTC time zone.
423    ///
424    /// KIND is either `=` for absolute metrics, or `+` for incremental
425    /// metrics.
426    ///
427    /// DATA is dependent on the type of metric, and is a simplified
428    /// representation of the data contents. In particular,
429    /// distributions, histograms, and summaries are represented as a
430    /// list of `X@Y` words, where `X` is the rate, count, or quantile,
431    /// and `Y` is the value or bucket.
432    ///
433    /// example:
434    /// ```text
435    /// 2020-08-12T20:23:37.248661343Z vector_received_bytes_total{component_kind="sink",component_type="blackhole"} = 6391
436    /// ```
437    fn fmt(&self, fmt: &mut Formatter<'_>) -> Result<(), fmt::Error> {
438        if let Some(timestamp) = &self.data.time.timestamp {
439            write!(fmt, "{timestamp:?} ")?;
440        }
441        let kind = match self.data.kind {
442            MetricKind::Absolute => '=',
443            MetricKind::Incremental => '+',
444        };
445        self.series.fmt(fmt)?;
446        write!(fmt, " {kind} ")?;
447        self.data.value.fmt(fmt)
448    }
449}
450
451impl EventDataEq for Metric {
452    fn event_data_eq(&self, other: &Self) -> bool {
453        self.series == other.series
454            && self.data == other.data
455            && self.metadata.event_data_eq(&other.metadata)
456    }
457}
458
459impl ByteSizeOf for Metric {
460    fn allocated_bytes(&self) -> usize {
461        self.series.allocated_bytes()
462            + self.data.allocated_bytes()
463            + self.metadata.allocated_bytes()
464    }
465}
466
467impl EstimatedJsonEncodedSizeOf for Metric {
468    fn estimated_json_encoded_size_of(&self) -> JsonSize {
469        // TODO: For now we're using the in-memory representation of the metric, but we'll convert
470        // this to actually calculate the JSON encoded size in the near future.
471        self.size_of().into()
472    }
473}
474
475impl Finalizable for Metric {
476    fn take_finalizers(&mut self) -> EventFinalizers {
477        self.metadata.take_finalizers()
478    }
479}
480
481impl GetEventCountTags for Metric {
482    fn get_tags(&self) -> TaggedEventsSent {
483        let source = if telemetry().tags().emit_source {
484            self.metadata().source_id().cloned().into()
485        } else {
486            OptionalTag::Ignored
487        };
488
489        // Currently there is no way to specify a tag that means the service,
490        // so we will be hardcoding it to "service".
491        let service = if telemetry().tags().emit_service {
492            self.tags()
493                .and_then(|tags| tags.get("service").map(ToString::to_string))
494                .into()
495        } else {
496            OptionalTag::Ignored
497        };
498
499        TaggedEventsSent { source, service }
500    }
501}
502
503/// Metric kind.
504///
505/// Metrics can be either absolute or incremental. Absolute metrics represent a sort of "last write wins" scenario,
506/// where the latest absolute value seen is meant to be the actual metric value.  In contrast, and perhaps intuitively,
507/// incremental metrics are meant to be additive, such that we don't know what total value of the metric is, but we know
508/// that we'll be adding or subtracting the given value from it.
509///
510/// Generally speaking, most metrics storage systems deal with incremental updates. A notable exception is Prometheus,
511/// which deals with, and expects, absolute values from clients.
512#[configurable_component]
513#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd)]
514#[serde(rename_all = "snake_case")]
515pub enum MetricKind {
516    /// Incremental metric.
517    Incremental,
518
519    /// Absolute metric.
520    Absolute,
521}
522
523#[cfg(feature = "vrl")]
524impl TryFrom<vrl::value::Value> for MetricKind {
525    type Error = String;
526
527    fn try_from(value: vrl::value::Value) -> Result<Self, Self::Error> {
528        let value = value.try_bytes().map_err(|e| e.to_string())?;
529        match std::str::from_utf8(&value).map_err(|e| e.to_string())? {
530            "incremental" => Ok(Self::Incremental),
531            "absolute" => Ok(Self::Absolute),
532            value => Err(format!(
533                "invalid metric kind {value}, metric kind must be `absolute` or `incremental`"
534            )),
535        }
536    }
537}
538
539#[cfg(feature = "vrl")]
540impl From<MetricKind> for vrl::value::Value {
541    fn from(kind: MetricKind) -> Self {
542        match kind {
543            MetricKind::Incremental => "incremental".into(),
544            MetricKind::Absolute => "absolute".into(),
545        }
546    }
547}
548
549#[macro_export]
550macro_rules! samples {
551    ( $( $value:expr => $rate:expr ),* ) => {
552        vec![ $( $crate::event::metric::Sample { value: $value, rate: $rate }, )* ]
553    }
554}
555
556#[macro_export]
557macro_rules! buckets {
558    ( $( $limit:expr => $count:expr ),* ) => {
559        vec![ $( $crate::event::metric::Bucket { upper_limit: $limit, count: $count }, )* ]
560    }
561}
562
563#[macro_export]
564macro_rules! quantiles {
565    ( $( $q:expr => $value:expr ),* ) => {
566        vec![ $( $crate::event::metric::Quantile { quantile: $q, value: $value }, )* ]
567    }
568}
569
570#[inline]
571pub(crate) fn zip_samples(
572    values: impl IntoIterator<Item = f64>,
573    rates: impl IntoIterator<Item = u32>,
574) -> Vec<Sample> {
575    values
576        .into_iter()
577        .zip(rates)
578        .map(|(value, rate)| Sample { value, rate })
579        .collect()
580}
581
582#[inline]
583pub(crate) fn zip_buckets(
584    limits: impl IntoIterator<Item = f64>,
585    counts: impl IntoIterator<Item = u64>,
586) -> Vec<Bucket> {
587    limits
588        .into_iter()
589        .zip(counts)
590        .map(|(upper_limit, count)| Bucket { upper_limit, count })
591        .collect()
592}
593
594#[inline]
595pub(crate) fn zip_quantiles(
596    quantiles: impl IntoIterator<Item = f64>,
597    values: impl IntoIterator<Item = f64>,
598) -> Vec<Quantile> {
599    quantiles
600        .into_iter()
601        .zip(values)
602        .map(|(quantile, value)| Quantile { quantile, value })
603        .collect()
604}
605
606fn write_list<I, T, W>(
607    fmt: &mut Formatter<'_>,
608    sep: &str,
609    items: I,
610    writer: W,
611) -> Result<(), fmt::Error>
612where
613    I: IntoIterator<Item = T>,
614    W: Fn(&mut Formatter<'_>, T) -> Result<(), fmt::Error>,
615{
616    let mut this_sep = "";
617    for item in items {
618        write!(fmt, "{this_sep}")?;
619        writer(fmt, item)?;
620        this_sep = sep;
621    }
622    Ok(())
623}
624
625fn write_word(fmt: &mut Formatter<'_>, word: &str) -> Result<(), fmt::Error> {
626    if word.contains(|c: char| !c.is_ascii_alphanumeric() && c != '_') {
627        write!(fmt, "{word:?}")
628    } else {
629        write!(fmt, "{word}")
630    }
631}
632
633pub fn samples_to_buckets(samples: &[Sample], buckets: &[f64]) -> (Vec<Bucket>, u64, f64) {
634    let mut counts = vec![0; buckets.len()];
635    let mut sum = 0.0;
636    let mut count = 0;
637    for sample in samples {
638        let rate = u64::from(sample.rate);
639
640        if let Some((i, _)) = buckets
641            .iter()
642            .enumerate()
643            .find(|&(_, b)| *b >= sample.value)
644        {
645            counts[i] += rate;
646        }
647
648        sum += sample.value * f64::from(sample.rate);
649        count += rate;
650    }
651
652    let buckets = buckets
653        .iter()
654        .zip(counts.iter())
655        .map(|(b, c)| Bucket {
656            upper_limit: *b,
657            count: *c,
658        })
659        .collect();
660
661    (buckets, count, sum)
662}
663
664#[cfg(test)]
665mod test {
666    use std::collections::BTreeSet;
667
668    use chrono::{DateTime, Timelike, Utc, offset::TimeZone};
669    use similar_asserts::assert_eq;
670
671    use super::*;
672
673    fn ts() -> DateTime<Utc> {
674        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
675            .single()
676            .and_then(|t| t.with_nanosecond(11))
677            .expect("invalid timestamp")
678    }
679
680    fn tags() -> MetricTags {
681        metric_tags!(
682            "normal_tag" => "value",
683            "true_tag" => "true",
684            "empty_tag" => "",
685        )
686    }
687
688    #[test]
689    fn merge_counters() {
690        let mut counter = Metric::new(
691            "counter",
692            MetricKind::Incremental,
693            MetricValue::Counter { value: 1.0 },
694        );
695
696        let delta = Metric::new(
697            "counter",
698            MetricKind::Incremental,
699            MetricValue::Counter { value: 2.0 },
700        )
701        .with_namespace(Some("vector"))
702        .with_tags(Some(tags()))
703        .with_timestamp(Some(ts()));
704
705        let expected = counter
706            .clone()
707            .with_value(MetricValue::Counter { value: 3.0 })
708            .with_timestamp(Some(ts()));
709
710        assert!(counter.data.add(&delta.data));
711        assert_eq!(counter, expected);
712    }
713
714    #[test]
715    fn merge_gauges() {
716        let mut gauge = Metric::new(
717            "gauge",
718            MetricKind::Incremental,
719            MetricValue::Gauge { value: 1.0 },
720        );
721
722        let delta = Metric::new(
723            "gauge",
724            MetricKind::Incremental,
725            MetricValue::Gauge { value: -2.0 },
726        )
727        .with_namespace(Some("vector"))
728        .with_tags(Some(tags()))
729        .with_timestamp(Some(ts()));
730
731        let expected = gauge
732            .clone()
733            .with_value(MetricValue::Gauge { value: -1.0 })
734            .with_timestamp(Some(ts()));
735
736        assert!(gauge.data.add(&delta.data));
737        assert_eq!(gauge, expected);
738    }
739
740    #[test]
741    fn merge_sets() {
742        let mut set = Metric::new(
743            "set",
744            MetricKind::Incremental,
745            MetricValue::Set {
746                values: vec!["old".into()].into_iter().collect(),
747            },
748        );
749
750        let delta = Metric::new(
751            "set",
752            MetricKind::Incremental,
753            MetricValue::Set {
754                values: vec!["new".into()].into_iter().collect(),
755            },
756        )
757        .with_namespace(Some("vector"))
758        .with_tags(Some(tags()))
759        .with_timestamp(Some(ts()));
760
761        let expected = set
762            .clone()
763            .with_value(MetricValue::Set {
764                values: vec!["old".into(), "new".into()].into_iter().collect(),
765            })
766            .with_timestamp(Some(ts()));
767
768        assert!(set.data.add(&delta.data));
769        assert_eq!(set, expected);
770    }
771
772    #[test]
773    fn merge_histograms() {
774        let mut dist = Metric::new(
775            "hist",
776            MetricKind::Incremental,
777            MetricValue::Distribution {
778                samples: samples![1.0 => 10],
779                statistic: StatisticKind::Histogram,
780            },
781        );
782
783        let delta = Metric::new(
784            "hist",
785            MetricKind::Incremental,
786            MetricValue::Distribution {
787                samples: samples![1.0 => 20],
788                statistic: StatisticKind::Histogram,
789            },
790        )
791        .with_namespace(Some("vector"))
792        .with_tags(Some(tags()))
793        .with_timestamp(Some(ts()));
794
795        let expected = dist
796            .clone()
797            .with_value(MetricValue::Distribution {
798                samples: samples![1.0 => 10, 1.0 => 20],
799                statistic: StatisticKind::Histogram,
800            })
801            .with_timestamp(Some(ts()));
802
803        assert!(dist.data.add(&delta.data));
804        assert_eq!(dist, expected);
805    }
806
807    #[test]
808    fn subtract_counters() {
809        // Make sure a newer/higher value counter can subtract an older/lesser value counter:
810        let old_counter = Metric::new(
811            "counter",
812            MetricKind::Absolute,
813            MetricValue::Counter { value: 4.0 },
814        );
815
816        let mut new_counter = Metric::new(
817            "counter",
818            MetricKind::Absolute,
819            MetricValue::Counter { value: 6.0 },
820        );
821
822        assert!(new_counter.subtract(&old_counter));
823        assert_eq!(new_counter.value(), &MetricValue::Counter { value: 2.0 });
824
825        // But not the other way around:
826        let old_counter = Metric::new(
827            "counter",
828            MetricKind::Absolute,
829            MetricValue::Counter { value: 6.0 },
830        );
831
832        let mut new_reset_counter = Metric::new(
833            "counter",
834            MetricKind::Absolute,
835            MetricValue::Counter { value: 1.0 },
836        );
837
838        assert!(!new_reset_counter.subtract(&old_counter));
839    }
840
841    #[test]
842    fn subtract_aggregated_histograms() {
843        // Make sure a newer/higher count aggregated histogram can subtract an older/lower count
844        // aggregated histogram:
845        let old_histogram = Metric::new(
846            "histogram",
847            MetricKind::Absolute,
848            MetricValue::AggregatedHistogram {
849                count: 1,
850                sum: 1.0,
851                buckets: buckets!(2.0 => 1),
852            },
853        );
854
855        let mut new_histogram = Metric::new(
856            "histogram",
857            MetricKind::Absolute,
858            MetricValue::AggregatedHistogram {
859                count: 3,
860                sum: 3.0,
861                buckets: buckets!(2.0 => 3),
862            },
863        );
864
865        assert!(new_histogram.subtract(&old_histogram));
866        assert_eq!(
867            new_histogram.value(),
868            &MetricValue::AggregatedHistogram {
869                count: 2,
870                sum: 2.0,
871                buckets: buckets!(2.0 => 2),
872            }
873        );
874
875        // But not the other way around:
876        let old_histogram = Metric::new(
877            "histogram",
878            MetricKind::Absolute,
879            MetricValue::AggregatedHistogram {
880                count: 3,
881                sum: 3.0,
882                buckets: buckets!(2.0 => 3),
883            },
884        );
885
886        let mut new_reset_histogram = Metric::new(
887            "histogram",
888            MetricKind::Absolute,
889            MetricValue::AggregatedHistogram {
890                count: 1,
891                sum: 1.0,
892                buckets: buckets!(2.0 => 1),
893            },
894        );
895
896        assert!(!new_reset_histogram.subtract(&old_histogram));
897    }
898
899    #[test]
900    // `too_many_lines` is mostly just useful for production code but we're not
901    // able to flag the lint on only for non-test.
902    #[allow(clippy::too_many_lines)]
903    fn display() {
904        assert_eq!(
905            format!(
906                "{}",
907                Metric::new(
908                    "one",
909                    MetricKind::Absolute,
910                    MetricValue::Counter { value: 1.23 },
911                )
912                .with_tags(Some(tags()))
913            ),
914            r#"one{empty_tag="",normal_tag="value",true_tag="true"} = 1.23"#
915        );
916
917        assert_eq!(
918            format!(
919                "{}",
920                Metric::new(
921                    "two word",
922                    MetricKind::Incremental,
923                    MetricValue::Gauge { value: 2.0 }
924                )
925                .with_timestamp(Some(ts()))
926            ),
927            r#"2018-11-14T08:09:10.000000011Z "two word"{} + 2"#
928        );
929
930        assert_eq!(
931            format!(
932                "{}",
933                Metric::new(
934                    "namespace",
935                    MetricKind::Absolute,
936                    MetricValue::Counter { value: 1.23 },
937                )
938                .with_namespace(Some("vector"))
939            ),
940            r"vector_namespace{} = 1.23"
941        );
942
943        assert_eq!(
944            format!(
945                "{}",
946                Metric::new(
947                    "namespace",
948                    MetricKind::Absolute,
949                    MetricValue::Counter { value: 1.23 },
950                )
951                .with_namespace(Some("vector host"))
952            ),
953            r#""vector host"_namespace{} = 1.23"#
954        );
955
956        let mut values = BTreeSet::<String>::new();
957        values.insert("v1".into());
958        values.insert("v2_two".into());
959        values.insert("thrəë".into());
960        values.insert("four=4".into());
961        assert_eq!(
962            format!(
963                "{}",
964                Metric::new("three", MetricKind::Absolute, MetricValue::Set { values })
965            ),
966            r#"three{} = "four=4" "thrəë" v1 v2_two"#
967        );
968
969        assert_eq!(
970            format!(
971                "{}",
972                Metric::new(
973                    "four",
974                    MetricKind::Absolute,
975                    MetricValue::Distribution {
976                        samples: samples![1.0 => 3, 2.0 => 4],
977                        statistic: StatisticKind::Histogram,
978                    }
979                )
980            ),
981            r"four{} = histogram 3@1 4@2"
982        );
983
984        assert_eq!(
985            format!(
986                "{}",
987                Metric::new(
988                    "five",
989                    MetricKind::Absolute,
990                    MetricValue::AggregatedHistogram {
991                        buckets: buckets![51.0 => 53, 52.0 => 54],
992                        count: 107,
993                        sum: 103.0,
994                    }
995                )
996            ),
997            r"five{} = count=107 sum=103 53@51 54@52"
998        );
999
1000        assert_eq!(
1001            format!(
1002                "{}",
1003                Metric::new(
1004                    "six",
1005                    MetricKind::Absolute,
1006                    MetricValue::AggregatedSummary {
1007                        quantiles: quantiles![1.0 => 63.0, 2.0 => 64.0],
1008                        count: 2,
1009                        sum: 127.0,
1010                    }
1011                )
1012            ),
1013            r"six{} = count=2 sum=127 1@63 2@64"
1014        );
1015    }
1016
1017    #[test]
1018    fn quantile_to_percentile_string() {
1019        let quantiles = [
1020            (-1.0, "0"),
1021            (0.0, "0"),
1022            (0.25, "25"),
1023            (0.50, "50"),
1024            (0.999, "999"),
1025            (0.9999, "9999"),
1026            (0.99999, "9999"),
1027            (1.0, "100"),
1028            (3.0, "100"),
1029        ];
1030
1031        for (quantile, expected) in quantiles {
1032            let quantile = Quantile {
1033                quantile,
1034                value: 1.0,
1035            };
1036            let result = quantile.to_percentile_string();
1037            assert_eq!(result, expected);
1038        }
1039    }
1040
1041    #[test]
1042    fn quantile_to_string() {
1043        let quantiles = [
1044            (-1.0, "0"),
1045            (0.0, "0"),
1046            (0.25, "0.25"),
1047            (0.50, "0.5"),
1048            (0.999, "0.999"),
1049            (0.9999, "0.9999"),
1050            (0.99999, "0.9999"),
1051            (1.0, "1"),
1052            (3.0, "1"),
1053        ];
1054
1055        for (quantile, expected) in quantiles {
1056            let quantile = Quantile {
1057                quantile,
1058                value: 1.0,
1059            };
1060            let result = quantile.to_quantile_string();
1061            assert_eq!(result, expected);
1062        }
1063    }
1064
1065    #[test]
1066    fn value_conversions() {
1067        let counter_value = MetricValue::Counter { value: 3.13 };
1068        assert_eq!(counter_value.distribution_to_agg_histogram(&[1.0]), None);
1069
1070        let counter_value = MetricValue::Counter { value: 3.13 };
1071        assert_eq!(counter_value.distribution_to_sketch(), None);
1072
1073        let distrib_value = MetricValue::Distribution {
1074            samples: samples!(1.0 => 10, 2.0 => 5, 5.0 => 2),
1075            statistic: StatisticKind::Summary,
1076        };
1077        let converted = distrib_value.distribution_to_agg_histogram(&[1.0, 5.0, 10.0]);
1078        assert_eq!(
1079            converted,
1080            Some(MetricValue::AggregatedHistogram {
1081                buckets: vec![
1082                    Bucket {
1083                        upper_limit: 1.0,
1084                        count: 10,
1085                    },
1086                    Bucket {
1087                        upper_limit: 5.0,
1088                        count: 7,
1089                    },
1090                    Bucket {
1091                        upper_limit: 10.0,
1092                        count: 0,
1093                    },
1094                ],
1095                sum: 30.0,
1096                count: 17,
1097            })
1098        );
1099
1100        let distrib_value = MetricValue::Distribution {
1101            samples: samples!(1.0 => 1),
1102            statistic: StatisticKind::Summary,
1103        };
1104        let converted = distrib_value.distribution_to_sketch();
1105        assert!(matches!(converted, Some(MetricValue::Sketch { .. })));
1106    }
1107
1108    #[test]
1109    fn merge_non_contiguous_interval() {
1110        let mut gauge = Metric::new(
1111            "gauge",
1112            MetricKind::Incremental,
1113            MetricValue::Gauge { value: 12.0 },
1114        )
1115        .with_timestamp(Some(ts()))
1116        .with_interval_ms(std::num::NonZeroU32::new(10));
1117
1118        let delta = Metric::new(
1119            "gauge",
1120            MetricKind::Incremental,
1121            MetricValue::Gauge { value: -5.0 },
1122        )
1123        .with_timestamp(Some(ts() + chrono::Duration::milliseconds(20)))
1124        .with_interval_ms(std::num::NonZeroU32::new(15));
1125
1126        let expected = gauge
1127            .clone()
1128            .with_value(MetricValue::Gauge { value: 7.0 })
1129            .with_timestamp(Some(ts()))
1130            .with_interval_ms(std::num::NonZeroU32::new(35));
1131
1132        assert!(gauge.data.add(&delta.data));
1133        assert_eq!(gauge, expected);
1134    }
1135
1136    #[test]
1137    fn merge_contiguous_interval() {
1138        let mut gauge = Metric::new(
1139            "gauge",
1140            MetricKind::Incremental,
1141            MetricValue::Gauge { value: 12.0 },
1142        )
1143        .with_timestamp(Some(ts()))
1144        .with_interval_ms(std::num::NonZeroU32::new(10));
1145
1146        let delta = Metric::new(
1147            "gauge",
1148            MetricKind::Incremental,
1149            MetricValue::Gauge { value: -5.0 },
1150        )
1151        .with_timestamp(Some(ts() + chrono::Duration::milliseconds(5)))
1152        .with_interval_ms(std::num::NonZeroU32::new(15));
1153
1154        let expected = gauge
1155            .clone()
1156            .with_value(MetricValue::Gauge { value: 7.0 })
1157            .with_timestamp(Some(ts()))
1158            .with_interval_ms(std::num::NonZeroU32::new(20));
1159
1160        assert!(gauge.data.add(&delta.data));
1161        assert_eq!(gauge, expected);
1162    }
1163}