vector/sources/host_metrics/
disk.rs

1use futures::StreamExt;
2use heim::units::information::byte;
3use vector_lib::{configurable::configurable_component, metric_tags};
4
5use super::{FilterList, HostMetrics, default_all_devices, example_devices, filter_result};
6use crate::internal_events::HostMetricsScrapeDetailError;
7
8/// Options for the disk metrics collector.
9#[configurable_component]
10#[derive(Clone, Debug, Default)]
11pub struct DiskConfig {
12    /// Lists of device name patterns to include or exclude in gathering
13    /// I/O utilization metrics.
14    #[configurable(metadata(docs::examples = "example_devices()"))]
15    #[serde(default = "default_all_devices")]
16    devices: FilterList,
17}
18
19impl HostMetrics {
20    pub async fn disk_metrics(&self, output: &mut super::MetricsBuffer) {
21        match heim::disk::io_counters().await {
22            Ok(counters) => {
23                for counter in counters
24                    .filter_map(|result| {
25                        filter_result(result, "Failed to load/parse disk I/O data.")
26                    })
27                    .map(|counter| {
28                        self.config
29                            .disk
30                            .devices
31                            .contains_path(Some(counter.device_name().as_ref()))
32                            .then_some(counter)
33                    })
34                    .filter_map(|counter| async { counter })
35                    .collect::<Vec<_>>()
36                    .await
37                {
38                    let tags = metric_tags! {
39                        "device" => counter.device_name().to_string_lossy()
40                    };
41                    output.name = "disk";
42                    output.counter(
43                        "disk_read_bytes_total",
44                        counter.read_bytes().get::<byte>() as f64,
45                        tags.clone(),
46                    );
47                    output.counter(
48                        "disk_reads_completed_total",
49                        counter.read_count() as f64,
50                        tags.clone(),
51                    );
52                    output.counter(
53                        "disk_written_bytes_total",
54                        counter.write_bytes().get::<byte>() as f64,
55                        tags.clone(),
56                    );
57                    output.counter(
58                        "disk_writes_completed_total",
59                        counter.write_count() as f64,
60                        tags,
61                    );
62                }
63            }
64            Err(error) => {
65                emit!(HostMetricsScrapeDetailError {
66                    message: "Failed to load disk I/O info.",
67                    error,
68                });
69            }
70        }
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::{
77        super::{
78            HostMetrics, HostMetricsConfig, MetricsBuffer,
79            tests::{all_counters, assert_filtered_metrics, count_name, count_tag},
80        },
81        DiskConfig,
82    };
83
84    #[tokio::test]
85    async fn generates_disk_metrics() {
86        let mut buffer = MetricsBuffer::new(None);
87        HostMetrics::new(HostMetricsConfig::default())
88            .disk_metrics(&mut buffer)
89            .await;
90        let metrics = buffer.metrics;
91
92        // The Windows test runner doesn't generate any disk metrics on the VM.
93        #[cfg(not(windows))]
94        assert!(!metrics.is_empty());
95        assert!(metrics.len() % 4 == 0);
96        assert!(all_counters(&metrics));
97
98        // There are exactly four disk_* names
99        for name in &[
100            "disk_read_bytes_total",
101            "disk_reads_completed_total",
102            "disk_written_bytes_total",
103            "disk_writes_completed_total",
104        ] {
105            assert_eq!(count_name(&metrics, name), metrics.len() / 4, "name={name}");
106        }
107
108        // They should all have a "device" tag
109        assert_eq!(count_tag(&metrics, "device"), metrics.len());
110    }
111
112    #[tokio::test]
113    async fn filters_disk_metrics_on_device() {
114        assert_filtered_metrics("device", |devices| async move {
115            let mut buffer = MetricsBuffer::new(None);
116            HostMetrics::new(HostMetricsConfig {
117                disk: DiskConfig { devices },
118                ..Default::default()
119            })
120            .disk_metrics(&mut buffer)
121            .await;
122            buffer.metrics
123        })
124        .await;
125    }
126}