1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::{
5 internal_event::{
6 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
7 },
8 json_size::JsonSize,
9};
10
11#[derive(Debug, Clone, Copy, Eq, PartialEq)]
12#[allow(dead_code)] pub enum SocketMode {
14 Tcp,
15 Udp,
16 Unix,
17}
18
19impl SocketMode {
20 pub const fn as_str(self) -> &'static str {
21 match self {
22 Self::Tcp => "tcp",
23 Self::Udp => "udp",
24 Self::Unix => "unix",
25 }
26 }
27}
28#[derive(Debug)]
29pub struct SocketBytesReceived {
30 pub mode: SocketMode,
31 pub byte_size: usize,
32}
33
34impl InternalEvent for SocketBytesReceived {
35 fn emit(self) {
36 let protocol = self.mode.as_str();
37 trace!(
38 message = "Bytes received.",
39 byte_size = %self.byte_size,
40 %protocol,
41 );
42 counter!(
43 "component_received_bytes_total",
44 "protocol" => protocol,
45 )
46 .increment(self.byte_size as u64);
47 histogram!("component_received_bytes").record(self.byte_size as f64);
48 }
49}
50
51#[derive(Debug)]
52pub struct SocketEventsReceived {
53 pub mode: SocketMode,
54 pub byte_size: JsonSize,
55 pub count: usize,
56}
57
58impl InternalEvent for SocketEventsReceived {
59 fn emit(self) {
60 let mode = self.mode.as_str();
61 trace!(
62 message = "Events received.",
63 count = self.count,
64 byte_size = self.byte_size.get(),
65 %mode,
66 );
67 counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
68 counter!("component_received_event_bytes_total", "mode" => mode)
69 .increment(self.byte_size.get() as u64);
70 histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
71 }
72}
73
74#[derive(Debug)]
75pub struct SocketBytesSent {
76 pub mode: SocketMode,
77 pub byte_size: usize,
78}
79
80impl InternalEvent for SocketBytesSent {
81 fn emit(self) {
82 let protocol = self.mode.as_str();
83 trace!(
84 message = "Bytes sent.",
85 byte_size = %self.byte_size,
86 %protocol,
87 );
88 counter!(
89 "component_sent_bytes_total",
90 "protocol" => protocol,
91 )
92 .increment(self.byte_size as u64);
93 }
94}
95
96#[derive(Debug)]
97pub struct SocketEventsSent {
98 pub mode: SocketMode,
99 pub count: u64,
100 pub byte_size: JsonSize,
101}
102
103impl InternalEvent for SocketEventsSent {
104 fn emit(self) {
105 trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
106 counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
107 counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
108 .increment(self.byte_size.get() as u64);
109 }
110}
111
112#[derive(Debug)]
113pub struct SocketBindError<E> {
114 pub mode: SocketMode,
115 pub error: E,
116}
117
118impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
119 fn emit(self) {
120 let mode = self.mode.as_str();
121 error!(
122 message = "Error binding socket.",
123 error = %self.error,
124 error_code = "socket_bind",
125 error_type = error_type::IO_FAILED,
126 stage = error_stage::INITIALIZING,
127 %mode,
128 );
129 counter!(
130 "component_errors_total",
131 "error_code" => "socket_bind",
132 "error_type" => error_type::IO_FAILED,
133 "stage" => error_stage::INITIALIZING,
134 "mode" => mode,
135 )
136 .increment(1);
137 }
138}
139
140#[derive(Debug)]
141pub struct SocketMulticastGroupJoinError<E> {
142 pub error: E,
143 pub group_addr: Ipv4Addr,
144 pub interface: Ipv4Addr,
145}
146
147impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
148 fn emit(self) {
149 let mode = SocketMode::Udp.as_str();
151 let group_addr = self.group_addr.to_string();
152 let interface = self.interface.to_string();
153
154 error!(
155 message = "Error joining multicast group.",
156 error = %self.error,
157 error_code = "socket_multicast_group_join",
158 error_type = error_type::IO_FAILED,
159 stage = error_stage::INITIALIZING,
160 %mode,
161 %group_addr,
162 %interface,
163 );
164 counter!(
165 "component_errors_total",
166 "error_code" => "socket_multicast_group_join",
167 "error_type" => error_type::IO_FAILED,
168 "stage" => error_stage::INITIALIZING,
169 "mode" => mode,
170 "group_addr" => group_addr,
171 "interface" => interface,
172 )
173 .increment(1);
174 }
175}
176
177#[derive(Debug)]
178pub struct SocketReceiveError<E> {
179 pub mode: SocketMode,
180 pub error: E,
181}
182
183impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
184 fn emit(self) {
185 let mode = self.mode.as_str();
186 error!(
187 message = "Error receiving data.",
188 error = %self.error,
189 error_code = "socket_receive",
190 error_type = error_type::READER_FAILED,
191 stage = error_stage::RECEIVING,
192 %mode,
193 );
194 counter!(
195 "component_errors_total",
196 "error_code" => "socket_receive",
197 "error_type" => error_type::READER_FAILED,
198 "stage" => error_stage::RECEIVING,
199 "mode" => mode,
200 )
201 .increment(1);
202 }
203}
204
205#[derive(Debug)]
206pub struct SocketSendError<E> {
207 pub mode: SocketMode,
208 pub error: E,
209}
210
211impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
212 fn emit(self) {
213 let mode = self.mode.as_str();
214 let reason = "Error sending data.";
215 error!(
216 message = reason,
217 error = %self.error,
218 error_code = "socket_send",
219 error_type = error_type::WRITER_FAILED,
220 stage = error_stage::SENDING,
221 %mode,
222 );
223 counter!(
224 "component_errors_total",
225 "error_code" => "socket_send",
226 "error_type" => error_type::WRITER_FAILED,
227 "stage" => error_stage::SENDING,
228 "mode" => mode,
229 )
230 .increment(1);
231
232 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
233 }
234}