vector/sources/host_metrics/
disk.rs1use futures::StreamExt;
2use heim::units::information::byte;
3use vector_lib::{configurable::configurable_component, metric_tags};
4
5use super::{FilterList, HostMetrics, default_all_devices, example_devices, filter_result};
6use crate::internal_events::HostMetricsScrapeDetailError;
7
8#[configurable_component]
10#[derive(Clone, Debug, Default)]
11pub struct DiskConfig {
12 #[configurable(metadata(docs::examples = "example_devices()"))]
15 #[serde(default = "default_all_devices")]
16 devices: FilterList,
17}
18
19impl HostMetrics {
20 pub async fn disk_metrics(&self, output: &mut super::MetricsBuffer) {
21 match heim::disk::io_counters().await {
22 Ok(counters) => {
23 for counter in counters
24 .filter_map(|result| {
25 filter_result(result, "Failed to load/parse disk I/O data.")
26 })
27 .map(|counter| {
28 self.config
29 .disk
30 .devices
31 .contains_path(Some(counter.device_name().as_ref()))
32 .then_some(counter)
33 })
34 .filter_map(|counter| async { counter })
35 .collect::<Vec<_>>()
36 .await
37 {
38 let tags = metric_tags! {
39 "device" => counter.device_name().to_string_lossy()
40 };
41 output.name = "disk";
42 output.counter(
43 "disk_read_bytes_total",
44 counter.read_bytes().get::<byte>() as f64,
45 tags.clone(),
46 );
47 output.counter(
48 "disk_reads_completed_total",
49 counter.read_count() as f64,
50 tags.clone(),
51 );
52 output.counter(
53 "disk_written_bytes_total",
54 counter.write_bytes().get::<byte>() as f64,
55 tags.clone(),
56 );
57 output.counter(
58 "disk_writes_completed_total",
59 counter.write_count() as f64,
60 tags,
61 );
62 }
63 }
64 Err(error) => {
65 emit!(HostMetricsScrapeDetailError {
66 message: "Failed to load disk I/O info.",
67 error,
68 });
69 }
70 }
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::{
77 super::{
78 HostMetrics, HostMetricsConfig, MetricsBuffer,
79 tests::{all_counters, assert_filtered_metrics, count_name, count_tag},
80 },
81 DiskConfig,
82 };
83
84 #[tokio::test]
85 async fn generates_disk_metrics() {
86 let mut buffer = MetricsBuffer::new(None);
87 HostMetrics::new(HostMetricsConfig::default())
88 .disk_metrics(&mut buffer)
89 .await;
90 let metrics = buffer.metrics;
91
92 #[cfg(not(windows))]
94 assert!(!metrics.is_empty());
95 assert!(metrics.len() % 4 == 0);
96 assert!(all_counters(&metrics));
97
98 for name in &[
100 "disk_read_bytes_total",
101 "disk_reads_completed_total",
102 "disk_written_bytes_total",
103 "disk_writes_completed_total",
104 ] {
105 assert_eq!(count_name(&metrics, name), metrics.len() / 4, "name={name}");
106 }
107
108 assert_eq!(count_tag(&metrics, "device"), metrics.len());
110 }
111
112 #[tokio::test]
113 async fn filters_disk_metrics_on_device() {
114 assert_filtered_metrics("device", |devices| async move {
115 let mut buffer = MetricsBuffer::new(None);
116 HostMetrics::new(HostMetricsConfig {
117 disk: DiskConfig { devices },
118 ..Default::default()
119 })
120 .disk_metrics(&mut buffer)
121 .await;
122 buffer.metrics
123 })
124 .await;
125 }
126}