vector/sinks/websocket_server/
config.rs1use std::{collections::HashMap, net::SocketAddr};
2
3use vector_lib::codecs::JsonSerializerConfig;
4use vector_lib::configurable::configurable_component;
5
6use crate::{
7 codecs::EncodingConfig,
8 common::http::server_auth::HttpServerAuthConfig,
9 config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
10 sinks::{Healthcheck, VectorSink},
11 tls::TlsEnableableConfig,
12};
13
14use super::buffering::MessageBufferingConfig;
15use super::sink::WebSocketListenerSink;
16
17#[configurable_component(sink(
19 "websocket_server",
20 "Deliver observability event data to websocket clients."
21))]
22#[derive(Clone, Debug)]
23pub struct WebSocketListenerSinkConfig {
24 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
28 #[configurable(metadata(docs::examples = "localhost:80"))]
29 pub address: SocketAddr,
30
31 #[configurable(derived)]
32 pub tls: Option<TlsEnableableConfig>,
33
34 #[configurable(derived)]
35 pub encoding: EncodingConfig,
36
37 #[configurable(derived)]
38 #[serde(default)]
39 pub subprotocol: SubProtocolConfig,
40
41 #[configurable(derived)]
42 #[serde(
43 default,
44 deserialize_with = "crate::serde::bool_or_struct",
45 skip_serializing_if = "crate::serde::is_default"
46 )]
47 pub acknowledgements: AcknowledgementsConfig,
48
49 #[configurable(derived)]
50 pub message_buffering: Option<MessageBufferingConfig>,
51
52 #[configurable(derived)]
53 pub auth: Option<HttpServerAuthConfig>,
54
55 #[configurable(derived)]
57 #[serde(default)]
58 pub internal_metrics: InternalMetricsConfig,
59}
60
61#[configurable_component]
63#[derive(Clone, Debug, PartialEq, Eq, Default)]
64#[serde(deny_unknown_fields)]
65pub struct InternalMetricsConfig {
66 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
68 #[configurable(metadata(
69 docs::additional_props_description = "Maps extra tag keys to values."
70 ))]
71 pub extra_tags: HashMap<String, ExtraMetricTagsConfig>,
72}
73
74#[configurable_component]
76#[derive(Clone, Debug, PartialEq, Eq)]
77#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
78#[configurable(metadata(docs::enum_tag_description = "Enum for extra metric tag values."))]
79pub enum ExtraMetricTagsConfig {
80 Fixed {
82 value: String,
84 },
85 Header {
87 name: String,
89 },
90 Query {
92 name: String,
94 },
95 Url,
97 IpAddress {
99 #[serde(default = "crate::serde::default_false")]
103 with_port: bool,
104 },
105}
106
107#[configurable_component]
109#[derive(Clone, Debug, PartialEq, Eq)]
110#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "type")]
111#[configurable(metadata(docs::enum_tag_description = "Enum for websocket subprotocol handling."))]
112pub enum SubProtocolConfig {
113 Any,
115 Specific {
118 supported_subprotocols: Vec<String>,
121 },
122}
123
124impl Default for SubProtocolConfig {
125 fn default() -> Self {
126 Self::Specific {
127 supported_subprotocols: Vec::default(),
128 }
129 }
130}
131
132impl Default for WebSocketListenerSinkConfig {
133 fn default() -> Self {
134 Self {
135 address: "0.0.0.0:8080".parse().unwrap(),
136 encoding: JsonSerializerConfig::default().into(),
137 tls: None,
138 acknowledgements: Default::default(),
139 message_buffering: None,
140 subprotocol: Default::default(),
141 auth: None,
142 internal_metrics: InternalMetricsConfig::default(),
143 }
144 }
145}
146
147#[async_trait::async_trait]
148#[typetag::serde(name = "websocket_server")]
149impl SinkConfig for WebSocketListenerSinkConfig {
150 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
151 let ws_sink = WebSocketListenerSink::new(self.clone(), cx)?;
152
153 Ok((
154 VectorSink::from_event_streamsink(ws_sink),
155 Box::pin(async move { Ok(()) }),
156 ))
157 }
158
159 fn input(&self) -> Input {
160 Input::new(self.encoding.config().input_type())
161 }
162
163 fn acknowledgements(&self) -> &AcknowledgementsConfig {
164 &self.acknowledgements
165 }
166}
167
168impl_generate_config_from_default!(WebSocketListenerSinkConfig);
169
170#[cfg(test)]
171mod test {
172 use super::*;
173
174 #[test]
175 fn generate_config() {
176 crate::test_util::test_generate_config::<WebSocketListenerSinkConfig>();
177 }
178}