vector/sources/host_metrics/
tcp.rs

1use std::{collections::HashMap, path::Path};
2
3use procfs::net::{TcpNetEntry, TcpState};
4use vector_lib::event::MetricTags;
5
6use super::HostMetrics;
7use crate::sources::host_metrics::HostMetricsScrapeDetailError;
8
9const PROC_IPV6_FILE: &str = "/proc/net/if_inet6";
10const TCP_CONNS_TOTAL: &str = "tcp_connections_total";
11const TCP_TX_QUEUED_BYTES_TOTAL: &str = "tcp_tx_queued_bytes_total";
12const TCP_RX_QUEUED_BYTES_TOTAL: &str = "tcp_rx_queued_bytes_total";
13const STATE: &str = "state";
14
15impl HostMetrics {
16    pub async fn tcp_metrics(&self, output: &mut super::MetricsBuffer) {
17        // Spawn blocking task to avoid blocking the async runtime with synchronous I/O
18        let result = tokio::task::spawn_blocking(build_tcp_stats)
19            .await
20            .unwrap_or_else(|join_error| {
21                Err(procfs::ProcError::Other(format!(
22                    "Failed to join blocking task: {}",
23                    join_error
24                )))
25            });
26
27        match result {
28            Ok(stats) => {
29                output.name = "tcp";
30                for (state, count) in stats.conn_states {
31                    let tags = metric_tags! {
32                        STATE => state
33                    };
34                    output.gauge(TCP_CONNS_TOTAL, count, tags);
35                }
36
37                output.gauge(
38                    TCP_TX_QUEUED_BYTES_TOTAL,
39                    stats.tx_queued_bytes,
40                    MetricTags::default(),
41                );
42                output.gauge(
43                    TCP_RX_QUEUED_BYTES_TOTAL,
44                    stats.rx_queued_bytes,
45                    MetricTags::default(),
46                );
47            }
48            Err(error) => {
49                emit!(HostMetricsScrapeDetailError {
50                    message: "Failed to load tcp connection info.",
51                    error,
52                });
53            }
54        }
55    }
56}
57
58#[derive(Debug, Default)]
59struct TcpStats {
60    conn_states: HashMap<String, f64>,
61    rx_queued_bytes: f64,
62    tx_queued_bytes: f64,
63}
64
65const fn tcp_state_to_string(state: TcpState) -> &'static str {
66    match state {
67        TcpState::Established => "established",
68        TcpState::SynSent => "syn_sent",
69        TcpState::SynRecv => "syn_recv",
70        TcpState::FinWait1 => "fin_wait1",
71        TcpState::FinWait2 => "fin_wait2",
72        TcpState::TimeWait => "time_wait",
73        TcpState::Close => "close",
74        TcpState::CloseWait => "close_wait",
75        TcpState::LastAck => "last_ack",
76        TcpState::Listen => "listen",
77        TcpState::Closing => "closing",
78        TcpState::NewSynRecv => "new_syn_recv",
79    }
80}
81
82fn parse_tcp_entries(entries: Vec<TcpNetEntry>, tcp_stats: &mut TcpStats) {
83    for entry in entries {
84        let state_str = tcp_state_to_string(entry.state);
85        *tcp_stats
86            .conn_states
87            .entry(state_str.to_string())
88            .or_insert(0.0) += 1.0;
89        tcp_stats.tx_queued_bytes += f64::from(entry.tx_queue);
90        tcp_stats.rx_queued_bytes += f64::from(entry.rx_queue);
91    }
92}
93
94/// Collects TCP socket statistics from `/proc/net/tcp` and `/proc/net/tcp6`.
95///
96/// # Behavior
97///
98/// IPv4 and IPv6 statistics are **merged together** into a single set of metrics.
99/// This means connection counts and queue bytes are aggregated across both address families.
100///
101/// **Note:** This merging behavior preserves compatibility with the previous netlink-based
102/// implementation. While not ideal (it prevents distinguishing IPv4 vs IPv6 traffic),
103/// changing this would alter the existing metric semantics that users may depend on.
104/// We can consider changing this behavior in the future.
105///
106/// # Error Handling
107///
108/// Both IPv4 and IPv6 read errors are **fatal**: if either fails, no metrics are emitted.
109/// When IPv6 is detected via `/proc/net/if_inet6`, a failure to read `/proc/net/tcp6` is
110/// treated as a hard error rather than a degraded fallback, because emitting IPv4-only
111/// totals on an IPv6-enabled host would silently undercount connections.
112fn build_tcp_stats() -> Result<TcpStats, procfs::ProcError> {
113    let mut tcp_stats = TcpStats::default();
114
115    // Read IPv4 TCP sockets
116    let tcp_entries = procfs::net::tcp()?;
117    parse_tcp_entries(tcp_entries, &mut tcp_stats);
118
119    // Read IPv6 TCP sockets if IPv6 is enabled.
120    // Failure here is fatal: silently returning IPv4-only metrics on an IPv6-enabled host
121    // would undercount connections, matching the behavior of the prior implementation.
122    if is_ipv6_enabled() {
123        let tcp6_entries = procfs::net::tcp6()?;
124        parse_tcp_entries(tcp6_entries, &mut tcp_stats);
125    }
126
127    Ok(tcp_stats)
128}
129
130fn is_ipv6_enabled() -> bool {
131    Path::new(PROC_IPV6_FILE).exists()
132}
133
134#[cfg(test)]
135mod tests {
136    use procfs::net::TcpState;
137
138    use super::{
139        STATE, TCP_CONNS_TOTAL, TCP_RX_QUEUED_BYTES_TOTAL, TCP_TX_QUEUED_BYTES_TOTAL,
140        tcp_state_to_string,
141    };
142    use crate::sources::host_metrics::{HostMetrics, HostMetricsConfig, MetricsBuffer};
143
144    #[test]
145    fn tcp_state_to_string_handles_all_variants() {
146        // Verify all 12 TCP states map correctly
147        assert_eq!(tcp_state_to_string(TcpState::Established), "established");
148        assert_eq!(tcp_state_to_string(TcpState::SynSent), "syn_sent");
149        assert_eq!(tcp_state_to_string(TcpState::SynRecv), "syn_recv");
150        assert_eq!(tcp_state_to_string(TcpState::FinWait1), "fin_wait1");
151        assert_eq!(tcp_state_to_string(TcpState::FinWait2), "fin_wait2");
152        assert_eq!(tcp_state_to_string(TcpState::TimeWait), "time_wait");
153        assert_eq!(tcp_state_to_string(TcpState::Close), "close");
154        assert_eq!(tcp_state_to_string(TcpState::CloseWait), "close_wait");
155        assert_eq!(tcp_state_to_string(TcpState::LastAck), "last_ack");
156        assert_eq!(tcp_state_to_string(TcpState::Listen), "listen");
157        assert_eq!(tcp_state_to_string(TcpState::Closing), "closing");
158        assert_eq!(tcp_state_to_string(TcpState::NewSynRecv), "new_syn_recv");
159    }
160
161    #[test]
162    fn parse_tcp_entries_handles_empty_list() {
163        use super::{TcpStats, parse_tcp_entries};
164
165        let mut stats = TcpStats::default();
166        parse_tcp_entries(vec![], &mut stats);
167
168        // Empty input should result in zero metrics
169        assert_eq!(stats.tx_queued_bytes, 0.0);
170        assert_eq!(stats.rx_queued_bytes, 0.0);
171        assert!(stats.conn_states.is_empty());
172    }
173
174    #[tokio::test]
175    async fn generates_tcp_metrics() {
176        let mut buffer = MetricsBuffer::new(None);
177        HostMetrics::new(HostMetricsConfig::default())
178            .tcp_metrics(&mut buffer)
179            .await;
180        let metrics = buffer.metrics;
181
182        assert!(!metrics.is_empty());
183
184        let mut n_tx_queued_bytes_metric = 0;
185        let mut n_rx_queued_bytes_metric = 0;
186
187        for metric in &metrics {
188            if metric.name() == TCP_CONNS_TOTAL {
189                let tags = metric.tags();
190                assert!(
191                    tags.is_some(),
192                    "Metric tcp_connections_total must have a tag"
193                );
194                let tags = tags.unwrap();
195                assert!(
196                    tags.contains_key(STATE),
197                    "Metric tcp_connections_total must have a state tag"
198                );
199            } else if metric.name() == TCP_TX_QUEUED_BYTES_TOTAL {
200                n_tx_queued_bytes_metric += 1;
201            } else if metric.name() == TCP_RX_QUEUED_BYTES_TOTAL {
202                n_rx_queued_bytes_metric += 1;
203            } else {
204                panic!("unrecognized metric name: {}", metric.name());
205            }
206        }
207
208        // Queue metrics should always be present (even if zero)
209        assert_eq!(
210            n_tx_queued_bytes_metric, 1,
211            "Expected exactly one tcp_tx_queued_bytes_total metric"
212        );
213        assert_eq!(
214            n_rx_queued_bytes_metric, 1,
215            "Expected exactly one tcp_rx_queued_bytes_total metric"
216        );
217
218        // Connection metrics depend on actual TCP connections on the host
219        // In minimal test environments, there may be zero connections, which is valid
220        // Each connection state present should have the correct tag structure
221        for metric in metrics {
222            if metric.name() == TCP_CONNS_TOTAL {
223                let tags = metric.tags();
224                assert!(
225                    tags.is_some(),
226                    "tcp_connections_total metric must have tags"
227                );
228                assert!(
229                    tags.unwrap().contains_key(STATE),
230                    "tcp_connections_total metric must have a state tag"
231                );
232            }
233        }
234    }
235}