1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::{
5 NamedInternalEvent,
6 internal_event::{
7 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8 },
9 json_size::JsonSize,
10};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
13#[allow(dead_code)] pub enum SocketMode {
15 Tcp,
16 Udp,
17 Unix,
18}
19
20impl SocketMode {
21 pub const fn as_str(self) -> &'static str {
22 match self {
23 Self::Tcp => "tcp",
24 Self::Udp => "udp",
25 Self::Unix => "unix",
26 }
27 }
28}
29
30#[derive(Debug, NamedInternalEvent)]
31pub struct SocketBytesReceived {
32 pub mode: SocketMode,
33 pub byte_size: usize,
34}
35
36impl InternalEvent for SocketBytesReceived {
37 fn emit(self) {
38 let protocol = self.mode.as_str();
39 trace!(
40 message = "Bytes received.",
41 byte_size = %self.byte_size,
42 %protocol,
43 );
44 counter!(
45 "component_received_bytes_total",
46 "protocol" => protocol,
47 )
48 .increment(self.byte_size as u64);
49 histogram!("component_received_bytes").record(self.byte_size as f64);
50 }
51}
52
53#[derive(Debug, NamedInternalEvent)]
54pub struct SocketEventsReceived {
55 pub mode: SocketMode,
56 pub byte_size: JsonSize,
57 pub count: usize,
58}
59
60impl InternalEvent for SocketEventsReceived {
61 fn emit(self) {
62 let mode = self.mode.as_str();
63 trace!(
64 message = "Events received.",
65 count = self.count,
66 byte_size = self.byte_size.get(),
67 %mode,
68 );
69 counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
70 counter!("component_received_event_bytes_total", "mode" => mode)
71 .increment(self.byte_size.get() as u64);
72 histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
73 }
74}
75
76#[derive(Debug, NamedInternalEvent)]
77pub struct SocketBytesSent {
78 pub mode: SocketMode,
79 pub byte_size: usize,
80}
81
82impl InternalEvent for SocketBytesSent {
83 fn emit(self) {
84 let protocol = self.mode.as_str();
85 trace!(
86 message = "Bytes sent.",
87 byte_size = %self.byte_size,
88 %protocol,
89 );
90 counter!(
91 "component_sent_bytes_total",
92 "protocol" => protocol,
93 )
94 .increment(self.byte_size as u64);
95 }
96}
97
98#[derive(Debug, NamedInternalEvent)]
99pub struct SocketEventsSent {
100 pub mode: SocketMode,
101 pub count: u64,
102 pub byte_size: JsonSize,
103}
104
105impl InternalEvent for SocketEventsSent {
106 fn emit(self) {
107 trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
108 counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
109 counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
110 .increment(self.byte_size.get() as u64);
111 }
112}
113
114#[derive(Debug, NamedInternalEvent)]
115pub struct SocketBindError<E> {
116 pub mode: SocketMode,
117 pub error: E,
118}
119
120impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
121 fn emit(self) {
122 let mode = self.mode.as_str();
123 error!(
124 message = "Error binding socket.",
125 error = %self.error,
126 error_code = "socket_bind",
127 error_type = error_type::IO_FAILED,
128 stage = error_stage::INITIALIZING,
129 %mode,
130 );
131 counter!(
132 "component_errors_total",
133 "error_code" => "socket_bind",
134 "error_type" => error_type::IO_FAILED,
135 "stage" => error_stage::INITIALIZING,
136 "mode" => mode,
137 )
138 .increment(1);
139 }
140}
141
142#[derive(Debug, NamedInternalEvent)]
143pub struct SocketMulticastGroupJoinError<E> {
144 pub error: E,
145 pub group_addr: Ipv4Addr,
146 pub interface: Ipv4Addr,
147}
148
149impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
150 fn emit(self) {
151 let mode = SocketMode::Udp.as_str();
153 let group_addr = self.group_addr.to_string();
154 let interface = self.interface.to_string();
155
156 error!(
157 message = "Error joining multicast group.",
158 error = %self.error,
159 error_code = "socket_multicast_group_join",
160 error_type = error_type::IO_FAILED,
161 stage = error_stage::INITIALIZING,
162 %mode,
163 %group_addr,
164 %interface,
165 );
166 counter!(
167 "component_errors_total",
168 "error_code" => "socket_multicast_group_join",
169 "error_type" => error_type::IO_FAILED,
170 "stage" => error_stage::INITIALIZING,
171 "mode" => mode,
172 "group_addr" => group_addr,
173 "interface" => interface,
174 )
175 .increment(1);
176 }
177}
178
179#[derive(Debug, NamedInternalEvent)]
180pub struct SocketReceiveError<E> {
181 pub mode: SocketMode,
182 pub error: E,
183}
184
185impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
186 fn emit(self) {
187 let mode = self.mode.as_str();
188 error!(
189 message = "Error receiving data.",
190 error = %self.error,
191 error_code = "socket_receive",
192 error_type = error_type::READER_FAILED,
193 stage = error_stage::RECEIVING,
194 %mode,
195 );
196 counter!(
197 "component_errors_total",
198 "error_code" => "socket_receive",
199 "error_type" => error_type::READER_FAILED,
200 "stage" => error_stage::RECEIVING,
201 "mode" => mode,
202 )
203 .increment(1);
204 }
205}
206
207#[derive(Debug, NamedInternalEvent)]
208pub struct SocketSendError<E> {
209 pub mode: SocketMode,
210 pub error: E,
211}
212
213impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
214 fn emit(self) {
215 let mode = self.mode.as_str();
216 let reason = "Error sending data.";
217 error!(
218 message = reason,
219 error = %self.error,
220 error_code = "socket_send",
221 error_type = error_type::WRITER_FAILED,
222 stage = error_stage::SENDING,
223 %mode,
224 );
225 counter!(
226 "component_errors_total",
227 "error_code" => "socket_send",
228 "error_type" => error_type::WRITER_FAILED,
229 "stage" => error_stage::SENDING,
230 "mode" => mode,
231 )
232 .increment(1);
233
234 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
235 }
236}