vector_core/event/metric/
data.rs

1use std::num::NonZeroU32;
2
3use chrono::{DateTime, Utc};
4use vector_common::byte_size_of::ByteSizeOf;
5use vector_config::configurable_component;
6
7use super::{MetricKind, MetricValue};
8
9/// Metric data.
10#[configurable_component]
11#[derive(Clone, Debug, PartialEq)]
12pub struct MetricData {
13    #[serde(flatten)]
14    pub time: MetricTime,
15
16    #[configurable(derived)]
17    pub kind: MetricKind,
18
19    #[serde(flatten)]
20    pub value: MetricValue,
21}
22
23/// Metric time.
24#[configurable_component]
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub struct MetricTime {
27    /// The timestamp of when the metric was created.
28    ///
29    /// Metrics may sometimes have no timestamp, or have no meaningful value if the metric is an
30    /// aggregation or transformed heavily enough from its original form such that the original
31    /// timestamp would not represent a meaningful value.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub timestamp: Option<DateTime<Utc>>,
34
35    /// The interval, in milliseconds, of this metric.
36    ///
37    /// Intervals represent the time window over which this metric applies, and is generally only
38    /// used for tracking rates (change over time) on counters.
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub interval_ms: Option<NonZeroU32>,
41}
42
43impl MetricData {
44    /// Gets a reference to the timestamp for this data, if available.
45    pub fn timestamp(&self) -> Option<&DateTime<Utc>> {
46        self.time.timestamp.as_ref()
47    }
48
49    /// Gets a reference to the value of this data.
50    pub fn value(&self) -> &MetricValue {
51        &self.value
52    }
53
54    /// Gets a mutable reference to the value of this data.
55    pub fn value_mut(&mut self) -> &mut MetricValue {
56        &mut self.value
57    }
58
59    /// Consumes this metric, returning it as an absolute metric.
60    ///
61    /// The `interval_ms` is set to `None`. If the metric was already absolute, nothing else is changed.
62    #[must_use]
63    pub fn into_absolute(self) -> Self {
64        Self {
65            time: MetricTime {
66                timestamp: self.time.timestamp,
67                interval_ms: None,
68            },
69            kind: MetricKind::Absolute,
70            value: self.value,
71        }
72    }
73
74    /// Consumes this metric, returning it as an incremental metric.
75    ///
76    /// If the metric was already incremental, nothing is changed.
77    #[must_use]
78    pub fn into_incremental(self) -> Self {
79        Self {
80            time: self.time,
81            kind: MetricKind::Incremental,
82            value: self.value,
83        }
84    }
85
86    /// Creates a `MetricData` directly from the raw components of another `MetricData`.
87    pub fn from_parts(time: MetricTime, kind: MetricKind, value: MetricValue) -> Self {
88        Self { time, kind, value }
89    }
90
91    /// Decomposes a `MetricData` into its individual parts.
92    pub fn into_parts(self) -> (MetricTime, MetricKind, MetricValue) {
93        (self.time, self.kind, self.value)
94    }
95
96    /// Updates this metric by adding the value from `other`.
97    #[must_use]
98    pub fn update(&mut self, other: &Self) -> bool {
99        let (new_ts, new_interval) = match (
100            self.time.timestamp,
101            self.time.interval_ms,
102            other.time.timestamp,
103            other.time.interval_ms,
104        ) {
105            (Some(t1), Some(i1), Some(t2), Some(i2)) => {
106                let Ok(delta_t) =
107                    TryInto::<u32>::try_into(t1.timestamp_millis().abs_diff(t2.timestamp_millis()))
108                else {
109                    return false;
110                };
111
112                if t1 > t2 {
113                    // The interval window starts from the beginning of `other` (aka `t2`)
114                    // and goes to the end of `self` (which is `t1 + i1`).
115                    (Some(t2), NonZeroU32::new(delta_t + i1.get()))
116                } else {
117                    // The interval window starts from the beginning of `self` (aka `t1`)
118                    // and goes to the end of `other` (which is `t2 + i2`).
119
120                    (Some(t1), NonZeroU32::new(delta_t + i2.get()))
121                }
122            }
123            (Some(t), _, None, _) | (None, _, Some(t), _) => (Some(t), None),
124            (Some(t1), _, Some(t2), _) => (Some(t1.max(t2)), None),
125            (_, _, _, _) => (None, None),
126        };
127
128        self.value.add(&other.value) && {
129            self.time.timestamp = new_ts;
130            self.time.interval_ms = new_interval;
131            true
132        }
133    }
134
135    /// Adds the data from the `other` metric to this one.
136    ///
137    /// The other metric must be incremental and contain the same value type as this one.
138    #[must_use]
139    pub fn add(&mut self, other: &Self) -> bool {
140        other.kind == MetricKind::Incremental && self.update(other)
141    }
142
143    /// Subtracts the data from the `other` metric from this one.
144    ///
145    /// The other metric must contain the same value type as this one.
146    #[must_use]
147    pub fn subtract(&mut self, other: &Self) -> bool {
148        self.value.subtract(&other.value)
149    }
150
151    /// Zeroes out the data in this metric.
152    pub fn zero(&mut self) {
153        self.value.zero();
154    }
155}
156
157impl AsRef<MetricData> for MetricData {
158    fn as_ref(&self) -> &Self {
159        self
160    }
161}
162
163impl PartialOrd for MetricData {
164    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
165        self.time.timestamp.partial_cmp(&other.time.timestamp)
166    }
167}
168
169impl ByteSizeOf for MetricData {
170    fn allocated_bytes(&self) -> usize {
171        self.value.allocated_bytes()
172    }
173}