vector/sources/file_descriptors/
stdin.rs

1use std::io;
2
3use vector_lib::{
4    codecs::decoding::{DeserializerConfig, FramingConfig},
5    config::LogNamespace,
6    configurable::configurable_component,
7    lookup::lookup_v2::OptionalValuePath,
8};
9
10use super::{FileDescriptorConfig, outputs};
11use crate::{
12    config::{Resource, SourceConfig, SourceContext, SourceOutput},
13    serde::default_decoding,
14};
15
16/// Configuration for the `stdin` source.
17#[configurable_component(source("stdin", "Collect logs sent via stdin."))]
18#[derive(Clone, Debug)]
19#[serde(deny_unknown_fields, default)]
20pub struct StdinConfig {
21    /// The maximum buffer size, in bytes, of incoming messages.
22    ///
23    /// Messages larger than this are truncated.
24    #[configurable(metadata(docs::type_unit = "bytes"))]
25    #[serde(default = "crate::serde::default_max_length")]
26    pub max_length: usize,
27
28    /// Overrides the name of the log field used to add the current hostname to each event.
29    ///
30    ///
31    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
32    ///
33    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
34    pub host_key: Option<OptionalValuePath>,
35
36    #[configurable(derived)]
37    pub framing: Option<FramingConfig>,
38
39    #[configurable(derived)]
40    #[serde(default = "default_decoding")]
41    pub decoding: DeserializerConfig,
42
43    /// The namespace to use for logs. This overrides the global setting.
44    #[configurable(metadata(docs::hidden))]
45    #[serde(default)]
46    log_namespace: Option<bool>,
47}
48
49impl FileDescriptorConfig for StdinConfig {
50    fn host_key(&self) -> Option<OptionalValuePath> {
51        self.host_key.clone()
52    }
53
54    fn framing(&self) -> Option<FramingConfig> {
55        self.framing.clone()
56    }
57
58    fn decoding(&self) -> DeserializerConfig {
59        self.decoding.clone()
60    }
61
62    fn description(&self) -> String {
63        Self::NAME.to_string()
64    }
65}
66
67impl Default for StdinConfig {
68    fn default() -> Self {
69        StdinConfig {
70            max_length: crate::serde::default_max_length(),
71            host_key: Default::default(),
72            framing: None,
73            decoding: default_decoding(),
74            log_namespace: None,
75        }
76    }
77}
78
79impl_generate_config_from_default!(StdinConfig);
80
81#[async_trait::async_trait]
82#[typetag::serde(name = "stdin")]
83impl SourceConfig for StdinConfig {
84    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
85        let log_namespace = cx.log_namespace(self.log_namespace);
86        self.source(
87            io::BufReader::new(io::stdin()),
88            cx.shutdown,
89            cx.out,
90            log_namespace,
91        )
92    }
93
94    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
95        let log_namespace = global_log_namespace.merge(self.log_namespace);
96
97        outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
98    }
99
100    fn resources(&self) -> Vec<Resource> {
101        vec![Resource::Fd(0)]
102    }
103
104    fn can_acknowledge(&self) -> bool {
105        false
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use std::io::Cursor;
112
113    use futures::StreamExt;
114    use vector_lib::lookup::path;
115    use vrl::value;
116
117    use super::*;
118    use crate::{
119        SourceSender,
120        config::log_schema,
121        shutdown::ShutdownSignal,
122        test_util::components::{SOURCE_TAGS, assert_source_compliance},
123    };
124
125    #[test]
126    fn generate_config() {
127        crate::test_util::test_generate_config::<StdinConfig>();
128    }
129
130    #[tokio::test]
131    async fn stdin_decodes_line() {
132        assert_source_compliance(&SOURCE_TAGS, async {
133            let (tx, rx) = SourceSender::new_test();
134            let config = StdinConfig::default();
135            let buf = Cursor::new("hello world\nhello world again");
136
137            config
138                .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Legacy)
139                .unwrap()
140                .await
141                .unwrap();
142
143            let mut stream = rx;
144
145            let event = stream.next().await;
146            assert_eq!(
147                Some("hello world".into()),
148                event.map(
149                    |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
150                        .to_string_lossy()
151                        .into_owned()
152                )
153            );
154
155            let event = stream.next().await;
156            assert_eq!(
157                Some("hello world again".into()),
158                event.map(
159                    |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
160                        .to_string_lossy()
161                        .into_owned()
162                )
163            );
164
165            let event = stream.next().await;
166            assert!(event.is_none());
167        })
168        .await;
169    }
170
171    #[tokio::test]
172    async fn stdin_decodes_line_vector_namespace() {
173        assert_source_compliance(&SOURCE_TAGS, async {
174            let (tx, rx) = SourceSender::new_test();
175            let config = StdinConfig::default();
176            let buf = Cursor::new("hello world\nhello world again");
177
178            config
179                .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Vector)
180                .unwrap()
181                .await
182                .unwrap();
183
184            let mut stream = rx;
185
186            let event = stream.next().await;
187            let event = event.unwrap();
188            let log = event.as_log();
189            let meta = log.metadata().value();
190
191            assert_eq!(&value!("hello world"), log.value());
192            assert_eq!(
193                meta.get(path!("vector", "source_type")).unwrap(),
194                &value!("stdin")
195            );
196            assert!(
197                meta.get(path!("vector", "ingest_timestamp"))
198                    .unwrap()
199                    .is_timestamp()
200            );
201
202            let event = stream.next().await;
203            let event = event.unwrap();
204            let log = event.as_log();
205
206            assert_eq!(&value!("hello world again"), log.value());
207
208            let event = stream.next().await;
209            assert!(event.is_none());
210        })
211        .await;
212    }
213}