vector/internal_events/
websocket.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::{
4    error::Error,
5    fmt::{Debug, Display, Formatter, Result},
6};
7
8use metrics::{counter, histogram};
9use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
10use vector_common::{
11    internal_event::{error_stage, error_type},
12    json_size::JsonSize,
13};
14use vector_lib::internal_event::InternalEvent;
15
16pub const PROTOCOL: &str = "websocket";
17
18#[derive(Debug)]
19pub struct WebSocketConnectionEstablished;
20
21impl InternalEvent for WebSocketConnectionEstablished {
22    fn emit(self) {
23        debug!(message = "Connected.");
24        counter!("connection_established_total").increment(1);
25    }
26
27    fn name(&self) -> Option<&'static str> {
28        Some("WebSocketConnectionEstablished")
29    }
30}
31
32#[derive(Debug)]
33pub struct WebSocketConnectionFailedError {
34    pub error: Box<dyn Error>,
35}
36
37impl InternalEvent for WebSocketConnectionFailedError {
38    fn emit(self) {
39        error!(
40            message = "WebSocket connection failed.",
41            error = %self.error,
42            error_code = "websocket_connection_error",
43            error_type = error_type::CONNECTION_FAILED,
44            stage = error_stage::SENDING,
45            internal_log_rate_limit = true,
46        );
47        counter!(
48            "component_errors_total",
49            "error_code" => "websocket_connection_failed",
50            "error_type" => error_type::CONNECTION_FAILED,
51            "stage" => error_stage::SENDING,
52        )
53        .increment(1);
54    }
55
56    fn name(&self) -> Option<&'static str> {
57        Some("WebSocketConnectionFailedError")
58    }
59}
60
61#[derive(Debug)]
62pub struct WebSocketConnectionShutdown;
63
64impl InternalEvent for WebSocketConnectionShutdown {
65    fn emit(self) {
66        warn!(message = "Closed by the server.");
67        counter!("connection_shutdown_total").increment(1);
68    }
69
70    fn name(&self) -> Option<&'static str> {
71        Some("WebSocketConnectionShutdown")
72    }
73}
74
75#[derive(Debug)]
76pub struct WebSocketConnectionError {
77    pub error: tokio_tungstenite::tungstenite::Error,
78}
79
80impl InternalEvent for WebSocketConnectionError {
81    fn emit(self) {
82        error!(
83            message = "WebSocket connection error.",
84            error = %self.error,
85            error_code = "websocket_connection_error",
86            error_type = error_type::WRITER_FAILED,
87            stage = error_stage::SENDING,
88            internal_log_rate_limit = true,
89        );
90        counter!(
91            "component_errors_total",
92            "protocol" => PROTOCOL,
93            "error_code" => "websocket_connection_error",
94            "error_type" => error_type::WRITER_FAILED,
95            "stage" => error_stage::SENDING,
96        )
97        .increment(1);
98    }
99
100    fn name(&self) -> Option<&'static str> {
101        Some("WebSocketConnectionError")
102    }
103}
104
105#[allow(dead_code)]
106#[derive(Debug, Copy, Clone)]
107pub enum WebSocketKind {
108    Ping,
109    Pong,
110    Text,
111    Binary,
112    Close,
113    Frame,
114}
115
116impl Display for WebSocketKind {
117    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
118        write!(f, "{self:?}")
119    }
120}
121
122#[derive(Debug)]
123pub struct WebSocketBytesReceived<'a> {
124    pub byte_size: usize,
125    pub url: &'a str,
126    pub protocol: &'static str,
127    pub kind: WebSocketKind,
128}
129
130impl InternalEvent for WebSocketBytesReceived<'_> {
131    fn emit(self) {
132        trace!(
133            message = "Bytes received.",
134            byte_size = %self.byte_size,
135            url = %self.url,
136            protocol = %self.protocol,
137            kind = %self.kind
138        );
139        let counter = counter!(
140            "component_received_bytes_total",
141            "url" => self.url.to_string(),
142            "protocol" => self.protocol,
143            "kind" => self.kind.to_string()
144        );
145        counter.increment(self.byte_size as u64);
146    }
147}
148
149#[derive(Debug)]
150pub struct WebSocketMessageReceived<'a> {
151    pub count: usize,
152    pub byte_size: JsonSize,
153    pub url: &'a str,
154    pub protocol: &'static str,
155    pub kind: WebSocketKind,
156}
157
158impl InternalEvent for WebSocketMessageReceived<'_> {
159    fn emit(self) {
160        trace!(
161            message = "Events received.",
162            count = %self.count,
163            byte_size = %self.byte_size,
164            url =  %self.url,
165            protcol = %self.protocol,
166            kind = %self.kind
167        );
168
169        let histogram = histogram!("component_received_events_count");
170        histogram.record(self.count as f64);
171        let counter = counter!(
172            "component_received_events_total",
173            "uri" => self.url.to_string(),
174            "protocol" => PROTOCOL,
175            "kind" => self.kind.to_string()
176        );
177        counter.increment(self.count as u64);
178        let counter = counter!(
179            "component_received_event_bytes_total",
180            "url" => self.url.to_string(),
181            "protocol" => PROTOCOL,
182            "kind" => self.kind.to_string()
183        );
184        counter.increment(self.byte_size.get() as u64);
185    }
186
187    fn name(&self) -> Option<&'static str> {
188        Some("WebSocketMessageReceived")
189    }
190}
191
192#[derive(Debug)]
193pub struct WebSocketReceiveError<'a> {
194    pub error: &'a TungsteniteError,
195}
196
197impl InternalEvent for WebSocketReceiveError<'_> {
198    fn emit(self) {
199        error!(
200            message = "Error receiving message from websocket.",
201            error = %self.error,
202            error_code = "websocket_receive_error",
203            error_type = error_type::CONNECTION_FAILED,
204            stage = error_stage::PROCESSING,
205            internal_log_rate_limit = true,
206        );
207        counter!(
208            "component_errors_total",
209            "protocol" => PROTOCOL,
210            "error_code" => "websocket_receive_error",
211            "error_type" => error_type::CONNECTION_FAILED,
212            "stage" => error_stage::PROCESSING,
213        )
214        .increment(1);
215    }
216
217    fn name(&self) -> Option<&'static str> {
218        Some("WebSocketReceiveError")
219    }
220}
221
222#[derive(Debug)]
223pub struct WebSocketSendError<'a> {
224    pub error: &'a TungsteniteError,
225}
226
227impl InternalEvent for WebSocketSendError<'_> {
228    fn emit(self) {
229        error!(
230            message = "Error sending message to websocket.",
231            error = %self.error,
232            error_code = "websocket_send_error",
233            error_type = error_type::CONNECTION_FAILED,
234            stage = error_stage::PROCESSING,
235            internal_log_rate_limit = true,
236        );
237        counter!(
238            "component_errors_total",
239            "error_code" => "websocket_send_error",
240            "error_type" => error_type::CONNECTION_FAILED,
241            "stage" => error_stage::PROCESSING,
242        )
243        .increment(1);
244    }
245
246    fn name(&self) -> Option<&'static str> {
247        Some("WebSocketSendError")
248    }
249}