vector/sinks/websocket/
config.rs1use snafu::ResultExt;
2use vector_lib::codecs::JsonSerializerConfig;
3use vector_lib::configurable::configurable_component;
4
5use crate::common::websocket::WebSocketCommonConfig;
6use crate::{
7 codecs::EncodingConfig,
8 common::websocket::{ConnectSnafu, WebSocketConnector, WebSocketError},
9 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
10 sinks::{websocket::sink::WebSocketSink, Healthcheck, VectorSink},
11 tls::MaybeTlsSettings,
12};
13
14#[configurable_component(sink(
16 "websocket",
17 "Deliver observability event data to a websocket listener."
18))]
19#[derive(Clone, Debug)]
20pub struct WebSocketSinkConfig {
21 #[serde(flatten)]
22 pub common: WebSocketCommonConfig,
23
24 #[configurable(derived)]
25 pub encoding: EncodingConfig,
26
27 #[configurable(derived)]
28 #[serde(
29 default,
30 deserialize_with = "crate::serde::bool_or_struct",
31 skip_serializing_if = "crate::serde::is_default"
32 )]
33 pub acknowledgements: AcknowledgementsConfig,
34}
35
36impl GenerateConfig for WebSocketSinkConfig {
37 fn generate_config() -> toml::Value {
38 toml::Value::try_from(Self {
39 common: WebSocketCommonConfig {
40 ..Default::default()
41 },
42 encoding: JsonSerializerConfig::default().into(),
43 acknowledgements: Default::default(),
44 })
45 .unwrap()
46 }
47}
48
49#[async_trait::async_trait]
50#[typetag::serde(name = "websocket")]
51impl SinkConfig for WebSocketSinkConfig {
52 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
53 let connector = self.build_connector()?;
54 let ws_sink = WebSocketSink::new(self, connector.clone())?;
55
56 Ok((
57 VectorSink::from_event_streamsink(ws_sink),
58 Box::pin(async move { connector.healthcheck().await }),
59 ))
60 }
61
62 fn input(&self) -> Input {
63 Input::new(self.encoding.config().input_type())
64 }
65
66 fn acknowledgements(&self) -> &AcknowledgementsConfig {
67 &self.acknowledgements
68 }
69}
70
71impl WebSocketSinkConfig {
72 fn build_connector(&self) -> Result<WebSocketConnector, WebSocketError> {
73 let tls =
74 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
75 WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())
76 }
77}
78
79#[cfg(test)]
80mod test {
81 use super::*;
82
83 #[test]
84 fn generate_config() {
85 crate::test_util::test_generate_config::<WebSocketSinkConfig>();
86 }
87}