1#![allow(dead_code)] use std::{
4 error::Error,
5 fmt::{Debug, Display, Formatter, Result},
6};
7
8use metrics::{counter, histogram};
9use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
10use vector_common::{
11 internal_event::{error_stage, error_type},
12 json_size::JsonSize,
13};
14use vector_lib::internal_event::InternalEvent;
15
16pub const PROTOCOL: &str = "websocket";
17
18#[derive(Debug)]
19pub struct WebSocketConnectionEstablished;
20
21impl InternalEvent for WebSocketConnectionEstablished {
22 fn emit(self) {
23 debug!(message = "Connected.");
24 counter!("connection_established_total").increment(1);
25 }
26
27 fn name(&self) -> Option<&'static str> {
28 Some("WebSocketConnectionEstablished")
29 }
30}
31
32#[derive(Debug)]
33pub struct WebSocketConnectionFailedError {
34 pub error: Box<dyn Error>,
35}
36
37impl InternalEvent for WebSocketConnectionFailedError {
38 fn emit(self) {
39 error!(
40 message = "WebSocket connection failed.",
41 error = %self.error,
42 error_code = "websocket_connection_error",
43 error_type = error_type::CONNECTION_FAILED,
44 stage = error_stage::SENDING,
45 internal_log_rate_limit = true,
46 );
47 counter!(
48 "component_errors_total",
49 "error_code" => "websocket_connection_failed",
50 "error_type" => error_type::CONNECTION_FAILED,
51 "stage" => error_stage::SENDING,
52 )
53 .increment(1);
54 }
55
56 fn name(&self) -> Option<&'static str> {
57 Some("WebSocketConnectionFailedError")
58 }
59}
60
61#[derive(Debug)]
62pub struct WebSocketConnectionShutdown;
63
64impl InternalEvent for WebSocketConnectionShutdown {
65 fn emit(self) {
66 warn!(message = "Closed by the server.");
67 counter!("connection_shutdown_total").increment(1);
68 }
69
70 fn name(&self) -> Option<&'static str> {
71 Some("WebSocketConnectionShutdown")
72 }
73}
74
75#[derive(Debug)]
76pub struct WebSocketConnectionError {
77 pub error: tokio_tungstenite::tungstenite::Error,
78}
79
80impl InternalEvent for WebSocketConnectionError {
81 fn emit(self) {
82 error!(
83 message = "WebSocket connection error.",
84 error = %self.error,
85 error_code = "websocket_connection_error",
86 error_type = error_type::WRITER_FAILED,
87 stage = error_stage::SENDING,
88 internal_log_rate_limit = true,
89 );
90 counter!(
91 "component_errors_total",
92 "protocol" => PROTOCOL,
93 "error_code" => "websocket_connection_error",
94 "error_type" => error_type::WRITER_FAILED,
95 "stage" => error_stage::SENDING,
96 )
97 .increment(1);
98 }
99
100 fn name(&self) -> Option<&'static str> {
101 Some("WebSocketConnectionError")
102 }
103}
104
105#[allow(dead_code)]
106#[derive(Debug, Copy, Clone)]
107pub enum WebSocketKind {
108 Ping,
109 Pong,
110 Text,
111 Binary,
112 Close,
113 Frame,
114}
115
116impl Display for WebSocketKind {
117 fn fmt(&self, f: &mut Formatter<'_>) -> Result {
118 write!(f, "{self:?}")
119 }
120}
121
122#[derive(Debug)]
123pub struct WebSocketBytesReceived<'a> {
124 pub byte_size: usize,
125 pub url: &'a str,
126 pub protocol: &'static str,
127 pub kind: WebSocketKind,
128}
129
130impl InternalEvent for WebSocketBytesReceived<'_> {
131 fn emit(self) {
132 trace!(
133 message = "Bytes received.",
134 byte_size = %self.byte_size,
135 url = %self.url,
136 protocol = %self.protocol,
137 kind = %self.kind
138 );
139 let counter = counter!(
140 "component_received_bytes_total",
141 "url" => self.url.to_string(),
142 "protocol" => self.protocol,
143 "kind" => self.kind.to_string()
144 );
145 counter.increment(self.byte_size as u64);
146 }
147}
148
149#[derive(Debug)]
150pub struct WebSocketMessageReceived<'a> {
151 pub count: usize,
152 pub byte_size: JsonSize,
153 pub url: &'a str,
154 pub protocol: &'static str,
155 pub kind: WebSocketKind,
156}
157
158impl InternalEvent for WebSocketMessageReceived<'_> {
159 fn emit(self) {
160 trace!(
161 message = "Events received.",
162 count = %self.count,
163 byte_size = %self.byte_size,
164 url = %self.url,
165 protcol = %self.protocol,
166 kind = %self.kind
167 );
168
169 let histogram = histogram!("component_received_events_count");
170 histogram.record(self.count as f64);
171 let counter = counter!(
172 "component_received_events_total",
173 "uri" => self.url.to_string(),
174 "protocol" => PROTOCOL,
175 "kind" => self.kind.to_string()
176 );
177 counter.increment(self.count as u64);
178 let counter = counter!(
179 "component_received_event_bytes_total",
180 "url" => self.url.to_string(),
181 "protocol" => PROTOCOL,
182 "kind" => self.kind.to_string()
183 );
184 counter.increment(self.byte_size.get() as u64);
185 }
186
187 fn name(&self) -> Option<&'static str> {
188 Some("WebSocketMessageReceived")
189 }
190}
191
192#[derive(Debug)]
193pub struct WebSocketReceiveError<'a> {
194 pub error: &'a TungsteniteError,
195}
196
197impl InternalEvent for WebSocketReceiveError<'_> {
198 fn emit(self) {
199 error!(
200 message = "Error receiving message from websocket.",
201 error = %self.error,
202 error_code = "websocket_receive_error",
203 error_type = error_type::CONNECTION_FAILED,
204 stage = error_stage::PROCESSING,
205 internal_log_rate_limit = true,
206 );
207 counter!(
208 "component_errors_total",
209 "protocol" => PROTOCOL,
210 "error_code" => "websocket_receive_error",
211 "error_type" => error_type::CONNECTION_FAILED,
212 "stage" => error_stage::PROCESSING,
213 )
214 .increment(1);
215 }
216
217 fn name(&self) -> Option<&'static str> {
218 Some("WebSocketReceiveError")
219 }
220}
221
222#[derive(Debug)]
223pub struct WebSocketSendError<'a> {
224 pub error: &'a TungsteniteError,
225}
226
227impl InternalEvent for WebSocketSendError<'_> {
228 fn emit(self) {
229 error!(
230 message = "Error sending message to websocket.",
231 error = %self.error,
232 error_code = "websocket_send_error",
233 error_type = error_type::CONNECTION_FAILED,
234 stage = error_stage::PROCESSING,
235 internal_log_rate_limit = true,
236 );
237 counter!(
238 "component_errors_total",
239 "error_code" => "websocket_send_error",
240 "error_type" => error_type::CONNECTION_FAILED,
241 "stage" => error_stage::PROCESSING,
242 )
243 .increment(1);
244 }
245
246 fn name(&self) -> Option<&'static str> {
247 Some("WebSocketSendError")
248 }
249}