vector/sinks/websocket/
config.rs

1use snafu::ResultExt;
2use vector_lib::{codecs::JsonSerializerConfig, configurable::configurable_component};
3
4use crate::{
5    codecs::EncodingConfig,
6    common::websocket::{ConnectSnafu, WebSocketCommonConfig, WebSocketConnector, WebSocketError},
7    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
8    sinks::{Healthcheck, VectorSink, websocket::sink::WebSocketSink},
9    tls::MaybeTlsSettings,
10};
11
12/// Configuration for the `websocket` sink.
13#[configurable_component(sink(
14    "websocket",
15    "Deliver observability event data to a websocket listener."
16))]
17#[derive(Clone, Debug)]
18pub struct WebSocketSinkConfig {
19    #[serde(flatten)]
20    pub common: WebSocketCommonConfig,
21
22    #[configurable(derived)]
23    pub encoding: EncodingConfig,
24
25    #[configurable(derived)]
26    #[serde(
27        default,
28        deserialize_with = "crate::serde::bool_or_struct",
29        skip_serializing_if = "crate::serde::is_default"
30    )]
31    pub acknowledgements: AcknowledgementsConfig,
32}
33
34impl GenerateConfig for WebSocketSinkConfig {
35    fn generate_config() -> toml::Value {
36        toml::Value::try_from(Self {
37            common: WebSocketCommonConfig {
38                ..Default::default()
39            },
40            encoding: JsonSerializerConfig::default().into(),
41            acknowledgements: Default::default(),
42        })
43        .unwrap()
44    }
45}
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "websocket")]
49impl SinkConfig for WebSocketSinkConfig {
50    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
51        let connector = self.build_connector()?;
52        let ws_sink = WebSocketSink::new(self, connector.clone())?;
53
54        Ok((
55            VectorSink::from_event_streamsink(ws_sink),
56            Box::pin(async move { connector.healthcheck().await }),
57        ))
58    }
59
60    fn input(&self) -> Input {
61        Input::new(self.encoding.config().input_type())
62    }
63
64    fn acknowledgements(&self) -> &AcknowledgementsConfig {
65        &self.acknowledgements
66    }
67}
68
69impl WebSocketSinkConfig {
70    fn build_connector(&self) -> Result<WebSocketConnector, WebSocketError> {
71        let tls =
72            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
73        WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())
74    }
75}
76
77#[cfg(test)]
78mod test {
79    use super::*;
80
81    #[test]
82    fn generate_config() {
83        crate::test_util::test_generate_config::<WebSocketSinkConfig>();
84    }
85}