1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL};
5use vector_lib::{
6 internal_event::{error_stage, error_type},
7 json_size::JsonSize,
8};
9
10#[derive(Debug, Clone, Copy, Eq, PartialEq)]
11#[allow(dead_code)] pub enum SocketMode {
13 Tcp,
14 Udp,
15 Unix,
16}
17
18impl SocketMode {
19 pub const fn as_str(self) -> &'static str {
20 match self {
21 Self::Tcp => "tcp",
22 Self::Udp => "udp",
23 Self::Unix => "unix",
24 }
25 }
26}
27#[derive(Debug)]
28pub struct SocketBytesReceived {
29 pub mode: SocketMode,
30 pub byte_size: usize,
31}
32
33impl InternalEvent for SocketBytesReceived {
34 fn emit(self) {
35 let protocol = self.mode.as_str();
36 trace!(
37 message = "Bytes received.",
38 byte_size = %self.byte_size,
39 %protocol,
40 );
41 counter!(
42 "component_received_bytes_total",
43 "protocol" => protocol,
44 )
45 .increment(self.byte_size as u64);
46 histogram!("component_received_bytes").record(self.byte_size as f64);
47 }
48}
49
50#[derive(Debug)]
51pub struct SocketEventsReceived {
52 pub mode: SocketMode,
53 pub byte_size: JsonSize,
54 pub count: usize,
55}
56
57impl InternalEvent for SocketEventsReceived {
58 fn emit(self) {
59 let mode = self.mode.as_str();
60 trace!(
61 message = "Events received.",
62 count = self.count,
63 byte_size = self.byte_size.get(),
64 %mode,
65 );
66 counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
67 counter!("component_received_event_bytes_total", "mode" => mode)
68 .increment(self.byte_size.get() as u64);
69 histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
70 }
71}
72
73#[derive(Debug)]
74pub struct SocketBytesSent {
75 pub mode: SocketMode,
76 pub byte_size: usize,
77}
78
79impl InternalEvent for SocketBytesSent {
80 fn emit(self) {
81 let protocol = self.mode.as_str();
82 trace!(
83 message = "Bytes sent.",
84 byte_size = %self.byte_size,
85 %protocol,
86 );
87 counter!(
88 "component_sent_bytes_total",
89 "protocol" => protocol,
90 )
91 .increment(self.byte_size as u64);
92 }
93}
94
95#[derive(Debug)]
96pub struct SocketEventsSent {
97 pub mode: SocketMode,
98 pub count: u64,
99 pub byte_size: JsonSize,
100}
101
102impl InternalEvent for SocketEventsSent {
103 fn emit(self) {
104 trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
105 counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
106 counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
107 .increment(self.byte_size.get() as u64);
108 }
109}
110
111#[derive(Debug)]
112pub struct SocketBindError<E> {
113 pub mode: SocketMode,
114 pub error: E,
115}
116
117impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
118 fn emit(self) {
119 let mode = self.mode.as_str();
120 error!(
121 message = "Error binding socket.",
122 error = %self.error,
123 error_code = "socket_bind",
124 error_type = error_type::IO_FAILED,
125 stage = error_stage::INITIALIZING,
126 %mode,
127 internal_log_rate_limit = true,
128 );
129 counter!(
130 "component_errors_total",
131 "error_code" => "socket_bind",
132 "error_type" => error_type::IO_FAILED,
133 "stage" => error_stage::INITIALIZING,
134 "mode" => mode,
135 )
136 .increment(1);
137 }
138}
139
140#[derive(Debug)]
141pub struct SocketMulticastGroupJoinError<E> {
142 pub error: E,
143 pub group_addr: Ipv4Addr,
144 pub interface: Ipv4Addr,
145}
146
147impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
148 fn emit(self) {
149 let mode = SocketMode::Udp.as_str();
151 let group_addr = self.group_addr.to_string();
152 let interface = self.interface.to_string();
153
154 error!(
155 message = "Error joining multicast group.",
156 error = %self.error,
157 error_code = "socket_multicast_group_join",
158 error_type = error_type::IO_FAILED,
159 stage = error_stage::INITIALIZING,
160 %mode,
161 %group_addr,
162 %interface,
163 internal_log_rate_limit = true,
164 );
165 counter!(
166 "component_errors_total",
167 "error_code" => "socket_multicast_group_join",
168 "error_type" => error_type::IO_FAILED,
169 "stage" => error_stage::INITIALIZING,
170 "mode" => mode,
171 "group_addr" => group_addr,
172 "interface" => interface,
173 )
174 .increment(1);
175 }
176}
177
178#[derive(Debug)]
179pub struct SocketReceiveError<E> {
180 pub mode: SocketMode,
181 pub error: E,
182}
183
184impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
185 fn emit(self) {
186 let mode = self.mode.as_str();
187 error!(
188 message = "Error receiving data.",
189 error = %self.error,
190 error_code = "socket_receive",
191 error_type = error_type::READER_FAILED,
192 stage = error_stage::RECEIVING,
193 %mode,
194 internal_log_rate_limit = true,
195 );
196 counter!(
197 "component_errors_total",
198 "error_code" => "socket_receive",
199 "error_type" => error_type::READER_FAILED,
200 "stage" => error_stage::RECEIVING,
201 "mode" => mode,
202 )
203 .increment(1);
204 }
205}
206
207#[derive(Debug)]
208pub struct SocketSendError<E> {
209 pub mode: SocketMode,
210 pub error: E,
211}
212
213impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
214 fn emit(self) {
215 let mode = self.mode.as_str();
216 let reason = "Error sending data.";
217 error!(
218 message = reason,
219 error = %self.error,
220 error_code = "socket_send",
221 error_type = error_type::WRITER_FAILED,
222 stage = error_stage::SENDING,
223 %mode,
224 internal_log_rate_limit = true,
225 );
226 counter!(
227 "component_errors_total",
228 "error_code" => "socket_send",
229 "error_type" => error_type::WRITER_FAILED,
230 "stage" => error_stage::SENDING,
231 "mode" => mode,
232 )
233 .increment(1);
234
235 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
236 }
237}