use std::{
path::{Path, PathBuf},
time::Duration,
};
use std::{
sync::mpsc::{channel, Receiver},
thread,
};
use crate::config::ComponentConfig;
use notify::{recommended_watcher, EventKind, RecursiveMode};
use crate::Error;
const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
pub enum WatcherConfig {
RecommendedWatcher,
PollWatcher(u64),
}
enum Watcher {
RecommendedWatcher(notify::RecommendedWatcher),
PollWatcher(notify::PollWatcher),
}
impl Watcher {
fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
self.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
use notify::Watcher as NotifyWatcher;
match self {
Watcher::RecommendedWatcher(watcher) => {
watcher.watch(path, recursive_mode)?;
}
Watcher::PollWatcher(watcher) => {
watcher.watch(path, recursive_mode)?;
}
}
Ok(())
}
}
pub fn spawn_thread<'a>(
watcher_conf: WatcherConfig,
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
component_configs: Vec<ComponentConfig>,
delay: impl Into<Option<Duration>>,
) -> Result<(), Error> {
let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
let mut component_config_paths: Vec<_> = component_configs
.clone()
.into_iter()
.flat_map(|p| p.config_paths.clone())
.collect();
config_paths.append(&mut component_config_paths);
let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
info!("Watching configuration files.");
thread::spawn(move || loop {
if let Some((mut watcher, receiver)) = watcher.take() {
while let Ok(Ok(event)) = receiver.recv() {
if matches!(
event.kind,
EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
) {
debug!(message = "Configuration file change detected.", event = ?event);
while receiver.recv_timeout(delay).is_ok() {}
debug!(message = "Consumed file change events for delay.", delay = ?delay);
let component_keys: Vec<_> = component_configs
.clone()
.into_iter()
.flat_map(|p| p.contains(&event.paths))
.collect();
if let Err(error) = watcher.add_paths(&config_paths) {
error!(message = "Failed to read files to watch.", %error);
break;
}
debug!(message = "Reloaded paths.");
info!("Configuration file changed.");
if !component_keys.is_empty() {
info!("Component {:?} configuration changed.", component_keys);
_ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(component_keys)).map_err(|error| {
error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error)
});
} else {
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
.map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
}
} else {
debug!(message = "Ignoring event.", event = ?event)
}
}
}
thread::sleep(RETRY_TIMEOUT);
watcher = create_watcher(&watcher_conf, &config_paths)
.map_err(|error| error!(message = "Failed to create file watcher.", %error))
.ok();
if watcher.is_some() {
info!("Speculating that configuration files have changed.");
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
}
});
Ok(())
}
fn create_watcher(
watcher_conf: &WatcherConfig,
config_paths: &[PathBuf],
) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
info!("Creating configuration file watcher.");
let (sender, receiver) = channel();
let mut watcher = match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
Watcher::RecommendedWatcher(recommended_watcher)
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = notify::PollWatcher::new(sender, config)?;
Watcher::PollWatcher(poll_watcher)
}
};
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
#[cfg(all(test, unix, not(target_os = "macos")))] mod tests {
use super::*;
use crate::{
config::ComponentKey,
signal::SignalRx,
test_util::{temp_dir, temp_file, trace_init},
};
use std::{fs::File, io::Write, time::Duration};
use tokio::sync::broadcast;
async fn test(file: &mut File, timeout: Duration, mut receiver: SignalRx) -> bool {
file.write_all(&[0]).unwrap();
file.sync_all().unwrap();
matches!(
tokio::time::timeout(timeout, receiver.recv()).await,
Ok(Ok(crate::signal::SignalTo::ReloadFromDisk))
)
}
async fn test_component_reload(
file: &mut File,
expected_component: &ComponentKey,
timeout: Duration,
mut receiver: SignalRx,
) -> bool {
file.write_all(&[0]).unwrap();
file.sync_all().unwrap();
matches!(
tokio::time::timeout(timeout, receiver.recv()).await,
Ok(Ok(crate::signal::SignalTo::ReloadComponents(components))) if components.contains(expected_component)
)
}
#[tokio::test]
async fn component_update() {
trace_init();
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let watcher_conf = WatcherConfig::RecommendedWatcher;
let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
let http_component = ComponentKey::from("http");
std::fs::create_dir(&dir).unwrap();
let mut component_files: Vec<std::fs::File> = component_file_path
.iter()
.map(|file| File::create(file).unwrap())
.collect();
let component_config =
ComponentConfig::new(component_file_path.clone(), http_component.clone());
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(
watcher_conf,
signal_tx,
&[dir],
vec![component_config],
delay,
)
.unwrap();
let signal_rx = signal_rx.resubscribe();
let signal_rx2 = signal_rx.resubscribe();
if !test_component_reload(
&mut component_files[0],
&http_component,
delay * 5,
signal_rx,
)
.await
{
panic!("Test timed out");
}
if !test_component_reload(
&mut component_files[1],
&http_component,
delay * 5,
signal_rx2,
)
.await
{
panic!("Test timed out");
}
}
#[tokio::test]
async fn file_directory_update() {
trace_init();
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;
std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
#[tokio::test]
async fn file_update() {
trace_init();
let delay = Duration::from_secs(3);
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
#[tokio::test]
#[cfg(unix)]
async fn sym_file_update() {
trace_init();
let delay = Duration::from_secs(3);
let file_path = temp_file();
let sym_file = temp_file();
let mut file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
#[tokio::test]
async fn recursive_directory_file_update() {
trace_init();
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let sub_dir = dir.join("sources");
let file_path = sub_dir.join("input.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;
std::fs::create_dir_all(&sub_dir).unwrap();
let mut file = File::create(&file_path).unwrap();
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
}