vector/sources/file_descriptors/
file_descriptor.rs

1use std::os::fd::{FromRawFd as _, IntoRawFd as _, RawFd};
2use std::{fs::File, io};
3
4use super::{outputs, FileDescriptorConfig};
5use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
6use vector_lib::config::LogNamespace;
7use vector_lib::configurable::configurable_component;
8use vector_lib::lookup::lookup_v2::OptionalValuePath;
9
10use crate::{
11    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
12    serde::default_decoding,
13};
14/// Configuration for the `file_descriptor` source.
15#[configurable_component(source("file_descriptor", "Collect logs from a file descriptor."))]
16#[derive(Clone, Debug)]
17#[serde(deny_unknown_fields)]
18pub struct FileDescriptorSourceConfig {
19    /// The maximum buffer size, in bytes, of incoming messages.
20    ///
21    /// Messages larger than this are truncated.
22    #[serde(default = "crate::serde::default_max_length")]
23    #[configurable(metadata(docs::type_unit = "bytes"))]
24    pub max_length: usize,
25
26    /// Overrides the name of the log field used to add the current hostname to each event.
27    ///
28    ///
29    /// By default, the [global `host_key` option](https://vector.dev/docs/reference/configuration//global-options#log_schema.host_key) is used.
30    pub host_key: Option<OptionalValuePath>,
31
32    #[configurable(derived)]
33    pub framing: Option<FramingConfig>,
34
35    #[configurable(derived)]
36    #[serde(default = "default_decoding")]
37    pub decoding: DeserializerConfig,
38
39    /// The file descriptor number to read from.
40    #[configurable(metadata(docs::examples = 10))]
41    #[configurable(metadata(docs::human_name = "File Descriptor Number"))]
42    pub fd: u32,
43
44    /// The namespace to use for logs. This overrides the global setting.
45    #[configurable(metadata(docs::hidden))]
46    #[serde(default)]
47    log_namespace: Option<bool>,
48}
49
50impl FileDescriptorConfig for FileDescriptorSourceConfig {
51    fn host_key(&self) -> Option<OptionalValuePath> {
52        self.host_key.clone()
53    }
54
55    fn framing(&self) -> Option<FramingConfig> {
56        self.framing.clone()
57    }
58
59    fn decoding(&self) -> DeserializerConfig {
60        self.decoding.clone()
61    }
62
63    fn description(&self) -> String {
64        format!("file descriptor {}", self.fd)
65    }
66}
67
68impl GenerateConfig for FileDescriptorSourceConfig {
69    fn generate_config() -> toml::Value {
70        let fd = null_fd().unwrap();
71        toml::from_str(&format!(
72            r#"
73            fd = {fd}
74            "#
75        ))
76        .unwrap()
77    }
78}
79
80pub(crate) fn null_fd() -> crate::Result<RawFd> {
81    #[cfg(unix)]
82    const FILENAME: &str = "/dev/null";
83    #[cfg(windows)]
84    const FILENAME: &str = "C:\\NUL";
85    File::open(FILENAME)
86        .map_err(|error| format!("Could not open dummy file at {FILENAME:?}: {error}").into())
87        .map(|file| file.into_raw_fd())
88}
89
90#[async_trait::async_trait]
91#[typetag::serde(name = "file_descriptor")]
92impl SourceConfig for FileDescriptorSourceConfig {
93    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
94        let pipe = io::BufReader::new(unsafe { File::from_raw_fd(self.fd as i32) });
95        let log_namespace = cx.log_namespace(self.log_namespace);
96
97        self.source(pipe, cx.shutdown, cx.out, log_namespace)
98    }
99
100    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
101        let log_namespace = global_log_namespace.merge(self.log_namespace);
102
103        outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
104    }
105
106    fn resources(&self) -> Vec<Resource> {
107        vec![Resource::Fd(self.fd)]
108    }
109
110    fn can_acknowledge(&self) -> bool {
111        false
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use nix::unistd::{close, pipe, write};
118    use vector_lib::lookup::path;
119
120    use super::*;
121    use crate::{
122        config::log_schema,
123        test_util::components::{
124            assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS, SOURCE_TAGS,
125        },
126        SourceSender,
127    };
128    use futures::StreamExt;
129    use vrl::value;
130
131    #[test]
132    fn generate_config() {
133        crate::test_util::test_generate_config::<FileDescriptorSourceConfig>();
134    }
135
136    #[tokio::test]
137    async fn file_descriptor_decodes_line() {
138        assert_source_compliance(&SOURCE_TAGS, async {
139            let (tx, rx) = SourceSender::new_test();
140            let (read_fd, write_fd) = pipe().unwrap();
141            let config = FileDescriptorSourceConfig {
142                max_length: crate::serde::default_max_length(),
143                host_key: Default::default(),
144                framing: None,
145                decoding: default_decoding(),
146                fd: read_fd as u32,
147                log_namespace: None,
148            };
149
150            let mut stream = rx;
151
152            write(write_fd, b"hello world\nhello world again\n").unwrap();
153            close(write_fd).unwrap();
154
155            let context = SourceContext::new_test(tx, None);
156            config.build(context).await.unwrap().await.unwrap();
157
158            let event = stream.next().await;
159            let message_key = log_schema().message_key().unwrap().to_string();
160            assert_eq!(
161                Some("hello world".into()),
162                event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned())
163            );
164
165            let event = stream.next().await;
166            assert_eq!(
167                Some("hello world again".into()),
168                event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned())
169            );
170
171            let event = stream.next().await;
172            assert!(event.is_none());
173        })
174        .await;
175    }
176
177    #[tokio::test]
178    async fn file_descriptor_decodes_line_vector_namespace() {
179        assert_source_compliance(&SOURCE_TAGS, async {
180            let (tx, rx) = SourceSender::new_test();
181            let (read_fd, write_fd) = pipe().unwrap();
182            let config = FileDescriptorSourceConfig {
183                max_length: crate::serde::default_max_length(),
184                host_key: Default::default(),
185                framing: None,
186                decoding: default_decoding(),
187                fd: read_fd as u32,
188                log_namespace: Some(true),
189            };
190
191            let mut stream = rx;
192
193            write(write_fd, b"hello world\nhello world again\n").unwrap();
194            close(write_fd).unwrap();
195
196            let context = SourceContext::new_test(tx, None);
197            config.build(context).await.unwrap().await.unwrap();
198
199            let event = stream.next().await;
200            let event = event.unwrap();
201            let log = event.as_log();
202            let meta = log.metadata().value();
203
204            assert_eq!(&value!("hello world"), log.value());
205            assert_eq!(
206                meta.get(path!("vector", "source_type")).unwrap(),
207                &value!("file_descriptor")
208            );
209            assert!(meta
210                .get(path!("vector", "ingest_timestamp"))
211                .unwrap()
212                .is_timestamp());
213
214            let event = stream.next().await;
215            let event = event.unwrap();
216            let log = event.as_log();
217
218            assert_eq!(&value!("hello world again"), log.value());
219
220            let event = stream.next().await;
221            assert!(event.is_none());
222        })
223        .await;
224    }
225
226    #[tokio::test]
227    async fn file_descriptor_handles_invalid_fd() {
228        assert_source_error(&COMPONENT_ERROR_TAGS, async {
229            let (tx, rx) = SourceSender::new_test();
230            let (_read_fd, write_fd) = pipe().unwrap();
231            let config = FileDescriptorSourceConfig {
232                max_length: crate::serde::default_max_length(),
233                host_key: Default::default(),
234                framing: None,
235                decoding: default_decoding(),
236                fd: write_fd as u32, // intentionally giving the source a write-only fd
237                log_namespace: None,
238            };
239
240            let mut stream = rx;
241
242            write(write_fd, b"hello world\nhello world again\n").unwrap();
243
244            let context = SourceContext::new_test(tx, None);
245            config.build(context).await.unwrap().await.unwrap();
246
247            // The error "Bad file descriptor" will be logged when the source attempts to read
248            // for the first time.
249            let event = stream.next().await;
250            assert!(event.is_none());
251        })
252        .await;
253    }
254}