vector/internal_events/
tcp.rs1use std::net::SocketAddr;
2
3use metrics::counter;
4use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
5
6use crate::{internal_events::SocketOutgoingConnectionError, tls::TlsError};
7
8#[derive(Debug)]
9pub struct TcpSocketConnectionEstablished {
10 pub peer_addr: Option<SocketAddr>,
11}
12
13impl InternalEvent for TcpSocketConnectionEstablished {
14 fn emit(self) {
15 if let Some(peer_addr) = self.peer_addr {
16 debug!(message = "Connected.", %peer_addr);
17 } else {
18 debug!(message = "Connected.", peer_addr = "unknown");
19 }
20 counter!("connection_established_total", "mode" => "tcp").increment(1);
21 }
22}
23
24#[derive(Debug)]
25pub struct TcpSocketOutgoingConnectionError<E> {
26 pub error: E,
27}
28
29impl<E: std::error::Error> InternalEvent for TcpSocketOutgoingConnectionError<E> {
30 fn emit(self) {
31 emit!(SocketOutgoingConnectionError { error: self.error });
34 }
35}
36
37#[derive(Debug)]
38pub struct TcpSocketConnectionShutdown;
39
40impl InternalEvent for TcpSocketConnectionShutdown {
41 fn emit(self) {
42 warn!(message = "Received EOF from the server, shutdown.");
43 counter!("connection_shutdown_total", "mode" => "tcp").increment(1);
44 }
45}
46
47#[cfg(all(unix, feature = "sources-dnstap"))]
48#[derive(Debug)]
49pub struct TcpSocketError<'a, E> {
50 pub(crate) error: &'a E,
51 pub peer_addr: SocketAddr,
52}
53
54#[cfg(all(unix, feature = "sources-dnstap"))]
55impl<E: std::fmt::Display> InternalEvent for TcpSocketError<'_, E> {
56 fn emit(self) {
57 error!(
58 message = "TCP socket error.",
59 error = %self.error,
60 peer_addr = ?self.peer_addr,
61 error_type = error_type::CONNECTION_FAILED,
62 stage = error_stage::PROCESSING,
63 );
64 counter!(
65 "component_errors_total",
66 "error_type" => error_type::CONNECTION_FAILED,
67 "stage" => error_stage::PROCESSING,
68 )
69 .increment(1);
70 }
71}
72
73#[derive(Debug)]
74pub struct TcpSocketTlsConnectionError {
75 pub error: TlsError,
76}
77
78impl InternalEvent for TcpSocketTlsConnectionError {
79 fn emit(self) {
80 match self.error {
81 TlsError::Handshake { ref source }
85 if source.code() == openssl::ssl::ErrorCode::SYSCALL
86 && source.io_error().is_none() =>
87 {
88 debug!(
89 message = "Connection error, probably a healthcheck.",
90 error = %self.error,
91 );
92 }
93 _ => {
94 error!(
95 message = "Connection error.",
96 error = %self.error,
97 error_code = "connection_failed",
98 error_type = error_type::WRITER_FAILED,
99 stage = error_stage::SENDING,
100 );
101 counter!(
102 "component_errors_total",
103 "error_code" => "connection_failed",
104 "error_type" => error_type::WRITER_FAILED,
105 "stage" => error_stage::SENDING,
106 "mode" => "tcp",
107 )
108 .increment(1);
109 }
110 }
111 }
112}
113
114#[derive(Debug)]
115pub struct TcpSendAckError {
116 pub error: std::io::Error,
117}
118
119impl InternalEvent for TcpSendAckError {
120 fn emit(self) {
121 error!(
122 message = "Error writing acknowledgement, dropping connection.",
123 error = %self.error,
124 error_code = "ack_failed",
125 error_type = error_type::WRITER_FAILED,
126 stage = error_stage::SENDING,
127 );
128 counter!(
129 "component_errors_total",
130 "error_code" => "ack_failed",
131 "error_type" => error_type::WRITER_FAILED,
132 "stage" => error_stage::SENDING,
133 "mode" => "tcp",
134 )
135 .increment(1);
136 }
137}
138
139#[derive(Debug)]
140pub struct TcpBytesReceived {
141 pub byte_size: usize,
142 pub peer_addr: SocketAddr,
143}
144
145impl InternalEvent for TcpBytesReceived {
146 fn emit(self) {
147 trace!(
148 message = "Bytes received.",
149 protocol = "tcp",
150 byte_size = %self.byte_size,
151 peer_addr = %self.peer_addr,
152 );
153 counter!(
154 "component_received_bytes_total", "protocol" => "tcp"
155 )
156 .increment(self.byte_size as u64);
157 }
158}