vector/sources/file_descriptors/
file_descriptor.rs1use std::{
2 fs::File,
3 io,
4 os::fd::{FromRawFd as _, IntoRawFd as _, RawFd},
5};
6
7use vector_lib::{
8 codecs::decoding::{DeserializerConfig, FramingConfig},
9 config::LogNamespace,
10 configurable::configurable_component,
11 lookup::lookup_v2::OptionalValuePath,
12};
13
14use super::{FileDescriptorConfig, outputs};
15use crate::{
16 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17 serde::default_decoding,
18};
19#[configurable_component(source("file_descriptor", "Collect logs from a file descriptor."))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct FileDescriptorSourceConfig {
24 #[serde(default = "crate::serde::default_max_length")]
28 #[configurable(metadata(docs::type_unit = "bytes"))]
29 pub max_length: usize,
30
31 pub host_key: Option<OptionalValuePath>,
36
37 #[configurable(derived)]
38 pub framing: Option<FramingConfig>,
39
40 #[configurable(derived)]
41 #[serde(default = "default_decoding")]
42 pub decoding: DeserializerConfig,
43
44 #[configurable(metadata(docs::examples = 10))]
46 #[configurable(metadata(docs::human_name = "File Descriptor Number"))]
47 pub fd: u32,
48
49 #[configurable(metadata(docs::hidden))]
51 #[serde(default)]
52 log_namespace: Option<bool>,
53}
54
55impl FileDescriptorConfig for FileDescriptorSourceConfig {
56 fn host_key(&self) -> Option<OptionalValuePath> {
57 self.host_key.clone()
58 }
59
60 fn framing(&self) -> Option<FramingConfig> {
61 self.framing.clone()
62 }
63
64 fn decoding(&self) -> DeserializerConfig {
65 self.decoding.clone()
66 }
67
68 fn description(&self) -> String {
69 format!("file descriptor {}", self.fd)
70 }
71}
72
73impl GenerateConfig for FileDescriptorSourceConfig {
74 fn generate_config() -> toml::Value {
75 let fd = null_fd().unwrap();
76 toml::from_str(&format!(
77 r#"
78 fd = {fd}
79 "#
80 ))
81 .unwrap()
82 }
83}
84
85pub(crate) fn null_fd() -> crate::Result<RawFd> {
86 #[cfg(unix)]
87 const FILENAME: &str = "/dev/null";
88 #[cfg(windows)]
89 const FILENAME: &str = "C:\\NUL";
90 File::open(FILENAME)
91 .map_err(|error| format!("Could not open dummy file at {FILENAME:?}: {error}").into())
92 .map(|file| file.into_raw_fd())
93}
94
95#[async_trait::async_trait]
96#[typetag::serde(name = "file_descriptor")]
97impl SourceConfig for FileDescriptorSourceConfig {
98 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
99 let pipe = io::BufReader::new(unsafe { File::from_raw_fd(self.fd as i32) });
100 let log_namespace = cx.log_namespace(self.log_namespace);
101
102 self.source(pipe, cx.shutdown, cx.out, log_namespace)
103 }
104
105 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106 let log_namespace = global_log_namespace.merge(self.log_namespace);
107
108 outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
109 }
110
111 fn resources(&self) -> Vec<Resource> {
112 vec![Resource::Fd(self.fd)]
113 }
114
115 fn can_acknowledge(&self) -> bool {
116 false
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use futures::StreamExt;
123 use nix::unistd::{close, pipe, write};
124 use vector_lib::lookup::path;
125 use vrl::value;
126
127 use super::*;
128 use crate::{
129 SourceSender,
130 config::log_schema,
131 test_util::components::{
132 COMPONENT_ERROR_TAGS, SOURCE_TAGS, assert_source_compliance, assert_source_error,
133 },
134 };
135
136 #[test]
137 fn generate_config() {
138 crate::test_util::test_generate_config::<FileDescriptorSourceConfig>();
139 }
140
141 #[tokio::test]
142 async fn file_descriptor_decodes_line() {
143 assert_source_compliance(&SOURCE_TAGS, async {
144 let (tx, rx) = SourceSender::new_test();
145 let (read_fd, write_fd) = pipe().unwrap();
146 let config = FileDescriptorSourceConfig {
147 max_length: crate::serde::default_max_length(),
148 host_key: Default::default(),
149 framing: None,
150 decoding: default_decoding(),
151 fd: read_fd as u32,
152 log_namespace: None,
153 };
154
155 let mut stream = rx;
156
157 write(write_fd, b"hello world\nhello world again\n").unwrap();
158 close(write_fd).unwrap();
159
160 let context = SourceContext::new_test(tx, None);
161 config.build(context).await.unwrap().await.unwrap();
162
163 let event = stream.next().await;
164 let message_key = log_schema().message_key().unwrap().to_string();
165 assert_eq!(
166 Some("hello world".into()),
167 event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned())
168 );
169
170 let event = stream.next().await;
171 assert_eq!(
172 Some("hello world again".into()),
173 event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned())
174 );
175
176 let event = stream.next().await;
177 assert!(event.is_none());
178 })
179 .await;
180 }
181
182 #[tokio::test]
183 async fn file_descriptor_decodes_line_vector_namespace() {
184 assert_source_compliance(&SOURCE_TAGS, async {
185 let (tx, rx) = SourceSender::new_test();
186 let (read_fd, write_fd) = pipe().unwrap();
187 let config = FileDescriptorSourceConfig {
188 max_length: crate::serde::default_max_length(),
189 host_key: Default::default(),
190 framing: None,
191 decoding: default_decoding(),
192 fd: read_fd as u32,
193 log_namespace: Some(true),
194 };
195
196 let mut stream = rx;
197
198 write(write_fd, b"hello world\nhello world again\n").unwrap();
199 close(write_fd).unwrap();
200
201 let context = SourceContext::new_test(tx, None);
202 config.build(context).await.unwrap().await.unwrap();
203
204 let event = stream.next().await;
205 let event = event.unwrap();
206 let log = event.as_log();
207 let meta = log.metadata().value();
208
209 assert_eq!(&value!("hello world"), log.value());
210 assert_eq!(
211 meta.get(path!("vector", "source_type")).unwrap(),
212 &value!("file_descriptor")
213 );
214 assert!(
215 meta.get(path!("vector", "ingest_timestamp"))
216 .unwrap()
217 .is_timestamp()
218 );
219
220 let event = stream.next().await;
221 let event = event.unwrap();
222 let log = event.as_log();
223
224 assert_eq!(&value!("hello world again"), log.value());
225
226 let event = stream.next().await;
227 assert!(event.is_none());
228 })
229 .await;
230 }
231
232 #[tokio::test]
233 async fn file_descriptor_handles_invalid_fd() {
234 assert_source_error(&COMPONENT_ERROR_TAGS, async {
235 let (tx, rx) = SourceSender::new_test();
236 let (_read_fd, write_fd) = pipe().unwrap();
237 let config = FileDescriptorSourceConfig {
238 max_length: crate::serde::default_max_length(),
239 host_key: Default::default(),
240 framing: None,
241 decoding: default_decoding(),
242 fd: write_fd as u32, log_namespace: None,
244 };
245
246 let mut stream = rx;
247
248 write(write_fd, b"hello world\nhello world again\n").unwrap();
249
250 let context = SourceContext::new_test(tx, None);
251 config.build(context).await.unwrap().await.unwrap();
252
253 let event = stream.next().await;
256 assert!(event.is_none());
257 })
258 .await;
259 }
260}