vector/sources/host_metrics/
network.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use futures::StreamExt;
#[cfg(target_os = "linux")]
use heim::net::os::linux::IoCountersExt;
#[cfg(windows)]
use heim::net::os::windows::IoCountersExt;
use heim::units::information::byte;
use vector_lib::configurable::configurable_component;
use vector_lib::metric_tags;

use crate::internal_events::HostMetricsScrapeDetailError;

use super::{default_all_devices, example_devices, filter_result, FilterList, HostMetrics};

/// Options for the network metrics collector.
#[configurable_component]
#[derive(Clone, Debug, Default)]
pub struct NetworkConfig {
    /// Lists of device name patterns to include or exclude in gathering
    /// network utilization metrics.
    #[serde(default = "default_all_devices")]
    #[configurable(metadata(docs::examples = "example_devices()"))]
    devices: FilterList,
}

impl HostMetrics {
    pub async fn network_metrics(&self, output: &mut super::MetricsBuffer) {
        output.name = "network";
        match heim::net::io_counters().await {
            Ok(counters) => {
                for counter in counters
                    .filter_map(|result| {
                        filter_result(result, "Failed to load/parse network data.")
                    })
                    // The following pair should be possible to do in one
                    // .filter_map, but it results in a strange "one type is
                    // more general than the other" error.
                    .map(|counter| {
                        self.config
                            .network
                            .devices
                            .contains_str(Some(counter.interface()))
                            .then_some(counter)
                    })
                    .filter_map(|counter| async { counter })
                    .collect::<Vec<_>>()
                    .await
                {
                    let interface = counter.interface();
                    let tags = metric_tags!("device" => interface);
                    output.counter(
                        "network_receive_bytes_total",
                        counter.bytes_recv().get::<byte>() as f64,
                        tags.clone(),
                    );
                    output.counter(
                        "network_receive_errs_total",
                        counter.errors_recv() as f64,
                        tags.clone(),
                    );
                    output.counter(
                        "network_receive_packets_total",
                        counter.packets_recv() as f64,
                        tags.clone(),
                    );
                    output.counter(
                        "network_transmit_bytes_total",
                        counter.bytes_sent().get::<byte>() as f64,
                        tags.clone(),
                    );
                    #[cfg(any(target_os = "linux", windows))]
                    output.counter(
                        "network_transmit_packets_drop_total",
                        counter.drop_sent() as f64,
                        tags.clone(),
                    );
                    #[cfg(any(target_os = "linux", windows))]
                    output.counter(
                        "network_transmit_packets_total",
                        counter.packets_sent() as f64,
                        tags.clone(),
                    );
                    output.counter(
                        "network_transmit_errs_total",
                        counter.errors_sent() as f64,
                        tags,
                    );
                }
            }
            Err(error) => {
                emit!(HostMetricsScrapeDetailError {
                    message: "Failed to load network I/O counters.",
                    error,
                });
            }
        }
    }
}

// The Windows CI environment produces zero network metrics, causing
// these tests to always fail.
#[cfg(all(test, not(windows)))]
mod tests {
    use super::{
        super::{
            tests::{all_counters, assert_filtered_metrics, count_tag},
            HostMetrics, HostMetricsConfig, MetricsBuffer,
        },
        NetworkConfig,
    };

    #[tokio::test]
    async fn generates_network_metrics() {
        let mut buffer = MetricsBuffer::new(None);
        HostMetrics::new(HostMetricsConfig::default())
            .network_metrics(&mut buffer)
            .await;
        let metrics = buffer.metrics;
        assert!(!metrics.is_empty());
        assert!(all_counters(&metrics));

        // All metrics are named network_*
        assert!(!metrics
            .iter()
            .any(|metric| !metric.name().starts_with("network_")));

        // They should all have a "device" tag
        assert_eq!(count_tag(&metrics, "device"), metrics.len());
    }

    #[tokio::test]
    async fn network_metrics_filters_on_device() {
        assert_filtered_metrics("device", |devices| async move {
            let mut buffer = MetricsBuffer::new(None);
            HostMetrics::new(HostMetricsConfig {
                network: NetworkConfig { devices },
                ..Default::default()
            })
            .network_metrics(&mut buffer)
            .await;
            buffer.metrics
        })
        .await;
    }
}