vector/sinks/websocket_server/
config.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use vector_lib::codecs::JsonSerializerConfig;
4use vector_lib::configurable::configurable_component;
5
6use crate::{
7    codecs::EncodingConfig,
8    common::http::server_auth::HttpServerAuthConfig,
9    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
10    sinks::{Healthcheck, VectorSink},
11    tls::TlsEnableableConfig,
12};
13
14use super::buffering::MessageBufferingConfig;
15use super::sink::WebSocketListenerSink;
16
17/// Configuration for the `websocket_server` sink.
18#[configurable_component(sink(
19    "websocket_server",
20    "Deliver observability event data to websocket clients."
21))]
22#[derive(Clone, Debug)]
23pub struct WebSocketListenerSinkConfig {
24    /// The socket address to listen for connections on.
25    ///
26    /// This value _must_ include a port.
27    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
28    #[configurable(metadata(docs::examples = "localhost:80"))]
29    pub address: SocketAddr,
30
31    #[configurable(derived)]
32    pub tls: Option<TlsEnableableConfig>,
33
34    #[configurable(derived)]
35    pub encoding: EncodingConfig,
36
37    #[configurable(derived)]
38    #[serde(default)]
39    pub subprotocol: SubProtocolConfig,
40
41    #[configurable(derived)]
42    #[serde(
43        default,
44        deserialize_with = "crate::serde::bool_or_struct",
45        skip_serializing_if = "crate::serde::is_default"
46    )]
47    pub acknowledgements: AcknowledgementsConfig,
48
49    #[configurable(derived)]
50    pub message_buffering: Option<MessageBufferingConfig>,
51
52    #[configurable(derived)]
53    pub auth: Option<HttpServerAuthConfig>,
54
55    /// Configuration of internal metrics
56    #[configurable(derived)]
57    #[serde(default)]
58    pub internal_metrics: InternalMetricsConfig,
59}
60
61/// Configuration of internal metrics for websocket server.
62#[configurable_component]
63#[derive(Clone, Debug, PartialEq, Eq, Default)]
64#[serde(deny_unknown_fields)]
65pub struct InternalMetricsConfig {
66    /// Extra tags to add to all metrics generated by this component
67    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
68    #[configurable(metadata(
69        docs::additional_props_description = "Maps extra tag keys to values."
70    ))]
71    pub extra_tags: HashMap<String, ExtraMetricTagsConfig>,
72}
73
74/// Configuration of extra metrics tags
75#[configurable_component]
76#[derive(Clone, Debug, PartialEq, Eq)]
77#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
78#[configurable(metadata(docs::enum_tag_description = "Enum for extra metric tag values."))]
79pub enum ExtraMetricTagsConfig {
80    /// Hard-coded extra metric tag for all clients
81    Fixed {
82        /// Tag value
83        value: String,
84    },
85    /// Extra metric tag that takes on the value of a header
86    Header {
87        /// Name of the header to use as value
88        name: String,
89    },
90    /// Extra metric tag that takes on the value of a query parameter
91    Query {
92        /// Name of the query parameter to use as value
93        name: String,
94    },
95    /// Extra metric tag that takes full request URL as value
96    Url,
97    /// Extra metric tag that uses client ip address as value
98    IpAddress {
99        /// Set to true if port should be included with the ip address.
100        ///
101        /// By default port is not included
102        #[serde(default = "crate::serde::default_false")]
103        with_port: bool,
104    },
105}
106
107/// Configuration of websocket subprotocol handling.
108#[configurable_component]
109#[derive(Clone, Debug, PartialEq, Eq)]
110#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
111#[configurable(metadata(docs::enum_tag_description = "Enum for websocket subprotocol handling."))]
112pub enum SubProtocolConfig {
113    /// Supports any subprotocol that the client sends. First of the requested subprotocols will be accepted.
114    Any,
115    /// Supports only listed subprotocols. If client doesn't send any of these, server will skip
116    /// `Sec-WebSocket-Protocol` header and the client can choose to close the connection then.
117    Specific {
118        /// List of supported `Sec-WebSocket-Protocol` values. First match out of requested
119        /// subprotocols will be accepted.
120        supported_subprotocols: Vec<String>,
121    },
122}
123
124impl Default for SubProtocolConfig {
125    fn default() -> Self {
126        Self::Specific {
127            supported_subprotocols: Vec::default(),
128        }
129    }
130}
131
132impl Default for WebSocketListenerSinkConfig {
133    fn default() -> Self {
134        Self {
135            address: "0.0.0.0:8080".parse().unwrap(),
136            encoding: JsonSerializerConfig::default().into(),
137            tls: None,
138            acknowledgements: Default::default(),
139            message_buffering: None,
140            subprotocol: Default::default(),
141            auth: None,
142            internal_metrics: InternalMetricsConfig::default(),
143        }
144    }
145}
146
147#[async_trait::async_trait]
148#[typetag::serde(name = "websocket_server")]
149impl SinkConfig for WebSocketListenerSinkConfig {
150    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
151        let ws_sink = WebSocketListenerSink::new(self.clone(), cx)?;
152
153        Ok((
154            VectorSink::from_event_streamsink(ws_sink),
155            Box::pin(async move { Ok(()) }),
156        ))
157    }
158
159    fn input(&self) -> Input {
160        Input::new(self.encoding.config().input_type())
161    }
162
163    fn acknowledgements(&self) -> &AcknowledgementsConfig {
164        &self.acknowledgements
165    }
166}
167
168impl_generate_config_from_default!(WebSocketListenerSinkConfig);
169
170#[cfg(test)]
171mod test {
172    use super::*;
173
174    #[test]
175    fn generate_config() {
176        crate::test_util::test_generate_config::<WebSocketListenerSinkConfig>();
177    }
178}