1use crate::config::{ComponentConfig, ComponentType};
2use std::collections::HashMap;
3use std::{
4 path::{Path, PathBuf},
5 time::Duration,
6};
7use std::{
8 sync::mpsc::{channel, Receiver},
9 thread,
10};
11
12use notify::{recommended_watcher, EventKind, RecursiveMode};
13
14use crate::Error;
15
16const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1);
24
25const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
26
27pub enum WatcherConfig {
29 RecommendedWatcher,
31 PollWatcher(u64),
33}
34
35enum Watcher {
36 RecommendedWatcher(notify::RecommendedWatcher),
38 PollWatcher(notify::PollWatcher),
40}
41
42impl Watcher {
43 fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
44 for path in config_paths {
45 self.watch(path, RecursiveMode::Recursive)?;
46 }
47 Ok(())
48 }
49
50 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
51 use notify::Watcher as NotifyWatcher;
52 match self {
53 Watcher::RecommendedWatcher(watcher) => {
54 watcher.watch(path, recursive_mode)?;
55 }
56 Watcher::PollWatcher(watcher) => {
57 watcher.watch(path, recursive_mode)?;
58 }
59 }
60 Ok(())
61 }
62}
63
64pub fn spawn_thread<'a>(
69 watcher_conf: WatcherConfig,
70 signal_tx: crate::signal::SignalTx,
71 config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
72 component_configs: Vec<ComponentConfig>,
73 delay: impl Into<Option<Duration>>,
74) -> Result<(), Error> {
75 let mut config_paths: Vec<_> = config_paths.into_iter().cloned().collect();
76 let mut component_config_paths: Vec<_> = component_configs
77 .clone()
78 .into_iter()
79 .flat_map(|p| p.config_paths.clone())
80 .collect();
81
82 config_paths.append(&mut component_config_paths);
83
84 let delay = delay.into().unwrap_or(CONFIG_WATCH_DELAY);
85
86 let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);
89
90 info!("Watching configuration files.");
91
92 thread::spawn(move || loop {
93 if let Some((mut watcher, receiver)) = watcher.take() {
94 while let Ok(Ok(event)) = receiver.recv() {
95 if matches!(
96 event.kind,
97 EventKind::Create(_) | EventKind::Remove(_) | EventKind::Modify(_)
98 ) {
99 debug!(message = "Configuration file change detected.", event = ?event);
100
101 while receiver.recv_timeout(delay).is_ok() {}
103
104 debug!(message = "Consumed file change events for delay.", delay = ?delay);
105
106 let changed_components: HashMap<_, _> = component_configs
107 .clone()
108 .into_iter()
109 .flat_map(|p| p.contains(&event.paths))
110 .collect();
111
112 if let Err(error) = watcher.add_paths(&config_paths) {
115 error!(message = "Failed to read files to watch.", %error);
116 break;
117 }
118
119 debug!(message = "Reloaded paths.");
120
121 info!("Configuration file changed.");
122 if !changed_components.is_empty() {
123 info!(
124 internal_log_rate_limit = true,
125 "Component {:?} configuration changed.",
126 changed_components.keys()
127 );
128 if changed_components
129 .iter()
130 .all(|(_, t)| *t == ComponentType::EnrichmentTable)
131 {
132 info!(
133 internal_log_rate_limit = true,
134 "Only enrichment tables have changed."
135 );
136 _ = signal_tx.send(crate::signal::SignalTo::ReloadEnrichmentTables).map_err(|error| {
137 error!(message = "Unable to reload enrichment tables.", cause = %error, internal_log_rate_limit = true)
138 });
139 } else {
140 _ = signal_tx.send(crate::signal::SignalTo::ReloadComponents(changed_components.into_keys().collect())).map_err(|error| {
141 error!(message = "Unable to reload component configuration. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
142 });
143 }
144 } else {
145 _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk)
146 .map_err(|error| {
147 error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error, internal_log_rate_limit = true)
148 });
149 }
150 } else {
151 debug!(message = "Ignoring event.", event = ?event)
152 }
153 }
154 }
155
156 thread::sleep(RETRY_TIMEOUT);
157
158 watcher = create_watcher(&watcher_conf, &config_paths)
159 .map_err(|error| error!(message = "Failed to create file watcher.", %error))
160 .ok();
161
162 if watcher.is_some() {
163 info!("Speculating that configuration files have changed.");
167 _ = signal_tx.send(crate::signal::SignalTo::ReloadFromDisk).map_err(|error| {
168 error!(message = "Unable to reload configuration file. Restart Vector to reload it.", cause = %error)
169 });
170 }
171 });
172
173 Ok(())
174}
175
176fn create_watcher(
177 watcher_conf: &WatcherConfig,
178 config_paths: &[PathBuf],
179) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
180 info!("Creating configuration file watcher.");
181
182 let (sender, receiver) = channel();
183 let mut watcher = match watcher_conf {
184 WatcherConfig::RecommendedWatcher => {
185 let recommended_watcher = recommended_watcher(sender)?;
186 Watcher::RecommendedWatcher(recommended_watcher)
187 }
188 WatcherConfig::PollWatcher(interval) => {
189 let config =
190 notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
191 let poll_watcher = notify::PollWatcher::new(sender, config)?;
192 Watcher::PollWatcher(poll_watcher)
193 }
194 };
195 watcher.add_paths(config_paths)?;
196 Ok((watcher, receiver))
197}
198
199#[cfg(all(test, unix, not(target_os = "macos")))] mod tests {
201 use super::*;
202 use crate::{
203 config::ComponentKey,
204 signal::SignalRx,
205 test_util::{temp_dir, temp_file, trace_init},
206 };
207 use std::{collections::HashSet, fs::File, io::Write, time::Duration};
208 use tokio::sync::broadcast;
209
210 async fn test_signal(
211 file: &mut File,
212 expected_signal: crate::signal::SignalTo,
213 timeout: Duration,
214 mut receiver: SignalRx,
215 ) -> bool {
216 file.write_all(&[0]).unwrap();
217 file.sync_all().unwrap();
218
219 match tokio::time::timeout(timeout, receiver.recv()).await {
220 Ok(Ok(signal)) => signal == expected_signal,
221 _ => false,
222 }
223 }
224
225 #[tokio::test]
226 async fn component_update() {
227 trace_init();
228
229 let delay = Duration::from_secs(3);
230 let dir = temp_dir().to_path_buf();
231 let watcher_conf = WatcherConfig::RecommendedWatcher;
232 let component_file_path = vec![dir.join("tls.cert"), dir.join("tls.key")];
233 let http_component = ComponentKey::from("http");
234
235 std::fs::create_dir(&dir).unwrap();
236
237 let mut component_files: Vec<std::fs::File> = component_file_path
238 .iter()
239 .map(|file| File::create(file).unwrap())
240 .collect();
241 let component_config = ComponentConfig::new(
242 component_file_path.clone(),
243 http_component.clone(),
244 ComponentType::Sink,
245 );
246
247 let (signal_tx, signal_rx) = broadcast::channel(128);
248 spawn_thread(
249 watcher_conf,
250 signal_tx,
251 &[dir],
252 vec![component_config],
253 delay,
254 )
255 .unwrap();
256
257 let signal_rx = signal_rx.resubscribe();
258 let signal_rx2 = signal_rx.resubscribe();
259
260 if !test_signal(
261 &mut component_files[0],
262 crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
263 http_component.clone()
264 ])),
265 delay * 5,
266 signal_rx,
267 )
268 .await
269 {
270 panic!("Test timed out");
271 }
272
273 if !test_signal(
274 &mut component_files[1],
275 crate::signal::SignalTo::ReloadComponents(HashSet::from_iter(vec![
276 http_component.clone()
277 ])),
278 delay * 5,
279 signal_rx2,
280 )
281 .await
282 {
283 panic!("Test timed out");
284 }
285 }
286 #[tokio::test]
287 async fn file_directory_update() {
288 trace_init();
289
290 let delay = Duration::from_secs(3);
291 let dir = temp_dir().to_path_buf();
292 let file_path = dir.join("vector.toml");
293 let watcher_conf = WatcherConfig::RecommendedWatcher;
294
295 std::fs::create_dir(&dir).unwrap();
296 let mut file = File::create(&file_path).unwrap();
297
298 let (signal_tx, signal_rx) = broadcast::channel(128);
299 spawn_thread(watcher_conf, signal_tx, &[dir], vec![], delay).unwrap();
300
301 if !test_signal(
302 &mut file,
303 crate::signal::SignalTo::ReloadFromDisk,
304 delay * 5,
305 signal_rx,
306 )
307 .await
308 {
309 panic!("Test timed out");
310 }
311 }
312
313 #[tokio::test]
314 async fn file_update() {
315 trace_init();
316
317 let delay = Duration::from_secs(3);
318 let file_path = temp_file();
319 let mut file = File::create(&file_path).unwrap();
320 let watcher_conf = WatcherConfig::RecommendedWatcher;
321
322 let (signal_tx, signal_rx) = broadcast::channel(128);
323 spawn_thread(watcher_conf, signal_tx, &[file_path], vec![], delay).unwrap();
324
325 if !test_signal(
326 &mut file,
327 crate::signal::SignalTo::ReloadFromDisk,
328 delay * 5,
329 signal_rx,
330 )
331 .await
332 {
333 panic!("Test timed out");
334 }
335 }
336
337 #[tokio::test]
338 #[cfg(unix)]
339 async fn sym_file_update() {
340 trace_init();
341
342 let delay = Duration::from_secs(3);
343 let file_path = temp_file();
344 let sym_file = temp_file();
345 let mut file = File::create(&file_path).unwrap();
346 std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();
347
348 let watcher_conf = WatcherConfig::RecommendedWatcher;
349
350 let (signal_tx, signal_rx) = broadcast::channel(128);
351 spawn_thread(watcher_conf, signal_tx, &[sym_file], vec![], delay).unwrap();
352
353 if !test_signal(
354 &mut file,
355 crate::signal::SignalTo::ReloadFromDisk,
356 delay * 5,
357 signal_rx,
358 )
359 .await
360 {
361 panic!("Test timed out");
362 }
363 }
364
365 #[tokio::test]
366 async fn recursive_directory_file_update() {
367 trace_init();
368
369 let delay = Duration::from_secs(3);
370 let dir = temp_dir().to_path_buf();
371 let sub_dir = dir.join("sources");
372 let file_path = sub_dir.join("input.toml");
373 let watcher_conf = WatcherConfig::RecommendedWatcher;
374
375 std::fs::create_dir_all(&sub_dir).unwrap();
376 let mut file = File::create(&file_path).unwrap();
377
378 let (signal_tx, signal_rx) = broadcast::channel(128);
379 spawn_thread(watcher_conf, signal_tx, &[sub_dir], vec![], delay).unwrap();
380
381 if !test_signal(
382 &mut file,
383 crate::signal::SignalTo::ReloadFromDisk,
384 delay * 5,
385 signal_rx,
386 )
387 .await
388 {
389 panic!("Test timed out");
390 }
391 }
392}