vector/sources/file_descriptors/
file_descriptor.rs1use std::{
2 fs::File,
3 io,
4 os::fd::{FromRawFd as _, IntoRawFd as _, RawFd},
5};
6
7use vector_lib::{
8 codecs::decoding::{DeserializerConfig, FramingConfig},
9 config::LogNamespace,
10 configurable::configurable_component,
11 lookup::lookup_v2::OptionalValuePath,
12};
13
14use super::{FileDescriptorConfig, outputs};
15use crate::{
16 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17 serde::default_decoding,
18};
19#[configurable_component(source("file_descriptor", "Collect logs from a file descriptor."))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct FileDescriptorSourceConfig {
24 #[serde(default = "crate::serde::default_max_length")]
28 #[configurable(metadata(docs::type_unit = "bytes"))]
29 pub max_length: usize,
30
31 pub host_key: Option<OptionalValuePath>,
36
37 #[configurable(derived)]
38 pub framing: Option<FramingConfig>,
39
40 #[configurable(derived)]
41 #[serde(default = "default_decoding")]
42 pub decoding: DeserializerConfig,
43
44 #[configurable(metadata(docs::examples = 10))]
46 #[configurable(metadata(docs::human_name = "File Descriptor Number"))]
47 pub fd: u32,
48
49 #[configurable(metadata(docs::hidden))]
51 #[serde(default)]
52 log_namespace: Option<bool>,
53}
54
55impl FileDescriptorConfig for FileDescriptorSourceConfig {
56 fn host_key(&self) -> Option<OptionalValuePath> {
57 self.host_key.clone()
58 }
59
60 fn framing(&self) -> Option<FramingConfig> {
61 self.framing.clone()
62 }
63
64 fn decoding(&self) -> DeserializerConfig {
65 self.decoding.clone()
66 }
67
68 fn description(&self) -> String {
69 format!("file descriptor {}", self.fd)
70 }
71}
72
73impl GenerateConfig for FileDescriptorSourceConfig {
74 fn generate_config() -> toml::Value {
75 let fd = null_fd().unwrap();
76 toml::from_str(&format!(
77 r#"
78 fd = {fd}
79 "#
80 ))
81 .unwrap()
82 }
83}
84
85pub(crate) fn null_fd() -> crate::Result<RawFd> {
86 #[cfg(unix)]
87 const FILENAME: &str = "/dev/null";
88 #[cfg(windows)]
89 const FILENAME: &str = "C:\\NUL";
90 File::open(FILENAME)
91 .map_err(|error| format!("Could not open dummy file at {FILENAME:?}: {error}").into())
92 .map(|file| file.into_raw_fd())
93}
94
95#[async_trait::async_trait]
96#[typetag::serde(name = "file_descriptor")]
97impl SourceConfig for FileDescriptorSourceConfig {
98 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
99 let pipe = io::BufReader::new(unsafe { File::from_raw_fd(self.fd as i32) });
100 let log_namespace = cx.log_namespace(self.log_namespace);
101
102 self.source(pipe, cx.shutdown, cx.out, log_namespace)
103 }
104
105 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106 let log_namespace = global_log_namespace.merge(self.log_namespace);
107
108 outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
109 }
110
111 fn resources(&self) -> Vec<Resource> {
112 vec![Resource::Fd(self.fd)]
113 }
114
115 fn can_acknowledge(&self) -> bool {
116 false
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use futures::StreamExt;
123 use nix::unistd::{close, pipe, write};
124 use std::os::fd::AsRawFd;
125 use vector_lib::lookup::path;
126 use vrl::value;
127
128 use super::*;
129 use crate::{
130 SourceSender,
131 config::log_schema,
132 test_util::components::{
133 COMPONENT_ERROR_TAGS, SOURCE_TAGS, assert_source_compliance, assert_source_error,
134 },
135 };
136
137 #[test]
138 fn generate_config() {
139 crate::test_util::test_generate_config::<FileDescriptorSourceConfig>();
140 }
141
142 #[tokio::test]
143 async fn file_descriptor_decodes_line() {
144 assert_source_compliance(&SOURCE_TAGS, async {
145 let (tx, rx) = SourceSender::new_test();
146 let (read_fd, write_fd) = pipe().unwrap();
147 let config = FileDescriptorSourceConfig {
148 max_length: crate::serde::default_max_length(),
149 host_key: Default::default(),
150 framing: None,
151 decoding: default_decoding(),
152 fd: read_fd.into_raw_fd() as u32,
153 log_namespace: None,
154 };
155
156 let mut stream = rx;
157
158 write(&write_fd, b"hello world\nhello world again\n").unwrap();
159 close(write_fd).unwrap();
160
161 let context = SourceContext::new_test(tx, None);
162 config.build(context).await.unwrap().await.unwrap();
163
164 let event = stream.next().await;
165 let message_key = log_schema().message_key().unwrap().to_string();
166 assert_eq!(
167 Some("hello world".into()),
168 event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned())
169 );
170
171 let event = stream.next().await;
172 assert_eq!(
173 Some("hello world again".into()),
174 event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned())
175 );
176
177 let event = stream.next().await;
178 assert!(event.is_none());
179 })
180 .await;
181 }
182
183 #[tokio::test]
184 async fn file_descriptor_decodes_line_vector_namespace() {
185 assert_source_compliance(&SOURCE_TAGS, async {
186 let (tx, rx) = SourceSender::new_test();
187 let (read_fd, write_fd) = pipe().unwrap();
188 let config = FileDescriptorSourceConfig {
189 max_length: crate::serde::default_max_length(),
190 host_key: Default::default(),
191 framing: None,
192 decoding: default_decoding(),
193 fd: read_fd.into_raw_fd() as u32,
194 log_namespace: Some(true),
195 };
196
197 let mut stream = rx;
198
199 write(&write_fd, b"hello world\nhello world again\n").unwrap();
200 close(write_fd).unwrap();
201
202 let context = SourceContext::new_test(tx, None);
203 config.build(context).await.unwrap().await.unwrap();
204
205 let event = stream.next().await;
206 let event = event.unwrap();
207 let log = event.as_log();
208 let meta = log.metadata().value();
209
210 assert_eq!(&value!("hello world"), log.value());
211 assert_eq!(
212 meta.get(path!("vector", "source_type")).unwrap(),
213 &value!("file_descriptor")
214 );
215 assert!(
216 meta.get(path!("vector", "ingest_timestamp"))
217 .unwrap()
218 .is_timestamp()
219 );
220
221 let event = stream.next().await;
222 let event = event.unwrap();
223 let log = event.as_log();
224
225 assert_eq!(&value!("hello world again"), log.value());
226
227 let event = stream.next().await;
228 assert!(event.is_none());
229 })
230 .await;
231 }
232
233 #[tokio::test]
234 async fn file_descriptor_handles_invalid_fd() {
235 assert_source_error(&COMPONENT_ERROR_TAGS, async {
236 let (tx, rx) = SourceSender::new_test();
237 let (_read_fd, write_fd) = pipe().unwrap();
238 let config = FileDescriptorSourceConfig {
239 max_length: crate::serde::default_max_length(),
240 host_key: Default::default(),
241 framing: None,
242 decoding: default_decoding(),
243 fd: write_fd.as_raw_fd() as u32, log_namespace: None,
245 };
246
247 let mut stream = rx;
248
249 write(&write_fd, b"hello world\nhello world again\n").unwrap();
250 let _ = write_fd.into_raw_fd();
253
254 let context = SourceContext::new_test(tx, None);
255 config.build(context).await.unwrap().await.unwrap();
256
257 let event = stream.next().await;
260 assert!(event.is_none());
261 })
262 .await;
263 }
264}