vector_core/event/metric/
data.rs1use std::num::NonZeroU32;
2
3use chrono::{DateTime, Utc};
4use vector_common::byte_size_of::ByteSizeOf;
5use vector_config::configurable_component;
6
7use super::{MetricKind, MetricValue};
8
9#[configurable_component]
11#[derive(Clone, Debug, PartialEq)]
12pub struct MetricData {
13 #[serde(flatten)]
14 pub time: MetricTime,
15
16 #[configurable(derived)]
17 pub kind: MetricKind,
18
19 #[serde(flatten)]
20 pub value: MetricValue,
21}
22
23#[configurable_component]
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub struct MetricTime {
27 #[serde(skip_serializing_if = "Option::is_none")]
33 pub timestamp: Option<DateTime<Utc>>,
34
35 #[serde(skip_serializing_if = "Option::is_none")]
40 pub interval_ms: Option<NonZeroU32>,
41}
42
43impl MetricData {
44 pub fn timestamp(&self) -> Option<&DateTime<Utc>> {
46 self.time.timestamp.as_ref()
47 }
48
49 pub fn value(&self) -> &MetricValue {
51 &self.value
52 }
53
54 pub fn value_mut(&mut self) -> &mut MetricValue {
56 &mut self.value
57 }
58
59 #[must_use]
63 pub fn into_absolute(self) -> Self {
64 Self {
65 time: MetricTime {
66 timestamp: self.time.timestamp,
67 interval_ms: None,
68 },
69 kind: MetricKind::Absolute,
70 value: self.value,
71 }
72 }
73
74 #[must_use]
78 pub fn into_incremental(self) -> Self {
79 Self {
80 time: self.time,
81 kind: MetricKind::Incremental,
82 value: self.value,
83 }
84 }
85
86 pub fn from_parts(time: MetricTime, kind: MetricKind, value: MetricValue) -> Self {
88 Self { time, kind, value }
89 }
90
91 pub fn into_parts(self) -> (MetricTime, MetricKind, MetricValue) {
93 (self.time, self.kind, self.value)
94 }
95
96 #[must_use]
98 pub fn update(&mut self, other: &Self) -> bool {
99 let (new_ts, new_interval) = match (
100 self.time.timestamp,
101 self.time.interval_ms,
102 other.time.timestamp,
103 other.time.interval_ms,
104 ) {
105 (Some(t1), Some(i1), Some(t2), Some(i2)) => {
106 let Ok(delta_t) =
107 TryInto::<u32>::try_into(t1.timestamp_millis().abs_diff(t2.timestamp_millis()))
108 else {
109 return false;
110 };
111
112 if t1 > t2 {
113 (Some(t2), NonZeroU32::new(delta_t + i1.get()))
116 } else {
117 (Some(t1), NonZeroU32::new(delta_t + i2.get()))
121 }
122 }
123 (Some(t), _, None, _) | (None, _, Some(t), _) => (Some(t), None),
124 (Some(t1), _, Some(t2), _) => (Some(t1.max(t2)), None),
125 (_, _, _, _) => (None, None),
126 };
127
128 self.value.add(&other.value) && {
129 self.time.timestamp = new_ts;
130 self.time.interval_ms = new_interval;
131 true
132 }
133 }
134
135 #[must_use]
139 pub fn add(&mut self, other: &Self) -> bool {
140 other.kind == MetricKind::Incremental && self.update(other)
141 }
142
143 #[must_use]
147 pub fn subtract(&mut self, other: &Self) -> bool {
148 self.value.subtract(&other.value)
149 }
150
151 pub fn zero(&mut self) {
153 self.value.zero();
154 }
155}
156
157impl AsRef<MetricData> for MetricData {
158 fn as_ref(&self) -> &Self {
159 self
160 }
161}
162
163impl PartialOrd for MetricData {
164 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
165 self.time.timestamp.partial_cmp(&other.time.timestamp)
166 }
167}
168
169impl ByteSizeOf for MetricData {
170 fn allocated_bytes(&self) -> usize {
171 self.value.allocated_bytes()
172 }
173}