vector/internal_events/
socket.rs

1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::{
5    internal_event::{
6        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
7    },
8    json_size::JsonSize,
9};
10
11#[derive(Debug, Clone, Copy, Eq, PartialEq)]
12#[allow(dead_code)] // some features only use some variants
13pub enum SocketMode {
14    Tcp,
15    Udp,
16    Unix,
17}
18
19impl SocketMode {
20    pub const fn as_str(self) -> &'static str {
21        match self {
22            Self::Tcp => "tcp",
23            Self::Udp => "udp",
24            Self::Unix => "unix",
25        }
26    }
27}
28#[derive(Debug)]
29pub struct SocketBytesReceived {
30    pub mode: SocketMode,
31    pub byte_size: usize,
32}
33
34impl InternalEvent for SocketBytesReceived {
35    fn emit(self) {
36        let protocol = self.mode.as_str();
37        trace!(
38            message = "Bytes received.",
39            byte_size = %self.byte_size,
40            %protocol,
41        );
42        counter!(
43            "component_received_bytes_total",
44            "protocol" => protocol,
45        )
46        .increment(self.byte_size as u64);
47        histogram!("component_received_bytes").record(self.byte_size as f64);
48    }
49}
50
51#[derive(Debug)]
52pub struct SocketEventsReceived {
53    pub mode: SocketMode,
54    pub byte_size: JsonSize,
55    pub count: usize,
56}
57
58impl InternalEvent for SocketEventsReceived {
59    fn emit(self) {
60        let mode = self.mode.as_str();
61        trace!(
62            message = "Events received.",
63            count = self.count,
64            byte_size = self.byte_size.get(),
65            %mode,
66        );
67        counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
68        counter!("component_received_event_bytes_total", "mode" => mode)
69            .increment(self.byte_size.get() as u64);
70        histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
71    }
72}
73
74#[derive(Debug)]
75pub struct SocketBytesSent {
76    pub mode: SocketMode,
77    pub byte_size: usize,
78}
79
80impl InternalEvent for SocketBytesSent {
81    fn emit(self) {
82        let protocol = self.mode.as_str();
83        trace!(
84            message = "Bytes sent.",
85            byte_size = %self.byte_size,
86            %protocol,
87        );
88        counter!(
89            "component_sent_bytes_total",
90            "protocol" => protocol,
91        )
92        .increment(self.byte_size as u64);
93    }
94}
95
96#[derive(Debug)]
97pub struct SocketEventsSent {
98    pub mode: SocketMode,
99    pub count: u64,
100    pub byte_size: JsonSize,
101}
102
103impl InternalEvent for SocketEventsSent {
104    fn emit(self) {
105        trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
106        counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
107        counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
108            .increment(self.byte_size.get() as u64);
109    }
110}
111
112#[derive(Debug)]
113pub struct SocketBindError<E> {
114    pub mode: SocketMode,
115    pub error: E,
116}
117
118impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
119    fn emit(self) {
120        let mode = self.mode.as_str();
121        error!(
122            message = "Error binding socket.",
123            error = %self.error,
124            error_code = "socket_bind",
125            error_type = error_type::IO_FAILED,
126            stage = error_stage::INITIALIZING,
127            %mode,
128            internal_log_rate_limit = true,
129        );
130        counter!(
131            "component_errors_total",
132            "error_code" => "socket_bind",
133            "error_type" => error_type::IO_FAILED,
134            "stage" => error_stage::INITIALIZING,
135            "mode" => mode,
136        )
137        .increment(1);
138    }
139}
140
141#[derive(Debug)]
142pub struct SocketMulticastGroupJoinError<E> {
143    pub error: E,
144    pub group_addr: Ipv4Addr,
145    pub interface: Ipv4Addr,
146}
147
148impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
149    fn emit(self) {
150        // Multicast groups are only used in UDP mode
151        let mode = SocketMode::Udp.as_str();
152        let group_addr = self.group_addr.to_string();
153        let interface = self.interface.to_string();
154
155        error!(
156            message = "Error joining multicast group.",
157            error = %self.error,
158            error_code = "socket_multicast_group_join",
159            error_type = error_type::IO_FAILED,
160            stage = error_stage::INITIALIZING,
161            %mode,
162            %group_addr,
163            %interface,
164            internal_log_rate_limit = true,
165        );
166        counter!(
167            "component_errors_total",
168            "error_code" => "socket_multicast_group_join",
169            "error_type" => error_type::IO_FAILED,
170            "stage" => error_stage::INITIALIZING,
171            "mode" => mode,
172            "group_addr" => group_addr,
173            "interface" => interface,
174        )
175        .increment(1);
176    }
177}
178
179#[derive(Debug)]
180pub struct SocketReceiveError<E> {
181    pub mode: SocketMode,
182    pub error: E,
183}
184
185impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
186    fn emit(self) {
187        let mode = self.mode.as_str();
188        error!(
189            message = "Error receiving data.",
190            error = %self.error,
191            error_code = "socket_receive",
192            error_type = error_type::READER_FAILED,
193            stage = error_stage::RECEIVING,
194            %mode,
195            internal_log_rate_limit = true,
196        );
197        counter!(
198            "component_errors_total",
199            "error_code" => "socket_receive",
200            "error_type" => error_type::READER_FAILED,
201            "stage" => error_stage::RECEIVING,
202            "mode" => mode,
203        )
204        .increment(1);
205    }
206}
207
208#[derive(Debug)]
209pub struct SocketSendError<E> {
210    pub mode: SocketMode,
211    pub error: E,
212}
213
214impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
215    fn emit(self) {
216        let mode = self.mode.as_str();
217        let reason = "Error sending data.";
218        error!(
219            message = reason,
220            error = %self.error,
221            error_code = "socket_send",
222            error_type = error_type::WRITER_FAILED,
223            stage = error_stage::SENDING,
224            %mode,
225            internal_log_rate_limit = true,
226        );
227        counter!(
228            "component_errors_total",
229            "error_code" => "socket_send",
230            "error_type" => error_type::WRITER_FAILED,
231            "stage" => error_stage::SENDING,
232            "mode" => mode,
233        )
234        .increment(1);
235
236        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
237    }
238}