vector/sources/file_descriptors/
file_descriptor.rs1use std::os::fd::{FromRawFd as _, IntoRawFd as _, RawFd};
2use std::{fs::File, io};
3
4use super::{outputs, FileDescriptorConfig};
5use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
6use vector_lib::config::LogNamespace;
7use vector_lib::configurable::configurable_component;
8use vector_lib::lookup::lookup_v2::OptionalValuePath;
9
10use crate::{
11 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
12 serde::default_decoding,
13};
14#[configurable_component(source("file_descriptor", "Collect logs from a file descriptor."))]
16#[derive(Clone, Debug)]
17#[serde(deny_unknown_fields)]
18pub struct FileDescriptorSourceConfig {
19 #[serde(default = "crate::serde::default_max_length")]
23 #[configurable(metadata(docs::type_unit = "bytes"))]
24 pub max_length: usize,
25
26 pub host_key: Option<OptionalValuePath>,
31
32 #[configurable(derived)]
33 pub framing: Option<FramingConfig>,
34
35 #[configurable(derived)]
36 #[serde(default = "default_decoding")]
37 pub decoding: DeserializerConfig,
38
39 #[configurable(metadata(docs::examples = 10))]
41 #[configurable(metadata(docs::human_name = "File Descriptor Number"))]
42 pub fd: u32,
43
44 #[configurable(metadata(docs::hidden))]
46 #[serde(default)]
47 log_namespace: Option<bool>,
48}
49
50impl FileDescriptorConfig for FileDescriptorSourceConfig {
51 fn host_key(&self) -> Option<OptionalValuePath> {
52 self.host_key.clone()
53 }
54
55 fn framing(&self) -> Option<FramingConfig> {
56 self.framing.clone()
57 }
58
59 fn decoding(&self) -> DeserializerConfig {
60 self.decoding.clone()
61 }
62
63 fn description(&self) -> String {
64 format!("file descriptor {}", self.fd)
65 }
66}
67
68impl GenerateConfig for FileDescriptorSourceConfig {
69 fn generate_config() -> toml::Value {
70 let fd = null_fd().unwrap();
71 toml::from_str(&format!(
72 r#"
73 fd = {fd}
74 "#
75 ))
76 .unwrap()
77 }
78}
79
80pub(crate) fn null_fd() -> crate::Result<RawFd> {
81 #[cfg(unix)]
82 const FILENAME: &str = "/dev/null";
83 #[cfg(windows)]
84 const FILENAME: &str = "C:\\NUL";
85 File::open(FILENAME)
86 .map_err(|error| format!("Could not open dummy file at {FILENAME:?}: {error}").into())
87 .map(|file| file.into_raw_fd())
88}
89
90#[async_trait::async_trait]
91#[typetag::serde(name = "file_descriptor")]
92impl SourceConfig for FileDescriptorSourceConfig {
93 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
94 let pipe = io::BufReader::new(unsafe { File::from_raw_fd(self.fd as i32) });
95 let log_namespace = cx.log_namespace(self.log_namespace);
96
97 self.source(pipe, cx.shutdown, cx.out, log_namespace)
98 }
99
100 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
101 let log_namespace = global_log_namespace.merge(self.log_namespace);
102
103 outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
104 }
105
106 fn resources(&self) -> Vec<Resource> {
107 vec![Resource::Fd(self.fd)]
108 }
109
110 fn can_acknowledge(&self) -> bool {
111 false
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use nix::unistd::{close, pipe, write};
118 use vector_lib::lookup::path;
119
120 use super::*;
121 use crate::{
122 config::log_schema,
123 test_util::components::{
124 assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS, SOURCE_TAGS,
125 },
126 SourceSender,
127 };
128 use futures::StreamExt;
129 use vrl::value;
130
131 #[test]
132 fn generate_config() {
133 crate::test_util::test_generate_config::<FileDescriptorSourceConfig>();
134 }
135
136 #[tokio::test]
137 async fn file_descriptor_decodes_line() {
138 assert_source_compliance(&SOURCE_TAGS, async {
139 let (tx, rx) = SourceSender::new_test();
140 let (read_fd, write_fd) = pipe().unwrap();
141 let config = FileDescriptorSourceConfig {
142 max_length: crate::serde::default_max_length(),
143 host_key: Default::default(),
144 framing: None,
145 decoding: default_decoding(),
146 fd: read_fd as u32,
147 log_namespace: None,
148 };
149
150 let mut stream = rx;
151
152 write(write_fd, b"hello world\nhello world again\n").unwrap();
153 close(write_fd).unwrap();
154
155 let context = SourceContext::new_test(tx, None);
156 config.build(context).await.unwrap().await.unwrap();
157
158 let event = stream.next().await;
159 let message_key = log_schema().message_key().unwrap().to_string();
160 assert_eq!(
161 Some("hello world".into()),
162 event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned())
163 );
164
165 let event = stream.next().await;
166 assert_eq!(
167 Some("hello world again".into()),
168 event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned())
169 );
170
171 let event = stream.next().await;
172 assert!(event.is_none());
173 })
174 .await;
175 }
176
177 #[tokio::test]
178 async fn file_descriptor_decodes_line_vector_namespace() {
179 assert_source_compliance(&SOURCE_TAGS, async {
180 let (tx, rx) = SourceSender::new_test();
181 let (read_fd, write_fd) = pipe().unwrap();
182 let config = FileDescriptorSourceConfig {
183 max_length: crate::serde::default_max_length(),
184 host_key: Default::default(),
185 framing: None,
186 decoding: default_decoding(),
187 fd: read_fd as u32,
188 log_namespace: Some(true),
189 };
190
191 let mut stream = rx;
192
193 write(write_fd, b"hello world\nhello world again\n").unwrap();
194 close(write_fd).unwrap();
195
196 let context = SourceContext::new_test(tx, None);
197 config.build(context).await.unwrap().await.unwrap();
198
199 let event = stream.next().await;
200 let event = event.unwrap();
201 let log = event.as_log();
202 let meta = log.metadata().value();
203
204 assert_eq!(&value!("hello world"), log.value());
205 assert_eq!(
206 meta.get(path!("vector", "source_type")).unwrap(),
207 &value!("file_descriptor")
208 );
209 assert!(meta
210 .get(path!("vector", "ingest_timestamp"))
211 .unwrap()
212 .is_timestamp());
213
214 let event = stream.next().await;
215 let event = event.unwrap();
216 let log = event.as_log();
217
218 assert_eq!(&value!("hello world again"), log.value());
219
220 let event = stream.next().await;
221 assert!(event.is_none());
222 })
223 .await;
224 }
225
226 #[tokio::test]
227 async fn file_descriptor_handles_invalid_fd() {
228 assert_source_error(&COMPONENT_ERROR_TAGS, async {
229 let (tx, rx) = SourceSender::new_test();
230 let (_read_fd, write_fd) = pipe().unwrap();
231 let config = FileDescriptorSourceConfig {
232 max_length: crate::serde::default_max_length(),
233 host_key: Default::default(),
234 framing: None,
235 decoding: default_decoding(),
236 fd: write_fd as u32, log_namespace: None,
238 };
239
240 let mut stream = rx;
241
242 write(write_fd, b"hello world\nhello world again\n").unwrap();
243
244 let context = SourceContext::new_test(tx, None);
245 config.build(context).await.unwrap().await.unwrap();
246
247 let event = stream.next().await;
250 assert!(event.is_none());
251 })
252 .await;
253 }
254}