1use notify::{EventKind, RecursiveMode, recommended_watcher};
2use std::{
3 collections::{HashMap, HashSet},
4 path::{Path, PathBuf},
5 sync::mpsc::{Receiver, channel},
6 thread,
7 time::Duration,
8};
9
10use crate::{
11 Error,
12 config::{ComponentConfig, ComponentType},
13};
14
15const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
23
24const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
25
26pub enum WatcherConfig {
28 RecommendedWatcher,
30 PollWatcher(u64),
32}
33
34enum Watcher {
35 RecommendedWatcher(notify::RecommendedWatcher),
37 PollWatcher(notify::PollWatcher),
39}
40
41impl Watcher {
42 fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
43 for path in config_paths {
44 if path.exists() {
45 self.watch(path, RecursiveMode::Recursive)?;
46 } else {
47 debug!(message = "Skipping non-existent path.", path = ?path);
48 }
49 }
50 Ok(())
51 }
52
53 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
54 use notify::Watcher as NotifyWatcher;
55 match self {
56 Watcher::RecommendedWatcher(watcher) => {
57 watcher.watch(path, recursive_mode)?;
58 }
59 Watcher::PollWatcher(watcher) => {
60 watcher.watch(path, recursive_mode)?;
61 }
62 }
63 Ok(())
64 }
65}
66
67pub fn spawn_thread<'a>(
72 watcher_conf: WatcherConfig,
73 signal_tx: crate::signal::SignalTx,
74 config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
75 component_configs: Vec<ComponentConfig>,
76 delay: impl Into<Option<Duration>>,
77) -> Result<(), Error> {
78 let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
79 let mut component_config_paths: Vec<_> = component_configs
80 .clone()
81 .into_iter()
82 .flat_map(|p| p.config_paths.clone())
83 .collect();
84
85 config_paths.append(&mut component_config_paths);
86
87 let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
88
89 let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
92
93 info!("Watching configuration files.");
94
95 thread::spawn(move || {
96 loop {
97 if let Some((mut watcher, receiver)) = watcher.take() {
98 while let Ok(Ok(event)) = receiver.recv() {
99 if matches!(
100 event.kind,
101 EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
102 ) {
103 debug!(message = "Configuration file change detected.", event = ?event);
104
105 let mut changed_paths: HashSet<PathBuf> = event.paths.into_iter().collect();
107
108 while let Ok(Ok(subseq_event)) = receiver.recv_timeout(delay) {
110 if matches!(
111 subseq_event.kind,
112 EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
113 ) {
114 changed_paths.extend(subseq_event.paths);
115 }
116 }
117
118 debug!(
119 message = "Collected file change events during delay period.",
120 paths = changed_paths.len(),
121 delay = ?delay
122 );
123
124 let changed_components: HashMap<_, _> = component_configs
125 .clone()
126 .into_iter()
127 .flat_map(|p| p.contains(&changed_paths))
128 .collect();
129
130 if let Err(error) = watcher.add_paths(&config_paths) {
133 error!(message = "Failed to read files to watch.", %error);
134 break;
135 }
136
137 debug!(message = "Reloaded paths.");
138
139 info!("Configuration file changed.");
140 if !changed_components.is_empty() {
141 info!(
142 "Component {:?} configuration changed.",
143 changed_components.keys()
144 );
145 if changed_components
146 .iter()
147 .all(|(_, t)| *t == ComponentType::EnrichmentTable)
148 {
149 info!("Only enrichment tables have changed.");
150 _ = signal_tx
151 .send(crate::signal::SignalTo::ReloadEnrichmentTables)
152 .map_err(|error| {
153 error!(
154 message = "Unable to reload enrichment tables.",
155 cause = %error,
156 internal_log_rate_limit = false,
157 )
158 });
159 } else {
160 _ = signal_tx
161 .send(crate::signal::SignalTo::ReloadComponents(
162 changed_components.into_keys().collect(),
163 ))
164 .map_err(|error| {
165 error!(
166 message = "Unable to reload component configuration. Restart Vector to reload it.",
167 cause = %error,
168 internal_log_rate_limit = false,
169 )
170 });
171 }
172 } else {
173 _ = signal_tx
174 .send(crate::signal::SignalTo::ReloadFromDisk)
175 .map_err(|error| {
176 error!(
177 message = "Unable to reload configuration file. Restart Vector to reload it.",
178 cause = %error,
179 internal_log_rate_limit = false,
180 )
181 });
182 }
183 } else {
184 debug!(message = "Ignoring event.", event = ?event)
185 }
186 }
187 }
188
189 thread::sleep(RETRY_TIMEOUT);
190
191 watcher = create_watcher(&watcher_conf, &config_paths)
192 .map_err(|error| error!(message = "Failed to create file watcher.", %error))
193 .ok();
194
195 if watcher.is_some() {
196 info!("Speculating that configuration files have changed.");
200 _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
201 error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
202 });
203 }
204 }
205 });
206
207 Ok(())
208}
209
210fn create_watcher(
211 watcher_conf: &WatcherConfig,
212 config_paths: &[PathBuf],
213) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
214 info!("Creating configuration file watcher.");
215
216 let (sender, receiver) = channel();
217 let mut watcher = match watcher_conf {
218 WatcherConfig::RecommendedWatcher => {
219 let recommended_watcher = recommended_watcher(sender)?;
220 Watcher::RecommendedWatcher(recommended_watcher)
221 }
222 WatcherConfig::PollWatcher(interval) => {
223 let config =
224 notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
225 let poll_watcher = notify::PollWatcher::new(sender, config)?;
226 Watcher::PollWatcher(poll_watcher)
227 }
228 };
229 watcher.add_paths(config_paths)?;
230 Ok((watcher, receiver))
231}
232
233#[cfg(all(test, unix, not(target_os = "macos")))] mod tests {
235 use std::{collections::HashSet, fs::File, io::Write, time::Duration};
236
237 use tokio::sync::broadcast;
238
239 use super::*;
240 use crate::{
241 config::ComponentKey,
242 signal::SignalRx,
243 test_util::{temp_dir, temp_file, trace_init},
244 };
245
246 async fn test_signal(
247 file: &mut File,
248 expected_signal: crate::signal::SignalTo,
249 timeout: Duration,
250 mut receiver: SignalRx,
251 ) -> bool {
252 file.write_all(&[0]).unwrap();
253 file.sync_all().unwrap();
254
255 match tokio::time::timeout(timeout, receiver.recv()).await {
256 Ok(Ok(signal)) => signal == expected_signal,
257 _ => false,
258 }
259 }
260
261 #[tokio::test]
262 async fn component_update() {
263 trace_init();
264
265 let delay = Duration::from_secs(3);
266 let dir = temp_dir().to_path_buf();
267 let watcher_conf = WatcherConfig::RecommendedWatcher;
268 let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
269 let http_component = ComponentKey::from("http");
270
271 std::fs::create_dir(&dir).unwrap();
272
273 let mut component_files: Vec<std::fs::File> = component_file_path
274 .iter()
275 .map(|file| File::create(file).unwrap())
276 .collect();
277 let component_config = ComponentConfig::new(
278 component_file_path.clone(),
279 http_component.clone(),
280 ComponentType::Sink,
281 );
282
283 let (signal_tx, signal_rx) = broadcast::channel(128);
284 spawn_thread(
285 watcher_conf,
286 signal_tx,
287 &[dir],
288 vec![component_config],
289 delay,
290 )
291 .unwrap();
292
293 let signal_rx = signal_rx.resubscribe();
294 let signal_rx2 = signal_rx.resubscribe();
295
296 if !test_signal(
297 &mut component_files[0],
298 crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
299 http_component.clone(),
300 ])),
301 delay * 5,
302 signal_rx,
303 )
304 .await
305 {
306 panic!("Test timed out");
307 }
308
309 if !test_signal(
310 &mut component_files[1],
311 crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
312 http_component.clone(),
313 ])),
314 delay * 5,
315 signal_rx2,
316 )
317 .await
318 {
319 panic!("Test timed out");
320 }
321 }
322 #[tokio::test]
323 async fn file_directory_update() {
324 trace_init();
325
326 let delay = Duration::from_secs(3);
327 let dir = temp_dir().to_path_buf();
328 let file_path = dir.join("vector.toml");
329 let watcher_conf = WatcherConfig::RecommendedWatcher;
330
331 std::fs::create_dir(&dir).unwrap();
332 let mut file = File::create(&file_path).unwrap();
333
334 let (signal_tx, signal_rx) = broadcast::channel(128);
335 spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
336
337 if !test_signal(
338 &mut file,
339 crate::signal::SignalTo::ReloadFromDisk,
340 delay * 5,
341 signal_rx,
342 )
343 .await
344 {
345 panic!("Test timed out");
346 }
347 }
348
349 #[tokio::test]
350 async fn file_update() {
351 trace_init();
352
353 let delay = Duration::from_secs(3);
354 let file_path = temp_file();
355 let mut file = File::create(&file_path).unwrap();
356 let watcher_conf = WatcherConfig::RecommendedWatcher;
357
358 let (signal_tx, signal_rx) = broadcast::channel(128);
359 spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
360
361 if !test_signal(
362 &mut file,
363 crate::signal::SignalTo::ReloadFromDisk,
364 delay * 5,
365 signal_rx,
366 )
367 .await
368 {
369 panic!("Test timed out");
370 }
371 }
372
373 #[tokio::test]
374 #[cfg(unix)]
375 async fn sym_file_update() {
376 trace_init();
377
378 let delay = Duration::from_secs(3);
379 let file_path = temp_file();
380 let sym_file = temp_file();
381 let mut file = File::create(&file_path).unwrap();
382 std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
383
384 let watcher_conf = WatcherConfig::RecommendedWatcher;
385
386 let (signal_tx, signal_rx) = broadcast::channel(128);
387 spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
388
389 if !test_signal(
390 &mut file,
391 crate::signal::SignalTo::ReloadFromDisk,
392 delay * 5,
393 signal_rx,
394 )
395 .await
396 {
397 panic!("Test timed out");
398 }
399 }
400
401 #[tokio::test]
402 async fn recursive_directory_file_update() {
403 trace_init();
404
405 let delay = Duration::from_secs(3);
406 let dir = temp_dir().to_path_buf();
407 let sub_dir = dir.join("sources");
408 let file_path = sub_dir.join("input.toml");
409 let watcher_conf = WatcherConfig::RecommendedWatcher;
410
411 std::fs::create_dir_all(&sub_dir).unwrap();
412 let mut file = File::create(&file_path).unwrap();
413
414 let (signal_tx, signal_rx) = broadcast::channel(128);
415 spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
416
417 if !test_signal(
418 &mut file,
419 crate::signal::SignalTo::ReloadFromDisk,
420 delay * 5,
421 signal_rx,
422 )
423 .await
424 {
425 panic!("Test timed out");
426 }
427 }
428}