vector/sinks/console/
config.rs

1use futures::{future, FutureExt};
2use tokio::io;
3use vector_lib::codecs::{
4    encoding::{Framer, FramingConfig},
5    JsonSerializerConfig,
6};
7use vector_lib::configurable::configurable_component;
8
9use crate::{
10    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
11    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
12    sinks::{console::sink::WriterSink, Healthcheck, VectorSink},
13};
14
15/// The [standard stream][standard_streams] to write to.
16///
17/// [standard_streams]: https://en.wikipedia.org/wiki/Standard_streams
18#[configurable_component]
19#[derive(Clone, Debug, Derivative)]
20#[derivative(Default)]
21#[serde(rename_all = "lowercase")]
22pub enum Target {
23    /// Write output to [STDOUT][stdout].
24    ///
25    /// [stdout]: https://en.wikipedia.org/wiki/Standard_streams#Standard_output_(stdout)
26    #[derivative(Default)]
27    Stdout,
28
29    /// Write output to [STDERR][stderr].
30    ///
31    /// [stderr]: https://en.wikipedia.org/wiki/Standard_streams#Standard_error_(stderr)
32    Stderr,
33}
34
35/// Configuration for the `console` sink.
36#[configurable_component(sink(
37    "console",
38    "Display observability events in the console, which can be useful for debugging purposes."
39))]
40#[derive(Clone, Debug)]
41#[serde(deny_unknown_fields)]
42pub struct ConsoleSinkConfig {
43    #[configurable(derived)]
44    #[serde(default = "default_target")]
45    pub target: Target,
46
47    #[serde(flatten)]
48    pub encoding: EncodingConfigWithFraming,
49
50    #[configurable(derived)]
51    #[serde(
52        default,
53        deserialize_with = "crate::serde::bool_or_struct",
54        skip_serializing_if = "crate::serde::is_default"
55    )]
56    pub acknowledgements: AcknowledgementsConfig,
57}
58
59const fn default_target() -> Target {
60    Target::Stdout
61}
62
63impl GenerateConfig for ConsoleSinkConfig {
64    fn generate_config() -> toml::Value {
65        toml::Value::try_from(Self {
66            target: Target::Stdout,
67            encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
68            acknowledgements: Default::default(),
69        })
70        .unwrap()
71    }
72}
73
74#[async_trait::async_trait]
75#[typetag::serde(name = "console")]
76impl SinkConfig for ConsoleSinkConfig {
77    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
78        let transformer = self.encoding.transformer();
79        let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?;
80        let encoder = Encoder::<Framer>::new(framer, serializer);
81
82        let sink: VectorSink = match self.target {
83            Target::Stdout => VectorSink::from_event_streamsink(WriterSink {
84                output: io::stdout(),
85                transformer,
86                encoder,
87            }),
88            Target::Stderr => VectorSink::from_event_streamsink(WriterSink {
89                output: io::stderr(),
90                transformer,
91                encoder,
92            }),
93        };
94
95        Ok((sink, future::ok(()).boxed()))
96    }
97
98    fn input(&self) -> Input {
99        Input::new(self.encoding.config().1.input_type())
100    }
101
102    fn acknowledgements(&self) -> &AcknowledgementsConfig {
103        &self.acknowledgements
104    }
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110
111    #[test]
112    fn generate_config() {
113        crate::test_util::test_generate_config::<ConsoleSinkConfig>();
114    }
115}