vector/config/
watcher.rs

1use crate::config::{ComponentConfig, ComponentType};
2use std::collections::HashMap;
3use std::{
4    path::{Path, PathBuf},
5    time::Duration,
6};
7use std::{
8    sync::mpsc::{channel, Receiver},
9    thread,
10};
11
12use notify::{recommended_watcher, EventKind, RecursiveMode};
13
14use crate::Error;
15
16/// Per notify own documentation, it's advised to have delay of more than 30 sec,
17/// so to avoid receiving repetitions of previous events on macOS.
18///
19/// But, config and topology reload logic can handle:
20///  - Invalid config, caused either by user or by data race.
21///  - Frequent changes, caused by user/editor modifying/saving file in small chunks.
22///    so we can use smaller, more responsive delay.
23const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
24
25const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
26
27/// Refer to [`crate::cli::WatchConfigMethod`] for details.
28pub enum WatcherConfig {
29    /// Recommended watcher for the current OS.
30    RecommendedWatcher,
31    /// A poll-based watcher that checks for file changes at regular intervals.
32    PollWatcher(u64),
33}
34
35enum Watcher {
36    /// recommended watcher for os, usually inotify for linux based systems
37    RecommendedWatcher(notify::RecommendedWatcher),
38    /// poll based watcher. for watching files from NFS.
39    PollWatcher(notify::PollWatcher),
40}
41
42impl Watcher {
43    fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
44        for path in config_paths {
45            self.watch(path, RecursiveMode::Recursive)?;
46        }
47        Ok(())
48    }
49
50    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
51        use notify::Watcher as NotifyWatcher;
52        match self {
53            Watcher::RecommendedWatcher(watcher) => {
54                watcher.watch(path, recursive_mode)?;
55            }
56            Watcher::PollWatcher(watcher) => {
57                watcher.watch(path, recursive_mode)?;
58            }
59        }
60        Ok(())
61    }
62}
63
64/// Sends a ReloadFromDisk or ReloadEnrichmentTables on config_path changes.
65/// Accumulates file changes until no change for given duration has occurred.
66/// Has best effort guarantee of detecting all file changes from the end of
67/// this function until the main thread stops.
68pub fn spawn_thread<'a>(
69    watcher_conf: WatcherConfig,
70    signal_tx: crate::signal::SignalTx,
71    config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
72    component_configs: Vec<ComponentConfig>,
73    delay: impl Into<Option<Duration>>,
74) -> Result<(), Error> {
75    let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
76    let mut component_config_paths: Vec<_> = component_configs
77        .clone()
78        .into_iter()
79        .flat_map(|p| p.config_paths.clone())
80        .collect();
81
82    config_paths.append(&mut component_config_paths);
83
84    let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
85
86    // Create watcher now so not to miss any changes happening between
87    // returning from this function and the thread starting.
88    let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
89
90    info!("Watching configuration files.");
91
92    thread::spawn(move || loop {
93        if let Some((mut watcher, receiver)) = watcher.take() {
94            while let Ok(Ok(event)) = receiver.recv() {
95                if matches!(
96                    event.kind,
97                    EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
98                ) {
99                    debug!(message = "Configuration file change detected.", event = ?event);
100
101                    // Consume events until delay amount of time has passed since the latest event.
102                    while receiver.recv_timeout(delay).is_ok() {}
103
104                    debug!(message = "Consumed file change events for delay.", delay = ?delay);
105
106                    let changed_components: HashMap<_, _> = component_configs
107                        .clone()
108                        .into_iter()
109                        .flat_map(|p| p.contains(&event.paths))
110                        .collect();
111
112                    // We need to read paths to resolve any inode changes that may have happened.
113                    // And we need to do it before raising sighup to avoid missing any change.
114                    if let Err(error) = watcher.add_paths(&config_paths) {
115                        error!(message = "Failed to read files to watch.", %error);
116                        break;
117                    }
118
119                    debug!(message = "Reloaded paths.");
120
121                    info!("Configuration file changed.");
122                    if !changed_components.is_empty() {
123                        info!(
124                            internal_log_rate_limit = true,
125                            "Component {:?} configuration changed.",
126                            changed_components.keys()
127                        );
128                        if changed_components
129                            .iter()
130                            .all(|(_, t)| *t == ComponentType::EnrichmentTable)
131                        {
132                            info!(
133                                internal_log_rate_limit = true,
134                                "Only enrichment tables have changed."
135                            );
136                            _ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| {
137                                error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true)
138                            });
139                        } else {
140                            _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(changed_components.into_keys().collect())).map_err(|error| {
141                                error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
142                            });
143                        }
144                    } else {
145                        _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
146                            .map_err(|error| {
147                                error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
148                            });
149                    }
150                } else {
151                    debug!(message = "Ignoring event.", event = ?event)
152                }
153            }
154        }
155
156        thread::sleep(RETRY_TIMEOUT);
157
158        watcher = create_watcher(&watcher_conf, &config_paths)
159            .map_err(|error| error!(message = "Failed to create file watcher.", %error))
160            .ok();
161
162        if watcher.is_some() {
163            // Config files could have changed while we weren't watching,
164            // so for a good measure raise SIGHUP and let reload logic
165            // determine if anything changed.
166            info!("Speculating that configuration files have changed.");
167            _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
168                error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
169            });
170        }
171    });
172
173    Ok(())
174}
175
176fn create_watcher(
177    watcher_conf: &WatcherConfig,
178    config_paths: &[PathBuf],
179) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
180    info!("Creating configuration file watcher.");
181
182    let (sender, receiver) = channel();
183    let mut watcher = match watcher_conf {
184        WatcherConfig::RecommendedWatcher => {
185            let recommended_watcher = recommended_watcher(sender)?;
186            Watcher::RecommendedWatcher(recommended_watcher)
187        }
188        WatcherConfig::PollWatcher(interval) => {
189            let config =
190                notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
191            let poll_watcher = notify::PollWatcher::new(sender, config)?;
192            Watcher::PollWatcher(poll_watcher)
193        }
194    };
195    watcher.add_paths(config_paths)?;
196    Ok((watcher, receiver))
197}
198
199#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
200mod tests {
201    use super::*;
202    use crate::{
203        config::ComponentKey,
204        signal::SignalRx,
205        test_util::{temp_dir, temp_file, trace_init},
206    };
207    use std::{collections::HashSet, fs::File, io::Write, time::Duration};
208    use tokio::sync::broadcast;
209
210    async fn test_signal(
211        file: &mut File,
212        expected_signal: crate::signal::SignalTo,
213        timeout: Duration,
214        mut receiver: SignalRx,
215    ) -> bool {
216        file.write_all(&[0]).unwrap();
217        file.sync_all().unwrap();
218
219        match tokio::time::timeout(timeout, receiver.recv()).await {
220            Ok(Ok(signal)) => signal == expected_signal,
221            _ => false,
222        }
223    }
224
225    #[tokio::test]
226    async fn component_update() {
227        trace_init();
228
229        let delay = Duration::from_secs(3);
230        let dir = temp_dir().to_path_buf();
231        let watcher_conf = WatcherConfig::RecommendedWatcher;
232        let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
233        let http_component = ComponentKey::from("http");
234
235        std::fs::create_dir(&dir).unwrap();
236
237        let mut component_files: Vec<std::fs::File> = component_file_path
238            .iter()
239            .map(|file| File::create(file).unwrap())
240            .collect();
241        let component_config = ComponentConfig::new(
242            component_file_path.clone(),
243            http_component.clone(),
244            ComponentType::Sink,
245        );
246
247        let (signal_tx, signal_rx) = broadcast::channel(128);
248        spawn_thread(
249            watcher_conf,
250            signal_tx,
251            &[dir],
252            vec![component_config],
253            delay,
254        )
255        .unwrap();
256
257        let signal_rx = signal_rx.resubscribe();
258        let signal_rx2 = signal_rx.resubscribe();
259
260        if !test_signal(
261            &mut component_files[0],
262            crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
263                http_component.clone()
264            ])),
265            delay * 5,
266            signal_rx,
267        )
268        .await
269        {
270            panic!("Test timed out");
271        }
272
273        if !test_signal(
274            &mut component_files[1],
275            crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
276                http_component.clone()
277            ])),
278            delay * 5,
279            signal_rx2,
280        )
281        .await
282        {
283            panic!("Test timed out");
284        }
285    }
286    #[tokio::test]
287    async fn file_directory_update() {
288        trace_init();
289
290        let delay = Duration::from_secs(3);
291        let dir = temp_dir().to_path_buf();
292        let file_path = dir.join("vector.toml");
293        let watcher_conf = WatcherConfig::RecommendedWatcher;
294
295        std::fs::create_dir(&dir).unwrap();
296        let mut file = File::create(&file_path).unwrap();
297
298        let (signal_tx, signal_rx) = broadcast::channel(128);
299        spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
300
301        if !test_signal(
302            &mut file,
303            crate::signal::SignalTo::ReloadFromDisk,
304            delay * 5,
305            signal_rx,
306        )
307        .await
308        {
309            panic!("Test timed out");
310        }
311    }
312
313    #[tokio::test]
314    async fn file_update() {
315        trace_init();
316
317        let delay = Duration::from_secs(3);
318        let file_path = temp_file();
319        let mut file = File::create(&file_path).unwrap();
320        let watcher_conf = WatcherConfig::RecommendedWatcher;
321
322        let (signal_tx, signal_rx) = broadcast::channel(128);
323        spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
324
325        if !test_signal(
326            &mut file,
327            crate::signal::SignalTo::ReloadFromDisk,
328            delay * 5,
329            signal_rx,
330        )
331        .await
332        {
333            panic!("Test timed out");
334        }
335    }
336
337    #[tokio::test]
338    #[cfg(unix)]
339    async fn sym_file_update() {
340        trace_init();
341
342        let delay = Duration::from_secs(3);
343        let file_path = temp_file();
344        let sym_file = temp_file();
345        let mut file = File::create(&file_path).unwrap();
346        std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
347
348        let watcher_conf = WatcherConfig::RecommendedWatcher;
349
350        let (signal_tx, signal_rx) = broadcast::channel(128);
351        spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
352
353        if !test_signal(
354            &mut file,
355            crate::signal::SignalTo::ReloadFromDisk,
356            delay * 5,
357            signal_rx,
358        )
359        .await
360        {
361            panic!("Test timed out");
362        }
363    }
364
365    #[tokio::test]
366    async fn recursive_directory_file_update() {
367        trace_init();
368
369        let delay = Duration::from_secs(3);
370        let dir = temp_dir().to_path_buf();
371        let sub_dir = dir.join("sources");
372        let file_path = sub_dir.join("input.toml");
373        let watcher_conf = WatcherConfig::RecommendedWatcher;
374
375        std::fs::create_dir_all(&sub_dir).unwrap();
376        let mut file = File::create(&file_path).unwrap();
377
378        let (signal_tx, signal_rx) = broadcast::channel(128);
379        spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
380
381        if !test_signal(
382            &mut file,
383            crate::signal::SignalTo::ReloadFromDisk,
384            delay * 5,
385            signal_rx,
386        )
387        .await
388        {
389            panic!("Test timed out");
390        }
391    }
392}