vector/sources/websocket/
config.rs

1use futures::TryFutureExt;
2use serde_with::serde_as;
3use snafu::ResultExt;
4use std::time::Duration;
5use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
6
7use super::source::{WebSocketSource, WebSocketSourceParams};
8use crate::common::websocket::WebSocketCommonConfig;
9use crate::{
10    codecs::DecodingConfig,
11    common::websocket::{ConnectSnafu, WebSocketConnector},
12    config::{SourceConfig, SourceContext},
13    serde::{default_decoding, default_framing_message_based},
14    sources::Source,
15    tls::MaybeTlsSettings,
16};
17use vector_config::configurable_component;
18use vector_lib::config::{LogNamespace, SourceOutput};
19
20/// Defines the different shapes the `pong_message` config can take.
21#[derive(Clone, Debug)]
22#[configurable_component]
23#[serde(untagged)]
24pub enum PongMessage {
25    /// For exact string matching.
26    /// e.g., pong_message: "pong"
27    Simple(String),
28
29    /// For advanced matching strategies.
30    /// e.g., pong_message: { type: contains, value: "pong" }
31    Advanced(PongValidation),
32}
33
34impl PongMessage {
35    pub fn matches(&self, msg: &str) -> bool {
36        match self {
37            PongMessage::Simple(expected) => msg == expected,
38            PongMessage::Advanced(validation) => validation.matches(msg),
39        }
40    }
41}
42
43/// Defines the advanced validation strategies for a pong message.
44#[derive(Clone, Debug)]
45#[configurable_component]
46#[serde(tag = "type")]
47#[serde(rename_all = "snake_case")]
48#[configurable(metadata(
49    docs::enum_tag_description = "The matching strategy to use for the pong message."
50))]
51pub enum PongValidation {
52    /// The entire message must be an exact match.
53    Exact {
54        /// The string value to match against.
55        value: String,
56    },
57
58    /// The message must contain the value as a substring.
59    Contains {
60        /// The string value to match against.
61        value: String,
62    },
63}
64
65impl PongValidation {
66    pub fn matches(&self, msg: &str) -> bool {
67        match self {
68            PongValidation::Exact { value: expected } => msg == expected,
69            PongValidation::Contains { value: substring } => msg.contains(substring),
70        }
71    }
72}
73
74/// Configuration for the `websocket` source.
75#[serde_as]
76#[configurable_component(source("websocket", "Collect events from a websocket endpoint.",))]
77#[derive(Clone, Debug)]
78pub struct WebSocketConfig {
79    #[serde(flatten)]
80    pub common: WebSocketCommonConfig,
81
82    /// Decoder to use on each received message.
83    #[configurable(derived)]
84    #[serde(default = "default_decoding")]
85    pub decoding: DeserializerConfig,
86
87    /// Framing to use in the decoding.
88    #[configurable(derived)]
89    #[serde(default = "default_framing_message_based")]
90    pub framing: FramingConfig,
91
92    /// Number of seconds before timing out while connecting.
93    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
94    #[serde(default = "default_connect_timeout_secs")]
95    #[configurable(metadata(docs::advanced))]
96    #[configurable(metadata(docs::examples = 10))]
97    pub connect_timeout_secs: Duration,
98
99    /// Number of seconds before timing out while waiting for a reply to the initial message.
100    /// This is only used when `initial_message` is also configured.
101    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
102    #[serde(default = "default_initial_message_timeout_secs")]
103    #[configurable(metadata(docs::advanced))]
104    #[configurable(metadata(docs::examples = 5))]
105    pub initial_message_timeout_secs: Duration,
106
107    /// An optional message to send to the server upon connection.
108    #[configurable(metadata(docs::advanced))]
109    #[configurable(metadata(docs::examples = "SUBSCRIBE logs"))]
110    #[serde(default)]
111    pub initial_message: Option<String>,
112
113    /// An optional application-level ping message to send over the WebSocket connection.
114    /// If not set, a standard WebSocket ping control frame is sent instead.
115    #[configurable(metadata(docs::advanced))]
116    #[serde(default)]
117    pub ping_message: Option<String>,
118
119    /// The expected application-level pong message to listen for as a response to a custom `ping_message`.
120    /// This is only used when `ping_message` is also configured. When a custom ping is sent,
121    /// receiving this specific message confirms that the connection is still alive.
122    #[configurable(metadata(docs::advanced))]
123    #[serde(default)]
124    pub pong_message: Option<PongMessage>,
125
126    /// The namespace to use for logs. This overrides the global setting.
127    #[configurable(metadata(docs::hidden))]
128    #[serde(default)]
129    pub log_namespace: Option<bool>,
130}
131
132const fn default_connect_timeout_secs() -> Duration {
133    Duration::from_secs(30)
134}
135
136const fn default_initial_message_timeout_secs() -> Duration {
137    Duration::from_secs(2)
138}
139
140impl Default for WebSocketConfig {
141    fn default() -> Self {
142        Self {
143            common: WebSocketCommonConfig::default(),
144            decoding: default_decoding(),
145            framing: default_framing_message_based(),
146            connect_timeout_secs: default_connect_timeout_secs(),
147            initial_message: None,
148            initial_message_timeout_secs: default_initial_message_timeout_secs(),
149            ping_message: None,
150            pong_message: None,
151            log_namespace: None,
152        }
153    }
154}
155
156impl_generate_config_from_default!(WebSocketConfig);
157
158#[async_trait::async_trait]
159#[typetag::serde(name = "websocket")]
160impl SourceConfig for WebSocketConfig {
161    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
162        let tls =
163            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
164        let connector =
165            WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())?;
166
167        let log_namespace = cx.log_namespace(self.log_namespace);
168        let decoder =
169            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
170                .build()?;
171
172        let params = WebSocketSourceParams {
173            connector,
174            decoder,
175            log_namespace,
176        };
177
178        let source = WebSocketSource::new(self.clone(), params);
179
180        Ok(Box::pin(source.run(cx).map_err(|_err| ())))
181    }
182
183    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
184        let log_namespace = global_log_namespace.merge(self.log_namespace);
185
186        let schema_definition = self
187            .decoding
188            .schema_definition(log_namespace)
189            .with_standard_vector_source_metadata();
190
191        vec![SourceOutput::new_maybe_logs(
192            self.decoding.output_type(),
193            schema_definition,
194        )]
195    }
196
197    fn can_acknowledge(&self) -> bool {
198        false
199    }
200}
201
202#[cfg(test)]
203mod test {
204    use super::*;
205    use vector_lib::schema::Definition;
206    use vector_lib::{config::LogNamespace, lookup::OwnedTargetPath, schema};
207    use vrl::owned_value_path;
208    use vrl::value::kind::{Collection, Kind};
209
210    #[test]
211    fn generate_config() {
212        crate::test_util::test_generate_config::<WebSocketConfig>();
213    }
214
215    #[test]
216    fn output_schema_definition_vector_namespace() {
217        let config = WebSocketConfig {
218            log_namespace: Some(true),
219            ..Default::default()
220        };
221
222        let definition = config
223            .outputs(LogNamespace::Vector)
224            .remove(0)
225            .schema_definition(true);
226
227        let expected_definition =
228            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
229                .with_meaning(OwnedTargetPath::event_root(), "message")
230                .with_metadata_field(
231                    &owned_value_path!("vector", "source_type"),
232                    Kind::bytes(),
233                    None,
234                )
235                .with_metadata_field(
236                    &owned_value_path!("vector", "ingest_timestamp"),
237                    Kind::timestamp(),
238                    None,
239                );
240
241        assert_eq!(definition, Some(expected_definition));
242    }
243
244    #[test]
245    fn output_schema_definition_legacy_namespace() {
246        let config = WebSocketConfig::default();
247
248        let definition = config
249            .outputs(LogNamespace::Legacy)
250            .remove(0)
251            .schema_definition(true);
252
253        let expected_definition = schema::Definition::new_with_default_metadata(
254            Kind::object(Collection::empty()),
255            [LogNamespace::Legacy],
256        )
257        .with_event_field(
258            &owned_value_path!("message"),
259            Kind::bytes(),
260            Some("message"),
261        )
262        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
263        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None);
264
265        assert_eq!(definition, Some(expected_definition));
266    }
267}