vector/internal_events/
socket.rs

1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::{
5    NamedInternalEvent,
6    internal_event::{
7        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8    },
9    json_size::JsonSize,
10};
11
12#[derive(Debug, Clone, Copy, Eq, PartialEq)]
13#[allow(dead_code)] // some features only use some variants
14pub enum SocketMode {
15    Tcp,
16    Udp,
17    Unix,
18}
19
20impl SocketMode {
21    pub const fn as_str(self) -> &'static str {
22        match self {
23            Self::Tcp => "tcp",
24            Self::Udp => "udp",
25            Self::Unix => "unix",
26        }
27    }
28}
29
30#[derive(Debug, NamedInternalEvent)]
31pub struct SocketBytesReceived {
32    pub mode: SocketMode,
33    pub byte_size: usize,
34}
35
36impl InternalEvent for SocketBytesReceived {
37    fn emit(self) {
38        let protocol = self.mode.as_str();
39        trace!(
40            message = "Bytes received.",
41            byte_size = %self.byte_size,
42            %protocol,
43        );
44        counter!(
45            "component_received_bytes_total",
46            "protocol" => protocol,
47        )
48        .increment(self.byte_size as u64);
49        histogram!("component_received_bytes").record(self.byte_size as f64);
50    }
51}
52
53#[derive(Debug, NamedInternalEvent)]
54pub struct SocketEventsReceived {
55    pub mode: SocketMode,
56    pub byte_size: JsonSize,
57    pub count: usize,
58}
59
60impl InternalEvent for SocketEventsReceived {
61    fn emit(self) {
62        let mode = self.mode.as_str();
63        trace!(
64            message = "Events received.",
65            count = self.count,
66            byte_size = self.byte_size.get(),
67            %mode,
68        );
69        counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
70        counter!("component_received_event_bytes_total", "mode" => mode)
71            .increment(self.byte_size.get() as u64);
72        histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
73    }
74}
75
76#[derive(Debug, NamedInternalEvent)]
77pub struct SocketBytesSent {
78    pub mode: SocketMode,
79    pub byte_size: usize,
80}
81
82impl InternalEvent for SocketBytesSent {
83    fn emit(self) {
84        let protocol = self.mode.as_str();
85        trace!(
86            message = "Bytes sent.",
87            byte_size = %self.byte_size,
88            %protocol,
89        );
90        counter!(
91            "component_sent_bytes_total",
92            "protocol" => protocol,
93        )
94        .increment(self.byte_size as u64);
95    }
96}
97
98#[derive(Debug, NamedInternalEvent)]
99pub struct SocketEventsSent {
100    pub mode: SocketMode,
101    pub count: u64,
102    pub byte_size: JsonSize,
103}
104
105impl InternalEvent for SocketEventsSent {
106    fn emit(self) {
107        trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
108        counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
109        counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
110            .increment(self.byte_size.get() as u64);
111    }
112}
113
114#[derive(Debug, NamedInternalEvent)]
115pub struct SocketBindError<E> {
116    pub mode: SocketMode,
117    pub error: E,
118}
119
120impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
121    fn emit(self) {
122        let mode = self.mode.as_str();
123        error!(
124            message = "Error binding socket.",
125            error = %self.error,
126            error_code = "socket_bind",
127            error_type = error_type::IO_FAILED,
128            stage = error_stage::INITIALIZING,
129            %mode,
130        );
131        counter!(
132            "component_errors_total",
133            "error_code" => "socket_bind",
134            "error_type" => error_type::IO_FAILED,
135            "stage" => error_stage::INITIALIZING,
136            "mode" => mode,
137        )
138        .increment(1);
139    }
140}
141
142#[derive(Debug, NamedInternalEvent)]
143pub struct SocketMulticastGroupJoinError<E> {
144    pub error: E,
145    pub group_addr: Ipv4Addr,
146    pub interface: Ipv4Addr,
147}
148
149impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
150    fn emit(self) {
151        // Multicast groups are only used in UDP mode
152        let mode = SocketMode::Udp.as_str();
153        let group_addr = self.group_addr.to_string();
154        let interface = self.interface.to_string();
155
156        error!(
157            message = "Error joining multicast group.",
158            error = %self.error,
159            error_code = "socket_multicast_group_join",
160            error_type = error_type::IO_FAILED,
161            stage = error_stage::INITIALIZING,
162            %mode,
163            %group_addr,
164            %interface,
165        );
166        counter!(
167            "component_errors_total",
168            "error_code" => "socket_multicast_group_join",
169            "error_type" => error_type::IO_FAILED,
170            "stage" => error_stage::INITIALIZING,
171            "mode" => mode,
172            "group_addr" => group_addr,
173            "interface" => interface,
174        )
175        .increment(1);
176    }
177}
178
179#[derive(Debug, NamedInternalEvent)]
180pub struct SocketReceiveError<E> {
181    pub mode: SocketMode,
182    pub error: E,
183}
184
185impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
186    fn emit(self) {
187        let mode = self.mode.as_str();
188        error!(
189            message = "Error receiving data.",
190            error = %self.error,
191            error_code = "socket_receive",
192            error_type = error_type::READER_FAILED,
193            stage = error_stage::RECEIVING,
194            %mode,
195        );
196        counter!(
197            "component_errors_total",
198            "error_code" => "socket_receive",
199            "error_type" => error_type::READER_FAILED,
200            "stage" => error_stage::RECEIVING,
201            "mode" => mode,
202        )
203        .increment(1);
204    }
205}
206
207#[derive(Debug, NamedInternalEvent)]
208pub struct SocketSendError<E> {
209    pub mode: SocketMode,
210    pub error: E,
211}
212
213impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
214    fn emit(self) {
215        let mode = self.mode.as_str();
216        let reason = "Error sending data.";
217        error!(
218            message = reason,
219            error = %self.error,
220            error_code = "socket_send",
221            error_type = error_type::WRITER_FAILED,
222            stage = error_stage::SENDING,
223            %mode,
224        );
225        counter!(
226            "component_errors_total",
227            "error_code" => "socket_send",
228            "error_type" => error_type::WRITER_FAILED,
229            "stage" => error_stage::SENDING,
230            "mode" => mode,
231        )
232        .increment(1);
233
234        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
235    }
236}