vector/internal_events/
tcp.rs

1use std::net::SocketAddr;
2
3use metrics::counter;
4use vector_lib::NamedInternalEvent;
5use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
6
7use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
8
9#[derive(Debug, NamedInternalEvent)]
10pub struct TcpSocketConnectionEstablished {
11    pub peer_addr: Option<SocketAddr>,
12}
13
14impl InternalEvent for TcpSocketConnectionEstablished {
15    fn emit(self) {
16        if let Some(peer_addr) = self.peer_addr {
17            debug!(message = "Connected.", %peer_addr);
18        } else {
19            debug!(message = "Connected.", peer_addr = "unknown");
20        }
21        counter!("connection_established_total", "mode" => "tcp").increment(1);
22    }
23}
24
25#[derive(Debug, NamedInternalEvent)]
26pub struct TcpSocketOutgoingConnectionError<E> {
27    pub error: E,
28}
29
30impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
31    fn emit(self) {
32        // ## skip check-duplicate-events ##
33        // ## skip check-validity-events ##
34        emit!(SocketOutgoingConnectionError { error: self.error });
35    }
36}
37
38#[derive(Debug, NamedInternalEvent)]
39pub struct TcpSocketConnectionShutdown;
40
41impl InternalEvent for TcpSocketConnectionShutdown {
42    fn emit(self) {
43        warn!(message = "Received EOF from the server, shutdown.");
44        counter!("connection_shutdown_total", "mode" => "tcp").increment(1);
45    }
46}
47
48#[cfg(all(unix, feature = "sources-dnstap"))]
49#[derive(Debug, NamedInternalEvent)]
50pub struct TcpSocketError<'a, E> {
51    pub(crate) error: &'a E,
52    pub peer_addr: SocketAddr,
53}
54
55#[cfg(all(unix, feature = "sources-dnstap"))]
56impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
57    fn emit(self) {
58        error!(
59            message = "TCP socket error.",
60            error = %self.error,
61            peer_addr = ?self.peer_addr,
62            error_type = error_type::CONNECTION_FAILED,
63            stage = error_stage::PROCESSING,
64        );
65        counter!(
66            "component_errors_total",
67            "error_type" => error_type::CONNECTION_FAILED,
68            "stage" => error_stage::PROCESSING,
69        )
70        .increment(1);
71    }
72}
73
74#[derive(Debug, NamedInternalEvent)]
75pub struct TcpSocketTlsConnectionError {
76    pub error: TlsError,
77}
78
79impl InternalEvent for TcpSocketTlsConnectionError {
80    fn emit(self) {
81        match self.error {
82            // Specific error that occurs when the other side is only
83            // doing SYN/SYN-ACK connections for healthcheck.
84            // https://github.com/vectordotdev/vector/issues/7318
85            TlsError::Handshake { ref source }
86                if source.code() == openssl::ssl::ErrorCode::SYSCALL
87                    && source.io_error().is_none() =>
88            {
89                debug!(
90                    message = "Connection error, probably a healthcheck.",
91                    error = %self.error,
92                );
93            }
94            _ => {
95                error!(
96                    message = "Connection error.",
97                    error = %self.error,
98                    error_code = "connection_failed",
99                    error_type = error_type::WRITER_FAILED,
100                    stage = error_stage::SENDING,
101                );
102                counter!(
103                    "component_errors_total",
104                    "error_code" => "connection_failed",
105                    "error_type" => error_type::WRITER_FAILED,
106                    "stage" => error_stage::SENDING,
107                    "mode" => "tcp",
108                )
109                .increment(1);
110            }
111        }
112    }
113}
114
115#[derive(Debug, NamedInternalEvent)]
116pub struct TcpSendAckError {
117    pub error: std::io::Error,
118}
119
120impl InternalEvent for TcpSendAckError {
121    fn emit(self) {
122        error!(
123            message = "Error writing acknowledgement, dropping connection.",
124            error = %self.error,
125            error_code = "ack_failed",
126            error_type = error_type::WRITER_FAILED,
127            stage = error_stage::SENDING,
128        );
129        counter!(
130            "component_errors_total",
131            "error_code" => "ack_failed",
132            "error_type" => error_type::WRITER_FAILED,
133            "stage" => error_stage::SENDING,
134            "mode" => "tcp",
135        )
136        .increment(1);
137    }
138}
139
140#[derive(Debug, NamedInternalEvent)]
141pub struct TcpBytesReceived {
142    pub byte_size: usize,
143    pub peer_addr: SocketAddr,
144}
145
146impl InternalEvent for TcpBytesReceived {
147    fn emit(self) {
148        trace!(
149            message = "Bytes received.",
150            protocol = "tcp",
151            byte_size = %self.byte_size,
152            peer_addr = %self.peer_addr,
153        );
154        counter!(
155            "component_received_bytes_total", "protocol" => "tcp"
156        )
157        .increment(self.byte_size as u64);
158    }
159}