vector/sources/host_metrics/
disk.rs

1use crate::internal_events::HostMetricsScrapeDetailError;
2use futures::StreamExt;
3use heim::units::information::byte;
4use vector_lib::configurable::configurable_component;
5use vector_lib::metric_tags;
6
7use super::{default_all_devices, example_devices, filter_result, FilterList, HostMetrics};
8
9/// Options for the disk metrics collector.
10#[configurable_component]
11#[derive(Clone, Debug, Default)]
12pub struct DiskConfig {
13    /// Lists of device name patterns to include or exclude in gathering
14    /// I/O utilization metrics.
15    #[configurable(metadata(docs::examples = "example_devices()"))]
16    #[serde(default = "default_all_devices")]
17    devices: FilterList,
18}
19
20impl HostMetrics {
21    pub async fn disk_metrics(&self, output: &mut super::MetricsBuffer) {
22        match heim::disk::io_counters().await {
23            Ok(counters) => {
24                for counter in counters
25                    .filter_map(|result| {
26                        filter_result(result, "Failed to load/parse disk I/O data.")
27                    })
28                    .map(|counter| {
29                        self.config
30                            .disk
31                            .devices
32                            .contains_path(Some(counter.device_name().as_ref()))
33                            .then_some(counter)
34                    })
35                    .filter_map(|counter| async { counter })
36                    .collect::<Vec<_>>()
37                    .await
38                {
39                    let tags = metric_tags! {
40                        "device" => counter.device_name().to_string_lossy()
41                    };
42                    output.name = "disk";
43                    output.counter(
44                        "disk_read_bytes_total",
45                        counter.read_bytes().get::<byte>() as f64,
46                        tags.clone(),
47                    );
48                    output.counter(
49                        "disk_reads_completed_total",
50                        counter.read_count() as f64,
51                        tags.clone(),
52                    );
53                    output.counter(
54                        "disk_written_bytes_total",
55                        counter.write_bytes().get::<byte>() as f64,
56                        tags.clone(),
57                    );
58                    output.counter(
59                        "disk_writes_completed_total",
60                        counter.write_count() as f64,
61                        tags,
62                    );
63                }
64            }
65            Err(error) => {
66                emit!(HostMetricsScrapeDetailError {
67                    message: "Failed to load disk I/O info.",
68                    error,
69                });
70            }
71        }
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::{
78        super::{
79            tests::{all_counters, assert_filtered_metrics, count_name, count_tag},
80            HostMetrics, HostMetricsConfig, MetricsBuffer,
81        },
82        DiskConfig,
83    };
84
85    #[tokio::test]
86    async fn generates_disk_metrics() {
87        let mut buffer = MetricsBuffer::new(None);
88        HostMetrics::new(HostMetricsConfig::default())
89            .disk_metrics(&mut buffer)
90            .await;
91        let metrics = buffer.metrics;
92
93        // The Windows test runner doesn't generate any disk metrics on the VM.
94        #[cfg(not(windows))]
95        assert!(!metrics.is_empty());
96        assert!(metrics.len() % 4 == 0);
97        assert!(all_counters(&metrics));
98
99        // There are exactly four disk_* names
100        for name in &[
101            "disk_read_bytes_total",
102            "disk_reads_completed_total",
103            "disk_written_bytes_total",
104            "disk_writes_completed_total",
105        ] {
106            assert_eq!(count_name(&metrics, name), metrics.len() / 4, "name={name}");
107        }
108
109        // They should all have a "device" tag
110        assert_eq!(count_tag(&metrics, "device"), metrics.len());
111    }
112
113    #[tokio::test]
114    async fn filters_disk_metrics_on_device() {
115        assert_filtered_metrics("device", |devices| async move {
116            let mut buffer = MetricsBuffer::new(None);
117            HostMetrics::new(HostMetricsConfig {
118                disk: DiskConfig { devices },
119                ..Default::default()
120            })
121            .disk_metrics(&mut buffer)
122            .await;
123            buffer.metrics
124        })
125        .await;
126    }
127}