vector/sinks/websocket_server/
config.rs1use std::{collections::HashMap, net::SocketAddr};
2
3use vector_lib::{codecs::JsonSerializerConfig, configurable::configurable_component};
4
5use super::{buffering::MessageBufferingConfig, sink::WebSocketListenerSink};
6use crate::{
7 codecs::EncodingConfig,
8 common::http::server_auth::HttpServerAuthConfig,
9 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
10 sinks::{Healthcheck, VectorSink},
11 tls::TlsEnableableConfig,
12};
13
14#[configurable_component(sink(
16 "websocket_server",
17 "Deliver observability event data to websocket clients."
18))]
19#[derive(Clone, Debug)]
20pub struct WebSocketListenerSinkConfig {
21 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
25 #[configurable(metadata(docs::examples = "localhost:80"))]
26 pub address: SocketAddr,
27
28 #[configurable(derived)]
29 pub tls: Option<TlsEnableableConfig>,
30
31 #[configurable(derived)]
32 pub encoding: EncodingConfig,
33
34 #[configurable(derived)]
35 #[serde(default)]
36 pub subprotocol: SubProtocolConfig,
37
38 #[configurable(derived)]
39 #[serde(
40 default,
41 deserialize_with = "crate::serde::bool_or_struct",
42 skip_serializing_if = "crate::serde::is_default"
43 )]
44 pub acknowledgements: AcknowledgementsConfig,
45
46 #[configurable(derived)]
47 pub message_buffering: Option<MessageBufferingConfig>,
48
49 #[configurable(derived)]
50 pub auth: Option<HttpServerAuthConfig>,
51
52 #[configurable(derived)]
54 #[serde(default)]
55 pub internal_metrics: InternalMetricsConfig,
56}
57
58#[configurable_component]
60#[derive(Clone, Debug, PartialEq, Eq, Default)]
61#[serde(deny_unknown_fields)]
62pub struct InternalMetricsConfig {
63 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
65 #[configurable(metadata(
66 docs::additional_props_description = "Maps extra tag keys to values."
67 ))]
68 pub extra_tags: HashMap<String, ExtraMetricTagsConfig>,
69}
70
71#[configurable_component]
73#[derive(Clone, Debug, PartialEq, Eq)]
74#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
75#[configurable(metadata(docs::enum_tag_description = "Enum for extra metric tag values."))]
76pub enum ExtraMetricTagsConfig {
77 Fixed {
79 value: String,
81 },
82 Header {
84 name: String,
86 },
87 Query {
89 name: String,
91 },
92 Url,
94 IpAddress {
96 #[serde(default = "crate::serde::default_false")]
100 with_port: bool,
101 },
102}
103
104#[configurable_component]
106#[derive(Clone, Debug, PartialEq, Eq)]
107#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
108#[configurable(metadata(docs::enum_tag_description = "Enum for websocket subprotocol handling."))]
109pub enum SubProtocolConfig {
110 Any,
112 Specific {
115 supported_subprotocols: Vec<String>,
118 },
119}
120
121impl Default for SubProtocolConfig {
122 fn default() -> Self {
123 Self::Specific {
124 supported_subprotocols: Vec::default(),
125 }
126 }
127}
128
129impl Default for WebSocketListenerSinkConfig {
130 fn default() -> Self {
131 Self {
132 address: "0.0.0.0:8080".parse().unwrap(),
133 encoding: JsonSerializerConfig::default().into(),
134 tls: None,
135 acknowledgements: Default::default(),
136 message_buffering: None,
137 subprotocol: Default::default(),
138 auth: None,
139 internal_metrics: InternalMetricsConfig::default(),
140 }
141 }
142}
143
144#[async_trait::async_trait]
145#[typetag::serde(name = "websocket_server")]
146impl SinkConfig for WebSocketListenerSinkConfig {
147 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
148 let ws_sink = WebSocketListenerSink::new(self.clone(), cx)?;
149
150 Ok((
151 VectorSink::from_event_streamsink(ws_sink),
152 Box::pin(async move { Ok(()) }),
153 ))
154 }
155
156 fn input(&self) -> Input {
157 Input::new(self.encoding.config().input_type())
158 }
159
160 fn acknowledgements(&self) -> &AcknowledgementsConfig {
161 &self.acknowledgements
162 }
163}
164
165impl_generate_config_from_default!(WebSocketListenerSinkConfig);
166
167#[cfg(test)]
168mod test {
169 use super::*;
170
171 #[test]
172 fn generate_config() {
173 crate::test_util::test_generate_config::<WebSocketListenerSinkConfig>();
174 }
175}