vector/internal_events/
websocket.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::{
4    error::Error,
5    fmt::{Debug, Display, Formatter, Result},
6};
7
8use metrics::{counter, histogram};
9use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
10use vector_common::{
11    internal_event::{error_stage, error_type},
12    json_size::JsonSize,
13};
14use vector_lib::NamedInternalEvent;
15use vector_lib::internal_event::InternalEvent;
16
17pub const PROTOCOL: &str = "websocket";
18
19#[derive(Debug, NamedInternalEvent)]
20pub struct WebSocketConnectionEstablished;
21
22impl InternalEvent for WebSocketConnectionEstablished {
23    fn emit(self) {
24        debug!(message = "Connected.");
25        counter!("connection_established_total").increment(1);
26    }
27}
28
29#[derive(Debug, NamedInternalEvent)]
30pub struct WebSocketConnectionFailedError {
31    pub error: Box<dyn Error>,
32}
33
34impl InternalEvent for WebSocketConnectionFailedError {
35    fn emit(self) {
36        error!(
37            message = "WebSocket connection failed.",
38            error = %self.error,
39            error_code = "websocket_connection_error",
40            error_type = error_type::CONNECTION_FAILED,
41            stage = error_stage::SENDING,
42        );
43        counter!(
44            "component_errors_total",
45            "protocol" => PROTOCOL,
46            "error_code" => "websocket_connection_failed",
47            "error_type" => error_type::CONNECTION_FAILED,
48            "stage" => error_stage::SENDING,
49        )
50        .increment(1);
51    }
52}
53
54#[derive(Debug, NamedInternalEvent)]
55pub struct WebSocketConnectionShutdown;
56
57impl InternalEvent for WebSocketConnectionShutdown {
58    fn emit(self) {
59        warn!(message = "Closed by the server.");
60        counter!("connection_shutdown_total").increment(1);
61    }
62}
63
64#[derive(Debug, NamedInternalEvent)]
65pub struct WebSocketConnectionError {
66    pub error: tokio_tungstenite::tungstenite::Error,
67}
68
69impl InternalEvent for WebSocketConnectionError {
70    fn emit(self) {
71        error!(
72            message = "WebSocket connection error.",
73            error = %self.error,
74            error_code = "websocket_connection_error",
75            error_type = error_type::WRITER_FAILED,
76            stage = error_stage::SENDING,
77        );
78        counter!(
79            "component_errors_total",
80            "protocol" => PROTOCOL,
81            "error_code" => "websocket_connection_error",
82            "error_type" => error_type::WRITER_FAILED,
83            "stage" => error_stage::SENDING,
84        )
85        .increment(1);
86    }
87}
88
89#[allow(dead_code)]
90#[derive(Debug, Copy, Clone)]
91pub enum WebSocketKind {
92    Ping,
93    Pong,
94    Text,
95    Binary,
96    Close,
97    Frame,
98}
99
100impl Display for WebSocketKind {
101    fn fmt(&self, f: &mut Formatter<'_>) -> Result {
102        write!(f, "{self:?}")
103    }
104}
105
106#[derive(Debug, NamedInternalEvent)]
107pub struct WebSocketBytesReceived<'a> {
108    pub byte_size: usize,
109    pub url: &'a str,
110    pub protocol: &'static str,
111    pub kind: WebSocketKind,
112}
113
114impl InternalEvent for WebSocketBytesReceived<'_> {
115    fn emit(self) {
116        trace!(
117            message = "Bytes received.",
118            byte_size = %self.byte_size,
119            url = %self.url,
120            protocol = %self.protocol,
121            kind = %self.kind
122        );
123        let counter = counter!(
124            "component_received_bytes_total",
125            "url" => self.url.to_string(),
126            "protocol" => self.protocol,
127            "kind" => self.kind.to_string()
128        );
129        counter.increment(self.byte_size as u64);
130    }
131}
132
133#[derive(Debug, NamedInternalEvent)]
134pub struct WebSocketMessageReceived<'a> {
135    pub count: usize,
136    pub byte_size: JsonSize,
137    pub url: &'a str,
138    pub protocol: &'static str,
139    pub kind: WebSocketKind,
140}
141
142impl InternalEvent for WebSocketMessageReceived<'_> {
143    fn emit(self) {
144        trace!(
145            message = "Events received.",
146            count = %self.count,
147            byte_size = %self.byte_size,
148            url =  %self.url,
149            protcol = %self.protocol,
150            kind = %self.kind
151        );
152
153        let histogram = histogram!("component_received_events_count");
154        histogram.record(self.count as f64);
155        let counter = counter!(
156            "component_received_events_total",
157            "uri" => self.url.to_string(),
158            "protocol" => PROTOCOL,
159            "kind" => self.kind.to_string()
160        );
161        counter.increment(self.count as u64);
162        let counter = counter!(
163            "component_received_event_bytes_total",
164            "url" => self.url.to_string(),
165            "protocol" => PROTOCOL,
166            "kind" => self.kind.to_string()
167        );
168        counter.increment(self.byte_size.get() as u64);
169    }
170}
171
172#[derive(Debug, NamedInternalEvent)]
173pub struct WebSocketReceiveError<'a> {
174    pub error: &'a TungsteniteError,
175}
176
177impl InternalEvent for WebSocketReceiveError<'_> {
178    fn emit(self) {
179        error!(
180            message = "Error receiving message from websocket.",
181            error = %self.error,
182            error_code = "websocket_receive_error",
183            error_type = error_type::CONNECTION_FAILED,
184            stage = error_stage::PROCESSING,
185        );
186        counter!(
187            "component_errors_total",
188            "protocol" => PROTOCOL,
189            "error_code" => "websocket_receive_error",
190            "error_type" => error_type::CONNECTION_FAILED,
191            "stage" => error_stage::PROCESSING,
192        )
193        .increment(1);
194    }
195}
196
197#[derive(Debug, NamedInternalEvent)]
198pub struct WebSocketSendError<'a> {
199    pub error: &'a TungsteniteError,
200}
201
202impl InternalEvent for WebSocketSendError<'_> {
203    fn emit(self) {
204        error!(
205            message = "Error sending message to websocket.",
206            error = %self.error,
207            error_code = "websocket_send_error",
208            error_type = error_type::CONNECTION_FAILED,
209            stage = error_stage::PROCESSING,
210        );
211        counter!(
212            "component_errors_total",
213            "protocol" => PROTOCOL,
214            "error_code" => "websocket_send_error",
215            "error_type" => error_type::CONNECTION_FAILED,
216            "stage" => error_stage::PROCESSING,
217        )
218        .increment(1);
219    }
220}