vector/internal_events/
tcp.rs1use std::net::SocketAddr;
2
3use metrics::counter;
4use vector_lib::internal_event::{error_stage, error_type, InternalEvent};
5
6use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
7
8#[derive(Debug)]
9pub struct TcpSocketConnectionEstablished {
10 pub peer_addr: Option<SocketAddr>,
11}
12
13impl InternalEvent for TcpSocketConnectionEstablished {
14 fn emit(self) {
15 if let Some(peer_addr) = self.peer_addr {
16 debug!(message = "Connected.", %peer_addr);
17 } else {
18 debug!(message = "Connected.", peer_addr = "unknown");
19 }
20 counter!("connection_established_total", "mode" => "tcp").increment(1);
21 }
22}
23
24#[derive(Debug)]
25pub struct TcpSocketOutgoingConnectionError<E> {
26 pub error: E,
27}
28
29impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
30 fn emit(self) {
31 emit!(SocketOutgoingConnectionError { error: self.error });
34 }
35}
36
37#[derive(Debug)]
38pub struct TcpSocketConnectionShutdown;
39
40impl InternalEvent for TcpSocketConnectionShutdown {
41 fn emit(self) {
42 warn!(message = "Received EOF from the server, shutdown.");
43 counter!("connection_shutdown_total", "mode" => "tcp").increment(1);
44 }
45}
46
47#[cfg(all(unix, feature = "sources-dnstap"))]
48#[derive(Debug)]
49pub struct TcpSocketError<'a, E> {
50 pub(crate) error: &'a E,
51 pub peer_addr: SocketAddr,
52}
53
54#[cfg(all(unix, feature = "sources-dnstap"))]
55impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
56 fn emit(self) {
57 error!(
58 message = "TCP socket error.",
59 error = %self.error,
60 peer_addr = ?self.peer_addr,
61 error_type = error_type::CONNECTION_FAILED,
62 stage = error_stage::PROCESSING,
63 internal_log_rate_limit = true,
64 );
65 counter!(
66 "component_errors_total",
67 "error_type" => error_type::CONNECTION_FAILED,
68 "stage" => error_stage::PROCESSING,
69 )
70 .increment(1);
71 }
72}
73
74#[derive(Debug)]
75pub struct TcpSocketTlsConnectionError {
76 pub error: TlsError,
77}
78
79impl InternalEvent for TcpSocketTlsConnectionError {
80 fn emit(self) {
81 match self.error {
82 TlsError::Handshake { ref source }
86 if source.code() == openssl::ssl::ErrorCode::SYSCALL
87 && source.io_error().is_none() =>
88 {
89 debug!(
90 message = "Connection error, probably a healthcheck.",
91 error = %self.error,
92 internal_log_rate_limit = true,
93 );
94 }
95 _ => {
96 error!(
97 message = "Connection error.",
98 error = %self.error,
99 error_code = "connection_failed",
100 error_type = error_type::WRITER_FAILED,
101 stage = error_stage::SENDING,
102 internal_log_rate_limit = true,
103 );
104 counter!(
105 "component_errors_total",
106 "error_code" => "connection_failed",
107 "error_type" => error_type::WRITER_FAILED,
108 "stage" => error_stage::SENDING,
109 "mode" => "tcp",
110 )
111 .increment(1);
112 }
113 }
114 }
115}
116
117#[derive(Debug)]
118pub struct TcpSendAckError {
119 pub error: std::io::Error,
120}
121
122impl InternalEvent for TcpSendAckError {
123 fn emit(self) {
124 error!(
125 message = "Error writing acknowledgement, dropping connection.",
126 error = %self.error,
127 error_code = "ack_failed",
128 error_type = error_type::WRITER_FAILED,
129 stage = error_stage::SENDING,
130 internal_log_rate_limit = true,
131 );
132 counter!(
133 "component_errors_total",
134 "error_code" => "ack_failed",
135 "error_type" => error_type::WRITER_FAILED,
136 "stage" => error_stage::SENDING,
137 "mode" => "tcp",
138 )
139 .increment(1);
140 }
141}
142
143#[derive(Debug)]
144pub struct TcpBytesReceived {
145 pub byte_size: usize,
146 pub peer_addr: SocketAddr,
147}
148
149impl InternalEvent for TcpBytesReceived {
150 fn emit(self) {
151 trace!(
152 message = "Bytes received.",
153 protocol = "tcp",
154 byte_size = %self.byte_size,
155 peer_addr = %self.peer_addr,
156 );
157 counter!(
158 "component_received_bytes_total", "protocol" => "tcp"
159 )
160 .increment(self.byte_size as u64);
161 }
162}