use std::os::fd::{FromRawFd as _, IntoRawFd as _, RawFd};
use std::{fs::File, io};
use super::{outputs, FileDescriptorConfig};
use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
use vector_lib::config::LogNamespace;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use crate::{
config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
serde::default_decoding,
};
#[configurable_component(source("file_descriptor", "Collect logs from a file descriptor."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct FileDescriptorSourceConfig {
#[serde(default = "crate::serde::default_max_length")]
#[configurable(metadata(docs::type_unit = "bytes"))]
pub max_length: usize,
pub host_key: Option<OptionalValuePath>,
#[configurable(derived)]
pub framing: Option<FramingConfig>,
#[configurable(derived)]
#[serde(default = "default_decoding")]
pub decoding: DeserializerConfig,
#[configurable(metadata(docs::examples = 10))]
#[configurable(metadata(docs::human_name = "File Descriptor Number"))]
pub fd: u32,
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,
}
impl FileDescriptorConfig for FileDescriptorSourceConfig {
fn host_key(&self) -> Option<OptionalValuePath> {
self.host_key.clone()
}
fn framing(&self) -> Option<FramingConfig> {
self.framing.clone()
}
fn decoding(&self) -> DeserializerConfig {
self.decoding.clone()
}
fn description(&self) -> String {
format!("file descriptor {}", self.fd)
}
}
impl GenerateConfig for FileDescriptorSourceConfig {
fn generate_config() -> toml::Value {
let fd = null_fd().unwrap();
toml::from_str(&format!(
r#"
fd = {fd}
"#
))
.unwrap()
}
}
pub(crate) fn null_fd() -> crate::Result<RawFd> {
#[cfg(unix)]
const FILENAME: &str = "/dev/null";
#[cfg(windows)]
const FILENAME: &str = "C:\\NUL";
File::open(FILENAME)
.map_err(|error| format!("Could not open dummy file at {FILENAME:?}: {error}").into())
.map(|file| file.into_raw_fd())
}
#[async_trait::async_trait]
#[typetag::serde(name = "file_descriptor")]
impl SourceConfig for FileDescriptorSourceConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
let pipe = io::BufReader::new(unsafe { File::from_raw_fd(self.fd as i32) });
let log_namespace = cx.log_namespace(self.log_namespace);
self.source(pipe, cx.shutdown, cx.out, log_namespace)
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
outputs(log_namespace, &self.host_key, &self.decoding, Self::NAME)
}
fn resources(&self) -> Vec<Resource> {
vec![Resource::Fd(self.fd)]
}
fn can_acknowledge(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use nix::unistd::{close, pipe, write};
use vector_lib::lookup::path;
use super::*;
use crate::{
config::log_schema,
test_util::components::{
assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS, SOURCE_TAGS,
},
SourceSender,
};
use futures::StreamExt;
use vrl::value;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<FileDescriptorSourceConfig>();
}
#[tokio::test]
async fn file_descriptor_decodes_line() {
assert_source_compliance(&SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let (read_fd, write_fd) = pipe().unwrap();
let config = FileDescriptorSourceConfig {
max_length: crate::serde::default_max_length(),
host_key: Default::default(),
framing: None,
decoding: default_decoding(),
fd: read_fd as u32,
log_namespace: None,
};
let mut stream = rx;
write(write_fd, b"hello world\nhello world again\n").unwrap();
close(write_fd).unwrap();
let context = SourceContext::new_test(tx, None);
config.build(context).await.unwrap().await.unwrap();
let event = stream.next().await;
let message_key = log_schema().message_key().unwrap().to_string();
assert_eq!(
Some("hello world".into()),
event.map(|event| event.as_log()[&message_key].to_string_lossy().into_owned())
);
let event = stream.next().await;
assert_eq!(
Some("hello world again".into()),
event.map(|event| event.as_log()[message_key].to_string_lossy().into_owned())
);
let event = stream.next().await;
assert!(event.is_none());
})
.await;
}
#[tokio::test]
async fn file_descriptor_decodes_line_vector_namespace() {
assert_source_compliance(&SOURCE_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let (read_fd, write_fd) = pipe().unwrap();
let config = FileDescriptorSourceConfig {
max_length: crate::serde::default_max_length(),
host_key: Default::default(),
framing: None,
decoding: default_decoding(),
fd: read_fd as u32,
log_namespace: Some(true),
};
let mut stream = rx;
write(write_fd, b"hello world\nhello world again\n").unwrap();
close(write_fd).unwrap();
let context = SourceContext::new_test(tx, None);
config.build(context).await.unwrap().await.unwrap();
let event = stream.next().await;
let event = event.unwrap();
let log = event.as_log();
let meta = log.metadata().value();
assert_eq!(&value!("hello world"), log.value());
assert_eq!(
meta.get(path!("vector", "source_type")).unwrap(),
&value!("file_descriptor")
);
assert!(meta
.get(path!("vector", "ingest_timestamp"))
.unwrap()
.is_timestamp());
let event = stream.next().await;
let event = event.unwrap();
let log = event.as_log();
assert_eq!(&value!("hello world again"), log.value());
let event = stream.next().await;
assert!(event.is_none());
})
.await;
}
#[tokio::test]
async fn file_descriptor_handles_invalid_fd() {
assert_source_error(&COMPONENT_ERROR_TAGS, async {
let (tx, rx) = SourceSender::new_test();
let (_read_fd, write_fd) = pipe().unwrap();
let config = FileDescriptorSourceConfig {
max_length: crate::serde::default_max_length(),
host_key: Default::default(),
framing: None,
decoding: default_decoding(),
fd: write_fd as u32, log_namespace: None,
};
let mut stream = rx;
write(write_fd, b"hello world\nhello world again\n").unwrap();
let context = SourceContext::new_test(tx, None);
config.build(context).await.unwrap().await.unwrap();
let event = stream.next().await;
assert!(event.is_none());
})
.await;
}
}