vector/sources/websocket/
config.rs

1use std::time::Duration;
2
3use futures::TryFutureExt;
4use serde_with::serde_as;
5use snafu::ResultExt;
6use vector_config::configurable_component;
7use vector_lib::{
8    codecs::decoding::{DeserializerConfig, FramingConfig},
9    config::{LogNamespace, SourceOutput},
10};
11
12use super::source::{WebSocketSource, WebSocketSourceParams};
13use crate::{
14    codecs::DecodingConfig,
15    common::websocket::{ConnectSnafu, WebSocketCommonConfig, WebSocketConnector},
16    config::{SourceConfig, SourceContext},
17    serde::{default_decoding, default_framing_message_based},
18    sources::Source,
19    tls::MaybeTlsSettings,
20};
21
22/// Defines the different shapes the `pong_message` config can take.
23#[derive(Clone, Debug)]
24#[configurable_component]
25#[serde(untagged)]
26pub enum PongMessage {
27    /// For exact string matching.
28    /// e.g., pong_message: "pong"
29    Simple(String),
30
31    /// For advanced matching strategies.
32    /// e.g., pong_message: { type: contains, value: "pong" }
33    Advanced(PongValidation),
34}
35
36impl PongMessage {
37    pub fn matches(&self, msg: &str) -> bool {
38        match self {
39            PongMessage::Simple(expected) => msg == expected,
40            PongMessage::Advanced(validation) => validation.matches(msg),
41        }
42    }
43}
44
45/// Defines the advanced validation strategies for a pong message.
46#[derive(Clone, Debug)]
47#[configurable_component]
48#[serde(tag = "type")]
49#[serde(rename_all = "snake_case")]
50#[configurable(metadata(
51    docs::enum_tag_description = "The matching strategy to use for the pong message."
52))]
53pub enum PongValidation {
54    /// The entire message must be an exact match.
55    Exact {
56        /// The string value to match against.
57        value: String,
58    },
59
60    /// The message must contain the value as a substring.
61    Contains {
62        /// The string value to match against.
63        value: String,
64    },
65}
66
67impl PongValidation {
68    pub fn matches(&self, msg: &str) -> bool {
69        match self {
70            PongValidation::Exact { value: expected } => msg == expected,
71            PongValidation::Contains { value: substring } => msg.contains(substring),
72        }
73    }
74}
75
76/// Configuration for the `websocket` source.
77#[serde_as]
78#[configurable_component(source("websocket", "Collect events from a websocket endpoint.",))]
79#[derive(Clone, Debug)]
80pub struct WebSocketConfig {
81    #[serde(flatten)]
82    pub common: WebSocketCommonConfig,
83
84    /// Decoder to use on each received message.
85    #[configurable(derived)]
86    #[serde(default = "default_decoding")]
87    pub decoding: DeserializerConfig,
88
89    /// Framing to use in the decoding.
90    #[configurable(derived)]
91    #[serde(default = "default_framing_message_based")]
92    pub framing: FramingConfig,
93
94    /// Number of seconds before timing out while connecting.
95    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
96    #[serde(default = "default_connect_timeout_secs")]
97    #[configurable(metadata(docs::advanced))]
98    #[configurable(metadata(docs::examples = 10))]
99    pub connect_timeout_secs: Duration,
100
101    /// Number of seconds before timing out while waiting for a reply to the initial message.
102    /// This is only used when `initial_message` is also configured.
103    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
104    #[serde(default = "default_initial_message_timeout_secs")]
105    #[configurable(metadata(docs::advanced))]
106    #[configurable(metadata(docs::examples = 5))]
107    pub initial_message_timeout_secs: Duration,
108
109    /// An optional message to send to the server upon connection.
110    #[configurable(metadata(docs::advanced))]
111    #[configurable(metadata(docs::examples = "SUBSCRIBE logs"))]
112    #[serde(default)]
113    pub initial_message: Option<String>,
114
115    /// An optional application-level ping message to send over the WebSocket connection.
116    /// If not set, a standard WebSocket ping control frame is sent instead.
117    #[configurable(metadata(docs::advanced))]
118    #[serde(default)]
119    pub ping_message: Option<String>,
120
121    /// The expected application-level pong message to listen for as a response to a custom `ping_message`.
122    /// This is only used when `ping_message` is also configured. When a custom ping is sent,
123    /// receiving this specific message confirms that the connection is still alive.
124    #[configurable(metadata(docs::advanced))]
125    #[serde(default)]
126    pub pong_message: Option<PongMessage>,
127
128    /// The namespace to use for logs. This overrides the global setting.
129    #[configurable(metadata(docs::hidden))]
130    #[serde(default)]
131    pub log_namespace: Option<bool>,
132}
133
134const fn default_connect_timeout_secs() -> Duration {
135    Duration::from_secs(30)
136}
137
138const fn default_initial_message_timeout_secs() -> Duration {
139    Duration::from_secs(2)
140}
141
142impl Default for WebSocketConfig {
143    fn default() -> Self {
144        Self {
145            common: WebSocketCommonConfig::default(),
146            decoding: default_decoding(),
147            framing: default_framing_message_based(),
148            connect_timeout_secs: default_connect_timeout_secs(),
149            initial_message: None,
150            initial_message_timeout_secs: default_initial_message_timeout_secs(),
151            ping_message: None,
152            pong_message: None,
153            log_namespace: None,
154        }
155    }
156}
157
158impl_generate_config_from_default!(WebSocketConfig);
159
160#[async_trait::async_trait]
161#[typetag::serde(name = "websocket")]
162impl SourceConfig for WebSocketConfig {
163    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
164        let tls =
165            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
166        let connector =
167            WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())?;
168
169        let log_namespace = cx.log_namespace(self.log_namespace);
170        let decoder =
171            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
172                .build()?;
173
174        let params = WebSocketSourceParams {
175            connector,
176            decoder,
177            log_namespace,
178        };
179
180        let source = WebSocketSource::new(self.clone(), params);
181
182        Ok(Box::pin(source.run(cx).map_err(|_err| ())))
183    }
184
185    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
186        let log_namespace = global_log_namespace.merge(self.log_namespace);
187
188        let schema_definition = self
189            .decoding
190            .schema_definition(log_namespace)
191            .with_standard_vector_source_metadata();
192
193        vec![SourceOutput::new_maybe_logs(
194            self.decoding.output_type(),
195            schema_definition,
196        )]
197    }
198
199    fn can_acknowledge(&self) -> bool {
200        false
201    }
202}
203
204#[cfg(test)]
205mod test {
206    use vector_lib::{config::LogNamespace, lookup::OwnedTargetPath, schema, schema::Definition};
207    use vrl::{
208        owned_value_path,
209        value::kind::{Collection, Kind},
210    };
211
212    use super::*;
213
214    #[test]
215    fn generate_config() {
216        crate::test_util::test_generate_config::<WebSocketConfig>();
217    }
218
219    #[test]
220    fn output_schema_definition_vector_namespace() {
221        let config = WebSocketConfig {
222            log_namespace: Some(true),
223            ..Default::default()
224        };
225
226        let definition = config
227            .outputs(LogNamespace::Vector)
228            .remove(0)
229            .schema_definition(true);
230
231        let expected_definition =
232            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
233                .with_meaning(OwnedTargetPath::event_root(), "message")
234                .with_metadata_field(
235                    &owned_value_path!("vector", "source_type"),
236                    Kind::bytes(),
237                    None,
238                )
239                .with_metadata_field(
240                    &owned_value_path!("vector", "ingest_timestamp"),
241                    Kind::timestamp(),
242                    None,
243                );
244
245        assert_eq!(definition, Some(expected_definition));
246    }
247
248    #[test]
249    fn output_schema_definition_legacy_namespace() {
250        let config = WebSocketConfig::default();
251
252        let definition = config
253            .outputs(LogNamespace::Legacy)
254            .remove(0)
255            .schema_definition(true);
256
257        let expected_definition = schema::Definition::new_with_default_metadata(
258            Kind::object(Collection::empty()),
259            [LogNamespace::Legacy],
260        )
261        .with_event_field(
262            &owned_value_path!("message"),
263            Kind::bytes(),
264            Some("message"),
265        )
266        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
267        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None);
268
269        assert_eq!(definition, Some(expected_definition));
270    }
271}