vector_core/event/metric/
data.rs

1use std::num::NonZeroU32;
2
3use chrono::{DateTime, Utc};
4use vector_common::byte_size_of::ByteSizeOf;
5use vector_config::configurable_component;
6
7use super::{MetricKind, MetricValue};
8
9/// Metric data.
10#[configurable_component]
11#[derive(Clone, Debug, PartialEq)]
12pub struct MetricData {
13    #[serde(flatten)]
14    pub time: MetricTime,
15
16    #[configurable(derived)]
17    pub kind: MetricKind,
18
19    #[serde(flatten)]
20    pub value: MetricValue,
21}
22
23/// Metric time.
24#[configurable_component]
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub struct MetricTime {
27    /// The timestamp of when the metric was created.
28    ///
29    /// Metrics may sometimes have no timestamp, or have no meaningful value if the metric is an
30    /// aggregation or transformed heavily enough from its original form such that the original
31    /// timestamp would not represent a meaningful value.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub timestamp: Option<DateTime<Utc>>,
34
35    /// The interval, in milliseconds, of this metric.
36    ///
37    /// Intervals represent the time window over which this metric applies, and is generally only
38    /// used for tracking rates (change over time) on counters.
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub interval_ms: Option<NonZeroU32>,
41}
42
43impl MetricData {
44    /// Gets a reference to the timestamp for this data, if available.
45    pub fn timestamp(&self) -> Option<&DateTime<Utc>> {
46        self.time.timestamp.as_ref()
47    }
48
49    /// Gets a reference to the value of this data.
50    pub fn value(&self) -> &MetricValue {
51        &self.value
52    }
53
54    /// Gets a mutable reference to the value of this data.
55    pub fn value_mut(&mut self) -> &mut MetricValue {
56        &mut self.value
57    }
58
59    /// Consumes this metric, returning it as an absolute metric.
60    ///
61    /// If the metric was already absolute, nothing is changed.
62    #[must_use]
63    pub fn into_absolute(self) -> Self {
64        Self {
65            time: self.time,
66            kind: MetricKind::Absolute,
67            value: self.value,
68        }
69    }
70
71    /// Consumes this metric, returning it as an incremental metric.
72    ///
73    /// If the metric was already incremental, nothing is changed.
74    #[must_use]
75    pub fn into_incremental(self) -> Self {
76        Self {
77            time: self.time,
78            kind: MetricKind::Incremental,
79            value: self.value,
80        }
81    }
82
83    /// Creates a `MetricData` directly from the raw components of another `MetricData`.
84    pub fn from_parts(time: MetricTime, kind: MetricKind, value: MetricValue) -> Self {
85        Self { time, kind, value }
86    }
87
88    /// Decomposes a `MetricData` into its individual parts.
89    pub fn into_parts(self) -> (MetricTime, MetricKind, MetricValue) {
90        (self.time, self.kind, self.value)
91    }
92
93    /// Updates this metric by adding the value from `other`.
94    #[must_use]
95    pub fn update(&mut self, other: &Self) -> bool {
96        let (new_ts, new_interval) = match (
97            self.time.timestamp,
98            self.time.interval_ms,
99            other.time.timestamp,
100            other.time.interval_ms,
101        ) {
102            (Some(t1), Some(i1), Some(t2), Some(i2)) => {
103                let Ok(delta_t) =
104                    TryInto::<u32>::try_into(t1.timestamp_millis().abs_diff(t2.timestamp_millis()))
105                else {
106                    return false;
107                };
108
109                if t1 > t2 {
110                    // The interval window starts from the beginning of `other` (aka `t2`)
111                    // and goes to the end of `self` (which is `t1 + i1`).
112                    (Some(t2), NonZeroU32::new(delta_t + i1.get()))
113                } else {
114                    // The interval window starts from the beginning of `self` (aka `t1`)
115                    // and goes to the end of `other` (which is `t2 + i2`).
116
117                    (Some(t1), NonZeroU32::new(delta_t + i2.get()))
118                }
119            }
120            (Some(t), _, None, _) | (None, _, Some(t), _) => (Some(t), None),
121            (Some(t1), _, Some(t2), _) => (Some(t1.max(t2)), None),
122            (_, _, _, _) => (None, None),
123        };
124
125        self.value.add(&other.value) && {
126            self.time.timestamp = new_ts;
127            self.time.interval_ms = new_interval;
128            true
129        }
130    }
131
132    /// Adds the data from the `other` metric to this one.
133    ///
134    /// The other metric must be incremental and contain the same value type as this one.
135    #[must_use]
136    pub fn add(&mut self, other: &Self) -> bool {
137        other.kind == MetricKind::Incremental && self.update(other)
138    }
139
140    /// Subtracts the data from the `other` metric from this one.
141    ///
142    /// The other metric must contain the same value type as this one.
143    #[must_use]
144    pub fn subtract(&mut self, other: &Self) -> bool {
145        self.value.subtract(&other.value)
146    }
147
148    /// Zeroes out the data in this metric.
149    pub fn zero(&mut self) {
150        self.value.zero();
151    }
152}
153
154impl AsRef<MetricData> for MetricData {
155    fn as_ref(&self) -> &Self {
156        self
157    }
158}
159
160impl PartialOrd for MetricData {
161    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
162        self.time.timestamp.partial_cmp(&other.time.timestamp)
163    }
164}
165
166impl ByteSizeOf for MetricData {
167    fn allocated_bytes(&self) -> usize {
168        self.value.allocated_bytes()
169    }
170}