vector/internal_events/
socket.rs

1use std::net::Ipv4Addr;
2
3use metrics::{counter, histogram};
4use vector_lib::internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL};
5use vector_lib::{
6    internal_event::{error_stage, error_type},
7    json_size::JsonSize,
8};
9
10#[derive(Debug, Clone, Copy, Eq, PartialEq)]
11#[allow(dead_code)] // some features only use some variants
12pub enum SocketMode {
13    Tcp,
14    Udp,
15    Unix,
16}
17
18impl SocketMode {
19    pub const fn as_str(self) -> &'static str {
20        match self {
21            Self::Tcp => "tcp",
22            Self::Udp => "udp",
23            Self::Unix => "unix",
24        }
25    }
26}
27#[derive(Debug)]
28pub struct SocketBytesReceived {
29    pub mode: SocketMode,
30    pub byte_size: usize,
31}
32
33impl InternalEvent for SocketBytesReceived {
34    fn emit(self) {
35        let protocol = self.mode.as_str();
36        trace!(
37            message = "Bytes received.",
38            byte_size = %self.byte_size,
39            %protocol,
40        );
41        counter!(
42            "component_received_bytes_total",
43            "protocol" => protocol,
44        )
45        .increment(self.byte_size as u64);
46        histogram!("component_received_bytes").record(self.byte_size as f64);
47    }
48}
49
50#[derive(Debug)]
51pub struct SocketEventsReceived {
52    pub mode: SocketMode,
53    pub byte_size: JsonSize,
54    pub count: usize,
55}
56
57impl InternalEvent for SocketEventsReceived {
58    fn emit(self) {
59        let mode = self.mode.as_str();
60        trace!(
61            message = "Events received.",
62            count = self.count,
63            byte_size = self.byte_size.get(),
64            %mode,
65        );
66        counter!("component_received_events_total", "mode" => mode).increment(self.count as u64);
67        counter!("component_received_event_bytes_total", "mode" => mode)
68            .increment(self.byte_size.get() as u64);
69        histogram!("component_received_bytes", "mode" => mode).record(self.byte_size.get() as f64);
70    }
71}
72
73#[derive(Debug)]
74pub struct SocketBytesSent {
75    pub mode: SocketMode,
76    pub byte_size: usize,
77}
78
79impl InternalEvent for SocketBytesSent {
80    fn emit(self) {
81        let protocol = self.mode.as_str();
82        trace!(
83            message = "Bytes sent.",
84            byte_size = %self.byte_size,
85            %protocol,
86        );
87        counter!(
88            "component_sent_bytes_total",
89            "protocol" => protocol,
90        )
91        .increment(self.byte_size as u64);
92    }
93}
94
95#[derive(Debug)]
96pub struct SocketEventsSent {
97    pub mode: SocketMode,
98    pub count: u64,
99    pub byte_size: JsonSize,
100}
101
102impl InternalEvent for SocketEventsSent {
103    fn emit(self) {
104        trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size.get());
105        counter!("component_sent_events_total", "mode" => self.mode.as_str()).increment(self.count);
106        counter!("component_sent_event_bytes_total", "mode" => self.mode.as_str())
107            .increment(self.byte_size.get() as u64);
108    }
109}
110
111#[derive(Debug)]
112pub struct SocketBindError<E> {
113    pub mode: SocketMode,
114    pub error: E,
115}
116
117impl<E: std::fmt::Display> InternalEvent for SocketBindError<E> {
118    fn emit(self) {
119        let mode = self.mode.as_str();
120        error!(
121            message = "Error binding socket.",
122            error = %self.error,
123            error_code = "socket_bind",
124            error_type = error_type::IO_FAILED,
125            stage = error_stage::INITIALIZING,
126            %mode,
127            internal_log_rate_limit = true,
128        );
129        counter!(
130            "component_errors_total",
131            "error_code" => "socket_bind",
132            "error_type" => error_type::IO_FAILED,
133            "stage" => error_stage::INITIALIZING,
134            "mode" => mode,
135        )
136        .increment(1);
137    }
138}
139
140#[derive(Debug)]
141pub struct SocketMulticastGroupJoinError<E> {
142    pub error: E,
143    pub group_addr: Ipv4Addr,
144    pub interface: Ipv4Addr,
145}
146
147impl<E: std::fmt::Display> InternalEvent for SocketMulticastGroupJoinError<E> {
148    fn emit(self) {
149        // Multicast groups are only used in UDP mode
150        let mode = SocketMode::Udp.as_str();
151        let group_addr = self.group_addr.to_string();
152        let interface = self.interface.to_string();
153
154        error!(
155            message = "Error joining multicast group.",
156            error = %self.error,
157            error_code = "socket_multicast_group_join",
158            error_type = error_type::IO_FAILED,
159            stage = error_stage::INITIALIZING,
160            %mode,
161            %group_addr,
162            %interface,
163            internal_log_rate_limit = true,
164        );
165        counter!(
166            "component_errors_total",
167            "error_code" => "socket_multicast_group_join",
168            "error_type" => error_type::IO_FAILED,
169            "stage" => error_stage::INITIALIZING,
170            "mode" => mode,
171            "group_addr" => group_addr,
172            "interface" => interface,
173        )
174        .increment(1);
175    }
176}
177
178#[derive(Debug)]
179pub struct SocketReceiveError<E> {
180    pub mode: SocketMode,
181    pub error: E,
182}
183
184impl<E: std::fmt::Display> InternalEvent for SocketReceiveError<E> {
185    fn emit(self) {
186        let mode = self.mode.as_str();
187        error!(
188            message = "Error receiving data.",
189            error = %self.error,
190            error_code = "socket_receive",
191            error_type = error_type::READER_FAILED,
192            stage = error_stage::RECEIVING,
193            %mode,
194            internal_log_rate_limit = true,
195        );
196        counter!(
197            "component_errors_total",
198            "error_code" => "socket_receive",
199            "error_type" => error_type::READER_FAILED,
200            "stage" => error_stage::RECEIVING,
201            "mode" => mode,
202        )
203        .increment(1);
204    }
205}
206
207#[derive(Debug)]
208pub struct SocketSendError<E> {
209    pub mode: SocketMode,
210    pub error: E,
211}
212
213impl<E: std::fmt::Display> InternalEvent for SocketSendError<E> {
214    fn emit(self) {
215        let mode = self.mode.as_str();
216        let reason = "Error sending data.";
217        error!(
218            message = reason,
219            error = %self.error,
220            error_code = "socket_send",
221            error_type = error_type::WRITER_FAILED,
222            stage = error_stage::SENDING,
223            %mode,
224            internal_log_rate_limit = true,
225        );
226        counter!(
227            "component_errors_total",
228            "error_code" => "socket_send",
229            "error_type" => error_type::WRITER_FAILED,
230            "stage" => error_stage::SENDING,
231            "mode" => mode,
232        )
233        .increment(1);
234
235        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
236    }
237}