vector/internal_events/
socket.rs

1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::{
5    internal_event::{
6        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
7    },
8    json_size::JsonSize,
9};
10
11#[derive(Debug, Clone, Copy, Eq, PartialEq)]
12#[allow(dead_code)] // some features only use some variants
13pub enum SocketMode {
14    Tcp,
15    Udp,
16    Unix,
17}
18
19impl SocketMode {
20    pub const fn as_str(self) -> &'static str {
21        match self {
22            Self::Tcp => "tcp",
23            Self::Udp => "udp",
24            Self::Unix => "unix",
25        }
26    }
27}
28#[derive(Debug)]
29pub struct SocketBytesReceived {
30    pub mode: SocketMode,
31    pub byte_size: usize,
32}
33
34impl InternalEvent for SocketBytesReceived {
35    fn emit(self) {
36        let protocol = self.mode.as_str();
37        trace!(
38            message = "Bytes received.",
39            byte_size = %self.byte_size,
40            %protocol,
41        );
42        counter!(
43            "component_received_bytes_total",
44            "protocol" => protocol,
45        )
46        .increment(self.byte_size as u64);
47        histogram!("component_received_bytes").record(self.byte_size as f64);
48    }
49}
50
51#[derive(Debug)]
52pub struct SocketEventsReceived {
53    pub mode: SocketMode,
54    pub byte_size: JsonSize,
55    pub count: usize,
56}
57
58impl InternalEvent for SocketEventsReceived {
59    fn emit(self) {
60        let mode = self.mode.as_str();
61        trace!(
62            message = "Events received.",
63            count = self.count,
64            byte_size = self.byte_size.get(),
65            %mode,
66        );
67        counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
68        counter!("component_received_event_bytes_total", "mode" => mode)
69            .increment(self.byte_size.get() as u64);
70        histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
71    }
72}
73
74#[derive(Debug)]
75pub struct SocketBytesSent {
76    pub mode: SocketMode,
77    pub byte_size: usize,
78}
79
80impl InternalEvent for SocketBytesSent {
81    fn emit(self) {
82        let protocol = self.mode.as_str();
83        trace!(
84            message = "Bytes sent.",
85            byte_size = %self.byte_size,
86            %protocol,
87        );
88        counter!(
89            "component_sent_bytes_total",
90            "protocol" => protocol,
91        )
92        .increment(self.byte_size as u64);
93    }
94}
95
96#[derive(Debug)]
97pub struct SocketEventsSent {
98    pub mode: SocketMode,
99    pub count: u64,
100    pub byte_size: JsonSize,
101}
102
103impl InternalEvent for SocketEventsSent {
104    fn emit(self) {
105        trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
106        counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
107        counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
108            .increment(self.byte_size.get() as u64);
109    }
110}
111
112#[derive(Debug)]
113pub struct SocketBindError<E> {
114    pub mode: SocketMode,
115    pub error: E,
116}
117
118impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
119    fn emit(self) {
120        let mode = self.mode.as_str();
121        error!(
122            message = "Error binding socket.",
123            error = %self.error,
124            error_code = "socket_bind",
125            error_type = error_type::IO_FAILED,
126            stage = error_stage::INITIALIZING,
127            %mode,
128        );
129        counter!(
130            "component_errors_total",
131            "error_code" => "socket_bind",
132            "error_type" => error_type::IO_FAILED,
133            "stage" => error_stage::INITIALIZING,
134            "mode" => mode,
135        )
136        .increment(1);
137    }
138}
139
140#[derive(Debug)]
141pub struct SocketMulticastGroupJoinError<E> {
142    pub error: E,
143    pub group_addr: Ipv4Addr,
144    pub interface: Ipv4Addr,
145}
146
147impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
148    fn emit(self) {
149        // Multicast groups are only used in UDP mode
150        let mode = SocketMode::Udp.as_str();
151        let group_addr = self.group_addr.to_string();
152        let interface = self.interface.to_string();
153
154        error!(
155            message = "Error joining multicast group.",
156            error = %self.error,
157            error_code = "socket_multicast_group_join",
158            error_type = error_type::IO_FAILED,
159            stage = error_stage::INITIALIZING,
160            %mode,
161            %group_addr,
162            %interface,
163        );
164        counter!(
165            "component_errors_total",
166            "error_code" => "socket_multicast_group_join",
167            "error_type" => error_type::IO_FAILED,
168            "stage" => error_stage::INITIALIZING,
169            "mode" => mode,
170            "group_addr" => group_addr,
171            "interface" => interface,
172        )
173        .increment(1);
174    }
175}
176
177#[derive(Debug)]
178pub struct SocketReceiveError<E> {
179    pub mode: SocketMode,
180    pub error: E,
181}
182
183impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
184    fn emit(self) {
185        let mode = self.mode.as_str();
186        error!(
187            message = "Error receiving data.",
188            error = %self.error,
189            error_code = "socket_receive",
190            error_type = error_type::READER_FAILED,
191            stage = error_stage::RECEIVING,
192            %mode,
193        );
194        counter!(
195            "component_errors_total",
196            "error_code" => "socket_receive",
197            "error_type" => error_type::READER_FAILED,
198            "stage" => error_stage::RECEIVING,
199            "mode" => mode,
200        )
201        .increment(1);
202    }
203}
204
205#[derive(Debug)]
206pub struct SocketSendError<E> {
207    pub mode: SocketMode,
208    pub error: E,
209}
210
211impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
212    fn emit(self) {
213        let mode = self.mode.as_str();
214        let reason = "Error sending data.";
215        error!(
216            message = reason,
217            error = %self.error,
218            error_code = "socket_send",
219            error_type = error_type::WRITER_FAILED,
220            stage = error_stage::SENDING,
221            %mode,
222        );
223        counter!(
224            "component_errors_total",
225            "error_code" => "socket_send",
226            "error_type" => error_type::WRITER_FAILED,
227            "stage" => error_stage::SENDING,
228            "mode" => mode,
229        )
230        .increment(1);
231
232        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
233    }
234}