vector/internal_events/
websocket.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::{
4    error::Error,
5    fmt::{Debug, Display, Formatter, Result},
6};
7
8use metrics::{counter, histogram};
9use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
10use vector_common::{
11    internal_event::{error_stage, error_type},
12    json_size::JsonSize,
13};
14use vector_lib::internal_event::InternalEvent;
15
16pub const PROTOCOL: &str = "websocket";
17
18#[derive(Debug)]
19pub struct WebSocketConnectionEstablished;
20
21impl InternalEvent for WebSocketConnectionEstablished {
22    fn emit(self) {
23        debug!(message = "Connected.");
24        counter!("connection_established_total").increment(1);
25    }
26
27    fn name(&self) -> Option<&'static str> {
28        Some("WebSocketConnectionEstablished")
29    }
30}
31
32#[derive(Debug)]
33pub struct WebSocketConnectionFailedError {
34    pub error: Box<dyn Error>,
35}
36
37impl InternalEvent for WebSocketConnectionFailedError {
38    fn emit(self) {
39        error!(
40            message = "WebSocket connection failed.",
41            error = %self.error,
42            error_code = "websocket_connection_error",
43            error_type = error_type::CONNECTION_FAILED,
44            stage = error_stage::SENDING,
45        );
46        counter!(
47            "component_errors_total",
48            "error_code" => "websocket_connection_failed",
49            "error_type" => error_type::CONNECTION_FAILED,
50            "stage" => error_stage::SENDING,
51        )
52        .increment(1);
53    }
54
55    fn name(&self) -> Option<&'static str> {
56        Some("WebSocketConnectionFailedError")
57    }
58}
59
60#[derive(Debug)]
61pub struct WebSocketConnectionShutdown;
62
63impl InternalEvent for WebSocketConnectionShutdown {
64    fn emit(self) {
65        warn!(message = "Closed by the server.");
66        counter!("connection_shutdown_total").increment(1);
67    }
68
69    fn name(&self) -> Option<&'static str> {
70        Some("WebSocketConnectionShutdown")
71    }
72}
73
74#[derive(Debug)]
75pub struct WebSocketConnectionError {
76    pub error: tokio_tungstenite::tungstenite::Error,
77}
78
79impl InternalEvent for WebSocketConnectionError {
80    fn emit(self) {
81        error!(
82            message = "WebSocket connection error.",
83            error = %self.error,
84            error_code = "websocket_connection_error",
85            error_type = error_type::WRITER_FAILED,
86            stage = error_stage::SENDING,
87        );
88        counter!(
89            "component_errors_total",
90            "protocol" => PROTOCOL,
91            "error_code" => "websocket_connection_error",
92            "error_type" => error_type::WRITER_FAILED,
93            "stage" => error_stage::SENDING,
94        )
95        .increment(1);
96    }
97
98    fn name(&self) -> Option<&'static str> {
99        Some("WebSocketConnectionError")
100    }
101}
102
103#[allow(dead_code)]
104#[derive(Debug, Copy, Clone)]
105pub enum WebSocketKind {
106    Ping,
107    Pong,
108    Text,
109    Binary,
110    Close,
111    Frame,
112}
113
114impl Display for WebSocketKind {
115    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
116        write!(f, "{self:?}")
117    }
118}
119
120#[derive(Debug)]
121pub struct WebSocketBytesReceived<'a> {
122    pub byte_size: usize,
123    pub url: &'a str,
124    pub protocol: &'static str,
125    pub kind: WebSocketKind,
126}
127
128impl InternalEvent for WebSocketBytesReceived<'_> {
129    fn emit(self) {
130        trace!(
131            message = "Bytes received.",
132            byte_size = %self.byte_size,
133            url = %self.url,
134            protocol = %self.protocol,
135            kind = %self.kind
136        );
137        let counter = counter!(
138            "component_received_bytes_total",
139            "url" => self.url.to_string(),
140            "protocol" => self.protocol,
141            "kind" => self.kind.to_string()
142        );
143        counter.increment(self.byte_size as u64);
144    }
145}
146
147#[derive(Debug)]
148pub struct WebSocketMessageReceived<'a> {
149    pub count: usize,
150    pub byte_size: JsonSize,
151    pub url: &'a str,
152    pub protocol: &'static str,
153    pub kind: WebSocketKind,
154}
155
156impl InternalEvent for WebSocketMessageReceived<'_> {
157    fn emit(self) {
158        trace!(
159            message = "Events received.",
160            count = %self.count,
161            byte_size = %self.byte_size,
162            url =  %self.url,
163            protcol = %self.protocol,
164            kind = %self.kind
165        );
166
167        let histogram = histogram!("component_received_events_count");
168        histogram.record(self.count as f64);
169        let counter = counter!(
170            "component_received_events_total",
171            "uri" => self.url.to_string(),
172            "protocol" => PROTOCOL,
173            "kind" => self.kind.to_string()
174        );
175        counter.increment(self.count as u64);
176        let counter = counter!(
177            "component_received_event_bytes_total",
178            "url" => self.url.to_string(),
179            "protocol" => PROTOCOL,
180            "kind" => self.kind.to_string()
181        );
182        counter.increment(self.byte_size.get() as u64);
183    }
184
185    fn name(&self) -> Option<&'static str> {
186        Some("WebSocketMessageReceived")
187    }
188}
189
190#[derive(Debug)]
191pub struct WebSocketReceiveError<'a> {
192    pub error: &'a TungsteniteError,
193}
194
195impl InternalEvent for WebSocketReceiveError<'_> {
196    fn emit(self) {
197        error!(
198            message = "Error receiving message from websocket.",
199            error = %self.error,
200            error_code = "websocket_receive_error",
201            error_type = error_type::CONNECTION_FAILED,
202            stage = error_stage::PROCESSING,
203        );
204        counter!(
205            "component_errors_total",
206            "protocol" => PROTOCOL,
207            "error_code" => "websocket_receive_error",
208            "error_type" => error_type::CONNECTION_FAILED,
209            "stage" => error_stage::PROCESSING,
210        )
211        .increment(1);
212    }
213
214    fn name(&self) -> Option<&'static str> {
215        Some("WebSocketReceiveError")
216    }
217}
218
219#[derive(Debug)]
220pub struct WebSocketSendError<'a> {
221    pub error: &'a TungsteniteError,
222}
223
224impl InternalEvent for WebSocketSendError<'_> {
225    fn emit(self) {
226        error!(
227            message = "Error sending message to websocket.",
228            error = %self.error,
229            error_code = "websocket_send_error",
230            error_type = error_type::CONNECTION_FAILED,
231            stage = error_stage::PROCESSING,
232        );
233        counter!(
234            "component_errors_total",
235            "error_code" => "websocket_send_error",
236            "error_type" => error_type::CONNECTION_FAILED,
237            "stage" => error_stage::PROCESSING,
238        )
239        .increment(1);
240    }
241
242    fn name(&self) -> Option<&'static str> {
243        Some("WebSocketSendError")
244    }
245}