vector/sources/file_descriptors/
file_descriptor.rs

1use std::{
2    fs::File,
3    io,
4    os::fd::{FromRawFd as _, IntoRawFd as _, RawFd},
5};
6
7use vector_lib::{
8    codecs::decoding::{DeserializerConfig, FramingConfig},
9    config::LogNamespace,
10    configurable::configurable_component,
11    lookup::lookup_v2::OptionalValuePath,
12};
13
14use super::{FileDescriptorConfig, outputs};
15use crate::{
16    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17    serde::default_decoding,
18};
19/// Configuration for the `file_descriptor` source.
20#[configurable_component(source("file_descriptor", "Collect logs from a file descriptor."))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct FileDescriptorSourceConfig {
24    /// The maximum buffer size, in bytes, of incoming messages.
25    ///
26    /// Messages larger than this are truncated.
27    #[serde(default = "crate::serde::default_max_length")]
28    #[configurable(metadata(docs::type_unit = "bytes"))]
29    pub max_length: usize,
30
31    /// Overrides the name of the log field used to add the current hostname to each event.
32    ///
33    ///
34    /// By default, the [global `host_key` option](https://vector.dev/docs/reference/configuration//global-options#log_schema.host_key) is used.
35    pub host_key: Option<OptionalValuePath>,
36
37    #[configurable(derived)]
38    pub framing: Option<FramingConfig>,
39
40    #[configurable(derived)]
41    #[serde(default = "default_decoding")]
42    pub decoding: DeserializerConfig,
43
44    /// The file descriptor number to read from.
45    #[configurable(metadata(docs::examples = 10))]
46    #[configurable(metadata(docs::human_name = "File Descriptor Number"))]
47    pub fd: u32,
48
49    /// The namespace to use for logs. This overrides the global setting.
50    #[configurable(metadata(docs::hidden))]
51    #[serde(default)]
52    log_namespace: Option<bool>,
53}
54
55impl FileDescriptorConfig for FileDescriptorSourceConfig {
56    fn host_key(&self) -> Option<OptionalValuePath> {
57        self.host_key.clone()
58    }
59
60    fn framing(&self) -> Option<FramingConfig> {
61        self.framing.clone()
62    }
63
64    fn decoding(&self) -> DeserializerConfig {
65        self.decoding.clone()
66    }
67
68    fn description(&self) -> String {
69        format!("file descriptor {}", self.fd)
70    }
71}
72
73impl GenerateConfig for FileDescriptorSourceConfig {
74    fn generate_config() -> toml::Value {
75        let fd = null_fd().unwrap();
76        toml::from_str(&format!(
77            r#"
78            fd = {fd}
79            "#
80        ))
81        .unwrap()
82    }
83}
84
85pub(crate) fn null_fd() -> crate::Result<RawFd> {
86    #[cfg(unix)]
87    const FILENAME: &str = "/dev/null";
88    #[cfg(windows)]
89    const FILENAME: &str = "C:\\NUL";
90    File::open(FILENAME)
91        .map_err(|error| format!("Could not open dummy file at {FILENAME:?}: {error}").into())
92        .map(|file| file.into_raw_fd())
93}
94
95#[async_trait::async_trait]
96#[typetag::serde(name = "file_descriptor")]
97impl SourceConfig for FileDescriptorSourceConfig {
98    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
99        let pipe = io::BufReader::new(unsafe { File::from_raw_fd(self.fd as i32) });
100        let log_namespace = cx.log_namespace(self.log_namespace);
101
102        self.source(pipe, cx.shutdown, cx.out, log_namespace)
103    }
104
105    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106        let log_namespace = global_log_namespace.merge(self.log_namespace);
107
108        outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
109    }
110
111    fn resources(&self) -> Vec<Resource> {
112        vec![Resource::Fd(self.fd)]
113    }
114
115    fn can_acknowledge(&self) -> bool {
116        false
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use futures::StreamExt;
123    use nix::unistd::{close, pipe, write};
124    use std::os::fd::AsRawFd;
125    use vector_lib::lookup::path;
126    use vrl::value;
127
128    use super::*;
129    use crate::{
130        SourceSender,
131        config::log_schema,
132        test_util::components::{
133            COMPONENT_ERROR_TAGS, SOURCE_TAGS, assert_source_compliance, assert_source_error,
134        },
135    };
136
137    #[test]
138    fn generate_config() {
139        crate::test_util::test_generate_config::<FileDescriptorSourceConfig>();
140    }
141
142    #[tokio::test]
143    async fn file_descriptor_decodes_line() {
144        assert_source_compliance(&SOURCE_TAGS, async {
145            let (tx, rx) = SourceSender::new_test();
146            let (read_fd, write_fd) = pipe().unwrap();
147            let config = FileDescriptorSourceConfig {
148                max_length: crate::serde::default_max_length(),
149                host_key: Default::default(),
150                framing: None,
151                decoding: default_decoding(),
152                fd: read_fd.into_raw_fd() as u32,
153                log_namespace: None,
154            };
155
156            let mut stream = rx;
157
158            write(&write_fd, b"hello world\nhello world again\n").unwrap();
159            close(write_fd).unwrap();
160
161            let context = SourceContext::new_test(tx, None);
162            config.build(context).await.unwrap().await.unwrap();
163
164            let event = stream.next().await;
165            let message_key = log_schema().message_key().unwrap().to_string();
166            assert_eq!(
167                Some("hello world".into()),
168                event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned())
169            );
170
171            let event = stream.next().await;
172            assert_eq!(
173                Some("hello world again".into()),
174                event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned())
175            );
176
177            let event = stream.next().await;
178            assert!(event.is_none());
179        })
180        .await;
181    }
182
183    #[tokio::test]
184    async fn file_descriptor_decodes_line_vector_namespace() {
185        assert_source_compliance(&SOURCE_TAGS, async {
186            let (tx, rx) = SourceSender::new_test();
187            let (read_fd, write_fd) = pipe().unwrap();
188            let config = FileDescriptorSourceConfig {
189                max_length: crate::serde::default_max_length(),
190                host_key: Default::default(),
191                framing: None,
192                decoding: default_decoding(),
193                fd: read_fd.into_raw_fd() as u32,
194                log_namespace: Some(true),
195            };
196
197            let mut stream = rx;
198
199            write(&write_fd, b"hello world\nhello world again\n").unwrap();
200            close(write_fd).unwrap();
201
202            let context = SourceContext::new_test(tx, None);
203            config.build(context).await.unwrap().await.unwrap();
204
205            let event = stream.next().await;
206            let event = event.unwrap();
207            let log = event.as_log();
208            let meta = log.metadata().value();
209
210            assert_eq!(&value!("hello world"), log.value());
211            assert_eq!(
212                meta.get(path!("vector", "source_type")).unwrap(),
213                &value!("file_descriptor")
214            );
215            assert!(
216                meta.get(path!("vector", "ingest_timestamp"))
217                    .unwrap()
218                    .is_timestamp()
219            );
220
221            let event = stream.next().await;
222            let event = event.unwrap();
223            let log = event.as_log();
224
225            assert_eq!(&value!("hello world again"), log.value());
226
227            let event = stream.next().await;
228            assert!(event.is_none());
229        })
230        .await;
231    }
232
233    #[tokio::test]
234    async fn file_descriptor_handles_invalid_fd() {
235        assert_source_error(&COMPONENT_ERROR_TAGS, async {
236            let (tx, rx) = SourceSender::new_test();
237            let (_read_fd, write_fd) = pipe().unwrap();
238            let config = FileDescriptorSourceConfig {
239                max_length: crate::serde::default_max_length(),
240                host_key: Default::default(),
241                framing: None,
242                decoding: default_decoding(),
243                fd: write_fd.as_raw_fd() as u32, // intentionally giving the source a write-only fd
244                log_namespace: None,
245            };
246
247            let mut stream = rx;
248
249            write(&write_fd, b"hello world\nhello world again\n").unwrap();
250            // Consume the OwnedFd without closing it to avoid double-close
251            // with the File created in build().
252            let _ = write_fd.into_raw_fd();
253
254            let context = SourceContext::new_test(tx, None);
255            config.build(context).await.unwrap().await.unwrap();
256
257            // The error "Bad file descriptor" will be logged when the source attempts to read
258            // for the first time.
259            let event = stream.next().await;
260            assert!(event.is_none());
261        })
262        .await;
263    }
264}