1use std::{
2 collections::HashMap,
3 path::{Path, PathBuf},
4 sync::mpsc::{Receiver, channel},
5 thread,
6 time::Duration,
7};
8
9use notify::{EventKind, RecursiveMode, recommended_watcher};
10
11use crate::{
12 Error,
13 config::{ComponentConfig, ComponentType},
14};
15
16const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
24
25const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
26
27pub enum WatcherConfig {
29 RecommendedWatcher,
31 PollWatcher(u64),
33}
34
35enum Watcher {
36 RecommendedWatcher(notify::RecommendedWatcher),
38 PollWatcher(notify::PollWatcher),
40}
41
42impl Watcher {
43 fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
44 for path in config_paths {
45 self.watch(path, RecursiveMode::Recursive)?;
46 }
47 Ok(())
48 }
49
50 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
51 use notify::Watcher as NotifyWatcher;
52 match self {
53 Watcher::RecommendedWatcher(watcher) => {
54 watcher.watch(path, recursive_mode)?;
55 }
56 Watcher::PollWatcher(watcher) => {
57 watcher.watch(path, recursive_mode)?;
58 }
59 }
60 Ok(())
61 }
62}
63
64pub fn spawn_thread<'a>(
69 watcher_conf: WatcherConfig,
70 signal_tx: crate::signal::SignalTx,
71 config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
72 component_configs: Vec<ComponentConfig>,
73 delay: impl Into<Option<Duration>>,
74) -> Result<(), Error> {
75 let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
76 let mut component_config_paths: Vec<_> = component_configs
77 .clone()
78 .into_iter()
79 .flat_map(|p| p.config_paths.clone())
80 .collect();
81
82 config_paths.append(&mut component_config_paths);
83
84 let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
85
86 let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
89
90 info!("Watching configuration files.");
91
92 thread::spawn(move || {
93 loop {
94 if let Some((mut watcher, receiver)) = watcher.take() {
95 while let Ok(Ok(event)) = receiver.recv() {
96 if matches!(
97 event.kind,
98 EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
99 ) {
100 debug!(message = "Configuration file change detected.", event = ?event);
101
102 while receiver.recv_timeout(delay).is_ok() {}
104
105 debug!(message = "Consumed file change events for delay.", delay = ?delay);
106
107 let changed_components: HashMap<_, _> = component_configs
108 .clone()
109 .into_iter()
110 .flat_map(|p| p.contains(&event.paths))
111 .collect();
112
113 if let Err(error) = watcher.add_paths(&config_paths) {
116 error!(message = "Failed to read files to watch.", %error);
117 break;
118 }
119
120 debug!(message = "Reloaded paths.");
121
122 info!("Configuration file changed.");
123 if !changed_components.is_empty() {
124 info!(
125 internal_log_rate_limit = true,
126 "Component {:?} configuration changed.",
127 changed_components.keys()
128 );
129 if changed_components
130 .iter()
131 .all(|(_, t)| *t == ComponentType::EnrichmentTable)
132 {
133 info!(
134 internal_log_rate_limit = true,
135 "Only enrichment tables have changed."
136 );
137 _ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| {
138 error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true)
139 });
140 } else {
141 _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(changed_components.into_keys().collect())).map_err(|error| {
142 error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
143 });
144 }
145 } else {
146 _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
147 .map_err(|error| {
148 error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
149 });
150 }
151 } else {
152 debug!(message = "Ignoring event.", event = ?event)
153 }
154 }
155 }
156
157 thread::sleep(RETRY_TIMEOUT);
158
159 watcher = create_watcher(&watcher_conf, &config_paths)
160 .map_err(|error| error!(message = "Failed to create file watcher.", %error))
161 .ok();
162
163 if watcher.is_some() {
164 info!("Speculating that configuration files have changed.");
168 _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
169 error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
170 });
171 }
172 }
173 });
174
175 Ok(())
176}
177
178fn create_watcher(
179 watcher_conf: &WatcherConfig,
180 config_paths: &[PathBuf],
181) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
182 info!("Creating configuration file watcher.");
183
184 let (sender, receiver) = channel();
185 let mut watcher = match watcher_conf {
186 WatcherConfig::RecommendedWatcher => {
187 let recommended_watcher = recommended_watcher(sender)?;
188 Watcher::RecommendedWatcher(recommended_watcher)
189 }
190 WatcherConfig::PollWatcher(interval) => {
191 let config =
192 notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
193 let poll_watcher = notify::PollWatcher::new(sender, config)?;
194 Watcher::PollWatcher(poll_watcher)
195 }
196 };
197 watcher.add_paths(config_paths)?;
198 Ok((watcher, receiver))
199}
200
201#[cfg(all(test, unix, not(target_os = "macos")))] mod tests {
203 use std::{collections::HashSet, fs::File, io::Write, time::Duration};
204
205 use tokio::sync::broadcast;
206
207 use super::*;
208 use crate::{
209 config::ComponentKey,
210 signal::SignalRx,
211 test_util::{temp_dir, temp_file, trace_init},
212 };
213
214 async fn test_signal(
215 file: &mut File,
216 expected_signal: crate::signal::SignalTo,
217 timeout: Duration,
218 mut receiver: SignalRx,
219 ) -> bool {
220 file.write_all(&[0]).unwrap();
221 file.sync_all().unwrap();
222
223 match tokio::time::timeout(timeout, receiver.recv()).await {
224 Ok(Ok(signal)) => signal == expected_signal,
225 _ => false,
226 }
227 }
228
229 #[tokio::test]
230 async fn component_update() {
231 trace_init();
232
233 let delay = Duration::from_secs(3);
234 let dir = temp_dir().to_path_buf();
235 let watcher_conf = WatcherConfig::RecommendedWatcher;
236 let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
237 let http_component = ComponentKey::from("http");
238
239 std::fs::create_dir(&dir).unwrap();
240
241 let mut component_files: Vec<std::fs::File> = component_file_path
242 .iter()
243 .map(|file| File::create(file).unwrap())
244 .collect();
245 let component_config = ComponentConfig::new(
246 component_file_path.clone(),
247 http_component.clone(),
248 ComponentType::Sink,
249 );
250
251 let (signal_tx, signal_rx) = broadcast::channel(128);
252 spawn_thread(
253 watcher_conf,
254 signal_tx,
255 &[dir],
256 vec![component_config],
257 delay,
258 )
259 .unwrap();
260
261 let signal_rx = signal_rx.resubscribe();
262 let signal_rx2 = signal_rx.resubscribe();
263
264 if !test_signal(
265 &mut component_files[0],
266 crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
267 http_component.clone(),
268 ])),
269 delay * 5,
270 signal_rx,
271 )
272 .await
273 {
274 panic!("Test timed out");
275 }
276
277 if !test_signal(
278 &mut component_files[1],
279 crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
280 http_component.clone(),
281 ])),
282 delay * 5,
283 signal_rx2,
284 )
285 .await
286 {
287 panic!("Test timed out");
288 }
289 }
290 #[tokio::test]
291 async fn file_directory_update() {
292 trace_init();
293
294 let delay = Duration::from_secs(3);
295 let dir = temp_dir().to_path_buf();
296 let file_path = dir.join("vector.toml");
297 let watcher_conf = WatcherConfig::RecommendedWatcher;
298
299 std::fs::create_dir(&dir).unwrap();
300 let mut file = File::create(&file_path).unwrap();
301
302 let (signal_tx, signal_rx) = broadcast::channel(128);
303 spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
304
305 if !test_signal(
306 &mut file,
307 crate::signal::SignalTo::ReloadFromDisk,
308 delay * 5,
309 signal_rx,
310 )
311 .await
312 {
313 panic!("Test timed out");
314 }
315 }
316
317 #[tokio::test]
318 async fn file_update() {
319 trace_init();
320
321 let delay = Duration::from_secs(3);
322 let file_path = temp_file();
323 let mut file = File::create(&file_path).unwrap();
324 let watcher_conf = WatcherConfig::RecommendedWatcher;
325
326 let (signal_tx, signal_rx) = broadcast::channel(128);
327 spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
328
329 if !test_signal(
330 &mut file,
331 crate::signal::SignalTo::ReloadFromDisk,
332 delay * 5,
333 signal_rx,
334 )
335 .await
336 {
337 panic!("Test timed out");
338 }
339 }
340
341 #[tokio::test]
342 #[cfg(unix)]
343 async fn sym_file_update() {
344 trace_init();
345
346 let delay = Duration::from_secs(3);
347 let file_path = temp_file();
348 let sym_file = temp_file();
349 let mut file = File::create(&file_path).unwrap();
350 std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
351
352 let watcher_conf = WatcherConfig::RecommendedWatcher;
353
354 let (signal_tx, signal_rx) = broadcast::channel(128);
355 spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
356
357 if !test_signal(
358 &mut file,
359 crate::signal::SignalTo::ReloadFromDisk,
360 delay * 5,
361 signal_rx,
362 )
363 .await
364 {
365 panic!("Test timed out");
366 }
367 }
368
369 #[tokio::test]
370 async fn recursive_directory_file_update() {
371 trace_init();
372
373 let delay = Duration::from_secs(3);
374 let dir = temp_dir().to_path_buf();
375 let sub_dir = dir.join("sources");
376 let file_path = sub_dir.join("input.toml");
377 let watcher_conf = WatcherConfig::RecommendedWatcher;
378
379 std::fs::create_dir_all(&sub_dir).unwrap();
380 let mut file = File::create(&file_path).unwrap();
381
382 let (signal_tx, signal_rx) = broadcast::channel(128);
383 spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
384
385 if !test_signal(
386 &mut file,
387 crate::signal::SignalTo::ReloadFromDisk,
388 delay * 5,
389 signal_rx,
390 )
391 .await
392 {
393 panic!("Test timed out");
394 }
395 }
396}