1#![allow(dead_code)] use std::{
4 error::Error,
5 fmt::{Debug, Display, Formatter, Result},
6};
7
8use metrics::{counter, histogram};
9use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
10use vector_common::{
11 internal_event::{error_stage, error_type},
12 json_size::JsonSize,
13};
14use vector_lib::NamedInternalEvent;
15use vector_lib::internal_event::InternalEvent;
16
17pub const PROTOCOL: &str = "websocket";
18
19#[derive(Debug, NamedInternalEvent)]
20pub struct WebSocketConnectionEstablished;
21
22impl InternalEvent for WebSocketConnectionEstablished {
23 fn emit(self) {
24 debug!(message = "Connected.");
25 counter!("connection_established_total").increment(1);
26 }
27}
28
29#[derive(Debug, NamedInternalEvent)]
30pub struct WebSocketConnectionFailedError {
31 pub error: Box<dyn Error>,
32}
33
34impl InternalEvent for WebSocketConnectionFailedError {
35 fn emit(self) {
36 error!(
37 message = "WebSocket connection failed.",
38 error = %self.error,
39 error_code = "websocket_connection_error",
40 error_type = error_type::CONNECTION_FAILED,
41 stage = error_stage::SENDING,
42 );
43 counter!(
44 "component_errors_total",
45 "protocol" => PROTOCOL,
46 "error_code" => "websocket_connection_failed",
47 "error_type" => error_type::CONNECTION_FAILED,
48 "stage" => error_stage::SENDING,
49 )
50 .increment(1);
51 }
52}
53
54#[derive(Debug, NamedInternalEvent)]
55pub struct WebSocketConnectionShutdown;
56
57impl InternalEvent for WebSocketConnectionShutdown {
58 fn emit(self) {
59 warn!(message = "Closed by the server.");
60 counter!("connection_shutdown_total").increment(1);
61 }
62}
63
64#[derive(Debug, NamedInternalEvent)]
65pub struct WebSocketConnectionError {
66 pub error: tokio_tungstenite::tungstenite::Error,
67}
68
69impl InternalEvent for WebSocketConnectionError {
70 fn emit(self) {
71 error!(
72 message = "WebSocket connection error.",
73 error = %self.error,
74 error_code = "websocket_connection_error",
75 error_type = error_type::WRITER_FAILED,
76 stage = error_stage::SENDING,
77 );
78 counter!(
79 "component_errors_total",
80 "protocol" => PROTOCOL,
81 "error_code" => "websocket_connection_error",
82 "error_type" => error_type::WRITER_FAILED,
83 "stage" => error_stage::SENDING,
84 )
85 .increment(1);
86 }
87}
88
89#[allow(dead_code)]
90#[derive(Debug, Copy, Clone)]
91pub enum WebSocketKind {
92 Ping,
93 Pong,
94 Text,
95 Binary,
96 Close,
97 Frame,
98}
99
100impl Display for WebSocketKind {
101 fn fmt(&self, f: &mut Formatter<'_>) -> Result {
102 write!(f, "{self:?}")
103 }
104}
105
106#[derive(Debug, NamedInternalEvent)]
107pub struct WebSocketBytesReceived<'a> {
108 pub byte_size: usize,
109 pub url: &'a str,
110 pub protocol: &'static str,
111 pub kind: WebSocketKind,
112}
113
114impl InternalEvent for WebSocketBytesReceived<'_> {
115 fn emit(self) {
116 trace!(
117 message = "Bytes received.",
118 byte_size = %self.byte_size,
119 url = %self.url,
120 protocol = %self.protocol,
121 kind = %self.kind
122 );
123 let counter = counter!(
124 "component_received_bytes_total",
125 "url" => self.url.to_string(),
126 "protocol" => self.protocol,
127 "kind" => self.kind.to_string()
128 );
129 counter.increment(self.byte_size as u64);
130 }
131}
132
133#[derive(Debug, NamedInternalEvent)]
134pub struct WebSocketMessageReceived<'a> {
135 pub count: usize,
136 pub byte_size: JsonSize,
137 pub url: &'a str,
138 pub protocol: &'static str,
139 pub kind: WebSocketKind,
140}
141
142impl InternalEvent for WebSocketMessageReceived<'_> {
143 fn emit(self) {
144 trace!(
145 message = "Events received.",
146 count = %self.count,
147 byte_size = %self.byte_size,
148 url = %self.url,
149 protcol = %self.protocol,
150 kind = %self.kind
151 );
152
153 let histogram = histogram!("component_received_events_count");
154 histogram.record(self.count as f64);
155 let counter = counter!(
156 "component_received_events_total",
157 "uri" => self.url.to_string(),
158 "protocol" => PROTOCOL,
159 "kind" => self.kind.to_string()
160 );
161 counter.increment(self.count as u64);
162 let counter = counter!(
163 "component_received_event_bytes_total",
164 "url" => self.url.to_string(),
165 "protocol" => PROTOCOL,
166 "kind" => self.kind.to_string()
167 );
168 counter.increment(self.byte_size.get() as u64);
169 }
170}
171
172#[derive(Debug, NamedInternalEvent)]
173pub struct WebSocketReceiveError<'a> {
174 pub error: &'a TungsteniteError,
175}
176
177impl InternalEvent for WebSocketReceiveError<'_> {
178 fn emit(self) {
179 error!(
180 message = "Error receiving message from websocket.",
181 error = %self.error,
182 error_code = "websocket_receive_error",
183 error_type = error_type::CONNECTION_FAILED,
184 stage = error_stage::PROCESSING,
185 );
186 counter!(
187 "component_errors_total",
188 "protocol" => PROTOCOL,
189 "error_code" => "websocket_receive_error",
190 "error_type" => error_type::CONNECTION_FAILED,
191 "stage" => error_stage::PROCESSING,
192 )
193 .increment(1);
194 }
195}
196
197#[derive(Debug, NamedInternalEvent)]
198pub struct WebSocketSendError<'a> {
199 pub error: &'a TungsteniteError,
200}
201
202impl InternalEvent for WebSocketSendError<'_> {
203 fn emit(self) {
204 error!(
205 message = "Error sending message to websocket.",
206 error = %self.error,
207 error_code = "websocket_send_error",
208 error_type = error_type::CONNECTION_FAILED,
209 stage = error_stage::PROCESSING,
210 );
211 counter!(
212 "component_errors_total",
213 "protocol" => PROTOCOL,
214 "error_code" => "websocket_send_error",
215 "error_type" => error_type::CONNECTION_FAILED,
216 "stage" => error_stage::PROCESSING,
217 )
218 .increment(1);
219 }
220}