vector/sources/websocket/
config.rs1use std::time::Duration;
2
3use futures::TryFutureExt;
4use serde_with::serde_as;
5use snafu::ResultExt;
6use vector_config::configurable_component;
7use vector_lib::{
8 codecs::decoding::{DeserializerConfig, FramingConfig},
9 config::{LogNamespace, SourceOutput},
10};
11
12use super::source::{WebSocketSource, WebSocketSourceParams};
13use crate::{
14 codecs::DecodingConfig,
15 common::websocket::{ConnectSnafu, WebSocketCommonConfig, WebSocketConnector},
16 config::{SourceConfig, SourceContext},
17 serde::{default_decoding, default_framing_message_based},
18 sources::Source,
19 tls::MaybeTlsSettings,
20};
21
22#[derive(Clone, Debug)]
24#[configurable_component]
25#[serde(untagged)]
26pub enum PongMessage {
27 Simple(String),
30
31 Advanced(PongValidation),
34}
35
36impl PongMessage {
37 pub fn matches(&self, msg: &str) -> bool {
38 match self {
39 PongMessage::Simple(expected) => msg == expected,
40 PongMessage::Advanced(validation) => validation.matches(msg),
41 }
42 }
43}
44
45#[derive(Clone, Debug)]
47#[configurable_component]
48#[serde(tag = "type")]
49#[serde(rename_all = "snake_case")]
50#[configurable(metadata(
51 docs::enum_tag_description = "The matching strategy to use for the pong message."
52))]
53pub enum PongValidation {
54 Exact {
56 value: String,
58 },
59
60 Contains {
62 value: String,
64 },
65}
66
67impl PongValidation {
68 pub fn matches(&self, msg: &str) -> bool {
69 match self {
70 PongValidation::Exact { value: expected } => msg == expected,
71 PongValidation::Contains { value: substring } => msg.contains(substring),
72 }
73 }
74}
75
76#[serde_as]
78#[configurable_component(source("websocket", "Collect events from a websocket endpoint.",))]
79#[derive(Clone, Debug)]
80pub struct WebSocketConfig {
81 #[serde(flatten)]
82 pub common: WebSocketCommonConfig,
83
84 #[configurable(derived)]
86 #[serde(default = "default_decoding")]
87 pub decoding: DeserializerConfig,
88
89 #[configurable(derived)]
91 #[serde(default = "default_framing_message_based")]
92 pub framing: FramingConfig,
93
94 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
96 #[serde(default = "default_connect_timeout_secs")]
97 #[configurable(metadata(docs::advanced))]
98 #[configurable(metadata(docs::examples = 10))]
99 pub connect_timeout_secs: Duration,
100
101 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
104 #[serde(default = "default_initial_message_timeout_secs")]
105 #[configurable(metadata(docs::advanced))]
106 #[configurable(metadata(docs::examples = 5))]
107 pub initial_message_timeout_secs: Duration,
108
109 #[configurable(metadata(docs::advanced))]
111 #[configurable(metadata(docs::examples = "SUBSCRIBE logs"))]
112 #[serde(default)]
113 pub initial_message: Option<String>,
114
115 #[configurable(metadata(docs::advanced))]
118 #[serde(default)]
119 pub ping_message: Option<String>,
120
121 #[configurable(metadata(docs::advanced))]
125 #[serde(default)]
126 pub pong_message: Option<PongMessage>,
127
128 #[configurable(metadata(docs::hidden))]
130 #[serde(default)]
131 pub log_namespace: Option<bool>,
132}
133
134const fn default_connect_timeout_secs() -> Duration {
135 Duration::from_secs(30)
136}
137
138const fn default_initial_message_timeout_secs() -> Duration {
139 Duration::from_secs(2)
140}
141
142impl Default for WebSocketConfig {
143 fn default() -> Self {
144 Self {
145 common: WebSocketCommonConfig::default(),
146 decoding: default_decoding(),
147 framing: default_framing_message_based(),
148 connect_timeout_secs: default_connect_timeout_secs(),
149 initial_message: None,
150 initial_message_timeout_secs: default_initial_message_timeout_secs(),
151 ping_message: None,
152 pong_message: None,
153 log_namespace: None,
154 }
155 }
156}
157
158impl_generate_config_from_default!(WebSocketConfig);
159
160#[async_trait::async_trait]
161#[typetag::serde(name = "websocket")]
162impl SourceConfig for WebSocketConfig {
163 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
164 let tls =
165 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
166 let connector =
167 WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())?;
168
169 let log_namespace = cx.log_namespace(self.log_namespace);
170 let decoder =
171 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
172 .build()?;
173
174 let params = WebSocketSourceParams {
175 connector,
176 decoder,
177 log_namespace,
178 };
179
180 let source = WebSocketSource::new(self.clone(), params);
181
182 Ok(Box::pin(source.run(cx).map_err(|_err| ())))
183 }
184
185 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
186 let log_namespace = global_log_namespace.merge(self.log_namespace);
187
188 let schema_definition = self
189 .decoding
190 .schema_definition(log_namespace)
191 .with_standard_vector_source_metadata();
192
193 vec![SourceOutput::new_maybe_logs(
194 self.decoding.output_type(),
195 schema_definition,
196 )]
197 }
198
199 fn can_acknowledge(&self) -> bool {
200 false
201 }
202}
203
204#[cfg(test)]
205mod test {
206 use vector_lib::{config::LogNamespace, lookup::OwnedTargetPath, schema, schema::Definition};
207 use vrl::{
208 owned_value_path,
209 value::kind::{Collection, Kind},
210 };
211
212 use super::*;
213
214 #[test]
215 fn generate_config() {
216 crate::test_util::test_generate_config::<WebSocketConfig>();
217 }
218
219 #[test]
220 fn output_schema_definition_vector_namespace() {
221 let config = WebSocketConfig {
222 log_namespace: Some(true),
223 ..Default::default()
224 };
225
226 let definition = config
227 .outputs(LogNamespace::Vector)
228 .remove(0)
229 .schema_definition(true);
230
231 let expected_definition =
232 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
233 .with_meaning(OwnedTargetPath::event_root(), "message")
234 .with_metadata_field(
235 &owned_value_path!("vector", "source_type"),
236 Kind::bytes(),
237 None,
238 )
239 .with_metadata_field(
240 &owned_value_path!("vector", "ingest_timestamp"),
241 Kind::timestamp(),
242 None,
243 );
244
245 assert_eq!(definition, Some(expected_definition));
246 }
247
248 #[test]
249 fn output_schema_definition_legacy_namespace() {
250 let config = WebSocketConfig::default();
251
252 let definition = config
253 .outputs(LogNamespace::Legacy)
254 .remove(0)
255 .schema_definition(true);
256
257 let expected_definition = schema::Definition::new_with_default_metadata(
258 Kind::object(Collection::empty()),
259 [LogNamespace::Legacy],
260 )
261 .with_event_field(
262 &owned_value_path!("message"),
263 Kind::bytes(),
264 Some("message"),
265 )
266 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
267 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None);
268
269 assert_eq!(definition, Some(expected_definition));
270 }
271}