vector/sources/eventstoredb_metrics/
types.rs

1use std::sync::LazyLock;
2
3use regex::Regex;
4use serde::{
5    de::{self, Error, MapAccess, Unexpected, Visitor},
6    Deserialize, Deserializer,
7};
8
9use crate::event::{Metric, MetricKind, MetricTags, MetricValue};
10
11#[derive(Deserialize, Debug)]
12#[serde(rename_all = "camelCase")]
13pub struct Stats {
14    pub proc: Proc,
15    pub sys: Sys,
16}
17
18impl Stats {
19    pub fn metrics(&self, namespace: Option<String>) -> Vec<Metric> {
20        let mut result = Vec::new();
21        let mut tags = MetricTags::default();
22        let now = chrono::Utc::now();
23        let namespace = namespace.unwrap_or_else(|| "eventstoredb".to_string());
24
25        tags.replace("id".to_string(), self.proc.id.to_string());
26
27        result.push(
28            Metric::new(
29                "process_memory_used_bytes",
30                MetricKind::Absolute,
31                MetricValue::Gauge {
32                    value: self.proc.mem as f64,
33                },
34            )
35            .with_namespace(Some(namespace.clone()))
36            .with_tags(Some(tags.clone()))
37            .with_timestamp(Some(now)),
38        );
39
40        result.push(
41            Metric::new(
42                "disk_read_bytes_total",
43                MetricKind::Absolute,
44                MetricValue::Counter {
45                    value: self.proc.disk_io.read_bytes as f64,
46                },
47            )
48            .with_namespace(Some(namespace.clone()))
49            .with_tags(Some(tags.clone()))
50            .with_timestamp(Some(now)),
51        );
52
53        result.push(
54            Metric::new(
55                "disk_written_bytes_total",
56                MetricKind::Absolute,
57                MetricValue::Counter {
58                    value: self.proc.disk_io.written_bytes as f64,
59                },
60            )
61            .with_namespace(Some(namespace.clone()))
62            .with_tags(Some(tags.clone()))
63            .with_timestamp(Some(now)),
64        );
65
66        result.push(
67            Metric::new(
68                "disk_read_ops_total",
69                MetricKind::Absolute,
70                MetricValue::Counter {
71                    value: self.proc.disk_io.read_ops as f64,
72                },
73            )
74            .with_namespace(Some(namespace.clone()))
75            .with_tags(Some(tags.clone()))
76            .with_timestamp(Some(now)),
77        );
78
79        result.push(
80            Metric::new(
81                "disk_write_ops_total",
82                MetricKind::Absolute,
83                MetricValue::Counter {
84                    value: self.proc.disk_io.write_ops as f64,
85                },
86            )
87            .with_namespace(Some(namespace.clone()))
88            .with_tags(Some(tags.clone()))
89            .with_timestamp(Some(now)),
90        );
91
92        result.push(
93            Metric::new(
94                "memory_free_bytes",
95                MetricKind::Absolute,
96                MetricValue::Gauge {
97                    value: self.sys.free_mem as f64,
98                },
99            )
100            .with_namespace(Some(namespace.clone()))
101            .with_tags(Some(tags.clone()))
102            .with_timestamp(Some(now)),
103        );
104
105        if let Some(drive) = self.sys.drive.as_ref() {
106            tags.replace("path".to_string(), drive.path.clone());
107
108            result.push(
109                Metric::new(
110                    "disk_total_bytes",
111                    MetricKind::Absolute,
112                    MetricValue::Gauge {
113                        value: drive.stats.total_bytes as f64,
114                    },
115                )
116                .with_namespace(Some(namespace.clone()))
117                .with_tags(Some(tags.clone()))
118                .with_timestamp(Some(now)),
119            );
120
121            result.push(
122                Metric::new(
123                    "disk_free_bytes",
124                    MetricKind::Absolute,
125                    MetricValue::Gauge {
126                        value: drive.stats.available_bytes as f64,
127                    },
128                )
129                .with_namespace(Some(namespace.clone()))
130                .with_tags(Some(tags.clone()))
131                .with_timestamp(Some(now)),
132            );
133
134            result.push(
135                Metric::new(
136                    "disk_used_bytes",
137                    MetricKind::Absolute,
138                    MetricValue::Gauge {
139                        value: drive.stats.used_bytes as f64,
140                    },
141                )
142                .with_namespace(Some(namespace))
143                .with_tags(Some(tags))
144                .with_timestamp(Some(now)),
145            );
146        }
147
148        result
149    }
150}
151
152#[derive(Deserialize, Debug)]
153#[serde(rename_all = "camelCase")]
154pub struct Proc {
155    pub id: usize,
156    pub mem: usize,
157    pub cpu: f64,
158    pub threads_count: i64,
159    pub thrown_exceptions_rate: f64,
160    pub disk_io: DiskIo,
161}
162
163#[derive(Deserialize, Debug)]
164#[serde(rename_all = "camelCase")]
165pub struct DiskIo {
166    pub read_bytes: usize,
167    pub written_bytes: usize,
168    pub read_ops: usize,
169    pub write_ops: usize,
170}
171
172#[derive(Deserialize, Debug)]
173#[serde(rename_all = "camelCase")]
174pub struct Sys {
175    pub free_mem: usize,
176    pub loadavg: LoadAvg,
177    pub drive: Option<Drive>,
178}
179
180#[derive(Deserialize, Debug)]
181#[serde(rename_all = "camelCase")]
182pub struct LoadAvg {
183    #[serde(rename = "1m")]
184    pub one_m: f64,
185    #[serde(rename = "5m")]
186    pub five_m: f64,
187    #[serde(rename = "15m")]
188    pub fifteen_m: f64,
189}
190
191#[derive(Debug)]
192pub struct Drive {
193    pub path: String,
194    pub stats: DriveStats,
195}
196
197impl<'de> Deserialize<'de> for Drive {
198    fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
199    where
200        D: Deserializer<'de>,
201    {
202        deserializer.deserialize_map(DriveVisitor)
203    }
204}
205
206#[derive(Deserialize, Debug)]
207#[serde(rename_all = "camelCase")]
208pub struct DriveStats {
209    pub available_bytes: usize,
210    pub total_bytes: usize,
211    // EventstoreDB v24.2 has the value as an string representing the percent like 30%
212    // v24.6 has it as integer value like 30. Here we handle both.
213    #[serde(deserialize_with = "percent_or_integer")]
214    pub usage: usize,
215    pub used_bytes: usize,
216}
217
218struct DriveVisitor;
219
220impl<'de> Visitor<'de> for DriveVisitor {
221    type Value = Drive;
222
223    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        write!(formatter, "DriveStats object")
225    }
226
227    fn visit_map<A>(self, mut map: A) -> Result<Self::Value, <A as MapAccess<'de>>::Error>
228    where
229        A: MapAccess<'de>,
230    {
231        if let Some(key) = map.next_key()? {
232            return Ok(Drive {
233                path: key,
234                stats: map.next_value()?,
235            });
236        }
237
238        Err(serde::de::Error::missing_field("<Drive path>"))
239    }
240}
241
242// Can be either an integer or a string like 30%
243fn percent_or_integer<'de, D>(deserializer: D) -> Result<usize, D::Error>
244where
245    D: Deserializer<'de>,
246{
247    struct PercentOrInteger;
248    static PERCENT_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(\d+)%").unwrap());
249
250    impl Visitor<'_> for PercentOrInteger {
251        type Value = usize;
252
253        fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
254            formatter.write_str("string or map")
255        }
256
257        fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
258        where
259            E: de::Error,
260        {
261            if let Some(caps) = PERCENT_REGEX.captures(value) {
262                caps[1].parse::<usize>().map_err(|err| {
263                    Error::custom(format!("could not parse percent value into usize: {err}"))
264                })
265            } else {
266                Err(de::Error::invalid_value(
267                    Unexpected::Str(value),
268                    &"string did not contain a percent value like 30%",
269                ))
270            }
271        }
272
273        fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
274        where
275            E: de::Error,
276        {
277            usize::try_from(v).map_err(Error::custom)
278        }
279
280        fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
281        where
282            E: de::Error,
283        {
284            usize::try_from(v).map_err(Error::custom)
285        }
286    }
287
288    deserializer.deserialize_any(PercentOrInteger)
289}