vector/sinks/websocket_server/
config.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use std::net::SocketAddr;

use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;

use crate::{
    codecs::EncodingConfig,
    common::http::server_auth::HttpServerAuthConfig,
    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
    sinks::{Healthcheck, VectorSink},
    tls::TlsEnableableConfig,
};

use super::sink::WebSocketListenerSink;

/// Configuration for the `websocket_server` sink.
#[configurable_component(sink(
    "websocket_server",
    "Deliver observability event data to websocket clients."
))]
#[derive(Clone, Debug)]
pub struct WebSocketListenerSinkConfig {
    /// The socket address to listen for connections on.
    ///
    /// This value _must_ include a port.
    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
    #[configurable(metadata(docs::examples = "localhost:80"))]
    pub address: SocketAddr,

    #[configurable(derived)]
    pub tls: Option<TlsEnableableConfig>,

    #[configurable(derived)]
    pub encoding: EncodingConfig,

    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    pub acknowledgements: AcknowledgementsConfig,

    #[configurable(derived)]
    pub auth: Option<HttpServerAuthConfig>,
}

impl Default for WebSocketListenerSinkConfig {
    fn default() -> Self {
        Self {
            address: "0.0.0.0:8080".parse().unwrap(),
            encoding: JsonSerializerConfig::default().into(),
            tls: None,
            acknowledgements: Default::default(),
            auth: None,
        }
    }
}

#[async_trait::async_trait]
#[typetag::serde(name = "websocket_server")]
impl SinkConfig for WebSocketListenerSinkConfig {
    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        let ws_sink = WebSocketListenerSink::new(self.clone(), cx)?;

        Ok((
            VectorSink::from_event_streamsink(ws_sink),
            Box::pin(async move { Ok(()) }),
        ))
    }

    fn input(&self) -> Input {
        Input::new(self.encoding.config().input_type())
    }

    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}

impl_generate_config_from_default!(WebSocketListenerSinkConfig);

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn generate_config() {
        crate::test_util::test_generate_config::<WebSocketListenerSinkConfig>();
    }
}