vector/internal_events/
tcp.rs1use std::net::SocketAddr;
2
3use metrics::counter;
4use vector_lib::NamedInternalEvent;
5use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
6
7use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
8
9#[derive(Debug, NamedInternalEvent)]
10pub struct TcpSocketConnectionEstablished {
11 pub peer_addr: Option<SocketAddr>,
12}
13
14impl InternalEvent for TcpSocketConnectionEstablished {
15 fn emit(self) {
16 if let Some(peer_addr) = self.peer_addr {
17 debug!(message = "Connected.", %peer_addr);
18 } else {
19 debug!(message = "Connected.", peer_addr = "unknown");
20 }
21 counter!("connection_established_total", "mode" => "tcp").increment(1);
22 }
23}
24
25#[derive(Debug, NamedInternalEvent)]
26pub struct TcpSocketOutgoingConnectionError<E> {
27 pub error: E,
28}
29
30impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
31 fn emit(self) {
32 emit!(SocketOutgoingConnectionError { error: self.error });
35 }
36}
37
38#[derive(Debug, NamedInternalEvent)]
39pub struct TcpSocketConnectionShutdown;
40
41impl InternalEvent for TcpSocketConnectionShutdown {
42 fn emit(self) {
43 warn!(message = "Received EOF from the server, shutdown.");
44 counter!("connection_shutdown_total", "mode" => "tcp").increment(1);
45 }
46}
47
48#[cfg(all(unix, feature = "sources-dnstap"))]
49#[derive(Debug, NamedInternalEvent)]
50pub struct TcpSocketError<'a, E> {
51 pub(crate) error: &'a E,
52 pub peer_addr: SocketAddr,
53}
54
55#[cfg(all(unix, feature = "sources-dnstap"))]
56impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
57 fn emit(self) {
58 error!(
59 message = "TCP socket error.",
60 error = %self.error,
61 peer_addr = ?self.peer_addr,
62 error_type = error_type::CONNECTION_FAILED,
63 stage = error_stage::PROCESSING,
64 );
65 counter!(
66 "component_errors_total",
67 "error_type" => error_type::CONNECTION_FAILED,
68 "stage" => error_stage::PROCESSING,
69 )
70 .increment(1);
71 }
72}
73
74#[derive(Debug, NamedInternalEvent)]
75pub struct TcpSocketTlsConnectionError {
76 pub error: TlsError,
77}
78
79impl InternalEvent for TcpSocketTlsConnectionError {
80 fn emit(self) {
81 match self.error {
82 TlsError::Handshake { ref source }
86 if source.code() == openssl::ssl::ErrorCode::SYSCALL
87 && source.io_error().is_none() =>
88 {
89 debug!(
90 message = "Connection error, probably a healthcheck.",
91 error = %self.error,
92 );
93 }
94 _ => {
95 error!(
96 message = "Connection error.",
97 error = %self.error,
98 error_code = "connection_failed",
99 error_type = error_type::WRITER_FAILED,
100 stage = error_stage::SENDING,
101 );
102 counter!(
103 "component_errors_total",
104 "error_code" => "connection_failed",
105 "error_type" => error_type::WRITER_FAILED,
106 "stage" => error_stage::SENDING,
107 "mode" => "tcp",
108 )
109 .increment(1);
110 }
111 }
112 }
113}
114
115#[derive(Debug, NamedInternalEvent)]
116pub struct TcpSendAckError {
117 pub error: std::io::Error,
118}
119
120impl InternalEvent for TcpSendAckError {
121 fn emit(self) {
122 error!(
123 message = "Error writing acknowledgement, dropping connection.",
124 error = %self.error,
125 error_code = "ack_failed",
126 error_type = error_type::WRITER_FAILED,
127 stage = error_stage::SENDING,
128 );
129 counter!(
130 "component_errors_total",
131 "error_code" => "ack_failed",
132 "error_type" => error_type::WRITER_FAILED,
133 "stage" => error_stage::SENDING,
134 "mode" => "tcp",
135 )
136 .increment(1);
137 }
138}
139
140#[derive(Debug, NamedInternalEvent)]
141pub struct TcpBytesReceived {
142 pub byte_size: usize,
143 pub peer_addr: SocketAddr,
144}
145
146impl InternalEvent for TcpBytesReceived {
147 fn emit(self) {
148 trace!(
149 message = "Bytes received.",
150 protocol = "tcp",
151 byte_size = %self.byte_size,
152 peer_addr = %self.peer_addr,
153 );
154 counter!(
155 "component_received_bytes_total", "protocol" => "tcp"
156 )
157 .increment(self.byte_size as u64);
158 }
159}