vector/sinks/websocket_server/
config.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use vector_lib::{codecs::JsonSerializerConfig, configurable::configurable_component};
4
5use super::{buffering::MessageBufferingConfig, sink::WebSocketListenerSink};
6use crate::{
7    codecs::EncodingConfig,
8    common::http::server_auth::HttpServerAuthConfig,
9    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
10    sinks::{Healthcheck, VectorSink},
11    tls::TlsEnableableConfig,
12};
13
14/// Configuration for the `websocket_server` sink.
15#[configurable_component(sink(
16    "websocket_server",
17    "Deliver observability event data to websocket clients."
18))]
19#[derive(Clone, Debug)]
20pub struct WebSocketListenerSinkConfig {
21    /// The socket address to listen for connections on.
22    ///
23    /// This value _must_ include a port.
24    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
25    #[configurable(metadata(docs::examples = "localhost:80"))]
26    pub address: SocketAddr,
27
28    #[configurable(derived)]
29    pub tls: Option<TlsEnableableConfig>,
30
31    #[configurable(derived)]
32    pub encoding: EncodingConfig,
33
34    #[configurable(derived)]
35    #[serde(default)]
36    pub subprotocol: SubProtocolConfig,
37
38    #[configurable(derived)]
39    #[serde(
40        default,
41        deserialize_with = "crate::serde::bool_or_struct",
42        skip_serializing_if = "crate::serde::is_default"
43    )]
44    pub acknowledgements: AcknowledgementsConfig,
45
46    #[configurable(derived)]
47    pub message_buffering: Option<MessageBufferingConfig>,
48
49    #[configurable(derived)]
50    pub auth: Option<HttpServerAuthConfig>,
51
52    /// Configuration of internal metrics
53    #[configurable(derived)]
54    #[serde(default)]
55    pub internal_metrics: InternalMetricsConfig,
56}
57
58/// Configuration of internal metrics for websocket server.
59#[configurable_component]
60#[derive(Clone, Debug, PartialEq, Eq, Default)]
61#[serde(deny_unknown_fields)]
62pub struct InternalMetricsConfig {
63    /// Extra tags to add to all metrics generated by this component
64    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
65    #[configurable(metadata(
66        docs::additional_props_description = "Maps extra tag keys to values."
67    ))]
68    pub extra_tags: HashMap<String, ExtraMetricTagsConfig>,
69}
70
71/// Configuration of extra metrics tags
72#[configurable_component]
73#[derive(Clone, Debug, PartialEq, Eq)]
74#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
75#[configurable(metadata(docs::enum_tag_description = "Enum for extra metric tag values."))]
76pub enum ExtraMetricTagsConfig {
77    /// Hard-coded extra metric tag for all clients
78    Fixed {
79        /// Tag value
80        value: String,
81    },
82    /// Extra metric tag that takes on the value of a header
83    Header {
84        /// Name of the header to use as value
85        name: String,
86    },
87    /// Extra metric tag that takes on the value of a query parameter
88    Query {
89        /// Name of the query parameter to use as value
90        name: String,
91    },
92    /// Extra metric tag that takes full request URL as value
93    Url,
94    /// Extra metric tag that uses client ip address as value
95    IpAddress {
96        /// Set to true if port should be included with the ip address.
97        ///
98        /// By default port is not included
99        #[serde(default = "crate::serde::default_false")]
100        with_port: bool,
101    },
102}
103
104/// Configuration of websocket subprotocol handling.
105#[configurable_component]
106#[derive(Clone, Debug, PartialEq, Eq)]
107#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
108#[configurable(metadata(docs::enum_tag_description = "Enum for websocket subprotocol handling."))]
109pub enum SubProtocolConfig {
110    /// Supports any subprotocol that the client sends. First of the requested subprotocols will be accepted.
111    Any,
112    /// Supports only listed subprotocols. If client doesn't send any of these, server will skip
113    /// `Sec-WebSocket-Protocol` header and the client can choose to close the connection then.
114    Specific {
115        /// List of supported `Sec-WebSocket-Protocol` values. First match out of requested
116        /// subprotocols will be accepted.
117        supported_subprotocols: Vec<String>,
118    },
119}
120
121impl Default for SubProtocolConfig {
122    fn default() -> Self {
123        Self::Specific {
124            supported_subprotocols: Vec::default(),
125        }
126    }
127}
128
129impl Default for WebSocketListenerSinkConfig {
130    fn default() -> Self {
131        Self {
132            address: "0.0.0.0:8080".parse().unwrap(),
133            encoding: JsonSerializerConfig::default().into(),
134            tls: None,
135            acknowledgements: Default::default(),
136            message_buffering: None,
137            subprotocol: Default::default(),
138            auth: None,
139            internal_metrics: InternalMetricsConfig::default(),
140        }
141    }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde(name = "websocket_server")]
146impl SinkConfig for WebSocketListenerSinkConfig {
147    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
148        let ws_sink = WebSocketListenerSink::new(self.clone(), cx)?;
149
150        Ok((
151            VectorSink::from_event_streamsink(ws_sink),
152            Box::pin(async move { Ok(()) }),
153        ))
154    }
155
156    fn input(&self) -> Input {
157        Input::new(self.encoding.config().input_type())
158    }
159
160    fn acknowledgements(&self) -> &AcknowledgementsConfig {
161        &self.acknowledgements
162    }
163}
164
165impl_generate_config_from_default!(WebSocketListenerSinkConfig);
166
167#[cfg(test)]
168mod test {
169    use super::*;
170
171    #[test]
172    fn generate_config() {
173        crate::test_util::test_generate_config::<WebSocketListenerSinkConfig>();
174    }
175}