vector/sources/host_metrics/
tcp.rs1use std::{collections::HashMap, io, path::Path};
2
3use byteorder::{ByteOrder, NativeEndian};
4use netlink_packet_core::{
5 NLM_F_ACK, NLM_F_DUMP, NLM_F_REQUEST, NetlinkHeader, NetlinkMessage, NetlinkPayload,
6};
7use netlink_packet_sock_diag::{
8 SockDiagMessage,
9 constants::*,
10 inet::{ExtensionFlags, InetRequest, InetResponseHeader, SocketId, StateFlags},
11};
12use netlink_sys::{
13 AsyncSocket, AsyncSocketExt, SocketAddr, TokioSocket, protocols::NETLINK_SOCK_DIAG,
14};
15use snafu::{ResultExt, Snafu};
16use vector_lib::event::MetricTags;
17
18use super::HostMetrics;
19use crate::sources::host_metrics::HostMetricsScrapeDetailError;
20
21const PROC_IPV6_FILE: &str = "/proc/net/if_inet6";
22const TCP_CONNS_TOTAL: &str = "tcp_connections_total";
23const TCP_TX_QUEUED_BYTES_TOTAL: &str = "tcp_tx_queued_bytes_total";
24const TCP_RX_QUEUED_BYTES_TOTAL: &str = "tcp_rx_queued_bytes_total";
25const STATE: &str = "state";
26
27impl HostMetrics {
28 pub async fn tcp_metrics(&self, output: &mut super::MetricsBuffer) {
29 match build_tcp_stats().await {
30 Ok(stats) => {
31 output.name = "tcp";
32 for (state, count) in stats.conn_states {
33 let tags = metric_tags! {
34 STATE => state
35 };
36 output.gauge(TCP_CONNS_TOTAL, count, tags);
37 }
38
39 output.gauge(
40 TCP_TX_QUEUED_BYTES_TOTAL,
41 stats.tx_queued_bytes,
42 MetricTags::default(),
43 );
44 output.gauge(
45 TCP_RX_QUEUED_BYTES_TOTAL,
46 stats.rx_queued_bytes,
47 MetricTags::default(),
48 );
49 }
50 Err(error) => {
51 emit!(HostMetricsScrapeDetailError {
52 message: "Failed to load tcp connection info.",
53 error,
54 });
55 }
56 }
57 }
58}
59
60#[derive(Debug, Snafu)]
61enum TcpError {
62 #[snafu(display("Could not open new netlink socket: {}", source))]
63 NetlinkSocket { source: io::Error },
64 #[snafu(display("Could not send netlink message: {}", source))]
65 NetlinkSend { source: io::Error },
66 #[snafu(display("Could not parse netlink response: {}", source))]
67 NetlinkParse {
68 source: netlink_packet_utils::DecodeError,
69 },
70 #[snafu(display("Could not recognize TCP state {state}"))]
71 InvalidTcpState { state: u8 },
72 #[snafu(display("Received an error message from netlink; code: {code}"))]
73 NetlinkMsgError { code: i32 },
74 #[snafu(display("Invalid message length: {length}"))]
75 InvalidLength { length: usize },
76}
77
78#[repr(u8)]
79enum TcpState {
80 Established = 1,
81 SynSent = 2,
82 SynRecv = 3,
83 FinWait1 = 4,
84 FinWait2 = 5,
85 TimeWait = 6,
86 Close = 7,
87 CloseWait = 8,
88 LastAck = 9,
89 Listen = 10,
90 Closing = 11,
91}
92
93impl From<TcpState> for String {
94 fn from(val: TcpState) -> Self {
95 match val {
96 TcpState::Established => "established".into(),
97 TcpState::SynSent => "syn_sent".into(),
98 TcpState::SynRecv => "syn_recv".into(),
99 TcpState::FinWait1 => "fin_wait1".into(),
100 TcpState::FinWait2 => "fin_wait2".into(),
101 TcpState::TimeWait => "time_wait".into(),
102 TcpState::Close => "close".into(),
103 TcpState::CloseWait => "close_wait".into(),
104 TcpState::LastAck => "last_ack".into(),
105 TcpState::Listen => "listen".into(),
106 TcpState::Closing => "closing".into(),
107 }
108 }
109}
110
111impl TryFrom<u8> for TcpState {
112 type Error = TcpError;
113
114 fn try_from(value: u8) -> Result<Self, Self::Error> {
115 match value {
116 1 => Ok(TcpState::Established),
117 2 => Ok(TcpState::SynSent),
118 3 => Ok(TcpState::SynRecv),
119 4 => Ok(TcpState::FinWait1),
120 5 => Ok(TcpState::FinWait2),
121 6 => Ok(TcpState::TimeWait),
122 7 => Ok(TcpState::Close),
123 8 => Ok(TcpState::CloseWait),
124 9 => Ok(TcpState::LastAck),
125 10 => Ok(TcpState::Listen),
126 11 => Ok(TcpState::Closing),
127 _ => Err(TcpError::InvalidTcpState { state: value }),
128 }
129 }
130}
131
132#[derive(Debug, Default)]
133struct TcpStats {
134 conn_states: HashMap<String, f64>,
135 rx_queued_bytes: f64,
136 tx_queued_bytes: f64,
137}
138
139fn parse_netlink_messages(
153 buffer: &[u8],
154 headers: &mut Vec<InetResponseHeader>,
155) -> Result<bool, TcpError> {
156 let mut offset = 0;
157 let mut done = false;
158
159 while offset < buffer.len() {
160 let remaining_bytes = &buffer[offset..];
161 if remaining_bytes.len() < 4 {
162 return Err(TcpError::InvalidLength {
164 length: remaining_bytes.len(),
165 });
166 }
167 let length = NativeEndian::read_u32(&remaining_bytes[0..4]) as usize;
169 if length == 0 {
170 break;
171 }
172 if length > remaining_bytes.len() {
173 return Err(TcpError::InvalidLength { length });
174 }
175
176 let msg_bytes = &remaining_bytes[..length];
177 let rx_packet =
178 <NetlinkMessage<SockDiagMessage>>::deserialize(msg_bytes).context(NetlinkParseSnafu)?;
179
180 match rx_packet.payload {
181 NetlinkPayload::InnerMessage(SockDiagMessage::InetResponse(response)) => {
182 headers.push(response.header);
183 }
184 NetlinkPayload::Done(_) => {
185 done = true;
186 break;
187 }
188 NetlinkPayload::Error(error) => {
189 if let Some(code) = error.code {
190 return Err(TcpError::NetlinkMsgError { code: code.get() });
191 }
192 }
193 _ => {}
194 }
195
196 offset += length;
197 }
198
199 Ok(done)
200}
201
202async fn fetch_netlink_inet_headers(addr_family: u8) -> Result<Vec<InetResponseHeader>, TcpError> {
217 let unicast_socket: SocketAddr = SocketAddr::new(0, 0);
218 let mut socket = TokioSocket::new(NETLINK_SOCK_DIAG).context(NetlinkSocketSnafu)?;
219
220 let mut inet_req = InetRequest {
221 family: addr_family,
222 protocol: IPPROTO_TCP,
223 extensions: ExtensionFlags::INFO,
224 states: StateFlags::all(),
225 socket_id: SocketId::new_v4(),
226 };
227 if addr_family == AF_INET6 {
228 inet_req.socket_id = SocketId::new_v6();
229 }
230
231 let mut hdr = NetlinkHeader::default();
232 hdr.flags = NLM_F_REQUEST | NLM_F_ACK | NLM_F_DUMP;
233 let mut msg = NetlinkMessage::new(hdr, SockDiagMessage::InetRequest(inet_req).into());
234 msg.finalize();
235
236 let mut buf = vec![0; msg.header.length as usize];
237 msg.serialize(&mut buf[..]);
238
239 socket
240 .send_to(&buf[..msg.buffer_len()], &unicast_socket)
241 .await
242 .context(NetlinkSendSnafu)?;
243
244 let mut inet_resp_hdrs = Vec::with_capacity(32); while let Ok((receive_buffer, _addr)) = socket.recv_from_full().await {
247 if receive_buffer.is_empty() {
248 break;
249 }
250 let done = parse_netlink_messages(&receive_buffer, &mut inet_resp_hdrs)?;
251 if done {
252 break;
253 }
254 }
255
256 Ok(inet_resp_hdrs)
257}
258
259fn parse_nl_inet_hdrs(
260 hdrs: Vec<InetResponseHeader>,
261 tcp_stats: &mut TcpStats,
262) -> Result<(), TcpError> {
263 for hdr in hdrs {
264 let state: TcpState = hdr.state.try_into()?;
265 let state_str: String = state.into();
266 *tcp_stats.conn_states.entry(state_str).or_insert(0.0) += 1.0;
267 tcp_stats.tx_queued_bytes += f64::from(hdr.send_queue);
268 tcp_stats.rx_queued_bytes += f64::from(hdr.recv_queue)
269 }
270
271 Ok(())
272}
273
274async fn build_tcp_stats() -> Result<TcpStats, TcpError> {
275 let mut tcp_stats = TcpStats::default();
276 let resp = fetch_netlink_inet_headers(AF_INET).await?;
277 parse_nl_inet_hdrs(resp, &mut tcp_stats)?;
278
279 if is_ipv6_enabled() {
280 let resp = fetch_netlink_inet_headers(AF_INET6).await?;
281 parse_nl_inet_hdrs(resp, &mut tcp_stats)?;
282 }
283
284 Ok(tcp_stats)
285}
286
287fn is_ipv6_enabled() -> bool {
288 Path::new(PROC_IPV6_FILE).exists()
289}
290
291#[cfg(test)]
292mod tests {
293 use netlink_packet_sock_diag::{
294 AF_INET,
295 inet::{InetResponseHeader, SocketId},
296 };
297 use tokio::net::{TcpListener, TcpStream};
298
299 use super::{
300 STATE, TCP_CONNS_TOTAL, TCP_RX_QUEUED_BYTES_TOTAL, TCP_TX_QUEUED_BYTES_TOTAL, TcpStats,
301 fetch_netlink_inet_headers, parse_nl_inet_hdrs,
302 };
303 use crate::{
304 sources::host_metrics::{HostMetrics, HostMetricsConfig, MetricsBuffer},
305 test_util::addr::next_addr,
306 };
307
308 #[test]
309 fn parses_nl_inet_hdrs() {
310 let mut hdrs: Vec<InetResponseHeader> = Vec::new();
311 for i in 1..4 {
312 let hdr = InetResponseHeader {
313 family: 0,
314 state: i,
315 timer: None,
316 socket_id: SocketId::new_v4(),
317 recv_queue: 3,
318 send_queue: 5,
319 uid: 0,
320 inode: 0,
321 };
322 hdrs.push(hdr);
323 }
324
325 let mut tcp_stats = TcpStats::default();
326 parse_nl_inet_hdrs(hdrs, &mut tcp_stats).unwrap();
327
328 assert_eq!(tcp_stats.tx_queued_bytes, 15.0);
329 assert_eq!(tcp_stats.rx_queued_bytes, 9.0);
330 assert_eq!(tcp_stats.conn_states.len(), 3);
331 assert_eq!(*tcp_stats.conn_states.get("established").unwrap(), 1.0);
332 assert_eq!(*tcp_stats.conn_states.get("syn_sent").unwrap(), 1.0);
333 assert_eq!(*tcp_stats.conn_states.get("syn_recv").unwrap(), 1.0);
334 }
335
336 #[ignore]
337 #[tokio::test]
342 async fn tcp_metrics_tests() {
343 fetches_nl_net_hdrs().await;
344 generates_tcp_metrics().await;
345 }
346
347 async fn fetches_nl_net_hdrs() {
348 let (_guard, next_addr) = next_addr();
350 let listener = TcpListener::bind(next_addr).await.unwrap();
351 let addr = listener.local_addr().unwrap();
352 tokio::spawn(async move {
353 let (_stream, _socket) = listener.accept().await.unwrap();
355 });
356 let _stream = TcpStream::connect(addr).await.unwrap();
358
359 let hdrs = fetch_netlink_inet_headers(AF_INET).await.unwrap();
360 assert!(hdrs.len() >= 2);
362
363 let mut source = false;
366 let mut dst = false;
367 for hdr in hdrs {
368 if hdr.socket_id.source_port == addr.port() {
369 source = true;
370 }
371 if hdr.socket_id.destination_port == addr.port() {
372 dst = true;
373 }
374 }
375 assert!(source);
376 assert!(dst);
377 }
378
379 async fn generates_tcp_metrics() {
380 let (_guard, next_addr) = next_addr();
381 let _listener = TcpListener::bind(next_addr).await.unwrap();
382
383 let mut buffer = MetricsBuffer::new(None);
384 HostMetrics::new(HostMetricsConfig::default())
385 .tcp_metrics(&mut buffer)
386 .await;
387 let metrics = buffer.metrics;
388
389 assert!(!metrics.is_empty());
390
391 let mut n_tx_queued_bytes_metric = 0;
392 let mut n_rx_queued_bytes_metric = 0;
393
394 for metric in metrics {
395 if metric.name() == TCP_CONNS_TOTAL {
396 let tags = metric.tags();
397 assert!(
398 tags.is_some(),
399 "Metric tcp_connections_total must have a tag"
400 );
401 let tags = tags.unwrap();
402 assert!(
403 tags.contains_key(STATE),
404 "Metric tcp_connections_total must have a mode tag"
405 );
406 } else if metric.name() == TCP_TX_QUEUED_BYTES_TOTAL {
407 n_tx_queued_bytes_metric += 1;
408 } else if metric.name() == TCP_RX_QUEUED_BYTES_TOTAL {
409 n_rx_queued_bytes_metric += 1;
410 } else {
411 panic!("unrecognized metric name");
412 }
413 }
414
415 assert_eq!(n_tx_queued_bytes_metric, 1);
416 assert_eq!(n_rx_queued_bytes_metric, 1);
417 }
418}