use std::{
path::{Path, PathBuf},
time::Duration,
};
use std::{
sync::mpsc::{channel, Receiver},
thread,
};
use notify::{recommended_watcher, EventKind, RecursiveMode};
use crate::Error;
const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
pub enum WatcherConfig {
RecommendedWatcher,
PollWatcher(u64),
}
enum Watcher {
RecommendedWatcher(notify::RecommendedWatcher),
PollWatcher(notify::PollWatcher),
}
impl Watcher {
fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
self.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
use notify::Watcher as NotifyWatcher;
match self {
Watcher::RecommendedWatcher(watcher) => {
watcher.watch(path, recursive_mode)?;
}
Watcher::PollWatcher(watcher) => {
watcher.watch(path, recursive_mode)?;
}
}
Ok(())
}
}
pub fn spawn_thread<'a>(
watcher_conf: WatcherConfig,
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
delay: impl Into<Option<Duration>>,
) -> Result<(), Error> {
let config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
info!("Watching configuration files.");
thread::spawn(move || loop {
if let Some((mut watcher, receiver)) = watcher.take() {
while let Ok(Ok(event)) = receiver.recv() {
if matches!(
event.kind,
EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
) {
debug!(message = "Configuration file change detected.", event = ?event);
while receiver.recv_timeout(delay).is_ok() {}
debug!(message = "Consumed file change events for delay.", delay = ?delay);
if let Err(error) = watcher.add_paths(&config_paths) {
error!(message = "Failed to read files to watch.", %error);
break;
}
debug!(message = "Reloaded paths.");
info!("Configuration file changed.");
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
} else {
debug!(message = "Ignoring event.", event = ?event)
}
}
}
thread::sleep(RETRY_TIMEOUT);
watcher = create_watcher(&watcher_conf, &config_paths)
.map_err(|error| error!(message = "Failed to create file watcher.", %error))
.ok();
if watcher.is_some() {
info!("Speculating that configuration files have changed.");
_ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
});
}
});
Ok(())
}
fn create_watcher(
watcher_conf: &WatcherConfig,
config_paths: &[PathBuf],
) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
info!("Creating configuration file watcher.");
let (sender, receiver) = channel();
let mut watcher = match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
Watcher::RecommendedWatcher(recommended_watcher)
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = notify::PollWatcher::new(sender, config)?;
Watcher::PollWatcher(poll_watcher)
}
};
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}
#[cfg(all(test, unix, not(target_os = "macos")))] mod tests {
use super::*;
use crate::{
signal::SignalRx,
test_util::{temp_dir, temp_file, trace_init},
};
use std::{fs::File, io::Write, time::Duration};
use tokio::sync::broadcast;
async fn test(file: &mut File, timeout: Duration, mut receiver: SignalRx) -> bool {
file.write_all(&[0]).unwrap();
file.sync_all().unwrap();
matches!(
tokio::time::timeout(timeout, receiver.recv()).await,
Ok(Ok(crate::signal::SignalTo::ReloadFromDisk))
)
}
#[tokio::test]
async fn file_directory_update() {
trace_init();
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;
std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
#[tokio::test]
async fn file_update() {
trace_init();
let delay = Duration::from_secs(3);
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
#[tokio::test]
#[cfg(unix)]
async fn sym_file_update() {
trace_init();
let delay = Duration::from_secs(3);
let file_path = temp_file();
let sym_file = temp_file();
let mut file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
#[tokio::test]
async fn recursive_directory_file_update() {
trace_init();
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let sub_dir = dir.join("sources");
let file_path = sub_dir.join("input.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;
std::fs::create_dir_all(&sub_dir).unwrap();
let mut file = File::create(&file_path).unwrap();
let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();
if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
}
}
}