vector_core/event/metric/data.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
use std::num::NonZeroU32;
use chrono::{DateTime, Utc};
use vector_common::byte_size_of::ByteSizeOf;
use vector_config::configurable_component;
use super::{MetricKind, MetricValue};
/// Metric data.
#[configurable_component]
#[derive(Clone, Debug, PartialEq)]
pub struct MetricData {
#[serde(flatten)]
pub time: MetricTime,
#[configurable(derived)]
pub kind: MetricKind,
#[serde(flatten)]
pub value: MetricValue,
}
/// Metric time.
#[configurable_component]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct MetricTime {
/// The timestamp of when the metric was created.
///
/// Metrics may sometimes have no timestamp, or have no meaningful value if the metric is an
/// aggregation or transformed heavily enough from its original form such that the original
/// timestamp would not represent a meaningful value.
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<DateTime<Utc>>,
/// The interval, in milliseconds, of this metric.
///
/// Intervals represent the time window over which this metric applies, and is generally only
/// used for tracking rates (change over time) on counters.
#[serde(skip_serializing_if = "Option::is_none")]
pub interval_ms: Option<NonZeroU32>,
}
impl MetricData {
/// Gets a reference to the timestamp for this data, if available.
pub fn timestamp(&self) -> Option<&DateTime<Utc>> {
self.time.timestamp.as_ref()
}
/// Gets a reference to the value of this data.
pub fn value(&self) -> &MetricValue {
&self.value
}
/// Gets a mutable reference to the value of this data.
pub fn value_mut(&mut self) -> &mut MetricValue {
&mut self.value
}
/// Consumes this metric, returning it as an absolute metric.
///
/// If the metric was already absolute, nothing is changed.
#[must_use]
pub fn into_absolute(self) -> Self {
Self {
time: self.time,
kind: MetricKind::Absolute,
value: self.value,
}
}
/// Consumes this metric, returning it as an incremental metric.
///
/// If the metric was already incremental, nothing is changed.
#[must_use]
pub fn into_incremental(self) -> Self {
Self {
time: self.time,
kind: MetricKind::Incremental,
value: self.value,
}
}
/// Creates a `MetricData` directly from the raw components of another `MetricData`.
pub fn from_parts(time: MetricTime, kind: MetricKind, value: MetricValue) -> Self {
Self { time, kind, value }
}
/// Decomposes a `MetricData` into its individual parts.
pub fn into_parts(self) -> (MetricTime, MetricKind, MetricValue) {
(self.time, self.kind, self.value)
}
/// Updates this metric by adding the value from `other`.
#[must_use]
pub fn update(&mut self, other: &Self) -> bool {
let (new_ts, new_interval) = match (
self.time.timestamp,
self.time.interval_ms,
other.time.timestamp,
other.time.interval_ms,
) {
(Some(t1), Some(i1), Some(t2), Some(i2)) => {
let Ok(delta_t) =
TryInto::<u32>::try_into(t1.timestamp_millis().abs_diff(t2.timestamp_millis()))
else {
return false;
};
if t1 > t2 {
// The interval window starts from the beginning of `other` (aka `t2`)
// and goes to the end of `self` (which is `t1 + i1`).
(Some(t2), NonZeroU32::new(delta_t + i1.get()))
} else {
// The interval window starts from the beginning of `self` (aka `t1`)
// and goes to the end of `other` (which is `t2 + i2`).
(Some(t1), NonZeroU32::new(delta_t + i2.get()))
}
}
(Some(t), _, None, _) | (None, _, Some(t), _) => (Some(t), None),
(Some(t1), _, Some(t2), _) => (Some(t1.max(t2)), None),
(_, _, _, _) => (None, None),
};
self.value.add(&other.value) && {
self.time.timestamp = new_ts;
self.time.interval_ms = new_interval;
true
}
}
/// Adds the data from the `other` metric to this one.
///
/// The other metric must be incremental and contain the same value type as this one.
#[must_use]
pub fn add(&mut self, other: &Self) -> bool {
other.kind == MetricKind::Incremental && self.update(other)
}
/// Subtracts the data from the `other` metric from this one.
///
/// The other metric must contain the same value type as this one.
#[must_use]
pub fn subtract(&mut self, other: &Self) -> bool {
self.value.subtract(&other.value)
}
/// Zeroes out the data in this metric.
pub fn zero(&mut self) {
self.value.zero();
}
}
impl AsRef<MetricData> for MetricData {
fn as_ref(&self) -> &Self {
self
}
}
impl PartialOrd for MetricData {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.time.timestamp.partial_cmp(&other.time.timestamp)
}
}
impl ByteSizeOf for MetricData {
fn allocated_bytes(&self) -> usize {
self.value.allocated_bytes()
}
}