vector/config/
watcher.rs

1use std::{
2    collections::HashMap,
3    path::{Path, PathBuf},
4    sync::mpsc::{Receiver, channel},
5    thread,
6    time::Duration,
7};
8
9use notify::{EventKind, RecursiveMode, recommended_watcher};
10
11use crate::{
12    Error,
13    config::{ComponentConfig, ComponentType},
14};
15
16/// Per notify own documentation, it's advised to have delay of more than 30 sec,
17/// so to avoid receiving repetitions of previous events on macOS.
18///
19/// But, config and topology reload logic can handle:
20///  - Invalid config, caused either by user or by data race.
21///  - Frequent changes, caused by user/editor modifying/saving file in small chunks.
22///    so we can use smaller, more responsive delay.
23const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
24
25const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
26
27/// Refer to [`crate::cli::WatchConfigMethod`] for details.
28pub enum WatcherConfig {
29    /// Recommended watcher for the current OS.
30    RecommendedWatcher,
31    /// A poll-based watcher that checks for file changes at regular intervals.
32    PollWatcher(u64),
33}
34
35enum Watcher {
36    /// recommended watcher for os, usually inotify for linux based systems
37    RecommendedWatcher(notify::RecommendedWatcher),
38    /// poll based watcher. for watching files from NFS.
39    PollWatcher(notify::PollWatcher),
40}
41
42impl Watcher {
43    fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
44        for path in config_paths {
45            self.watch(path, RecursiveMode::Recursive)?;
46        }
47        Ok(())
48    }
49
50    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
51        use notify::Watcher as NotifyWatcher;
52        match self {
53            Watcher::RecommendedWatcher(watcher) => {
54                watcher.watch(path, recursive_mode)?;
55            }
56            Watcher::PollWatcher(watcher) => {
57                watcher.watch(path, recursive_mode)?;
58            }
59        }
60        Ok(())
61    }
62}
63
64/// Sends a ReloadFromDisk or ReloadEnrichmentTables on config_path changes.
65/// Accumulates file changes until no change for given duration has occurred.
66/// Has best effort guarantee of detecting all file changes from the end of
67/// this function until the main thread stops.
68pub fn spawn_thread<'a>(
69    watcher_conf: WatcherConfig,
70    signal_tx: crate::signal::SignalTx,
71    config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
72    component_configs: Vec<ComponentConfig>,
73    delay: impl Into<Option<Duration>>,
74) -> Result<(), Error> {
75    let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
76    let mut component_config_paths: Vec<_> = component_configs
77        .clone()
78        .into_iter()
79        .flat_map(|p| p.config_paths.clone())
80        .collect();
81
82    config_paths.append(&mut component_config_paths);
83
84    let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
85
86    // Create watcher now so not to miss any changes happening between
87    // returning from this function and the thread starting.
88    let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
89
90    info!("Watching configuration files.");
91
92    thread::spawn(move || {
93        loop {
94            if let Some((mut watcher, receiver)) = watcher.take() {
95                while let Ok(Ok(event)) = receiver.recv() {
96                    if matches!(
97                        event.kind,
98                        EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
99                    ) {
100                        debug!(message = "Configuration file change detected.", event = ?event);
101
102                        // Consume events until delay amount of time has passed since the latest event.
103                        while receiver.recv_timeout(delay).is_ok() {}
104
105                        debug!(message = "Consumed file change events for delay.", delay = ?delay);
106
107                        let changed_components: HashMap<_, _> = component_configs
108                            .clone()
109                            .into_iter()
110                            .flat_map(|p| p.contains(&event.paths))
111                            .collect();
112
113                        // We need to read paths to resolve any inode changes that may have happened.
114                        // And we need to do it before raising sighup to avoid missing any change.
115                        if let Err(error) = watcher.add_paths(&config_paths) {
116                            error!(message = "Failed to read files to watch.", %error);
117                            break;
118                        }
119
120                        debug!(message = "Reloaded paths.");
121
122                        info!("Configuration file changed.");
123                        if !changed_components.is_empty() {
124                            info!(
125                                internal_log_rate_limit = true,
126                                "Component {:?} configuration changed.",
127                                changed_components.keys()
128                            );
129                            if changed_components
130                                .iter()
131                                .all(|(_, t)| *t == ComponentType::EnrichmentTable)
132                            {
133                                info!(
134                                    internal_log_rate_limit = true,
135                                    "Only enrichment tables have changed."
136                                );
137                                _ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| {
138                                error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true)
139                            });
140                            } else {
141                                _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(changed_components.into_keys().collect())).map_err(|error| {
142                                error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
143                            });
144                            }
145                        } else {
146                            _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
147                            .map_err(|error| {
148                                error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
149                            });
150                        }
151                    } else {
152                        debug!(message = "Ignoring event.", event = ?event)
153                    }
154                }
155            }
156
157            thread::sleep(RETRY_TIMEOUT);
158
159            watcher = create_watcher(&watcher_conf, &config_paths)
160                .map_err(|error| error!(message = "Failed to create file watcher.", %error))
161                .ok();
162
163            if watcher.is_some() {
164                // Config files could have changed while we weren't watching,
165                // so for a good measure raise SIGHUP and let reload logic
166                // determine if anything changed.
167                info!("Speculating that configuration files have changed.");
168                _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
169                error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
170            });
171            }
172        }
173    });
174
175    Ok(())
176}
177
178fn create_watcher(
179    watcher_conf: &WatcherConfig,
180    config_paths: &[PathBuf],
181) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
182    info!("Creating configuration file watcher.");
183
184    let (sender, receiver) = channel();
185    let mut watcher = match watcher_conf {
186        WatcherConfig::RecommendedWatcher => {
187            let recommended_watcher = recommended_watcher(sender)?;
188            Watcher::RecommendedWatcher(recommended_watcher)
189        }
190        WatcherConfig::PollWatcher(interval) => {
191            let config =
192                notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
193            let poll_watcher = notify::PollWatcher::new(sender, config)?;
194            Watcher::PollWatcher(poll_watcher)
195        }
196    };
197    watcher.add_paths(config_paths)?;
198    Ok((watcher, receiver))
199}
200
201#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
202mod tests {
203    use std::{collections::HashSet, fs::File, io::Write, time::Duration};
204
205    use tokio::sync::broadcast;
206
207    use super::*;
208    use crate::{
209        config::ComponentKey,
210        signal::SignalRx,
211        test_util::{temp_dir, temp_file, trace_init},
212    };
213
214    async fn test_signal(
215        file: &mut File,
216        expected_signal: crate::signal::SignalTo,
217        timeout: Duration,
218        mut receiver: SignalRx,
219    ) -> bool {
220        file.write_all(&[0]).unwrap();
221        file.sync_all().unwrap();
222
223        match tokio::time::timeout(timeout, receiver.recv()).await {
224            Ok(Ok(signal)) => signal == expected_signal,
225            _ => false,
226        }
227    }
228
229    #[tokio::test]
230    async fn component_update() {
231        trace_init();
232
233        let delay = Duration::from_secs(3);
234        let dir = temp_dir().to_path_buf();
235        let watcher_conf = WatcherConfig::RecommendedWatcher;
236        let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
237        let http_component = ComponentKey::from("http");
238
239        std::fs::create_dir(&dir).unwrap();
240
241        let mut component_files: Vec<std::fs::File> = component_file_path
242            .iter()
243            .map(|file| File::create(file).unwrap())
244            .collect();
245        let component_config = ComponentConfig::new(
246            component_file_path.clone(),
247            http_component.clone(),
248            ComponentType::Sink,
249        );
250
251        let (signal_tx, signal_rx) = broadcast::channel(128);
252        spawn_thread(
253            watcher_conf,
254            signal_tx,
255            &[dir],
256            vec![component_config],
257            delay,
258        )
259        .unwrap();
260
261        let signal_rx = signal_rx.resubscribe();
262        let signal_rx2 = signal_rx.resubscribe();
263
264        if !test_signal(
265            &mut component_files[0],
266            crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
267                http_component.clone(),
268            ])),
269            delay * 5,
270            signal_rx,
271        )
272        .await
273        {
274            panic!("Test timed out");
275        }
276
277        if !test_signal(
278            &mut component_files[1],
279            crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
280                http_component.clone(),
281            ])),
282            delay * 5,
283            signal_rx2,
284        )
285        .await
286        {
287            panic!("Test timed out");
288        }
289    }
290    #[tokio::test]
291    async fn file_directory_update() {
292        trace_init();
293
294        let delay = Duration::from_secs(3);
295        let dir = temp_dir().to_path_buf();
296        let file_path = dir.join("vector.toml");
297        let watcher_conf = WatcherConfig::RecommendedWatcher;
298
299        std::fs::create_dir(&dir).unwrap();
300        let mut file = File::create(&file_path).unwrap();
301
302        let (signal_tx, signal_rx) = broadcast::channel(128);
303        spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
304
305        if !test_signal(
306            &mut file,
307            crate::signal::SignalTo::ReloadFromDisk,
308            delay * 5,
309            signal_rx,
310        )
311        .await
312        {
313            panic!("Test timed out");
314        }
315    }
316
317    #[tokio::test]
318    async fn file_update() {
319        trace_init();
320
321        let delay = Duration::from_secs(3);
322        let file_path = temp_file();
323        let mut file = File::create(&file_path).unwrap();
324        let watcher_conf = WatcherConfig::RecommendedWatcher;
325
326        let (signal_tx, signal_rx) = broadcast::channel(128);
327        spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
328
329        if !test_signal(
330            &mut file,
331            crate::signal::SignalTo::ReloadFromDisk,
332            delay * 5,
333            signal_rx,
334        )
335        .await
336        {
337            panic!("Test timed out");
338        }
339    }
340
341    #[tokio::test]
342    #[cfg(unix)]
343    async fn sym_file_update() {
344        trace_init();
345
346        let delay = Duration::from_secs(3);
347        let file_path = temp_file();
348        let sym_file = temp_file();
349        let mut file = File::create(&file_path).unwrap();
350        std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
351
352        let watcher_conf = WatcherConfig::RecommendedWatcher;
353
354        let (signal_tx, signal_rx) = broadcast::channel(128);
355        spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
356
357        if !test_signal(
358            &mut file,
359            crate::signal::SignalTo::ReloadFromDisk,
360            delay * 5,
361            signal_rx,
362        )
363        .await
364        {
365            panic!("Test timed out");
366        }
367    }
368
369    #[tokio::test]
370    async fn recursive_directory_file_update() {
371        trace_init();
372
373        let delay = Duration::from_secs(3);
374        let dir = temp_dir().to_path_buf();
375        let sub_dir = dir.join("sources");
376        let file_path = sub_dir.join("input.toml");
377        let watcher_conf = WatcherConfig::RecommendedWatcher;
378
379        std::fs::create_dir_all(&sub_dir).unwrap();
380        let mut file = File::create(&file_path).unwrap();
381
382        let (signal_tx, signal_rx) = broadcast::channel(128);
383        spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
384
385        if !test_signal(
386            &mut file,
387            crate::signal::SignalTo::ReloadFromDisk,
388            delay * 5,
389            signal_rx,
390        )
391        .await
392        {
393            panic!("Test timed out");
394        }
395    }
396}