vector/config/
watcher.rs

1use notify::{EventKind, RecursiveMode, recommended_watcher};
2use std::{
3    collections::{HashMap, HashSet},
4    path::{Path, PathBuf},
5    sync::mpsc::{Receiver, channel},
6    thread,
7    time::Duration,
8};
9
10use crate::{
11    Error,
12    config::{ComponentConfig, ComponentType},
13};
14
15/// Per notify own documentation, it's advised to have delay of more than 30 sec,
16/// so to avoid receiving repetitions of previous events on macOS.
17///
18/// But, config and topology reload logic can handle:
19///  - Invalid config, caused either by user or by data race.
20///  - Frequent changes, caused by user/editor modifying/saving file in small chunks.
21///    so we can use smaller, more responsive delay.
22const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
23
24const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
25
26/// Refer to [`crate::cli::WatchConfigMethod`] for details.
27pub enum WatcherConfig {
28    /// Recommended watcher for the current OS.
29    RecommendedWatcher,
30    /// A poll-based watcher that checks for file changes at regular intervals.
31    PollWatcher(u64),
32}
33
34enum Watcher {
35    /// recommended watcher for os, usually inotify for linux based systems
36    RecommendedWatcher(notify::RecommendedWatcher),
37    /// poll based watcher. for watching files from NFS.
38    PollWatcher(notify::PollWatcher),
39}
40
41impl Watcher {
42    fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
43        for path in config_paths {
44            if path.exists() {
45                self.watch(path, RecursiveMode::Recursive)?;
46            } else {
47                debug!(message = "Skipping non-existent path.", path = ?path);
48            }
49        }
50        Ok(())
51    }
52
53    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
54        use notify::Watcher as NotifyWatcher;
55        match self {
56            Watcher::RecommendedWatcher(watcher) => {
57                watcher.watch(path, recursive_mode)?;
58            }
59            Watcher::PollWatcher(watcher) => {
60                watcher.watch(path, recursive_mode)?;
61            }
62        }
63        Ok(())
64    }
65}
66
67/// Sends a ReloadFromDisk or ReloadEnrichmentTables on config_path changes.
68/// Accumulates file changes until no change for given duration has occurred.
69/// Has best effort guarantee of detecting all file changes from the end of
70/// this function until the main thread stops.
71pub fn spawn_thread<'a>(
72    watcher_conf: WatcherConfig,
73    signal_tx: crate::signal::SignalTx,
74    config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
75    component_configs: Vec<ComponentConfig>,
76    delay: impl Into<Option<Duration>>,
77) -> Result<(), Error> {
78    let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
79    let mut component_config_paths: Vec<_> = component_configs
80        .clone()
81        .into_iter()
82        .flat_map(|p| p.config_paths.clone())
83        .collect();
84
85    config_paths.append(&mut component_config_paths);
86
87    let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
88
89    // Create watcher now so not to miss any changes happening between
90    // returning from this function and the thread starting.
91    let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
92
93    info!("Watching configuration files.");
94
95    thread::spawn(move || {
96        loop {
97            if let Some((mut watcher, receiver)) = watcher.take() {
98                while let Ok(Ok(event)) = receiver.recv() {
99                    if matches!(
100                        event.kind,
101                        EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
102                    ) {
103                        debug!(message = "Configuration file change detected.", event = ?event);
104
105                        // Collect paths from initial event
106                        let mut changed_paths: HashSet<PathBuf> = event.paths.into_iter().collect();
107
108                        // Collect paths from subsequent events until delay amount of time has passed
109                        while let Ok(Ok(subseq_event)) = receiver.recv_timeout(delay) {
110                            if matches!(
111                                subseq_event.kind,
112                                EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
113                            ) {
114                                changed_paths.extend(subseq_event.paths);
115                            }
116                        }
117
118                        debug!(
119                            message = "Collected file change events during delay period.",
120                            paths = changed_paths.len(),
121                            delay = ?delay
122                        );
123
124                        let changed_components: HashMap<_, _> = component_configs
125                            .clone()
126                            .into_iter()
127                            .flat_map(|p| p.contains(&changed_paths))
128                            .collect();
129
130                        // We need to read paths to resolve any inode changes that may have happened.
131                        // And we need to do it before raising sighup to avoid missing any change.
132                        if let Err(error) = watcher.add_paths(&config_paths) {
133                            error!(message = "Failed to read files to watch.", %error);
134                            break;
135                        }
136
137                        debug!(message = "Reloaded paths.");
138
139                        info!("Configuration file changed.");
140                        if !changed_components.is_empty() {
141                            info!(
142                                "Component {:?} configuration changed.",
143                                changed_components.keys()
144                            );
145                            if changed_components
146                                .iter()
147                                .all(|(_, t)| *t == ComponentType::EnrichmentTable)
148                            {
149                                info!("Only enrichment tables have changed.");
150                                _ = signal_tx
151                                    .send(crate::signal::SignalTo::ReloadEnrichmentTables)
152                                    .map_err(|error| {
153                                        error!(
154                                            message = "Unable to reload enrichment tables.",
155                                            cause = %error,
156                                            internal_log_rate_limit = false,
157                                        )
158                                    });
159                            } else {
160                                _ = signal_tx
161                                    .send(crate::signal::SignalTo::ReloadComponents(
162                                        changed_components.into_keys().collect(),
163                                    ))
164                                    .map_err(|error| {
165                                        error!(
166                                            message = "Unable to reload component configuration. Restart Vector to reload it.",
167                                            cause = %error,
168                                            internal_log_rate_limit = false,
169                                        )
170                                    });
171                            }
172                        } else {
173                            _ = signal_tx
174                                .send(crate::signal::SignalTo::ReloadFromDisk)
175                                .map_err(|error| {
176                                    error!(
177                                        message = "Unable to reload configuration file. Restart Vector to reload it.",
178                                        cause = %error,
179                                        internal_log_rate_limit = false,
180                                    )
181                                });
182                        }
183                    } else {
184                        debug!(message = "Ignoring event.", event = ?event)
185                    }
186                }
187            }
188
189            thread::sleep(RETRY_TIMEOUT);
190
191            watcher = create_watcher(&watcher_conf, &config_paths)
192                .map_err(|error| error!(message = "Failed to create file watcher.", %error))
193                .ok();
194
195            if watcher.is_some() {
196                // Config files could have changed while we weren't watching,
197                // so for a good measure raise SIGHUP and let reload logic
198                // determine if anything changed.
199                info!("Speculating that configuration files have changed.");
200                _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
201                error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
202            });
203            }
204        }
205    });
206
207    Ok(())
208}
209
210fn create_watcher(
211    watcher_conf: &WatcherConfig,
212    config_paths: &[PathBuf],
213) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
214    info!("Creating configuration file watcher.");
215
216    let (sender, receiver) = channel();
217    let mut watcher = match watcher_conf {
218        WatcherConfig::RecommendedWatcher => {
219            let recommended_watcher = recommended_watcher(sender)?;
220            Watcher::RecommendedWatcher(recommended_watcher)
221        }
222        WatcherConfig::PollWatcher(interval) => {
223            let config =
224                notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
225            let poll_watcher = notify::PollWatcher::new(sender, config)?;
226            Watcher::PollWatcher(poll_watcher)
227        }
228    };
229    watcher.add_paths(config_paths)?;
230    Ok((watcher, receiver))
231}
232
233#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
234mod tests {
235    use std::{collections::HashSet, fs::File, io::Write, time::Duration};
236
237    use tokio::sync::broadcast;
238
239    use super::*;
240    use crate::{
241        config::ComponentKey,
242        signal::SignalRx,
243        test_util::{temp_dir, temp_file, trace_init},
244    };
245
246    async fn test_signal(
247        file: &mut File,
248        expected_signal: crate::signal::SignalTo,
249        timeout: Duration,
250        mut receiver: SignalRx,
251    ) -> bool {
252        file.write_all(&[0]).unwrap();
253        file.sync_all().unwrap();
254
255        match tokio::time::timeout(timeout, receiver.recv()).await {
256            Ok(Ok(signal)) => signal == expected_signal,
257            _ => false,
258        }
259    }
260
261    #[tokio::test]
262    async fn component_update() {
263        trace_init();
264
265        let delay = Duration::from_secs(3);
266        let dir = temp_dir().to_path_buf();
267        let watcher_conf = WatcherConfig::RecommendedWatcher;
268        let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
269        let http_component = ComponentKey::from("http");
270
271        std::fs::create_dir(&dir).unwrap();
272
273        let mut component_files: Vec<std::fs::File> = component_file_path
274            .iter()
275            .map(|file| File::create(file).unwrap())
276            .collect();
277        let component_config = ComponentConfig::new(
278            component_file_path.clone(),
279            http_component.clone(),
280            ComponentType::Sink,
281        );
282
283        let (signal_tx, signal_rx) = broadcast::channel(128);
284        spawn_thread(
285            watcher_conf,
286            signal_tx,
287            &[dir],
288            vec![component_config],
289            delay,
290        )
291        .unwrap();
292
293        let signal_rx = signal_rx.resubscribe();
294        let signal_rx2 = signal_rx.resubscribe();
295
296        if !test_signal(
297            &mut component_files[0],
298            crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
299                http_component.clone(),
300            ])),
301            delay * 5,
302            signal_rx,
303        )
304        .await
305        {
306            panic!("Test timed out");
307        }
308
309        if !test_signal(
310            &mut component_files[1],
311            crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
312                http_component.clone(),
313            ])),
314            delay * 5,
315            signal_rx2,
316        )
317        .await
318        {
319            panic!("Test timed out");
320        }
321    }
322    #[tokio::test]
323    async fn file_directory_update() {
324        trace_init();
325
326        let delay = Duration::from_secs(3);
327        let dir = temp_dir().to_path_buf();
328        let file_path = dir.join("vector.toml");
329        let watcher_conf = WatcherConfig::RecommendedWatcher;
330
331        std::fs::create_dir(&dir).unwrap();
332        let mut file = File::create(&file_path).unwrap();
333
334        let (signal_tx, signal_rx) = broadcast::channel(128);
335        spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
336
337        if !test_signal(
338            &mut file,
339            crate::signal::SignalTo::ReloadFromDisk,
340            delay * 5,
341            signal_rx,
342        )
343        .await
344        {
345            panic!("Test timed out");
346        }
347    }
348
349    #[tokio::test]
350    async fn file_update() {
351        trace_init();
352
353        let delay = Duration::from_secs(3);
354        let file_path = temp_file();
355        let mut file = File::create(&file_path).unwrap();
356        let watcher_conf = WatcherConfig::RecommendedWatcher;
357
358        let (signal_tx, signal_rx) = broadcast::channel(128);
359        spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
360
361        if !test_signal(
362            &mut file,
363            crate::signal::SignalTo::ReloadFromDisk,
364            delay * 5,
365            signal_rx,
366        )
367        .await
368        {
369            panic!("Test timed out");
370        }
371    }
372
373    #[tokio::test]
374    #[cfg(unix)]
375    async fn sym_file_update() {
376        trace_init();
377
378        let delay = Duration::from_secs(3);
379        let file_path = temp_file();
380        let sym_file = temp_file();
381        let mut file = File::create(&file_path).unwrap();
382        std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
383
384        let watcher_conf = WatcherConfig::RecommendedWatcher;
385
386        let (signal_tx, signal_rx) = broadcast::channel(128);
387        spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
388
389        if !test_signal(
390            &mut file,
391            crate::signal::SignalTo::ReloadFromDisk,
392            delay * 5,
393            signal_rx,
394        )
395        .await
396        {
397            panic!("Test timed out");
398        }
399    }
400
401    #[tokio::test]
402    async fn recursive_directory_file_update() {
403        trace_init();
404
405        let delay = Duration::from_secs(3);
406        let dir = temp_dir().to_path_buf();
407        let sub_dir = dir.join("sources");
408        let file_path = sub_dir.join("input.toml");
409        let watcher_conf = WatcherConfig::RecommendedWatcher;
410
411        std::fs::create_dir_all(&sub_dir).unwrap();
412        let mut file = File::create(&file_path).unwrap();
413
414        let (signal_tx, signal_rx) = broadcast::channel(128);
415        spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
416
417        if !test_signal(
418            &mut file,
419            crate::signal::SignalTo::ReloadFromDisk,
420            delay * 5,
421            signal_rx,
422        )
423        .await
424        {
425            panic!("Test timed out");
426        }
427    }
428}