vector/internal_events/
websocket.rs

1use std::error::Error;
2use std::fmt::{Debug, Display, Formatter, Result};
3
4use metrics::{counter, histogram};
5use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
6use vector_lib::internal_event::InternalEvent;
7
8use vector_common::{
9    internal_event::{error_stage, error_type},
10    json_size::JsonSize,
11};
12
13pub const PROTOCOL: &str = "websocket";
14
15#[derive(Debug)]
16pub struct WebSocketConnectionEstablished;
17
18impl InternalEvent for WebSocketConnectionEstablished {
19    fn emit(self) {
20        debug!(message = "Connected.");
21        counter!("connection_established_total").increment(1);
22    }
23
24    fn name(&self) -> Option<&'static str> {
25        Some("WebSocketConnectionEstablished")
26    }
27}
28
29#[derive(Debug)]
30pub struct WebSocketConnectionFailedError {
31    pub error: Box<dyn Error>,
32}
33
34impl InternalEvent for WebSocketConnectionFailedError {
35    fn emit(self) {
36        error!(
37            message = "WebSocket connection failed.",
38            error = %self.error,
39            error_code = "websocket_connection_error",
40            error_type = error_type::CONNECTION_FAILED,
41            stage = error_stage::SENDING,
42            internal_log_rate_limit = true,
43        );
44        counter!(
45            "component_errors_total",
46            "error_code" => "websocket_connection_failed",
47            "error_type" => error_type::CONNECTION_FAILED,
48            "stage" => error_stage::SENDING,
49        )
50        .increment(1);
51    }
52
53    fn name(&self) -> Option<&'static str> {
54        Some("WebSocketConnectionFailedError")
55    }
56}
57
58#[derive(Debug)]
59pub struct WebSocketConnectionShutdown;
60
61impl InternalEvent for WebSocketConnectionShutdown {
62    fn emit(self) {
63        warn!(message = "Closed by the server.");
64        counter!("connection_shutdown_total").increment(1);
65    }
66
67    fn name(&self) -> Option<&'static str> {
68        Some("WebSocketConnectionShutdown")
69    }
70}
71
72#[derive(Debug)]
73pub struct WebSocketConnectionError {
74    pub error: tokio_tungstenite::tungstenite::Error,
75}
76
77impl InternalEvent for WebSocketConnectionError {
78    fn emit(self) {
79        error!(
80            message = "WebSocket connection error.",
81            error = %self.error,
82            error_code = "websocket_connection_error",
83            error_type = error_type::WRITER_FAILED,
84            stage = error_stage::SENDING,
85            internal_log_rate_limit = true,
86        );
87        counter!(
88            "component_errors_total",
89            "protocol" => PROTOCOL,
90            "error_code" => "websocket_connection_error",
91            "error_type" => error_type::WRITER_FAILED,
92            "stage" => error_stage::SENDING,
93        )
94        .increment(1);
95    }
96
97    fn name(&self) -> Option<&'static str> {
98        Some("WebSocketConnectionError")
99    }
100}
101
102#[allow(dead_code)]
103#[derive(Debug, Copy, Clone)]
104pub enum WebSocketKind {
105    Ping,
106    Pong,
107    Text,
108    Binary,
109    Close,
110    Frame,
111}
112
113impl Display for WebSocketKind {
114    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
115        write!(f, "{self:?}")
116    }
117}
118
119#[derive(Debug)]
120pub struct WebSocketBytesReceived<'a> {
121    pub byte_size: usize,
122    pub url: &'a str,
123    pub protocol: &'static str,
124    pub kind: WebSocketKind,
125}
126
127impl InternalEvent for WebSocketBytesReceived<'_> {
128    fn emit(self) {
129        trace!(
130            message = "Bytes received.",
131            byte_size = %self.byte_size,
132            url = %self.url,
133            protocol = %self.protocol,
134            kind = %self.kind
135        );
136        let counter = counter!(
137            "component_received_bytes_total",
138            "url" => self.url.to_string(),
139            "protocol" => self.protocol,
140            "kind" => self.kind.to_string()
141        );
142        counter.increment(self.byte_size as u64);
143    }
144}
145
146#[derive(Debug)]
147pub struct WebSocketMessageReceived<'a> {
148    pub count: usize,
149    pub byte_size: JsonSize,
150    pub url: &'a str,
151    pub protocol: &'static str,
152    pub kind: WebSocketKind,
153}
154
155impl InternalEvent for WebSocketMessageReceived<'_> {
156    fn emit(self) {
157        trace!(
158            message = "Events received.",
159            count = %self.count,
160            byte_size = %self.byte_size,
161            url =  %self.url,
162            protcol = %self.protocol,
163            kind = %self.kind
164        );
165
166        let histogram = histogram!("component_received_events_count");
167        histogram.record(self.count as f64);
168        let counter = counter!(
169            "component_received_events_total",
170            "uri" => self.url.to_string(),
171            "protocol" => PROTOCOL,
172            "kind" => self.kind.to_string()
173        );
174        counter.increment(self.count as u64);
175        let counter = counter!(
176            "component_received_event_bytes_total",
177            "url" => self.url.to_string(),
178            "protocol" => PROTOCOL,
179            "kind" => self.kind.to_string()
180        );
181        counter.increment(self.byte_size.get() as u64);
182    }
183
184    fn name(&self) -> Option<&'static str> {
185        Some("WebSocketMessageReceived")
186    }
187}
188
189#[derive(Debug)]
190pub struct WebSocketReceiveError<'a> {
191    pub error: &'a TungsteniteError,
192}
193
194impl InternalEvent for WebSocketReceiveError<'_> {
195    fn emit(self) {
196        error!(
197            message = "Error receiving message from websocket.",
198            error = %self.error,
199            error_code = "websocket_receive_error",
200            error_type = error_type::CONNECTION_FAILED,
201            stage = error_stage::PROCESSING,
202            internal_log_rate_limit = true,
203        );
204        counter!(
205            "component_errors_total",
206            "protocol" => PROTOCOL,
207            "error_code" => "websocket_receive_error",
208            "error_type" => error_type::CONNECTION_FAILED,
209            "stage" => error_stage::PROCESSING,
210        )
211        .increment(1);
212    }
213
214    fn name(&self) -> Option<&'static str> {
215        Some("WebSocketReceiveError")
216    }
217}
218
219#[derive(Debug)]
220pub struct WebSocketSendError<'a> {
221    pub error: &'a TungsteniteError,
222}
223
224impl InternalEvent for WebSocketSendError<'_> {
225    fn emit(self) {
226        error!(
227            message = "Error sending message to websocket.",
228            error = %self.error,
229            error_code = "websocket_send_error",
230            error_type = error_type::CONNECTION_FAILED,
231            stage = error_stage::PROCESSING,
232            internal_log_rate_limit = true,
233        );
234        counter!(
235            "component_errors_total",
236            "error_code" => "websocket_send_error",
237            "error_type" => error_type::CONNECTION_FAILED,
238            "stage" => error_stage::PROCESSING,
239        )
240        .increment(1);
241    }
242
243    fn name(&self) -> Option<&'static str> {
244        Some("WebSocketSendError")
245    }
246}