vector/sources/file_descriptors/
stdin.rs1use std::io;
2
3use vector_lib::{
4 codecs::decoding::{DeserializerConfig, FramingConfig},
5 config::LogNamespace,
6 configurable::configurable_component,
7 lookup::lookup_v2::OptionalValuePath,
8};
9
10use super::{FileDescriptorConfig, outputs};
11use crate::{
12 config::{Resource, SourceConfig, SourceContext, SourceOutput},
13 serde::default_decoding,
14};
15
16#[configurable_component(source("stdin", "Collect logs sent via stdin."))]
18#[derive(Clone, Debug)]
19#[serde(deny_unknown_fields, default)]
20pub struct StdinConfig {
21 #[configurable(metadata(docs::type_unit = "bytes"))]
25 #[serde(default = "crate::serde::default_max_length")]
26 pub max_length: usize,
27
28 pub host_key: Option<OptionalValuePath>,
35
36 #[configurable(derived)]
37 pub framing: Option<FramingConfig>,
38
39 #[configurable(derived)]
40 #[serde(default = "default_decoding")]
41 pub decoding: DeserializerConfig,
42
43 #[configurable(metadata(docs::hidden))]
45 #[serde(default)]
46 log_namespace: Option<bool>,
47}
48
49impl FileDescriptorConfig for StdinConfig {
50 fn host_key(&self) -> Option<OptionalValuePath> {
51 self.host_key.clone()
52 }
53
54 fn framing(&self) -> Option<FramingConfig> {
55 self.framing.clone()
56 }
57
58 fn decoding(&self) -> DeserializerConfig {
59 self.decoding.clone()
60 }
61
62 fn description(&self) -> String {
63 Self::NAME.to_string()
64 }
65}
66
67impl Default for StdinConfig {
68 fn default() -> Self {
69 StdinConfig {
70 max_length: crate::serde::default_max_length(),
71 host_key: Default::default(),
72 framing: None,
73 decoding: default_decoding(),
74 log_namespace: None,
75 }
76 }
77}
78
79impl_generate_config_from_default!(StdinConfig);
80
81#[async_trait::async_trait]
82#[typetag::serde(name = "stdin")]
83impl SourceConfig for StdinConfig {
84 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
85 let log_namespace = cx.log_namespace(self.log_namespace);
86 self.source(
87 io::BufReader::new(io::stdin()),
88 cx.shutdown,
89 cx.out,
90 log_namespace,
91 )
92 }
93
94 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
95 let log_namespace = global_log_namespace.merge(self.log_namespace);
96
97 outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
98 }
99
100 fn resources(&self) -> Vec<Resource> {
101 vec![Resource::Fd(0)]
102 }
103
104 fn can_acknowledge(&self) -> bool {
105 false
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use std::io::Cursor;
112
113 use futures::StreamExt;
114 use vector_lib::lookup::path;
115 use vrl::value;
116
117 use super::*;
118 use crate::{
119 SourceSender,
120 config::log_schema,
121 shutdown::ShutdownSignal,
122 test_util::components::{SOURCE_TAGS, assert_source_compliance},
123 };
124
125 #[test]
126 fn generate_config() {
127 crate::test_util::test_generate_config::<StdinConfig>();
128 }
129
130 #[tokio::test]
131 async fn stdin_decodes_line() {
132 assert_source_compliance(&SOURCE_TAGS, async {
133 let (tx, rx) = SourceSender::new_test();
134 let config = StdinConfig::default();
135 let buf = Cursor::new("hello world\nhello world again");
136
137 config
138 .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Legacy)
139 .unwrap()
140 .await
141 .unwrap();
142
143 let mut stream = rx;
144
145 let event = stream.next().await;
146 assert_eq!(
147 Some("hello world".into()),
148 event.map(
149 |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
150 .to_string_lossy()
151 .into_owned()
152 )
153 );
154
155 let event = stream.next().await;
156 assert_eq!(
157 Some("hello world again".into()),
158 event.map(
159 |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
160 .to_string_lossy()
161 .into_owned()
162 )
163 );
164
165 let event = stream.next().await;
166 assert!(event.is_none());
167 })
168 .await;
169 }
170
171 #[tokio::test]
172 async fn stdin_decodes_line_vector_namespace() {
173 assert_source_compliance(&SOURCE_TAGS, async {
174 let (tx, rx) = SourceSender::new_test();
175 let config = StdinConfig::default();
176 let buf = Cursor::new("hello world\nhello world again");
177
178 config
179 .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Vector)
180 .unwrap()
181 .await
182 .unwrap();
183
184 let mut stream = rx;
185
186 let event = stream.next().await;
187 let event = event.unwrap();
188 let log = event.as_log();
189 let meta = log.metadata().value();
190
191 assert_eq!(&value!("hello world"), log.value());
192 assert_eq!(
193 meta.get(path!("vector", "source_type")).unwrap(),
194 &value!("stdin")
195 );
196 assert!(
197 meta.get(path!("vector", "ingest_timestamp"))
198 .unwrap()
199 .is_timestamp()
200 );
201
202 let event = stream.next().await;
203 let event = event.unwrap();
204 let log = event.as_log();
205
206 assert_eq!(&value!("hello world again"), log.value());
207
208 let event = stream.next().await;
209 assert!(event.is_none());
210 })
211 .await;
212 }
213}