vector/sinks/websocket/
config.rs

1use snafu::ResultExt;
2use vector_lib::codecs::JsonSerializerConfig;
3use vector_lib::configurable::configurable_component;
4
5use crate::common::websocket::WebSocketCommonConfig;
6use crate::{
7    codecs::EncodingConfig,
8    common::websocket::{ConnectSnafu, WebSocketConnector, WebSocketError},
9    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
10    sinks::{websocket::sink::WebSocketSink, Healthcheck, VectorSink},
11    tls::MaybeTlsSettings,
12};
13
14/// Configuration for the `websocket` sink.
15#[configurable_component(sink(
16    "websocket",
17    "Deliver observability event data to a websocket listener."
18))]
19#[derive(Clone, Debug)]
20pub struct WebSocketSinkConfig {
21    #[serde(flatten)]
22    pub common: WebSocketCommonConfig,
23
24    #[configurable(derived)]
25    pub encoding: EncodingConfig,
26
27    #[configurable(derived)]
28    #[serde(
29        default,
30        deserialize_with = "crate::serde::bool_or_struct",
31        skip_serializing_if = "crate::serde::is_default"
32    )]
33    pub acknowledgements: AcknowledgementsConfig,
34}
35
36impl GenerateConfig for WebSocketSinkConfig {
37    fn generate_config() -> toml::Value {
38        toml::Value::try_from(Self {
39            common: WebSocketCommonConfig {
40                ..Default::default()
41            },
42            encoding: JsonSerializerConfig::default().into(),
43            acknowledgements: Default::default(),
44        })
45        .unwrap()
46    }
47}
48
49#[async_trait::async_trait]
50#[typetag::serde(name = "websocket")]
51impl SinkConfig for WebSocketSinkConfig {
52    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
53        let connector = self.build_connector()?;
54        let ws_sink = WebSocketSink::new(self, connector.clone())?;
55
56        Ok((
57            VectorSink::from_event_streamsink(ws_sink),
58            Box::pin(async move { connector.healthcheck().await }),
59        ))
60    }
61
62    fn input(&self) -> Input {
63        Input::new(self.encoding.config().input_type())
64    }
65
66    fn acknowledgements(&self) -> &AcknowledgementsConfig {
67        &self.acknowledgements
68    }
69}
70
71impl WebSocketSinkConfig {
72    fn build_connector(&self) -> Result<WebSocketConnector, WebSocketError> {
73        let tls =
74            MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
75        WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())
76    }
77}
78
79#[cfg(test)]
80mod test {
81    use super::*;
82
83    #[test]
84    fn generate_config() {
85        crate::test_util::test_generate_config::<WebSocketSinkConfig>();
86    }
87}