vector/internal_events/
tcp.rs

1use std::net::SocketAddr;
2
3use metrics::counter;
4use vector_lib::internal_event::{error_stage, error_type, InternalEvent};
5
6use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
7
8#[derive(Debug)]
9pub struct TcpSocketConnectionEstablished {
10    pub peer_addr: Option<SocketAddr>,
11}
12
13impl InternalEvent for TcpSocketConnectionEstablished {
14    fn emit(self) {
15        if let Some(peer_addr) = self.peer_addr {
16            debug!(message = "Connected.", %peer_addr);
17        } else {
18            debug!(message = "Connected.", peer_addr = "unknown");
19        }
20        counter!("connection_established_total", "mode" => "tcp").increment(1);
21    }
22}
23
24#[derive(Debug)]
25pub struct TcpSocketOutgoingConnectionError<E> {
26    pub error: E,
27}
28
29impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
30    fn emit(self) {
31        // ## skip check-duplicate-events ##
32        // ## skip check-validity-events ##
33        emit!(SocketOutgoingConnectionError { error: self.error });
34    }
35}
36
37#[derive(Debug)]
38pub struct TcpSocketConnectionShutdown;
39
40impl InternalEvent for TcpSocketConnectionShutdown {
41    fn emit(self) {
42        warn!(message = "Received EOF from the server, shutdown.");
43        counter!("connection_shutdown_total", "mode" => "tcp").increment(1);
44    }
45}
46
47#[cfg(all(unix, feature = "sources-dnstap"))]
48#[derive(Debug)]
49pub struct TcpSocketError<'a, E> {
50    pub(crate) error: &'a E,
51    pub peer_addr: SocketAddr,
52}
53
54#[cfg(all(unix, feature = "sources-dnstap"))]
55impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
56    fn emit(self) {
57        error!(
58            message = "TCP socket error.",
59            error = %self.error,
60            peer_addr = ?self.peer_addr,
61            error_type = error_type::CONNECTION_FAILED,
62            stage = error_stage::PROCESSING,
63            internal_log_rate_limit = true,
64        );
65        counter!(
66            "component_errors_total",
67            "error_type" => error_type::CONNECTION_FAILED,
68            "stage" => error_stage::PROCESSING,
69        )
70        .increment(1);
71    }
72}
73
74#[derive(Debug)]
75pub struct TcpSocketTlsConnectionError {
76    pub error: TlsError,
77}
78
79impl InternalEvent for TcpSocketTlsConnectionError {
80    fn emit(self) {
81        match self.error {
82            // Specific error that occurs when the other side is only
83            // doing SYN/SYN-ACK connections for healthcheck.
84            // https://github.com/vectordotdev/vector/issues/7318
85            TlsError::Handshake { ref source }
86                if source.code() == openssl::ssl::ErrorCode::SYSCALL
87                    && source.io_error().is_none() =>
88            {
89                debug!(
90                    message = "Connection error, probably a healthcheck.",
91                    error = %self.error,
92                    internal_log_rate_limit = true,
93                );
94            }
95            _ => {
96                error!(
97                    message = "Connection error.",
98                    error = %self.error,
99                    error_code = "connection_failed",
100                    error_type = error_type::WRITER_FAILED,
101                    stage = error_stage::SENDING,
102                    internal_log_rate_limit = true,
103                );
104                counter!(
105                    "component_errors_total",
106                    "error_code" => "connection_failed",
107                    "error_type" => error_type::WRITER_FAILED,
108                    "stage" => error_stage::SENDING,
109                    "mode" => "tcp",
110                )
111                .increment(1);
112            }
113        }
114    }
115}
116
117#[derive(Debug)]
118pub struct TcpSendAckError {
119    pub error: std::io::Error,
120}
121
122impl InternalEvent for TcpSendAckError {
123    fn emit(self) {
124        error!(
125            message = "Error writing acknowledgement, dropping connection.",
126            error = %self.error,
127            error_code = "ack_failed",
128            error_type = error_type::WRITER_FAILED,
129            stage = error_stage::SENDING,
130            internal_log_rate_limit = true,
131        );
132        counter!(
133            "component_errors_total",
134            "error_code" => "ack_failed",
135            "error_type" => error_type::WRITER_FAILED,
136            "stage" => error_stage::SENDING,
137            "mode" => "tcp",
138        )
139        .increment(1);
140    }
141}
142
143#[derive(Debug)]
144pub struct TcpBytesReceived {
145    pub byte_size: usize,
146    pub peer_addr: SocketAddr,
147}
148
149impl InternalEvent for TcpBytesReceived {
150    fn emit(self) {
151        trace!(
152            message = "Bytes received.",
153            protocol = "tcp",
154            byte_size = %self.byte_size,
155            peer_addr = %self.peer_addr,
156        );
157        counter!(
158            "component_received_bytes_total", "protocol" => "tcp"
159        )
160        .increment(self.byte_size as u64);
161    }
162}