1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::{
5 internal_event::{
6 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
7 },
8 json_size::JsonSize,
9};
10
11#[derive(Debug, Clone, Copy, Eq, PartialEq)]
12#[allow(dead_code)] pub enum SocketMode {
14 Tcp,
15 Udp,
16 Unix,
17}
18
19impl SocketMode {
20 pub const fn as_str(self) -> &'static str {
21 match self {
22 Self::Tcp => "tcp",
23 Self::Udp => "udp",
24 Self::Unix => "unix",
25 }
26 }
27}
28#[derive(Debug)]
29pub struct SocketBytesReceived {
30 pub mode: SocketMode,
31 pub byte_size: usize,
32}
33
34impl InternalEvent for SocketBytesReceived {
35 fn emit(self) {
36 let protocol = self.mode.as_str();
37 trace!(
38 message = "Bytes received.",
39 byte_size = %self.byte_size,
40 %protocol,
41 );
42 counter!(
43 "component_received_bytes_total",
44 "protocol" => protocol,
45 )
46 .increment(self.byte_size as u64);
47 histogram!("component_received_bytes").record(self.byte_size as f64);
48 }
49}
50
51#[derive(Debug)]
52pub struct SocketEventsReceived {
53 pub mode: SocketMode,
54 pub byte_size: JsonSize,
55 pub count: usize,
56}
57
58impl InternalEvent for SocketEventsReceived {
59 fn emit(self) {
60 let mode = self.mode.as_str();
61 trace!(
62 message = "Events received.",
63 count = self.count,
64 byte_size = self.byte_size.get(),
65 %mode,
66 );
67 counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
68 counter!("component_received_event_bytes_total", "mode" => mode)
69 .increment(self.byte_size.get() as u64);
70 histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
71 }
72}
73
74#[derive(Debug)]
75pub struct SocketBytesSent {
76 pub mode: SocketMode,
77 pub byte_size: usize,
78}
79
80impl InternalEvent for SocketBytesSent {
81 fn emit(self) {
82 let protocol = self.mode.as_str();
83 trace!(
84 message = "Bytes sent.",
85 byte_size = %self.byte_size,
86 %protocol,
87 );
88 counter!(
89 "component_sent_bytes_total",
90 "protocol" => protocol,
91 )
92 .increment(self.byte_size as u64);
93 }
94}
95
96#[derive(Debug)]
97pub struct SocketEventsSent {
98 pub mode: SocketMode,
99 pub count: u64,
100 pub byte_size: JsonSize,
101}
102
103impl InternalEvent for SocketEventsSent {
104 fn emit(self) {
105 trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
106 counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
107 counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
108 .increment(self.byte_size.get() as u64);
109 }
110}
111
112#[derive(Debug)]
113pub struct SocketBindError<E> {
114 pub mode: SocketMode,
115 pub error: E,
116}
117
118impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
119 fn emit(self) {
120 let mode = self.mode.as_str();
121 error!(
122 message = "Error binding socket.",
123 error = %self.error,
124 error_code = "socket_bind",
125 error_type = error_type::IO_FAILED,
126 stage = error_stage::INITIALIZING,
127 %mode,
128 internal_log_rate_limit = true,
129 );
130 counter!(
131 "component_errors_total",
132 "error_code" => "socket_bind",
133 "error_type" => error_type::IO_FAILED,
134 "stage" => error_stage::INITIALIZING,
135 "mode" => mode,
136 )
137 .increment(1);
138 }
139}
140
141#[derive(Debug)]
142pub struct SocketMulticastGroupJoinError<E> {
143 pub error: E,
144 pub group_addr: Ipv4Addr,
145 pub interface: Ipv4Addr,
146}
147
148impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
149 fn emit(self) {
150 let mode = SocketMode::Udp.as_str();
152 let group_addr = self.group_addr.to_string();
153 let interface = self.interface.to_string();
154
155 error!(
156 message = "Error joining multicast group.",
157 error = %self.error,
158 error_code = "socket_multicast_group_join",
159 error_type = error_type::IO_FAILED,
160 stage = error_stage::INITIALIZING,
161 %mode,
162 %group_addr,
163 %interface,
164 internal_log_rate_limit = true,
165 );
166 counter!(
167 "component_errors_total",
168 "error_code" => "socket_multicast_group_join",
169 "error_type" => error_type::IO_FAILED,
170 "stage" => error_stage::INITIALIZING,
171 "mode" => mode,
172 "group_addr" => group_addr,
173 "interface" => interface,
174 )
175 .increment(1);
176 }
177}
178
179#[derive(Debug)]
180pub struct SocketReceiveError<E> {
181 pub mode: SocketMode,
182 pub error: E,
183}
184
185impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
186 fn emit(self) {
187 let mode = self.mode.as_str();
188 error!(
189 message = "Error receiving data.",
190 error = %self.error,
191 error_code = "socket_receive",
192 error_type = error_type::READER_FAILED,
193 stage = error_stage::RECEIVING,
194 %mode,
195 internal_log_rate_limit = true,
196 );
197 counter!(
198 "component_errors_total",
199 "error_code" => "socket_receive",
200 "error_type" => error_type::READER_FAILED,
201 "stage" => error_stage::RECEIVING,
202 "mode" => mode,
203 )
204 .increment(1);
205 }
206}
207
208#[derive(Debug)]
209pub struct SocketSendError<E> {
210 pub mode: SocketMode,
211 pub error: E,
212}
213
214impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
215 fn emit(self) {
216 let mode = self.mode.as_str();
217 let reason = "Error sending data.";
218 error!(
219 message = reason,
220 error = %self.error,
221 error_code = "socket_send",
222 error_type = error_type::WRITER_FAILED,
223 stage = error_stage::SENDING,
224 %mode,
225 internal_log_rate_limit = true,
226 );
227 counter!(
228 "component_errors_total",
229 "error_code" => "socket_send",
230 "error_type" => error_type::WRITER_FAILED,
231 "stage" => error_stage::SENDING,
232 "mode" => mode,
233 )
234 .increment(1);
235
236 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
237 }
238}