vector_core/event/metric/
data.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
use std::num::NonZeroU32;

use chrono::{DateTime, Utc};
use vector_common::byte_size_of::ByteSizeOf;
use vector_config::configurable_component;

use super::{MetricKind, MetricValue};

/// Metric data.
#[configurable_component]
#[derive(Clone, Debug, PartialEq)]
pub struct MetricData {
    #[serde(flatten)]
    pub time: MetricTime,

    #[configurable(derived)]
    pub kind: MetricKind,

    #[serde(flatten)]
    pub value: MetricValue,
}

/// Metric time.
#[configurable_component]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct MetricTime {
    /// The timestamp of when the metric was created.
    ///
    /// Metrics may sometimes have no timestamp, or have no meaningful value if the metric is an
    /// aggregation or transformed heavily enough from its original form such that the original
    /// timestamp would not represent a meaningful value.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub timestamp: Option<DateTime<Utc>>,

    /// The interval, in milliseconds, of this metric.
    ///
    /// Intervals represent the time window over which this metric applies, and is generally only
    /// used for tracking rates (change over time) on counters.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub interval_ms: Option<NonZeroU32>,
}

impl MetricData {
    /// Gets a reference to the timestamp for this data, if available.
    pub fn timestamp(&self) -> Option<&DateTime<Utc>> {
        self.time.timestamp.as_ref()
    }

    /// Gets a reference to the value of this data.
    pub fn value(&self) -> &MetricValue {
        &self.value
    }

    /// Gets a mutable reference to the value of this data.
    pub fn value_mut(&mut self) -> &mut MetricValue {
        &mut self.value
    }

    /// Consumes this metric, returning it as an absolute metric.
    ///
    /// If the metric was already absolute, nothing is changed.
    #[must_use]
    pub fn into_absolute(self) -> Self {
        Self {
            time: self.time,
            kind: MetricKind::Absolute,
            value: self.value,
        }
    }

    /// Consumes this metric, returning it as an incremental metric.
    ///
    /// If the metric was already incremental, nothing is changed.
    #[must_use]
    pub fn into_incremental(self) -> Self {
        Self {
            time: self.time,
            kind: MetricKind::Incremental,
            value: self.value,
        }
    }

    /// Creates a `MetricData` directly from the raw components of another `MetricData`.
    pub fn from_parts(time: MetricTime, kind: MetricKind, value: MetricValue) -> Self {
        Self { time, kind, value }
    }

    /// Decomposes a `MetricData` into its individual parts.
    pub fn into_parts(self) -> (MetricTime, MetricKind, MetricValue) {
        (self.time, self.kind, self.value)
    }

    /// Updates this metric by adding the value from `other`.
    #[must_use]
    pub fn update(&mut self, other: &Self) -> bool {
        let (new_ts, new_interval) = match (
            self.time.timestamp,
            self.time.interval_ms,
            other.time.timestamp,
            other.time.interval_ms,
        ) {
            (Some(t1), Some(i1), Some(t2), Some(i2)) => {
                let Ok(delta_t) =
                    TryInto::<u32>::try_into(t1.timestamp_millis().abs_diff(t2.timestamp_millis()))
                else {
                    return false;
                };

                if t1 > t2 {
                    // The interval window starts from the beginning of `other` (aka `t2`)
                    // and goes to the end of `self` (which is `t1 + i1`).
                    (Some(t2), NonZeroU32::new(delta_t + i1.get()))
                } else {
                    // The interval window starts from the beginning of `self` (aka `t1`)
                    // and goes to the end of `other` (which is `t2 + i2`).

                    (Some(t1), NonZeroU32::new(delta_t + i2.get()))
                }
            }
            (Some(t), _, None, _) | (None, _, Some(t), _) => (Some(t), None),
            (Some(t1), _, Some(t2), _) => (Some(t1.max(t2)), None),
            (_, _, _, _) => (None, None),
        };

        self.value.add(&other.value) && {
            self.time.timestamp = new_ts;
            self.time.interval_ms = new_interval;
            true
        }
    }

    /// Adds the data from the `other` metric to this one.
    ///
    /// The other metric must be incremental and contain the same value type as this one.
    #[must_use]
    pub fn add(&mut self, other: &Self) -> bool {
        other.kind == MetricKind::Incremental && self.update(other)
    }

    /// Subtracts the data from the `other` metric from this one.
    ///
    /// The other metric must contain the same value type as this one.
    #[must_use]
    pub fn subtract(&mut self, other: &Self) -> bool {
        self.value.subtract(&other.value)
    }

    /// Zeroes out the data in this metric.
    pub fn zero(&mut self) {
        self.value.zero();
    }
}

impl AsRef<MetricData> for MetricData {
    fn as_ref(&self) -> &Self {
        self
    }
}

impl PartialOrd for MetricData {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        self.time.timestamp.partial_cmp(&other.time.timestamp)
    }
}

impl ByteSizeOf for MetricData {
    fn allocated_bytes(&self) -> usize {
        self.value.allocated_bytes()
    }
}