vector/sources/file_descriptors/
stdin.rs

1use std::io;
2
3use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
4use vector_lib::config::LogNamespace;
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::lookup_v2::OptionalValuePath;
7
8use crate::{
9    config::{Resource, SourceConfig, SourceContext, SourceOutput},
10    serde::default_decoding,
11};
12
13use super::{outputs, FileDescriptorConfig};
14
15/// Configuration for the `stdin` source.
16#[configurable_component(source("stdin", "Collect logs sent via stdin."))]
17#[derive(Clone, Debug)]
18#[serde(deny_unknown_fields, default)]
19pub struct StdinConfig {
20    /// The maximum buffer size, in bytes, of incoming messages.
21    ///
22    /// Messages larger than this are truncated.
23    #[configurable(metadata(docs::type_unit = "bytes"))]
24    #[serde(default = "crate::serde::default_max_length")]
25    pub max_length: usize,
26
27    /// Overrides the name of the log field used to add the current hostname to each event.
28    ///
29    ///
30    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
31    ///
32    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
33    pub host_key: Option<OptionalValuePath>,
34
35    #[configurable(derived)]
36    pub framing: Option<FramingConfig>,
37
38    #[configurable(derived)]
39    #[serde(default = "default_decoding")]
40    pub decoding: DeserializerConfig,
41
42    /// The namespace to use for logs. This overrides the global setting.
43    #[configurable(metadata(docs::hidden))]
44    #[serde(default)]
45    log_namespace: Option<bool>,
46}
47
48impl FileDescriptorConfig for StdinConfig {
49    fn host_key(&self) -> Option<OptionalValuePath> {
50        self.host_key.clone()
51    }
52
53    fn framing(&self) -> Option<FramingConfig> {
54        self.framing.clone()
55    }
56
57    fn decoding(&self) -> DeserializerConfig {
58        self.decoding.clone()
59    }
60
61    fn description(&self) -> String {
62        Self::NAME.to_string()
63    }
64}
65
66impl Default for StdinConfig {
67    fn default() -> Self {
68        StdinConfig {
69            max_length: crate::serde::default_max_length(),
70            host_key: Default::default(),
71            framing: None,
72            decoding: default_decoding(),
73            log_namespace: None,
74        }
75    }
76}
77
78impl_generate_config_from_default!(StdinConfig);
79
80#[async_trait::async_trait]
81#[typetag::serde(name = "stdin")]
82impl SourceConfig for StdinConfig {
83    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
84        let log_namespace = cx.log_namespace(self.log_namespace);
85        self.source(
86            io::BufReader::new(io::stdin()),
87            cx.shutdown,
88            cx.out,
89            log_namespace,
90        )
91    }
92
93    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
94        let log_namespace = global_log_namespace.merge(self.log_namespace);
95
96        outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
97    }
98
99    fn resources(&self) -> Vec<Resource> {
100        vec![Resource::Fd(0)]
101    }
102
103    fn can_acknowledge(&self) -> bool {
104        false
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use std::io::Cursor;
111
112    use super::*;
113    use crate::{
114        config::log_schema, shutdown::ShutdownSignal,
115        test_util::components::assert_source_compliance, test_util::components::SOURCE_TAGS,
116        SourceSender,
117    };
118    use futures::StreamExt;
119    use vector_lib::lookup::path;
120    use vrl::value;
121
122    #[test]
123    fn generate_config() {
124        crate::test_util::test_generate_config::<StdinConfig>();
125    }
126
127    #[tokio::test]
128    async fn stdin_decodes_line() {
129        assert_source_compliance(&SOURCE_TAGS, async {
130            let (tx, rx) = SourceSender::new_test();
131            let config = StdinConfig::default();
132            let buf = Cursor::new("hello world\nhello world again");
133
134            config
135                .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Legacy)
136                .unwrap()
137                .await
138                .unwrap();
139
140            let mut stream = rx;
141
142            let event = stream.next().await;
143            assert_eq!(
144                Some("hello world".into()),
145                event.map(
146                    |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
147                        .to_string_lossy()
148                        .into_owned()
149                )
150            );
151
152            let event = stream.next().await;
153            assert_eq!(
154                Some("hello world again".into()),
155                event.map(
156                    |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
157                        .to_string_lossy()
158                        .into_owned()
159                )
160            );
161
162            let event = stream.next().await;
163            assert!(event.is_none());
164        })
165        .await;
166    }
167
168    #[tokio::test]
169    async fn stdin_decodes_line_vector_namespace() {
170        assert_source_compliance(&SOURCE_TAGS, async {
171            let (tx, rx) = SourceSender::new_test();
172            let config = StdinConfig::default();
173            let buf = Cursor::new("hello world\nhello world again");
174
175            config
176                .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Vector)
177                .unwrap()
178                .await
179                .unwrap();
180
181            let mut stream = rx;
182
183            let event = stream.next().await;
184            let event = event.unwrap();
185            let log = event.as_log();
186            let meta = log.metadata().value();
187
188            assert_eq!(&value!("hello world"), log.value());
189            assert_eq!(
190                meta.get(path!("vector", "source_type")).unwrap(),
191                &value!("stdin")
192            );
193            assert!(meta
194                .get(path!("vector", "ingest_timestamp"))
195                .unwrap()
196                .is_timestamp());
197
198            let event = stream.next().await;
199            let event = event.unwrap();
200            let log = event.as_log();
201
202            assert_eq!(&value!("hello world again"), log.value());
203
204            let event = stream.next().await;
205            assert!(event.is_none());
206        })
207        .await;
208    }
209}