vector/sources/file_descriptors/
stdin.rs1use std::io;
2
3use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
4use vector_lib::config::LogNamespace;
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::lookup_v2::OptionalValuePath;
7
8use crate::{
9 config::{Resource, SourceConfig, SourceContext, SourceOutput},
10 serde::default_decoding,
11};
12
13use super::{outputs, FileDescriptorConfig};
14
15#[configurable_component(source("stdin", "Collect logs sent via stdin."))]
17#[derive(Clone, Debug)]
18#[serde(deny_unknown_fields, default)]
19pub struct StdinConfig {
20 #[configurable(metadata(docs::type_unit = "bytes"))]
24 #[serde(default = "crate::serde::default_max_length")]
25 pub max_length: usize,
26
27 pub host_key: Option<OptionalValuePath>,
34
35 #[configurable(derived)]
36 pub framing: Option<FramingConfig>,
37
38 #[configurable(derived)]
39 #[serde(default = "default_decoding")]
40 pub decoding: DeserializerConfig,
41
42 #[configurable(metadata(docs::hidden))]
44 #[serde(default)]
45 log_namespace: Option<bool>,
46}
47
48impl FileDescriptorConfig for StdinConfig {
49 fn host_key(&self) -> Option<OptionalValuePath> {
50 self.host_key.clone()
51 }
52
53 fn framing(&self) -> Option<FramingConfig> {
54 self.framing.clone()
55 }
56
57 fn decoding(&self) -> DeserializerConfig {
58 self.decoding.clone()
59 }
60
61 fn description(&self) -> String {
62 Self::NAME.to_string()
63 }
64}
65
66impl Default for StdinConfig {
67 fn default() -> Self {
68 StdinConfig {
69 max_length: crate::serde::default_max_length(),
70 host_key: Default::default(),
71 framing: None,
72 decoding: default_decoding(),
73 log_namespace: None,
74 }
75 }
76}
77
78impl_generate_config_from_default!(StdinConfig);
79
80#[async_trait::async_trait]
81#[typetag::serde(name = "stdin")]
82impl SourceConfig for StdinConfig {
83 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
84 let log_namespace = cx.log_namespace(self.log_namespace);
85 self.source(
86 io::BufReader::new(io::stdin()),
87 cx.shutdown,
88 cx.out,
89 log_namespace,
90 )
91 }
92
93 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
94 let log_namespace = global_log_namespace.merge(self.log_namespace);
95
96 outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
97 }
98
99 fn resources(&self) -> Vec<Resource> {
100 vec![Resource::Fd(0)]
101 }
102
103 fn can_acknowledge(&self) -> bool {
104 false
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use std::io::Cursor;
111
112 use super::*;
113 use crate::{
114 config::log_schema, shutdown::ShutdownSignal,
115 test_util::components::assert_source_compliance, test_util::components::SOURCE_TAGS,
116 SourceSender,
117 };
118 use futures::StreamExt;
119 use vector_lib::lookup::path;
120 use vrl::value;
121
122 #[test]
123 fn generate_config() {
124 crate::test_util::test_generate_config::<StdinConfig>();
125 }
126
127 #[tokio::test]
128 async fn stdin_decodes_line() {
129 assert_source_compliance(&SOURCE_TAGS, async {
130 let (tx, rx) = SourceSender::new_test();
131 let config = StdinConfig::default();
132 let buf = Cursor::new("hello world\nhello world again");
133
134 config
135 .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Legacy)
136 .unwrap()
137 .await
138 .unwrap();
139
140 let mut stream = rx;
141
142 let event = stream.next().await;
143 assert_eq!(
144 Some("hello world".into()),
145 event.map(
146 |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
147 .to_string_lossy()
148 .into_owned()
149 )
150 );
151
152 let event = stream.next().await;
153 assert_eq!(
154 Some("hello world again".into()),
155 event.map(
156 |event| event.as_log()[log_schema().message_key().unwrap().to_string()]
157 .to_string_lossy()
158 .into_owned()
159 )
160 );
161
162 let event = stream.next().await;
163 assert!(event.is_none());
164 })
165 .await;
166 }
167
168 #[tokio::test]
169 async fn stdin_decodes_line_vector_namespace() {
170 assert_source_compliance(&SOURCE_TAGS, async {
171 let (tx, rx) = SourceSender::new_test();
172 let config = StdinConfig::default();
173 let buf = Cursor::new("hello world\nhello world again");
174
175 config
176 .source(buf, ShutdownSignal::noop(), tx, LogNamespace::Vector)
177 .unwrap()
178 .await
179 .unwrap();
180
181 let mut stream = rx;
182
183 let event = stream.next().await;
184 let event = event.unwrap();
185 let log = event.as_log();
186 let meta = log.metadata().value();
187
188 assert_eq!(&value!("hello world"), log.value());
189 assert_eq!(
190 meta.get(path!("vector", "source_type")).unwrap(),
191 &value!("stdin")
192 );
193 assert!(meta
194 .get(path!("vector", "ingest_timestamp"))
195 .unwrap()
196 .is_timestamp());
197
198 let event = stream.next().await;
199 let event = event.unwrap();
200 let log = event.as_log();
201
202 assert_eq!(&value!("hello world again"), log.value());
203
204 let event = stream.next().await;
205 assert!(event.is_none());
206 })
207 .await;
208 }
209}