file_source/
file_server.rs

1use std::{
2    cmp,
3    collections::{BTreeMap, HashMap},
4    path::PathBuf,
5    sync::Arc,
6    time::{self, Duration},
7};
8
9use bytes::Bytes;
10use chrono::{DateTime, Utc};
11use file_source_common::{
12    FileFingerprint, FileSourceInternalEvents, Fingerprinter, ReadFrom,
13    checkpointer::{Checkpointer, CheckpointsView},
14};
15use futures::{
16    Future, Sink, SinkExt,
17    future::{Either, select},
18};
19use futures_util::future::join_all;
20use indexmap::IndexMap;
21use tokio::{
22    fs::{self, remove_file},
23    task::{Id, JoinSet},
24    time::sleep,
25};
26
27use tracing::{debug, error, info, trace};
28
29use crate::{
30    file_watcher::{FileWatcher, RawLineResult},
31    paths_provider::PathsProvider,
32};
33
34/// `FileServer` is a Source which cooperatively schedules reads over files,
35/// converting the lines of said files into `LogLine` structures. As
36/// `FileServer` is intended to be useful across multiple operating systems with
37/// POSIX filesystem semantics `FileServer` must poll for changes. That is, no
38/// event notification is used by `FileServer`.
39///
40/// `FileServer` is configured on a path to watch. The files do _not_ need to
41/// exist at startup. `FileServer` will discover new files which match
42/// its path in at most 60 seconds.
43pub struct FileServer<PP, E: FileSourceInternalEvents>
44where
45    PP: PathsProvider,
46{
47    pub paths_provider: PP,
48    pub max_read_bytes: usize,
49    pub ignore_checkpoints: bool,
50    pub read_from: ReadFrom,
51    pub ignore_before: Option<DateTime<Utc>>,
52    pub max_line_bytes: usize,
53    pub line_delimiter: Bytes,
54    pub data_dir: PathBuf,
55    pub glob_minimum_cooldown: Duration,
56    pub fingerprinter: Fingerprinter,
57    pub oldest_first: bool,
58    pub remove_after: Option<Duration>,
59    pub emitter: E,
60    pub rotate_wait: Duration,
61}
62
63/// `FileServer` as Source
64///
65/// The 'run' of `FileServer` performs the cooperative scheduling of reads over
66/// `FileServer`'s configured files. Much care has been taking to make this
67/// scheduling 'fair', meaning busy files do not drown out quiet files or vice
68/// versa but there's no one perfect approach. Very fast files _will_ be lost if
69/// your system aggressively rolls log files. `FileServer` will keep a file
70/// handler open but should your system move so quickly that a file disappears
71/// before `FileServer` is able to open it the contents will be lost. This should be a
72/// rare occurrence.
73///
74/// Specific operating systems support evented interfaces that correct this
75/// problem but your intrepid authors know of no generic solution.
76impl<PP, E> FileServer<PP, E>
77where
78    PP: PathsProvider,
79    E: FileSourceInternalEvents,
80{
81    // The first `shutdown_data` signal here is to stop this file
82    // server from outputting new data; the second
83    // `shutdown_checkpointer` is for finishing the background
84    // checkpoint writer task, which has to wait for all
85    // acknowledgements to be completed.
86    pub async fn run<C, S1, S2>(
87        mut self,
88        mut chans: C,
89        mut shutdown_data: S1,
90        shutdown_checkpointer: S2,
91        mut checkpointer: Checkpointer,
92    ) -> Result<Shutdown, <C as Sink<Vec<Line>>>::Error>
93    where
94        C: Sink<Vec<Line>> + Unpin,
95        <C as Sink<Vec<Line>>>::Error: std::error::Error,
96        S1: Future + Unpin + Send + 'static,
97        S2: Future + Unpin + Send + 'static,
98    {
99        let mut fp_map: IndexMap<FileFingerprint, FileWatcher> = Default::default();
100
101        let mut backoff_cap: usize = 1;
102        let mut lines = Vec::new();
103
104        checkpointer.read_checkpoints(self.ignore_before).await;
105
106        let mut known_small_files = HashMap::new();
107
108        let mut existing_files = Vec::new();
109        for path in self.paths_provider.paths().into_iter() {
110            if let Some(file_id) = self
111                .fingerprinter
112                .fingerprint_or_emit(&path, &mut known_small_files, &self.emitter)
113                .await
114            {
115                existing_files.push((path, file_id));
116            }
117        }
118
119        let metadata = join_all(
120            existing_files
121                .iter()
122                .map(|(path, _file_id)| fs::metadata(path)),
123        )
124        .await;
125
126        let created = metadata.into_iter().map(|m| {
127            m.and_then(|m| m.created())
128                .map(DateTime::<Utc>::from)
129                .unwrap_or_else(|_| Utc::now())
130        });
131
132        let mut existing_files: Vec<(DateTime<Utc>, PathBuf, FileFingerprint)> = existing_files
133            .into_iter()
134            .zip(created)
135            .map(|((path, file_id), key)| (key, path, file_id))
136            .collect();
137
138        existing_files.sort_by_key(|(key, _, _)| *key);
139
140        let checkpoints = checkpointer.view();
141
142        for (_key, path, file_id) in existing_files {
143            self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, true)
144                .await;
145        }
146        self.emitter.emit_files_open(fp_map.len());
147
148        let mut stats = TimingStats::default();
149
150        // Spawn the checkpoint writer task
151        let checkpoint_task_handle = tokio::spawn(checkpoint_writer(
152            checkpointer,
153            self.glob_minimum_cooldown,
154            shutdown_checkpointer,
155            self.emitter.clone(),
156        ));
157
158        // Alright friends, how does this work?
159        //
160        // We want to avoid burning up users' CPUs. To do this we sleep after
161        // reading lines out of files. But! We want to be responsive as well. We
162        // keep track of a 'backoff_cap' to decide how long we'll wait in any
163        // given loop. This cap grows each time we fail to read lines in an
164        // exponential fashion to some hard-coded cap. To reduce time using glob,
165        // we do not re-scan for major file changes (new files, moves, deletes),
166        // or write new checkpoints, on every iteration.
167        let mut next_glob_time = time::Instant::now();
168        loop {
169            // Glob find files to follow, but not too often.
170            let now_time = time::Instant::now();
171            if next_glob_time <= now_time {
172                // Schedule the next glob time.
173                next_glob_time = now_time.checked_add(self.glob_minimum_cooldown).unwrap();
174
175                if stats.started_at.elapsed() > Duration::from_secs(1) {
176                    stats.report();
177                }
178
179                if stats.started_at.elapsed() > Duration::from_secs(10) {
180                    stats = TimingStats::default();
181                }
182
183                // Search (glob) for files to detect major file changes.
184                let start = time::Instant::now();
185                for (_file_id, watcher) in &mut fp_map {
186                    watcher.set_file_findable(false); // assume not findable until found
187                }
188                for path in self.paths_provider.paths().into_iter() {
189                    if let Some(file_id) = self
190                        .fingerprinter
191                        .fingerprint_or_emit(&path, &mut known_small_files, &self.emitter)
192                        .await
193                    {
194                        if let Some(watcher) = fp_map.get_mut(&file_id) {
195                            // file fingerprint matches a watched file
196                            let was_found_this_cycle = watcher.file_findable();
197                            watcher.set_file_findable(true);
198                            if watcher.path == path {
199                                trace!(
200                                    message = "Continue watching file.",
201                                    path = ?path,
202                                );
203                            } else if !was_found_this_cycle {
204                                // matches a file with a different path
205                                info!(
206                                    message = "Watched file has been renamed.",
207                                    path = ?path,
208                                    old_path = ?watcher.path
209                                );
210                                watcher.update_path(path).await.ok(); // ok if this fails: might fix next cycle
211                            } else {
212                                info!(
213                                    message = "More than one file has the same fingerprint.",
214                                    path = ?path,
215                                    old_path = ?watcher.path
216                                );
217                                let (old_path, new_path) = (&watcher.path, &path);
218                                if let (Ok(old_modified_time), Ok(new_modified_time)) = (
219                                    fs::metadata(old_path).await.and_then(|m| m.modified()),
220                                    fs::metadata(new_path).await.and_then(|m| m.modified()),
221                                ) && old_modified_time < new_modified_time
222                                {
223                                    info!(
224                                        message = "Switching to watch most recently modified file.",
225                                        new_modified_time = ?new_modified_time,
226                                        old_modified_time = ?old_modified_time,
227                                    );
228                                    watcher.update_path(path).await.ok(); // ok if this fails: might fix next cycle
229                                }
230                            }
231                        } else {
232                            // untracked file fingerprint
233                            self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, false)
234                                .await;
235                            self.emitter.emit_files_open(fp_map.len());
236                        }
237                    }
238                }
239                stats.record("discovery", start.elapsed());
240            }
241
242            // Cleanup the known_small_files
243            if let Some(grace_period) = self.remove_after {
244                let mut set = JoinSet::new();
245
246                let remove_file_tasks: HashMap<Id, PathBuf> = known_small_files
247                    .iter()
248                    .filter(|&(_path, last_time_open)| last_time_open.elapsed() >= grace_period)
249                    .map(|(path, _last_time_open)| path.clone())
250                    .map(|path| {
251                        let path_ = path.clone();
252                        let abort_handle =
253                            set.spawn(async move { (path_.clone(), remove_file(&path_).await) });
254                        (abort_handle.id(), path)
255                    })
256                    .collect();
257
258                while let Some(res) = set.join_next().await {
259                    match res {
260                        Ok((path, Ok(()))) => {
261                            let removed = known_small_files.remove(&path);
262
263                            if removed.is_some() {
264                                self.emitter.emit_file_deleted(&path);
265                            }
266                        }
267                        Ok((path, Err(err))) => {
268                            self.emitter.emit_file_delete_error(&path, err);
269                        }
270                        Err(join_err) => {
271                            self.emitter.emit_file_delete_error(
272                                remove_file_tasks
273                                    .get(&join_err.id())
274                                    .expect("panicked/cancelled task id not in task id pool"),
275                                std::io::Error::other(join_err),
276                            );
277                        }
278                    }
279                }
280            }
281
282            // Collect lines by polling files.
283            let mut global_bytes_read: usize = 0;
284            let mut maxed_out_reading_single_file = false;
285            for (&file_id, watcher) in &mut fp_map {
286                if !watcher.should_read() {
287                    continue;
288                }
289
290                let start = time::Instant::now();
291                let mut bytes_read: usize = 0;
292                while let Ok(RawLineResult {
293                    raw_line: Some(line),
294                    discarded_for_size_and_truncated,
295                }) = watcher.read_line().await
296                {
297                    discarded_for_size_and_truncated.iter().for_each(|buf| {
298                        self.emitter.emit_file_line_too_long(
299                            &buf.clone(),
300                            self.max_line_bytes,
301                            buf.len(),
302                        )
303                    });
304
305                    let sz = line.bytes.len();
306                    trace!(
307                        message = "Read bytes.",
308                        path = ?watcher.path,
309                        bytes = ?sz
310                    );
311                    stats.record_bytes(sz);
312
313                    bytes_read += sz;
314
315                    lines.push(Line {
316                        text: line.bytes,
317                        filename: watcher.path.to_str().expect("not a valid path").to_owned(),
318                        file_id,
319                        start_offset: line.offset,
320                        end_offset: watcher.get_file_position(),
321                    });
322
323                    if bytes_read > self.max_read_bytes {
324                        maxed_out_reading_single_file = true;
325                        break;
326                    }
327                }
328                stats.record("reading", start.elapsed());
329
330                if bytes_read > 0 {
331                    global_bytes_read = global_bytes_read.saturating_add(bytes_read);
332                } else {
333                    // Should the file be removed
334                    if let Some(grace_period) = self.remove_after
335                        && watcher.last_read_success().elapsed() >= grace_period
336                    {
337                        // Try to remove
338                        match remove_file(&watcher.path).await {
339                            Ok(()) => {
340                                self.emitter.emit_file_deleted(&watcher.path);
341                                watcher.set_dead();
342                            }
343                            Err(error) => {
344                                // We will try again after some time.
345                                self.emitter.emit_file_delete_error(&watcher.path, error);
346                            }
347                        }
348                    }
349                }
350
351                // Do not move on to newer files if we are behind on an older file
352                if self.oldest_first && maxed_out_reading_single_file {
353                    break;
354                }
355            }
356
357            for (_, watcher) in &mut fp_map {
358                if !watcher.file_findable() && watcher.last_seen().elapsed() > self.rotate_wait {
359                    watcher.set_dead();
360                }
361            }
362
363            // A FileWatcher is dead when the underlying file has disappeared.
364            // If the FileWatcher is dead we don't retain it; it will be deallocated.
365            fp_map.retain(|file_id, watcher| {
366                if watcher.dead() {
367                    self.emitter
368                        .emit_file_unwatched(&watcher.path, watcher.reached_eof());
369                    checkpoints.set_dead(*file_id);
370                    false
371                } else {
372                    true
373                }
374            });
375            self.emitter.emit_files_open(fp_map.len());
376
377            let start = time::Instant::now();
378            let to_send = std::mem::take(&mut lines);
379
380            let result = chans.send(to_send).await;
381            match result {
382                Ok(()) => {}
383                Err(error) => {
384                    error!(message = "Output channel closed.", %error);
385                    return Err(error);
386                }
387            }
388            stats.record("sending", start.elapsed());
389
390            let start = time::Instant::now();
391            // When no lines have been read we kick the backup_cap up by twice,
392            // limited by the hard-coded cap. Else, we set the backup_cap to its
393            // minimum on the assumption that next time through there will be
394            // more lines to read promptly.
395            backoff_cap = if global_bytes_read == 0 {
396                cmp::min(2_048, backoff_cap.saturating_mul(2))
397            } else {
398                1
399            };
400            let backoff = backoff_cap.saturating_sub(global_bytes_read);
401
402            // This works only if run inside tokio context since we are using
403            // tokio's Timer. Outside of such context, this will panic on the first
404            // call. Also since we are using block_on here and in the above code,
405            // this should be run in its own thread. `spawn_blocking` fulfills
406            // all of these requirements.
407            let sleep = async move {
408                if backoff > 0 {
409                    sleep(Duration::from_millis(backoff as u64)).await;
410                }
411            };
412            futures::pin_mut!(sleep);
413            match select(shutdown_data, sleep).await {
414                Either::Left((_, _)) => {
415                    chans
416                        .close()
417                        .await
418                        .expect("error closing file_server data channel.");
419                    let checkpointer = checkpoint_task_handle
420                        .await
421                        .expect("checkpoint task has panicked");
422                    if let Err(error) = checkpointer.write_checkpoints().await {
423                        error!(?error, "Error writing checkpoints before shutdown");
424                    }
425                    return Ok(Shutdown);
426                }
427                Either::Right((_, future)) => shutdown_data = future,
428            }
429            stats.record("sleeping", start.elapsed());
430        }
431    }
432
433    async fn watch_new_file(
434        &self,
435        path: PathBuf,
436        file_id: FileFingerprint,
437        fp_map: &mut IndexMap<FileFingerprint, FileWatcher>,
438        checkpoints: &CheckpointsView,
439        startup: bool,
440    ) {
441        // Determine the initial _requested_ starting point in the file. This can be overridden
442        // once the file is actually opened and we determine it is compressed, older than we're
443        // configured to read, etc.
444        let fallback = if startup {
445            self.read_from
446        } else {
447            // Always read new files that show up while we're running from the beginning. There's
448            // not a good way to determine if they were moved or just created and written very
449            // quickly, so just make sure we're not missing any data.
450            ReadFrom::Beginning
451        };
452
453        // Always prefer the stored checkpoint unless the user has opted out.  Previously, the
454        // checkpoint was only loaded for new files when Vector was started up, but the
455        // `kubernetes_logs` source returns the files well after start-up, once it has populated
456        // them from the k8s metadata, so we now just always use the checkpoints unless opted out.
457        // https://github.com/vectordotdev/vector/issues/7139
458        let read_from = if !self.ignore_checkpoints {
459            checkpoints
460                .get(file_id)
461                .map(ReadFrom::Checkpoint)
462                .unwrap_or(fallback)
463        } else {
464            fallback
465        };
466
467        match FileWatcher::new(
468            path.clone(),
469            read_from,
470            self.ignore_before,
471            self.max_line_bytes,
472            self.line_delimiter.clone(),
473        )
474        .await
475        {
476            Ok(mut watcher) => {
477                if let ReadFrom::Checkpoint(file_position) = read_from {
478                    self.emitter.emit_file_resumed(&path, file_position);
479                } else {
480                    self.emitter.emit_file_added(&path);
481                }
482                watcher.set_file_findable(true);
483                fp_map.insert(file_id, watcher);
484            }
485            Err(error) => self.emitter.emit_file_watch_error(&path, error),
486        };
487    }
488}
489
490async fn checkpoint_writer(
491    checkpointer: Checkpointer,
492    sleep_duration: Duration,
493    mut shutdown: impl Future + Unpin,
494    emitter: impl FileSourceInternalEvents,
495) -> Arc<Checkpointer> {
496    let checkpointer = Arc::new(checkpointer);
497    loop {
498        let sleep = sleep(sleep_duration);
499        tokio::select! {
500            _ = &mut shutdown => break,
501            _ = sleep => {},
502        }
503
504        let emitter = emitter.clone();
505        let checkpointer = Arc::clone(&checkpointer);
506        let start = time::Instant::now();
507        match checkpointer.write_checkpoints().await {
508            Ok(count) => emitter.emit_file_checkpointed(count, start.elapsed()),
509            Err(error) => emitter.emit_file_checkpoint_write_error(error),
510        };
511    }
512    checkpointer
513}
514
515pub fn calculate_ignore_before(ignore_older_secs: Option<u64>) -> Option<DateTime<Utc>> {
516    ignore_older_secs.map(|secs| Utc::now() - chrono::Duration::seconds(secs as i64))
517}
518
519/// A sentinel type to signal that file server was gracefully shut down.
520///
521/// The purpose of this type is to clarify the semantics of the result values
522/// returned from the [`FileServer::run`] for both the users of the file server,
523/// and the implementors.
524#[derive(Debug)]
525pub struct Shutdown;
526
527struct TimingStats {
528    started_at: time::Instant,
529    segments: BTreeMap<&'static str, Duration>,
530    events: usize,
531    bytes: usize,
532}
533
534impl TimingStats {
535    fn record(&mut self, key: &'static str, duration: Duration) {
536        let segment = self.segments.entry(key).or_default();
537        *segment += duration;
538    }
539
540    fn record_bytes(&mut self, bytes: usize) {
541        self.events += 1;
542        self.bytes += bytes;
543    }
544
545    fn report(&self) {
546        if !tracing::level_enabled!(tracing::Level::DEBUG) {
547            return;
548        }
549        let total = self.started_at.elapsed();
550        let counted: Duration = self.segments.values().sum();
551        let other: Duration = total.saturating_sub(counted);
552        let mut ratios = self
553            .segments
554            .iter()
555            .map(|(k, v)| (*k, v.as_secs_f32() / total.as_secs_f32()))
556            .collect::<BTreeMap<_, _>>();
557        ratios.insert("other", other.as_secs_f32() / total.as_secs_f32());
558        let (event_throughput, bytes_throughput) = if total.as_secs() > 0 {
559            (
560                self.events as u64 / total.as_secs(),
561                self.bytes as u64 / total.as_secs(),
562            )
563        } else {
564            (0, 0)
565        };
566        debug!(event_throughput = %scale(event_throughput), bytes_throughput = %scale(bytes_throughput), ?ratios);
567    }
568}
569
570fn scale(bytes: u64) -> String {
571    let units = ["", "k", "m", "g"];
572    let mut bytes = bytes as f32;
573    let mut i = 0;
574    while bytes > 1000.0 && i <= 3 {
575        bytes /= 1000.0;
576        i += 1;
577    }
578    format!("{:.3}{}/sec", bytes, units[i])
579}
580
581impl Default for TimingStats {
582    fn default() -> Self {
583        Self {
584            started_at: time::Instant::now(),
585            segments: Default::default(),
586            events: Default::default(),
587            bytes: Default::default(),
588        }
589    }
590}
591
592#[derive(Debug)]
593pub struct Line {
594    pub text: Bytes,
595    pub filename: String,
596    pub file_id: FileFingerprint,
597    pub start_offset: u64,
598    pub end_offset: u64,
599}