vector/sources/file_descriptors/
file_descriptor.rs

1use std::{
2    fs::File,
3    io,
4    os::fd::{FromRawFd as _, IntoRawFd as _, RawFd},
5};
6
7use vector_lib::{
8    codecs::decoding::{DeserializerConfig, FramingConfig},
9    config::LogNamespace,
10    configurable::configurable_component,
11    lookup::lookup_v2::OptionalValuePath,
12};
13
14use super::{FileDescriptorConfig, outputs};
15use crate::{
16    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
17    serde::default_decoding,
18};
19/// Configuration for the `file_descriptor` source.
20#[configurable_component(source("file_descriptor", "Collect logs from a file descriptor."))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct FileDescriptorSourceConfig {
24    /// The maximum buffer size, in bytes, of incoming messages.
25    ///
26    /// Messages larger than this are truncated.
27    #[serde(default = "crate::serde::default_max_length")]
28    #[configurable(metadata(docs::type_unit = "bytes"))]
29    pub max_length: usize,
30
31    /// Overrides the name of the log field used to add the current hostname to each event.
32    ///
33    ///
34    /// By default, the [global `host_key` option](https://vector.dev/docs/reference/configuration//global-options#log_schema.host_key) is used.
35    pub host_key: Option<OptionalValuePath>,
36
37    #[configurable(derived)]
38    pub framing: Option<FramingConfig>,
39
40    #[configurable(derived)]
41    #[serde(default = "default_decoding")]
42    pub decoding: DeserializerConfig,
43
44    /// The file descriptor number to read from.
45    #[configurable(metadata(docs::examples = 10))]
46    #[configurable(metadata(docs::human_name = "File Descriptor Number"))]
47    pub fd: u32,
48
49    /// The namespace to use for logs. This overrides the global setting.
50    #[configurable(metadata(docs::hidden))]
51    #[serde(default)]
52    log_namespace: Option<bool>,
53}
54
55impl FileDescriptorConfig for FileDescriptorSourceConfig {
56    fn host_key(&self) -> Option<OptionalValuePath> {
57        self.host_key.clone()
58    }
59
60    fn framing(&self) -> Option<FramingConfig> {
61        self.framing.clone()
62    }
63
64    fn decoding(&self) -> DeserializerConfig {
65        self.decoding.clone()
66    }
67
68    fn description(&self) -> String {
69        format!("file descriptor {}", self.fd)
70    }
71}
72
73impl GenerateConfig for FileDescriptorSourceConfig {
74    fn generate_config() -> toml::Value {
75        let fd = null_fd().unwrap();
76        toml::from_str(&format!(
77            r#"
78            fd = {fd}
79            "#
80        ))
81        .unwrap()
82    }
83}
84
85pub(crate) fn null_fd() -> crate::Result<RawFd> {
86    #[cfg(unix)]
87    const FILENAME: &str = "/dev/null";
88    #[cfg(windows)]
89    const FILENAME: &str = "C:\\NUL";
90    File::open(FILENAME)
91        .map_err(|error| format!("Could not open dummy file at {FILENAME:?}: {error}").into())
92        .map(|file| file.into_raw_fd())
93}
94
95#[async_trait::async_trait]
96#[typetag::serde(name = "file_descriptor")]
97impl SourceConfig for FileDescriptorSourceConfig {
98    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
99        let pipe = io::BufReader::new(unsafe { File::from_raw_fd(self.fd as i32) });
100        let log_namespace = cx.log_namespace(self.log_namespace);
101
102        self.source(pipe, cx.shutdown, cx.out, log_namespace)
103    }
104
105    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
106        let log_namespace = global_log_namespace.merge(self.log_namespace);
107
108        outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
109    }
110
111    fn resources(&self) -> Vec<Resource> {
112        vec![Resource::Fd(self.fd)]
113    }
114
115    fn can_acknowledge(&self) -> bool {
116        false
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use futures::StreamExt;
123    use nix::unistd::{close, pipe, write};
124    use vector_lib::lookup::path;
125    use vrl::value;
126
127    use super::*;
128    use crate::{
129        SourceSender,
130        config::log_schema,
131        test_util::components::{
132            COMPONENT_ERROR_TAGS, SOURCE_TAGS, assert_source_compliance, assert_source_error,
133        },
134    };
135
136    #[test]
137    fn generate_config() {
138        crate::test_util::test_generate_config::<FileDescriptorSourceConfig>();
139    }
140
141    #[tokio::test]
142    async fn file_descriptor_decodes_line() {
143        assert_source_compliance(&SOURCE_TAGS, async {
144            let (tx, rx) = SourceSender::new_test();
145            let (read_fd, write_fd) = pipe().unwrap();
146            let config = FileDescriptorSourceConfig {
147                max_length: crate::serde::default_max_length(),
148                host_key: Default::default(),
149                framing: None,
150                decoding: default_decoding(),
151                fd: read_fd as u32,
152                log_namespace: None,
153            };
154
155            let mut stream = rx;
156
157            write(write_fd, b"hello world\nhello world again\n").unwrap();
158            close(write_fd).unwrap();
159
160            let context = SourceContext::new_test(tx, None);
161            config.build(context).await.unwrap().await.unwrap();
162
163            let event = stream.next().await;
164            let message_key = log_schema().message_key().unwrap().to_string();
165            assert_eq!(
166                Some("hello world".into()),
167                event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned())
168            );
169
170            let event = stream.next().await;
171            assert_eq!(
172                Some("hello world again".into()),
173                event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned())
174            );
175
176            let event = stream.next().await;
177            assert!(event.is_none());
178        })
179        .await;
180    }
181
182    #[tokio::test]
183    async fn file_descriptor_decodes_line_vector_namespace() {
184        assert_source_compliance(&SOURCE_TAGS, async {
185            let (tx, rx) = SourceSender::new_test();
186            let (read_fd, write_fd) = pipe().unwrap();
187            let config = FileDescriptorSourceConfig {
188                max_length: crate::serde::default_max_length(),
189                host_key: Default::default(),
190                framing: None,
191                decoding: default_decoding(),
192                fd: read_fd as u32,
193                log_namespace: Some(true),
194            };
195
196            let mut stream = rx;
197
198            write(write_fd, b"hello world\nhello world again\n").unwrap();
199            close(write_fd).unwrap();
200
201            let context = SourceContext::new_test(tx, None);
202            config.build(context).await.unwrap().await.unwrap();
203
204            let event = stream.next().await;
205            let event = event.unwrap();
206            let log = event.as_log();
207            let meta = log.metadata().value();
208
209            assert_eq!(&value!("hello world"), log.value());
210            assert_eq!(
211                meta.get(path!("vector", "source_type")).unwrap(),
212                &value!("file_descriptor")
213            );
214            assert!(
215                meta.get(path!("vector", "ingest_timestamp"))
216                    .unwrap()
217                    .is_timestamp()
218            );
219
220            let event = stream.next().await;
221            let event = event.unwrap();
222            let log = event.as_log();
223
224            assert_eq!(&value!("hello world again"), log.value());
225
226            let event = stream.next().await;
227            assert!(event.is_none());
228        })
229        .await;
230    }
231
232    #[tokio::test]
233    async fn file_descriptor_handles_invalid_fd() {
234        assert_source_error(&COMPONENT_ERROR_TAGS, async {
235            let (tx, rx) = SourceSender::new_test();
236            let (_read_fd, write_fd) = pipe().unwrap();
237            let config = FileDescriptorSourceConfig {
238                max_length: crate::serde::default_max_length(),
239                host_key: Default::default(),
240                framing: None,
241                decoding: default_decoding(),
242                fd: write_fd as u32, // intentionally giving the source a write-only fd
243                log_namespace: None,
244            };
245
246            let mut stream = rx;
247
248            write(write_fd, b"hello world\nhello world again\n").unwrap();
249
250            let context = SourceContext::new_test(tx, None);
251            config.build(context).await.unwrap().await.unwrap();
252
253            // The error "Bad file descriptor" will be logged when the source attempts to read
254            // for the first time.
255            let event = stream.next().await;
256            assert!(event.is_none());
257        })
258        .await;
259    }
260}