vector/sources/host_metrics/
disk.rs1use crate::internal_events::HostMetricsScrapeDetailError;
2use futures::StreamExt;
3use heim::units::information::byte;
4use vector_lib::configurable::configurable_component;
5use vector_lib::metric_tags;
6
7use super::{default_all_devices, example_devices, filter_result, FilterList, HostMetrics};
8
9#[configurable_component]
11#[derive(Clone, Debug, Default)]
12pub struct DiskConfig {
13 #[configurable(metadata(docs::examples = "example_devices()"))]
16 #[serde(default = "default_all_devices")]
17 devices: FilterList,
18}
19
20impl HostMetrics {
21 pub async fn disk_metrics(&self, output: &mut super::MetricsBuffer) {
22 match heim::disk::io_counters().await {
23 Ok(counters) => {
24 for counter in counters
25 .filter_map(|result| {
26 filter_result(result, "Failed to load/parse disk I/O data.")
27 })
28 .map(|counter| {
29 self.config
30 .disk
31 .devices
32 .contains_path(Some(counter.device_name().as_ref()))
33 .then_some(counter)
34 })
35 .filter_map(|counter| async { counter })
36 .collect::<Vec<_>>()
37 .await
38 {
39 let tags = metric_tags! {
40 "device" => counter.device_name().to_string_lossy()
41 };
42 output.name = "disk";
43 output.counter(
44 "disk_read_bytes_total",
45 counter.read_bytes().get::<byte>() as f64,
46 tags.clone(),
47 );
48 output.counter(
49 "disk_reads_completed_total",
50 counter.read_count() as f64,
51 tags.clone(),
52 );
53 output.counter(
54 "disk_written_bytes_total",
55 counter.write_bytes().get::<byte>() as f64,
56 tags.clone(),
57 );
58 output.counter(
59 "disk_writes_completed_total",
60 counter.write_count() as f64,
61 tags,
62 );
63 }
64 }
65 Err(error) => {
66 emit!(HostMetricsScrapeDetailError {
67 message: "Failed to load disk I/O info.",
68 error,
69 });
70 }
71 }
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::{
78 super::{
79 tests::{all_counters, assert_filtered_metrics, count_name, count_tag},
80 HostMetrics, HostMetricsConfig, MetricsBuffer,
81 },
82 DiskConfig,
83 };
84
85 #[tokio::test]
86 async fn generates_disk_metrics() {
87 let mut buffer = MetricsBuffer::new(None);
88 HostMetrics::new(HostMetricsConfig::default())
89 .disk_metrics(&mut buffer)
90 .await;
91 let metrics = buffer.metrics;
92
93 #[cfg(not(windows))]
95 assert!(!metrics.is_empty());
96 assert!(metrics.len() % 4 == 0);
97 assert!(all_counters(&metrics));
98
99 for name in &[
101 "disk_read_bytes_total",
102 "disk_reads_completed_total",
103 "disk_written_bytes_total",
104 "disk_writes_completed_total",
105 ] {
106 assert_eq!(count_name(&metrics, name), metrics.len() / 4, "name={name}");
107 }
108
109 assert_eq!(count_tag(&metrics, "device"), metrics.len());
111 }
112
113 #[tokio::test]
114 async fn filters_disk_metrics_on_device() {
115 assert_filtered_metrics("device", |devices| async move {
116 let mut buffer = MetricsBuffer::new(None);
117 HostMetrics::new(HostMetricsConfig {
118 disk: DiskConfig { devices },
119 ..Default::default()
120 })
121 .disk_metrics(&mut buffer)
122 .await;
123 buffer.metrics
124 })
125 .await;
126 }
127}