vector/sources/host_metrics/
network.rs1use futures::StreamExt;
2#[cfg(target_os = "linux")]
3use heim::net::os::linux::IoCountersExt;
4#[cfg(windows)]
5use heim::net::os::windows::IoCountersExt;
6use heim::units::information::byte;
7use vector_lib::{configurable::configurable_component, metric_tags};
8
9use super::{FilterList, HostMetrics, default_all_devices, example_devices, filter_result};
10use crate::internal_events::HostMetricsScrapeDetailError;
11
12#[configurable_component]
14#[derive(Clone, Debug, Default)]
15pub struct NetworkConfig {
16 #[serde(default = "default_all_devices")]
19 #[configurable(metadata(docs::examples = "example_devices()"))]
20 devices: FilterList,
21}
22
23impl HostMetrics {
24 pub async fn network_metrics(&self, output: &mut super::MetricsBuffer) {
25 output.name = "network";
26 match heim::net::io_counters().await {
27 Ok(counters) => {
28 for counter in counters
29 .filter_map(|result| {
30 filter_result(result, "Failed to load/parse network data.")
31 })
32 .map(|counter| {
36 self.config
37 .network
38 .devices
39 .contains_str(Some(counter.interface()))
40 .then_some(counter)
41 })
42 .filter_map(|counter| async { counter })
43 .collect::<Vec<_>>()
44 .await
45 {
46 let interface = counter.interface();
47 let tags = metric_tags!("device" => interface);
48 output.counter(
49 "network_receive_bytes_total",
50 counter.bytes_recv().get::<byte>() as f64,
51 tags.clone(),
52 );
53 output.counter(
54 "network_receive_errs_total",
55 counter.errors_recv() as f64,
56 tags.clone(),
57 );
58 output.counter(
59 "network_receive_packets_total",
60 counter.packets_recv() as f64,
61 tags.clone(),
62 );
63 output.counter(
64 "network_transmit_bytes_total",
65 counter.bytes_sent().get::<byte>() as f64,
66 tags.clone(),
67 );
68 #[cfg(any(target_os = "linux", windows))]
69 output.counter(
70 "network_transmit_packets_drop_total",
71 counter.drop_sent() as f64,
72 tags.clone(),
73 );
74 #[cfg(any(target_os = "linux", windows))]
75 output.counter(
76 "network_transmit_packets_total",
77 counter.packets_sent() as f64,
78 tags.clone(),
79 );
80 output.counter(
81 "network_transmit_errs_total",
82 counter.errors_sent() as f64,
83 tags,
84 );
85 }
86 }
87 Err(error) => {
88 emit!(HostMetricsScrapeDetailError {
89 message: "Failed to load network I/O counters.",
90 error,
91 });
92 }
93 }
94 }
95}
96
97#[cfg(all(test, not(windows)))]
100mod tests {
101 use super::{
102 super::{
103 HostMetrics, HostMetricsConfig, MetricsBuffer,
104 tests::{all_counters, assert_filtered_metrics, count_tag},
105 },
106 NetworkConfig,
107 };
108
109 #[tokio::test]
110 async fn generates_network_metrics() {
111 let mut buffer = MetricsBuffer::new(None);
112 HostMetrics::new(HostMetricsConfig::default())
113 .network_metrics(&mut buffer)
114 .await;
115 let metrics = buffer.metrics;
116 assert!(!metrics.is_empty());
117 assert!(all_counters(&metrics));
118
119 assert!(
121 !metrics
122 .iter()
123 .any(|metric| !metric.name().starts_with("network_"))
124 );
125
126 assert_eq!(count_tag(&metrics, "device"), metrics.len());
128 }
129
130 #[tokio::test]
131 async fn network_metrics_filters_on_device() {
132 assert_filtered_metrics("device", |devices| async move {
133 let mut buffer = MetricsBuffer::new(None);
134 HostMetrics::new(HostMetricsConfig {
135 network: NetworkConfig { devices },
136 ..Default::default()
137 })
138 .network_metrics(&mut buffer)
139 .await;
140 buffer.metrics
141 })
142 .await;
143 }
144}