vector/sinks/websocket/
config.rs1use snafu::ResultExt;
2use vector_lib::{codecs::JsonSerializerConfig, configurable::configurable_component};
3
4use crate::{
5 codecs::EncodingConfig,
6 common::websocket::{ConnectSnafu, WebSocketCommonConfig, WebSocketConnector, WebSocketError},
7 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
8 sinks::{Healthcheck, VectorSink, websocket::sink::WebSocketSink},
9 tls::MaybeTlsSettings,
10};
11
12#[configurable_component(sink(
14 "websocket",
15 "Deliver observability event data to a websocket listener."
16))]
17#[derive(Clone, Debug)]
18pub struct WebSocketSinkConfig {
19 #[serde(flatten)]
20 pub common: WebSocketCommonConfig,
21
22 #[configurable(derived)]
23 pub encoding: EncodingConfig,
24
25 #[configurable(derived)]
26 #[serde(
27 default,
28 deserialize_with = "crate::serde::bool_or_struct",
29 skip_serializing_if = "crate::serde::is_default"
30 )]
31 pub acknowledgements: AcknowledgementsConfig,
32}
33
34impl GenerateConfig for WebSocketSinkConfig {
35 fn generate_config() -> toml::Value {
36 toml::Value::try_from(Self {
37 common: WebSocketCommonConfig {
38 ..Default::default()
39 },
40 encoding: JsonSerializerConfig::default().into(),
41 acknowledgements: Default::default(),
42 })
43 .unwrap()
44 }
45}
46
47#[async_trait::async_trait]
48#[typetag::serde(name = "websocket")]
49impl SinkConfig for WebSocketSinkConfig {
50 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
51 let connector = self.build_connector()?;
52 let ws_sink = WebSocketSink::new(self, connector.clone())?;
53
54 Ok((
55 VectorSink::from_event_streamsink(ws_sink),
56 Box::pin(async move { connector.healthcheck().await }),
57 ))
58 }
59
60 fn input(&self) -> Input {
61 Input::new(self.encoding.config().input_type())
62 }
63
64 fn acknowledgements(&self) -> &AcknowledgementsConfig {
65 &self.acknowledgements
66 }
67}
68
69impl WebSocketSinkConfig {
70 fn build_connector(&self) -> Result<WebSocketConnector, WebSocketError> {
71 let tls =
72 MaybeTlsSettings::from_config(self.common.tls.as_ref(), false).context(ConnectSnafu)?;
73 WebSocketConnector::new(self.common.uri.clone(), tls, self.common.auth.clone())
74 }
75}
76
77#[cfg(test)]
78mod test {
79 use super::*;
80
81 #[test]
82 fn generate_config() {
83 crate::test_util::test_generate_config::<WebSocketSinkConfig>();
84 }
85}