vector_core/event/metric/
data.rs1use std::num::NonZeroU32;
2
3use chrono::{DateTime, Utc};
4use vector_common::byte_size_of::ByteSizeOf;
5use vector_config::configurable_component;
6
7use super::{MetricKind, MetricValue};
8
9#[configurable_component]
11#[derive(Clone, Debug, PartialEq)]
12pub struct MetricData {
13 #[serde(flatten)]
14 pub time: MetricTime,
15
16 #[configurable(derived)]
17 pub kind: MetricKind,
18
19 #[serde(flatten)]
20 pub value: MetricValue,
21}
22
23#[configurable_component]
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub struct MetricTime {
27 #[serde(skip_serializing_if = "Option::is_none")]
33 pub timestamp: Option<DateTime<Utc>>,
34
35 #[serde(skip_serializing_if = "Option::is_none")]
40 pub interval_ms: Option<NonZeroU32>,
41}
42
43impl MetricData {
44 pub fn timestamp(&self) -> Option<&DateTime<Utc>> {
46 self.time.timestamp.as_ref()
47 }
48
49 pub fn value(&self) -> &MetricValue {
51 &self.value
52 }
53
54 pub fn value_mut(&mut self) -> &mut MetricValue {
56 &mut self.value
57 }
58
59 #[must_use]
63 pub fn into_absolute(self) -> Self {
64 Self {
65 time: self.time,
66 kind: MetricKind::Absolute,
67 value: self.value,
68 }
69 }
70
71 #[must_use]
75 pub fn into_incremental(self) -> Self {
76 Self {
77 time: self.time,
78 kind: MetricKind::Incremental,
79 value: self.value,
80 }
81 }
82
83 pub fn from_parts(time: MetricTime, kind: MetricKind, value: MetricValue) -> Self {
85 Self { time, kind, value }
86 }
87
88 pub fn into_parts(self) -> (MetricTime, MetricKind, MetricValue) {
90 (self.time, self.kind, self.value)
91 }
92
93 #[must_use]
95 pub fn update(&mut self, other: &Self) -> bool {
96 let (new_ts, new_interval) = match (
97 self.time.timestamp,
98 self.time.interval_ms,
99 other.time.timestamp,
100 other.time.interval_ms,
101 ) {
102 (Some(t1), Some(i1), Some(t2), Some(i2)) => {
103 let Ok(delta_t) =
104 TryInto::<u32>::try_into(t1.timestamp_millis().abs_diff(t2.timestamp_millis()))
105 else {
106 return false;
107 };
108
109 if t1 > t2 {
110 (Some(t2), NonZeroU32::new(delta_t + i1.get()))
113 } else {
114 (Some(t1), NonZeroU32::new(delta_t + i2.get()))
118 }
119 }
120 (Some(t), _, None, _) | (None, _, Some(t), _) => (Some(t), None),
121 (Some(t1), _, Some(t2), _) => (Some(t1.max(t2)), None),
122 (_, _, _, _) => (None, None),
123 };
124
125 self.value.add(&other.value) && {
126 self.time.timestamp = new_ts;
127 self.time.interval_ms = new_interval;
128 true
129 }
130 }
131
132 #[must_use]
136 pub fn add(&mut self, other: &Self) -> bool {
137 other.kind == MetricKind::Incremental && self.update(other)
138 }
139
140 #[must_use]
144 pub fn subtract(&mut self, other: &Self) -> bool {
145 self.value.subtract(&other.value)
146 }
147
148 pub fn zero(&mut self) {
150 self.value.zero();
151 }
152}
153
154impl AsRef<MetricData> for MetricData {
155 fn as_ref(&self) -> &Self {
156 self
157 }
158}
159
160impl PartialOrd for MetricData {
161 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
162 self.time.timestamp.partial_cmp(&other.time.timestamp)
163 }
164}
165
166impl ByteSizeOf for MetricData {
167 fn allocated_bytes(&self) -> usize {
168 self.value.allocated_bytes()
169 }
170}