1use std::error::Error;
2use std::fmt::{Debug, Display, Formatter, Result};
3
4use metrics::{counter, histogram};
5use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
6use vector_lib::internal_event::InternalEvent;
7
8use vector_common::{
9 internal_event::{error_stage, error_type},
10 json_size::JsonSize,
11};
12
13pub const PROTOCOL: &str = "websocket";
14
15#[derive(Debug)]
16pub struct WebSocketConnectionEstablished;
17
18impl InternalEvent for WebSocketConnectionEstablished {
19 fn emit(self) {
20 debug!(message = "Connected.");
21 counter!("connection_established_total").increment(1);
22 }
23
24 fn name(&self) -> Option<&'static str> {
25 Some("WebSocketConnectionEstablished")
26 }
27}
28
29#[derive(Debug)]
30pub struct WebSocketConnectionFailedError {
31 pub error: Box<dyn Error>,
32}
33
34impl InternalEvent for WebSocketConnectionFailedError {
35 fn emit(self) {
36 error!(
37 message = "WebSocket connection failed.",
38 error = %self.error,
39 error_code = "websocket_connection_error",
40 error_type = error_type::CONNECTION_FAILED,
41 stage = error_stage::SENDING,
42 internal_log_rate_limit = true,
43 );
44 counter!(
45 "component_errors_total",
46 "error_code" => "websocket_connection_failed",
47 "error_type" => error_type::CONNECTION_FAILED,
48 "stage" => error_stage::SENDING,
49 )
50 .increment(1);
51 }
52
53 fn name(&self) -> Option<&'static str> {
54 Some("WebSocketConnectionFailedError")
55 }
56}
57
58#[derive(Debug)]
59pub struct WebSocketConnectionShutdown;
60
61impl InternalEvent for WebSocketConnectionShutdown {
62 fn emit(self) {
63 warn!(message = "Closed by the server.");
64 counter!("connection_shutdown_total").increment(1);
65 }
66
67 fn name(&self) -> Option<&'static str> {
68 Some("WebSocketConnectionShutdown")
69 }
70}
71
72#[derive(Debug)]
73pub struct WebSocketConnectionError {
74 pub error: tokio_tungstenite::tungstenite::Error,
75}
76
77impl InternalEvent for WebSocketConnectionError {
78 fn emit(self) {
79 error!(
80 message = "WebSocket connection error.",
81 error = %self.error,
82 error_code = "websocket_connection_error",
83 error_type = error_type::WRITER_FAILED,
84 stage = error_stage::SENDING,
85 internal_log_rate_limit = true,
86 );
87 counter!(
88 "component_errors_total",
89 "protocol" => PROTOCOL,
90 "error_code" => "websocket_connection_error",
91 "error_type" => error_type::WRITER_FAILED,
92 "stage" => error_stage::SENDING,
93 )
94 .increment(1);
95 }
96
97 fn name(&self) -> Option<&'static str> {
98 Some("WebSocketConnectionError")
99 }
100}
101
102#[allow(dead_code)]
103#[derive(Debug, Copy, Clone)]
104pub enum WebSocketKind {
105 Ping,
106 Pong,
107 Text,
108 Binary,
109 Close,
110 Frame,
111}
112
113impl Display for WebSocketKind {
114 fn fmt(&self, f: &mut Formatter<'_>) -> Result {
115 write!(f, "{self:?}")
116 }
117}
118
119#[derive(Debug)]
120pub struct WebSocketBytesReceived<'a> {
121 pub byte_size: usize,
122 pub url: &'a str,
123 pub protocol: &'static str,
124 pub kind: WebSocketKind,
125}
126
127impl InternalEvent for WebSocketBytesReceived<'_> {
128 fn emit(self) {
129 trace!(
130 message = "Bytes received.",
131 byte_size = %self.byte_size,
132 url = %self.url,
133 protocol = %self.protocol,
134 kind = %self.kind
135 );
136 let counter = counter!(
137 "component_received_bytes_total",
138 "url" => self.url.to_string(),
139 "protocol" => self.protocol,
140 "kind" => self.kind.to_string()
141 );
142 counter.increment(self.byte_size as u64);
143 }
144}
145
146#[derive(Debug)]
147pub struct WebSocketMessageReceived<'a> {
148 pub count: usize,
149 pub byte_size: JsonSize,
150 pub url: &'a str,
151 pub protocol: &'static str,
152 pub kind: WebSocketKind,
153}
154
155impl InternalEvent for WebSocketMessageReceived<'_> {
156 fn emit(self) {
157 trace!(
158 message = "Events received.",
159 count = %self.count,
160 byte_size = %self.byte_size,
161 url = %self.url,
162 protcol = %self.protocol,
163 kind = %self.kind
164 );
165
166 let histogram = histogram!("component_received_events_count");
167 histogram.record(self.count as f64);
168 let counter = counter!(
169 "component_received_events_total",
170 "uri" => self.url.to_string(),
171 "protocol" => PROTOCOL,
172 "kind" => self.kind.to_string()
173 );
174 counter.increment(self.count as u64);
175 let counter = counter!(
176 "component_received_event_bytes_total",
177 "url" => self.url.to_string(),
178 "protocol" => PROTOCOL,
179 "kind" => self.kind.to_string()
180 );
181 counter.increment(self.byte_size.get() as u64);
182 }
183
184 fn name(&self) -> Option<&'static str> {
185 Some("WebSocketMessageReceived")
186 }
187}
188
189#[derive(Debug)]
190pub struct WebSocketReceiveError<'a> {
191 pub error: &'a TungsteniteError,
192}
193
194impl InternalEvent for WebSocketReceiveError<'_> {
195 fn emit(self) {
196 error!(
197 message = "Error receiving message from websocket.",
198 error = %self.error,
199 error_code = "websocket_receive_error",
200 error_type = error_type::CONNECTION_FAILED,
201 stage = error_stage::PROCESSING,
202 internal_log_rate_limit = true,
203 );
204 counter!(
205 "component_errors_total",
206 "protocol" => PROTOCOL,
207 "error_code" => "websocket_receive_error",
208 "error_type" => error_type::CONNECTION_FAILED,
209 "stage" => error_stage::PROCESSING,
210 )
211 .increment(1);
212 }
213
214 fn name(&self) -> Option<&'static str> {
215 Some("WebSocketReceiveError")
216 }
217}
218
219#[derive(Debug)]
220pub struct WebSocketSendError<'a> {
221 pub error: &'a TungsteniteError,
222}
223
224impl InternalEvent for WebSocketSendError<'_> {
225 fn emit(self) {
226 error!(
227 message = "Error sending message to websocket.",
228 error = %self.error,
229 error_code = "websocket_send_error",
230 error_type = error_type::CONNECTION_FAILED,
231 stage = error_stage::PROCESSING,
232 internal_log_rate_limit = true,
233 );
234 counter!(
235 "component_errors_total",
236 "error_code" => "websocket_send_error",
237 "error_type" => error_type::CONNECTION_FAILED,
238 "stage" => error_stage::PROCESSING,
239 )
240 .increment(1);
241 }
242
243 fn name(&self) -> Option<&'static str> {
244 Some("WebSocketSendError")
245 }
246}