vector/sinks/console/
config.rs

1use futures::{FutureExt, future};
2use tokio::io;
3use vector_lib::{
4    codecs::{
5        JsonSerializerConfig,
6        encoding::{Framer, FramingConfig},
7    },
8    configurable::configurable_component,
9};
10
11use crate::{
12    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
13    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
14    sinks::{Healthcheck, VectorSink, console::sink::WriterSink},
15};
16
17/// The [standard stream][standard_streams] to write to.
18///
19/// [standard_streams]: https://en.wikipedia.org/wiki/Standard_streams
20#[configurable_component]
21#[derive(Clone, Debug, Derivative)]
22#[derivative(Default)]
23#[serde(rename_all = "lowercase")]
24pub enum Target {
25    /// Write output to [STDOUT][stdout].
26    ///
27    /// [stdout]: https://en.wikipedia.org/wiki/Standard_streams#Standard_output_(stdout)
28    #[derivative(Default)]
29    Stdout,
30
31    /// Write output to [STDERR][stderr].
32    ///
33    /// [stderr]: https://en.wikipedia.org/wiki/Standard_streams#Standard_error_(stderr)
34    Stderr,
35}
36
37/// Configuration for the `console` sink.
38#[configurable_component(sink(
39    "console",
40    "Display observability events in the console, which can be useful for debugging purposes."
41))]
42#[derive(Clone, Debug)]
43#[serde(deny_unknown_fields)]
44pub struct ConsoleSinkConfig {
45    #[configurable(derived)]
46    #[serde(default = "default_target")]
47    pub target: Target,
48
49    #[serde(flatten)]
50    pub encoding: EncodingConfigWithFraming,
51
52    #[configurable(derived)]
53    #[serde(
54        default,
55        deserialize_with = "crate::serde::bool_or_struct",
56        skip_serializing_if = "crate::serde::is_default"
57    )]
58    pub acknowledgements: AcknowledgementsConfig,
59}
60
61const fn default_target() -> Target {
62    Target::Stdout
63}
64
65impl GenerateConfig for ConsoleSinkConfig {
66    fn generate_config() -> toml::Value {
67        toml::Value::try_from(Self {
68            target: Target::Stdout,
69            encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
70            acknowledgements: Default::default(),
71        })
72        .unwrap()
73    }
74}
75
76#[async_trait::async_trait]
77#[typetag::serde(name = "console")]
78impl SinkConfig for ConsoleSinkConfig {
79    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
80        let transformer = self.encoding.transformer();
81        let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?;
82        let encoder = Encoder::<Framer>::new(framer, serializer);
83
84        let sink: VectorSink = match self.target {
85            Target::Stdout => VectorSink::from_event_streamsink(WriterSink {
86                output: io::stdout(),
87                transformer,
88                encoder,
89            }),
90            Target::Stderr => VectorSink::from_event_streamsink(WriterSink {
91                output: io::stderr(),
92                transformer,
93                encoder,
94            }),
95        };
96
97        Ok((sink, future::ok(()).boxed()))
98    }
99
100    fn input(&self) -> Input {
101        Input::new(self.encoding.config().1.input_type())
102    }
103
104    fn acknowledgements(&self) -> &AcknowledgementsConfig {
105        &self.acknowledgements
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112
113    #[test]
114    fn generate_config() {
115        crate::test_util::test_generate_config::<ConsoleSinkConfig>();
116    }
117}