1#![allow(dead_code)] use std::{
4 error::Error,
5 fmt::{Debug, Display, Formatter, Result},
6};
7
8use metrics::{counter, histogram};
9use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
10use vector_common::{
11 internal_event::{error_stage, error_type},
12 json_size::JsonSize,
13};
14use vector_lib::internal_event::InternalEvent;
15
16pub const PROTOCOL: &str = "websocket";
17
18#[derive(Debug)]
19pub struct WebSocketConnectionEstablished;
20
21impl InternalEvent for WebSocketConnectionEstablished {
22 fn emit(self) {
23 debug!(message = "Connected.");
24 counter!("connection_established_total").increment(1);
25 }
26
27 fn name(&self) -> Option<&'static str> {
28 Some("WebSocketConnectionEstablished")
29 }
30}
31
32#[derive(Debug)]
33pub struct WebSocketConnectionFailedError {
34 pub error: Box<dyn Error>,
35}
36
37impl InternalEvent for WebSocketConnectionFailedError {
38 fn emit(self) {
39 error!(
40 message = "WebSocket connection failed.",
41 error = %self.error,
42 error_code = "websocket_connection_error",
43 error_type = error_type::CONNECTION_FAILED,
44 stage = error_stage::SENDING,
45 );
46 counter!(
47 "component_errors_total",
48 "error_code" => "websocket_connection_failed",
49 "error_type" => error_type::CONNECTION_FAILED,
50 "stage" => error_stage::SENDING,
51 )
52 .increment(1);
53 }
54
55 fn name(&self) -> Option<&'static str> {
56 Some("WebSocketConnectionFailedError")
57 }
58}
59
60#[derive(Debug)]
61pub struct WebSocketConnectionShutdown;
62
63impl InternalEvent for WebSocketConnectionShutdown {
64 fn emit(self) {
65 warn!(message = "Closed by the server.");
66 counter!("connection_shutdown_total").increment(1);
67 }
68
69 fn name(&self) -> Option<&'static str> {
70 Some("WebSocketConnectionShutdown")
71 }
72}
73
74#[derive(Debug)]
75pub struct WebSocketConnectionError {
76 pub error: tokio_tungstenite::tungstenite::Error,
77}
78
79impl InternalEvent for WebSocketConnectionError {
80 fn emit(self) {
81 error!(
82 message = "WebSocket connection error.",
83 error = %self.error,
84 error_code = "websocket_connection_error",
85 error_type = error_type::WRITER_FAILED,
86 stage = error_stage::SENDING,
87 );
88 counter!(
89 "component_errors_total",
90 "protocol" => PROTOCOL,
91 "error_code" => "websocket_connection_error",
92 "error_type" => error_type::WRITER_FAILED,
93 "stage" => error_stage::SENDING,
94 )
95 .increment(1);
96 }
97
98 fn name(&self) -> Option<&'static str> {
99 Some("WebSocketConnectionError")
100 }
101}
102
103#[allow(dead_code)]
104#[derive(Debug, Copy, Clone)]
105pub enum WebSocketKind {
106 Ping,
107 Pong,
108 Text,
109 Binary,
110 Close,
111 Frame,
112}
113
114impl Display for WebSocketKind {
115 fn fmt(&self, f: &mut Formatter<'_>) -> Result {
116 write!(f, "{self:?}")
117 }
118}
119
120#[derive(Debug)]
121pub struct WebSocketBytesReceived<'a> {
122 pub byte_size: usize,
123 pub url: &'a str,
124 pub protocol: &'static str,
125 pub kind: WebSocketKind,
126}
127
128impl InternalEvent for WebSocketBytesReceived<'_> {
129 fn emit(self) {
130 trace!(
131 message = "Bytes received.",
132 byte_size = %self.byte_size,
133 url = %self.url,
134 protocol = %self.protocol,
135 kind = %self.kind
136 );
137 let counter = counter!(
138 "component_received_bytes_total",
139 "url" => self.url.to_string(),
140 "protocol" => self.protocol,
141 "kind" => self.kind.to_string()
142 );
143 counter.increment(self.byte_size as u64);
144 }
145}
146
147#[derive(Debug)]
148pub struct WebSocketMessageReceived<'a> {
149 pub count: usize,
150 pub byte_size: JsonSize,
151 pub url: &'a str,
152 pub protocol: &'static str,
153 pub kind: WebSocketKind,
154}
155
156impl InternalEvent for WebSocketMessageReceived<'_> {
157 fn emit(self) {
158 trace!(
159 message = "Events received.",
160 count = %self.count,
161 byte_size = %self.byte_size,
162 url = %self.url,
163 protcol = %self.protocol,
164 kind = %self.kind
165 );
166
167 let histogram = histogram!("component_received_events_count");
168 histogram.record(self.count as f64);
169 let counter = counter!(
170 "component_received_events_total",
171 "uri" => self.url.to_string(),
172 "protocol" => PROTOCOL,
173 "kind" => self.kind.to_string()
174 );
175 counter.increment(self.count as u64);
176 let counter = counter!(
177 "component_received_event_bytes_total",
178 "url" => self.url.to_string(),
179 "protocol" => PROTOCOL,
180 "kind" => self.kind.to_string()
181 );
182 counter.increment(self.byte_size.get() as u64);
183 }
184
185 fn name(&self) -> Option<&'static str> {
186 Some("WebSocketMessageReceived")
187 }
188}
189
190#[derive(Debug)]
191pub struct WebSocketReceiveError<'a> {
192 pub error: &'a TungsteniteError,
193}
194
195impl InternalEvent for WebSocketReceiveError<'_> {
196 fn emit(self) {
197 error!(
198 message = "Error receiving message from websocket.",
199 error = %self.error,
200 error_code = "websocket_receive_error",
201 error_type = error_type::CONNECTION_FAILED,
202 stage = error_stage::PROCESSING,
203 );
204 counter!(
205 "component_errors_total",
206 "protocol" => PROTOCOL,
207 "error_code" => "websocket_receive_error",
208 "error_type" => error_type::CONNECTION_FAILED,
209 "stage" => error_stage::PROCESSING,
210 )
211 .increment(1);
212 }
213
214 fn name(&self) -> Option<&'static str> {
215 Some("WebSocketReceiveError")
216 }
217}
218
219#[derive(Debug)]
220pub struct WebSocketSendError<'a> {
221 pub error: &'a TungsteniteError,
222}
223
224impl InternalEvent for WebSocketSendError<'_> {
225 fn emit(self) {
226 error!(
227 message = "Error sending message to websocket.",
228 error = %self.error,
229 error_code = "websocket_send_error",
230 error_type = error_type::CONNECTION_FAILED,
231 stage = error_stage::PROCESSING,
232 );
233 counter!(
234 "component_errors_total",
235 "error_code" => "websocket_send_error",
236 "error_type" => error_type::CONNECTION_FAILED,
237 "stage" => error_stage::PROCESSING,
238 )
239 .increment(1);
240 }
241
242 fn name(&self) -> Option<&'static str> {
243 Some("WebSocketSendError")
244 }
245}