vector/sources/host_metrics/
network.rs1use futures::StreamExt;
2#[cfg(target_os = "linux")]
3use heim::net::os::linux::IoCountersExt;
4#[cfg(windows)]
5use heim::net::os::windows::IoCountersExt;
6use heim::units::information::byte;
7use vector_lib::configurable::configurable_component;
8use vector_lib::metric_tags;
9
10use crate::internal_events::HostMetricsScrapeDetailError;
11
12use super::{default_all_devices, example_devices, filter_result, FilterList, HostMetrics};
13
14#[configurable_component]
16#[derive(Clone, Debug, Default)]
17pub struct NetworkConfig {
18 #[serde(default = "default_all_devices")]
21 #[configurable(metadata(docs::examples = "example_devices()"))]
22 devices: FilterList,
23}
24
25impl HostMetrics {
26 pub async fn network_metrics(&self, output: &mut super::MetricsBuffer) {
27 output.name = "network";
28 match heim::net::io_counters().await {
29 Ok(counters) => {
30 for counter in counters
31 .filter_map(|result| {
32 filter_result(result, "Failed to load/parse network data.")
33 })
34 .map(|counter| {
38 self.config
39 .network
40 .devices
41 .contains_str(Some(counter.interface()))
42 .then_some(counter)
43 })
44 .filter_map(|counter| async { counter })
45 .collect::<Vec<_>>()
46 .await
47 {
48 let interface = counter.interface();
49 let tags = metric_tags!("device" => interface);
50 output.counter(
51 "network_receive_bytes_total",
52 counter.bytes_recv().get::<byte>() as f64,
53 tags.clone(),
54 );
55 output.counter(
56 "network_receive_errs_total",
57 counter.errors_recv() as f64,
58 tags.clone(),
59 );
60 output.counter(
61 "network_receive_packets_total",
62 counter.packets_recv() as f64,
63 tags.clone(),
64 );
65 output.counter(
66 "network_transmit_bytes_total",
67 counter.bytes_sent().get::<byte>() as f64,
68 tags.clone(),
69 );
70 #[cfg(any(target_os = "linux", windows))]
71 output.counter(
72 "network_transmit_packets_drop_total",
73 counter.drop_sent() as f64,
74 tags.clone(),
75 );
76 #[cfg(any(target_os = "linux", windows))]
77 output.counter(
78 "network_transmit_packets_total",
79 counter.packets_sent() as f64,
80 tags.clone(),
81 );
82 output.counter(
83 "network_transmit_errs_total",
84 counter.errors_sent() as f64,
85 tags,
86 );
87 }
88 }
89 Err(error) => {
90 emit!(HostMetricsScrapeDetailError {
91 message: "Failed to load network I/O counters.",
92 error,
93 });
94 }
95 }
96 }
97}
98
99#[cfg(all(test, not(windows)))]
102mod tests {
103 use super::{
104 super::{
105 tests::{all_counters, assert_filtered_metrics, count_tag},
106 HostMetrics, HostMetricsConfig, MetricsBuffer,
107 },
108 NetworkConfig,
109 };
110
111 #[tokio::test]
112 async fn generates_network_metrics() {
113 let mut buffer = MetricsBuffer::new(None);
114 HostMetrics::new(HostMetricsConfig::default())
115 .network_metrics(&mut buffer)
116 .await;
117 let metrics = buffer.metrics;
118 assert!(!metrics.is_empty());
119 assert!(all_counters(&metrics));
120
121 assert!(!metrics
123 .iter()
124 .any(|metric| !metric.name().starts_with("network_")));
125
126 assert_eq!(count_tag(&metrics, "device"), metrics.len());
128 }
129
130 #[tokio::test]
131 async fn network_metrics_filters_on_device() {
132 assert_filtered_metrics("device", |devices| async move {
133 let mut buffer = MetricsBuffer::new(None);
134 HostMetrics::new(HostMetricsConfig {
135 network: NetworkConfig { devices },
136 ..Default::default()
137 })
138 .network_metrics(&mut buffer)
139 .await;
140 buffer.metrics
141 })
142 .await;
143 }
144}