vector/sources/websocket/
config.rs1use futures::TryFutureExt;
2use serde_with::serde_as;
3use snafu::ResultExt;
4use std::time::Duration;
5use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
6
7use super::source::{WebSocketSource, WebSocketSourceParams};
8use crate::common::websocket::WebSocketCommonConfig;
9use crate::{
10 codecs::DecodingConfig,
11 common::websocket::{ConnectSnafu, WebSocketConnector},
12 config::{SourceConfig, SourceContext},
13 serde::{default_decoding, default_framing_message_based},
14 sources::Source,
15 tls::MaybeTlsSettings,
16};
17use vector_config::configurable_component;
18use vector_lib::config::{LogNamespace, SourceOutput};
19
20#[derive(Clone, Debug)]
22#[configurable_component]
23#[serde(untagged)]
24pub enum PongMessage {
25 Simple(String),
28
29 Advanced(PongValidation),
32}
33
34impl PongMessage {
35 pub fn matches(&self, msg: &str) -> bool {
36 match self {
37 PongMessage::Simple(expected) => msg == expected,
38 PongMessage::Advanced(validation) => validation.matches(msg),
39 }
40 }
41}
42
43#[derive(Clone, Debug)]
45#[configurable_component]
46#[serde(tag = "type")]
47#[serde(rename_all = "snake_case")]
48#[configurable(metadata(
49 docs::enum_tag_description = "The matching strategy to use for the pong message."
50))]
51pub enum PongValidation {
52 Exact {
54 value: String,
56 },
57
58 Contains {
60 value: String,
62 },
63}
64
65impl PongValidation {
66 pub fn matches(&self, msg: &str) -> bool {
67 match self {
68 PongValidation::Exact { value: expected } => msg == expected,
69 PongValidation::Contains { value: substring } => msg.contains(substring),
70 }
71 }
72}
73
74#[serde_as]
76#[configurable_component(source("websocket", "Collect events from a websocket endpoint.",))]
77#[derive(Clone, Debug)]
78pub struct WebSocketConfig {
79 #[serde(flatten)]
80 pub common: WebSocketCommonConfig,
81
82 #[configurable(derived)]
84 #[serde(default = "default_decoding")]
85 pub decoding: DeserializerConfig,
86
87 #[configurable(derived)]
89 #[serde(default = "default_framing_message_based")]
90 pub framing: FramingConfig,
91
92 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
94 #[serde(default = "default_connect_timeout_secs")]
95 #[configurable(metadata(docs::advanced))]
96 #[configurable(metadata(docs::examples = 10))]
97 pub connect_timeout_secs: Duration,
98
99 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
102 #[serde(default = "default_initial_message_timeout_secs")]
103 #[configurable(metadata(docs::advanced))]
104 #[configurable(metadata(docs::examples = 5))]
105 pub initial_message_timeout_secs: Duration,
106
107 #[configurable(metadata(docs::advanced))]
109 #[configurable(metadata(docs::examples = "SUBSCRIBE logs"))]
110 #[serde(default)]
111 pub initial_message: Option<String>,
112
113 #[configurable(metadata(docs::advanced))]
116 #[serde(default)]
117 pub ping_message: Option<String>,
118
119 #[configurable(metadata(docs::advanced))]
123 #[serde(default)]
124 pub pong_message: Option<PongMessage>,
125
126 #[configurable(metadata(docs::hidden))]
128 #[serde(default)]
129 pub log_namespace: Option<bool>,
130}
131
132const fn default_connect_timeout_secs() -> Duration {
133 Duration::from_secs(30)
134}
135
136const fn default_initial_message_timeout_secs() -> Duration {
137 Duration::from_secs(2)
138}
139
140impl Default for WebSocketConfig {
141 fn default() -> Self {
142 Self {
143 common: WebSocketCommonConfig::default(),
144 decoding: default_decoding(),
145 framing: default_framing_message_based(),
146 connect_timeout_secs: default_connect_timeout_secs(),
147 initial_message: None,
148 initial_message_timeout_secs: default_initial_message_timeout_secs(),
149 ping_message: None,
150 pong_message: None,
151 log_namespace: None,
152 }
153 }
154}
155
156impl_generate_config_from_default!(WebSocketConfig);
157
158#[async_trait::async_trait]
159#[typetag::serde(name = "websocket")]
160impl SourceConfig for WebSocketConfig {
161 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
162 let tls =
163 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
164 let connector =
165 WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())?;
166
167 let log_namespace = cx.log_namespace(self.log_namespace);
168 let decoder =
169 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
170 .build()?;
171
172 let params = WebSocketSourceParams {
173 connector,
174 decoder,
175 log_namespace,
176 };
177
178 let source = WebSocketSource::new(self.clone(), params);
179
180 Ok(Box::pin(source.run(cx).map_err(|_err| ())))
181 }
182
183 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
184 let log_namespace = global_log_namespace.merge(self.log_namespace);
185
186 let schema_definition = self
187 .decoding
188 .schema_definition(log_namespace)
189 .with_standard_vector_source_metadata();
190
191 vec![SourceOutput::new_maybe_logs(
192 self.decoding.output_type(),
193 schema_definition,
194 )]
195 }
196
197 fn can_acknowledge(&self) -> bool {
198 false
199 }
200}
201
202#[cfg(test)]
203mod test {
204 use super::*;
205 use vector_lib::schema::Definition;
206 use vector_lib::{config::LogNamespace, lookup::OwnedTargetPath, schema};
207 use vrl::owned_value_path;
208 use vrl::value::kind::{Collection, Kind};
209
210 #[test]
211 fn generate_config() {
212 crate::test_util::test_generate_config::<WebSocketConfig>();
213 }
214
215 #[test]
216 fn output_schema_definition_vector_namespace() {
217 let config = WebSocketConfig {
218 log_namespace: Some(true),
219 ..Default::default()
220 };
221
222 let definition = config
223 .outputs(LogNamespace::Vector)
224 .remove(0)
225 .schema_definition(true);
226
227 let expected_definition =
228 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
229 .with_meaning(OwnedTargetPath::event_root(), "message")
230 .with_metadata_field(
231 &owned_value_path!("vector", "source_type"),
232 Kind::bytes(),
233 None,
234 )
235 .with_metadata_field(
236 &owned_value_path!("vector", "ingest_timestamp"),
237 Kind::timestamp(),
238 None,
239 );
240
241 assert_eq!(definition, Some(expected_definition));
242 }
243
244 #[test]
245 fn output_schema_definition_legacy_namespace() {
246 let config = WebSocketConfig::default();
247
248 let definition = config
249 .outputs(LogNamespace::Legacy)
250 .remove(0)
251 .schema_definition(true);
252
253 let expected_definition = schema::Definition::new_with_default_metadata(
254 Kind::object(Collection::empty()),
255 [LogNamespace::Legacy],
256 )
257 .with_event_field(
258 &owned_value_path!("message"),
259 Kind::bytes(),
260 Some("message"),
261 )
262 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
263 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None);
264
265 assert_eq!(definition, Some(expected_definition));
266 }
267}