vector/sources/host_metrics/
tcp.rs1use std::{collections::HashMap, path::Path};
2
3use procfs::net::{TcpNetEntry, TcpState};
4use vector_lib::event::MetricTags;
5
6use super::HostMetrics;
7use crate::sources::host_metrics::HostMetricsScrapeDetailError;
8
9const PROC_IPV6_FILE: &str = "/proc/net/if_inet6";
10const TCP_CONNS_TOTAL: &str = "tcp_connections_total";
11const TCP_TX_QUEUED_BYTES_TOTAL: &str = "tcp_tx_queued_bytes_total";
12const TCP_RX_QUEUED_BYTES_TOTAL: &str = "tcp_rx_queued_bytes_total";
13const STATE: &str = "state";
14
15impl HostMetrics {
16 pub async fn tcp_metrics(&self, output: &mut super::MetricsBuffer) {
17 let result = tokio::task::spawn_blocking(build_tcp_stats)
19 .await
20 .unwrap_or_else(|join_error| {
21 Err(procfs::ProcError::Other(format!(
22 "Failed to join blocking task: {}",
23 join_error
24 )))
25 });
26
27 match result {
28 Ok(stats) => {
29 output.name = "tcp";
30 for (state, count) in stats.conn_states {
31 let tags = metric_tags! {
32 STATE => state
33 };
34 output.gauge(TCP_CONNS_TOTAL, count, tags);
35 }
36
37 output.gauge(
38 TCP_TX_QUEUED_BYTES_TOTAL,
39 stats.tx_queued_bytes,
40 MetricTags::default(),
41 );
42 output.gauge(
43 TCP_RX_QUEUED_BYTES_TOTAL,
44 stats.rx_queued_bytes,
45 MetricTags::default(),
46 );
47 }
48 Err(error) => {
49 emit!(HostMetricsScrapeDetailError {
50 message: "Failed to load tcp connection info.",
51 error,
52 });
53 }
54 }
55 }
56}
57
58#[derive(Debug, Default)]
59struct TcpStats {
60 conn_states: HashMap<String, f64>,
61 rx_queued_bytes: f64,
62 tx_queued_bytes: f64,
63}
64
65const fn tcp_state_to_string(state: TcpState) -> &'static str {
66 match state {
67 TcpState::Established => "established",
68 TcpState::SynSent => "syn_sent",
69 TcpState::SynRecv => "syn_recv",
70 TcpState::FinWait1 => "fin_wait1",
71 TcpState::FinWait2 => "fin_wait2",
72 TcpState::TimeWait => "time_wait",
73 TcpState::Close => "close",
74 TcpState::CloseWait => "close_wait",
75 TcpState::LastAck => "last_ack",
76 TcpState::Listen => "listen",
77 TcpState::Closing => "closing",
78 TcpState::NewSynRecv => "new_syn_recv",
79 }
80}
81
82fn parse_tcp_entries(entries: Vec<TcpNetEntry>, tcp_stats: &mut TcpStats) {
83 for entry in entries {
84 let state_str = tcp_state_to_string(entry.state);
85 *tcp_stats
86 .conn_states
87 .entry(state_str.to_string())
88 .or_insert(0.0) += 1.0;
89 tcp_stats.tx_queued_bytes += f64::from(entry.tx_queue);
90 tcp_stats.rx_queued_bytes += f64::from(entry.rx_queue);
91 }
92}
93
94fn build_tcp_stats() -> Result<TcpStats, procfs::ProcError> {
113 let mut tcp_stats = TcpStats::default();
114
115 let tcp_entries = procfs::net::tcp()?;
117 parse_tcp_entries(tcp_entries, &mut tcp_stats);
118
119 if is_ipv6_enabled() {
123 let tcp6_entries = procfs::net::tcp6()?;
124 parse_tcp_entries(tcp6_entries, &mut tcp_stats);
125 }
126
127 Ok(tcp_stats)
128}
129
130fn is_ipv6_enabled() -> bool {
131 Path::new(PROC_IPV6_FILE).exists()
132}
133
134#[cfg(test)]
135mod tests {
136 use procfs::net::TcpState;
137
138 use super::{
139 STATE, TCP_CONNS_TOTAL, TCP_RX_QUEUED_BYTES_TOTAL, TCP_TX_QUEUED_BYTES_TOTAL,
140 tcp_state_to_string,
141 };
142 use crate::sources::host_metrics::{HostMetrics, HostMetricsConfig, MetricsBuffer};
143
144 #[test]
145 fn tcp_state_to_string_handles_all_variants() {
146 assert_eq!(tcp_state_to_string(TcpState::Established), "established");
148 assert_eq!(tcp_state_to_string(TcpState::SynSent), "syn_sent");
149 assert_eq!(tcp_state_to_string(TcpState::SynRecv), "syn_recv");
150 assert_eq!(tcp_state_to_string(TcpState::FinWait1), "fin_wait1");
151 assert_eq!(tcp_state_to_string(TcpState::FinWait2), "fin_wait2");
152 assert_eq!(tcp_state_to_string(TcpState::TimeWait), "time_wait");
153 assert_eq!(tcp_state_to_string(TcpState::Close), "close");
154 assert_eq!(tcp_state_to_string(TcpState::CloseWait), "close_wait");
155 assert_eq!(tcp_state_to_string(TcpState::LastAck), "last_ack");
156 assert_eq!(tcp_state_to_string(TcpState::Listen), "listen");
157 assert_eq!(tcp_state_to_string(TcpState::Closing), "closing");
158 assert_eq!(tcp_state_to_string(TcpState::NewSynRecv), "new_syn_recv");
159 }
160
161 #[test]
162 fn parse_tcp_entries_handles_empty_list() {
163 use super::{TcpStats, parse_tcp_entries};
164
165 let mut stats = TcpStats::default();
166 parse_tcp_entries(vec![], &mut stats);
167
168 assert_eq!(stats.tx_queued_bytes, 0.0);
170 assert_eq!(stats.rx_queued_bytes, 0.0);
171 assert!(stats.conn_states.is_empty());
172 }
173
174 #[tokio::test]
175 async fn generates_tcp_metrics() {
176 let mut buffer = MetricsBuffer::new(None);
177 HostMetrics::new(HostMetricsConfig::default())
178 .tcp_metrics(&mut buffer)
179 .await;
180 let metrics = buffer.metrics;
181
182 assert!(!metrics.is_empty());
183
184 let mut n_tx_queued_bytes_metric = 0;
185 let mut n_rx_queued_bytes_metric = 0;
186
187 for metric in &metrics {
188 if metric.name() == TCP_CONNS_TOTAL {
189 let tags = metric.tags();
190 assert!(
191 tags.is_some(),
192 "Metric tcp_connections_total must have a tag"
193 );
194 let tags = tags.unwrap();
195 assert!(
196 tags.contains_key(STATE),
197 "Metric tcp_connections_total must have a state tag"
198 );
199 } else if metric.name() == TCP_TX_QUEUED_BYTES_TOTAL {
200 n_tx_queued_bytes_metric += 1;
201 } else if metric.name() == TCP_RX_QUEUED_BYTES_TOTAL {
202 n_rx_queued_bytes_metric += 1;
203 } else {
204 panic!("unrecognized metric name: {}", metric.name());
205 }
206 }
207
208 assert_eq!(
210 n_tx_queued_bytes_metric, 1,
211 "Expected exactly one tcp_tx_queued_bytes_total metric"
212 );
213 assert_eq!(
214 n_rx_queued_bytes_metric, 1,
215 "Expected exactly one tcp_rx_queued_bytes_total metric"
216 );
217
218 for metric in metrics {
222 if metric.name() == TCP_CONNS_TOTAL {
223 let tags = metric.tags();
224 assert!(
225 tags.is_some(),
226 "tcp_connections_total metric must have tags"
227 );
228 assert!(
229 tags.unwrap().contains_key(STATE),
230 "tcp_connections_total metric must have a state tag"
231 );
232 }
233 }
234 }
235}