vector/sources/eventstoredb_metrics/
types.rs1use std::sync::LazyLock;
2
3use regex::Regex;
4use serde::{
5 de::{self, Error, MapAccess, Unexpected, Visitor},
6 Deserialize, Deserializer,
7};
8
9use crate::event::{Metric, MetricKind, MetricTags, MetricValue};
10
11#[derive(Deserialize, Debug)]
12#[serde(rename_all = "camelCase")]
13pub struct Stats {
14 pub proc: Proc,
15 pub sys: Sys,
16}
17
18impl Stats {
19 pub fn metrics(&self, namespace: Option<String>) -> Vec<Metric> {
20 let mut result = Vec::new();
21 let mut tags = MetricTags::default();
22 let now = chrono::Utc::now();
23 let namespace = namespace.unwrap_or_else(|| "eventstoredb".to_string());
24
25 tags.replace("id".to_string(), self.proc.id.to_string());
26
27 result.push(
28 Metric::new(
29 "process_memory_used_bytes",
30 MetricKind::Absolute,
31 MetricValue::Gauge {
32 value: self.proc.mem as f64,
33 },
34 )
35 .with_namespace(Some(namespace.clone()))
36 .with_tags(Some(tags.clone()))
37 .with_timestamp(Some(now)),
38 );
39
40 result.push(
41 Metric::new(
42 "disk_read_bytes_total",
43 MetricKind::Absolute,
44 MetricValue::Counter {
45 value: self.proc.disk_io.read_bytes as f64,
46 },
47 )
48 .with_namespace(Some(namespace.clone()))
49 .with_tags(Some(tags.clone()))
50 .with_timestamp(Some(now)),
51 );
52
53 result.push(
54 Metric::new(
55 "disk_written_bytes_total",
56 MetricKind::Absolute,
57 MetricValue::Counter {
58 value: self.proc.disk_io.written_bytes as f64,
59 },
60 )
61 .with_namespace(Some(namespace.clone()))
62 .with_tags(Some(tags.clone()))
63 .with_timestamp(Some(now)),
64 );
65
66 result.push(
67 Metric::new(
68 "disk_read_ops_total",
69 MetricKind::Absolute,
70 MetricValue::Counter {
71 value: self.proc.disk_io.read_ops as f64,
72 },
73 )
74 .with_namespace(Some(namespace.clone()))
75 .with_tags(Some(tags.clone()))
76 .with_timestamp(Some(now)),
77 );
78
79 result.push(
80 Metric::new(
81 "disk_write_ops_total",
82 MetricKind::Absolute,
83 MetricValue::Counter {
84 value: self.proc.disk_io.write_ops as f64,
85 },
86 )
87 .with_namespace(Some(namespace.clone()))
88 .with_tags(Some(tags.clone()))
89 .with_timestamp(Some(now)),
90 );
91
92 result.push(
93 Metric::new(
94 "memory_free_bytes",
95 MetricKind::Absolute,
96 MetricValue::Gauge {
97 value: self.sys.free_mem as f64,
98 },
99 )
100 .with_namespace(Some(namespace.clone()))
101 .with_tags(Some(tags.clone()))
102 .with_timestamp(Some(now)),
103 );
104
105 if let Some(drive) = self.sys.drive.as_ref() {
106 tags.replace("path".to_string(), drive.path.clone());
107
108 result.push(
109 Metric::new(
110 "disk_total_bytes",
111 MetricKind::Absolute,
112 MetricValue::Gauge {
113 value: drive.stats.total_bytes as f64,
114 },
115 )
116 .with_namespace(Some(namespace.clone()))
117 .with_tags(Some(tags.clone()))
118 .with_timestamp(Some(now)),
119 );
120
121 result.push(
122 Metric::new(
123 "disk_free_bytes",
124 MetricKind::Absolute,
125 MetricValue::Gauge {
126 value: drive.stats.available_bytes as f64,
127 },
128 )
129 .with_namespace(Some(namespace.clone()))
130 .with_tags(Some(tags.clone()))
131 .with_timestamp(Some(now)),
132 );
133
134 result.push(
135 Metric::new(
136 "disk_used_bytes",
137 MetricKind::Absolute,
138 MetricValue::Gauge {
139 value: drive.stats.used_bytes as f64,
140 },
141 )
142 .with_namespace(Some(namespace))
143 .with_tags(Some(tags))
144 .with_timestamp(Some(now)),
145 );
146 }
147
148 result
149 }
150}
151
152#[derive(Deserialize, Debug)]
153#[serde(rename_all = "camelCase")]
154pub struct Proc {
155 pub id: usize,
156 pub mem: usize,
157 pub cpu: f64,
158 pub threads_count: i64,
159 pub thrown_exceptions_rate: f64,
160 pub disk_io: DiskIo,
161}
162
163#[derive(Deserialize, Debug)]
164#[serde(rename_all = "camelCase")]
165pub struct DiskIo {
166 pub read_bytes: usize,
167 pub written_bytes: usize,
168 pub read_ops: usize,
169 pub write_ops: usize,
170}
171
172#[derive(Deserialize, Debug)]
173#[serde(rename_all = "camelCase")]
174pub struct Sys {
175 pub free_mem: usize,
176 pub loadavg: LoadAvg,
177 pub drive: Option<Drive>,
178}
179
180#[derive(Deserialize, Debug)]
181#[serde(rename_all = "camelCase")]
182pub struct LoadAvg {
183 #[serde(rename = "1m")]
184 pub one_m: f64,
185 #[serde(rename = "5m")]
186 pub five_m: f64,
187 #[serde(rename = "15m")]
188 pub fifteen_m: f64,
189}
190
191#[derive(Debug)]
192pub struct Drive {
193 pub path: String,
194 pub stats: DriveStats,
195}
196
197impl<'de> Deserialize<'de> for Drive {
198 fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
199 where
200 D: Deserializer<'de>,
201 {
202 deserializer.deserialize_map(DriveVisitor)
203 }
204}
205
206#[derive(Deserialize, Debug)]
207#[serde(rename_all = "camelCase")]
208pub struct DriveStats {
209 pub available_bytes: usize,
210 pub total_bytes: usize,
211 #[serde(deserialize_with = "percent_or_integer")]
214 pub usage: usize,
215 pub used_bytes: usize,
216}
217
218struct DriveVisitor;
219
220impl<'de> Visitor<'de> for DriveVisitor {
221 type Value = Drive;
222
223 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 write!(formatter, "DriveStats object")
225 }
226
227 fn visit_map<A>(self, mut map: A) -> Result<Self::Value, <A as MapAccess<'de>>::Error>
228 where
229 A: MapAccess<'de>,
230 {
231 if let Some(key) = map.next_key()? {
232 return Ok(Drive {
233 path: key,
234 stats: map.next_value()?,
235 });
236 }
237
238 Err(serde::de::Error::missing_field("<Drive path>"))
239 }
240}
241
242fn percent_or_integer<'de, D>(deserializer: D) -> Result<usize, D::Error>
244where
245 D: Deserializer<'de>,
246{
247 struct PercentOrInteger;
248 static PERCENT_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"(\d+)%").unwrap());
249
250 impl Visitor<'_> for PercentOrInteger {
251 type Value = usize;
252
253 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
254 formatter.write_str("string or map")
255 }
256
257 fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
258 where
259 E: de::Error,
260 {
261 if let Some(caps) = PERCENT_REGEX.captures(value) {
262 caps[1].parse::<usize>().map_err(|err| {
263 Error::custom(format!("could not parse percent value into usize: {err}"))
264 })
265 } else {
266 Err(de::Error::invalid_value(
267 Unexpected::Str(value),
268 &"string did not contain a percent value like 30%",
269 ))
270 }
271 }
272
273 fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
274 where
275 E: de::Error,
276 {
277 usize::try_from(v).map_err(Error::custom)
278 }
279
280 fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
281 where
282 E: de::Error,
283 {
284 usize::try_from(v).map_err(Error::custom)
285 }
286 }
287
288 deserializer.deserialize_any(PercentOrInteger)
289}