file_source/file_watcher/
mod.rs

1use std::{
2    fs::{self, File},
3    io::{self, BufRead, Seek},
4    path::PathBuf,
5    time::{Duration, Instant},
6};
7
8use bytes::{Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use flate2::bufread::MultiGzDecoder;
11use tracing::debug;
12use vector_common::constants::GZIP_MAGIC;
13
14use crate::{
15    buffer::{read_until_with_max_size, ReadResult},
16    metadata_ext::PortableFileExt,
17    FilePosition, ReadFrom,
18};
19#[cfg(test)]
20mod tests;
21
22/// The `RawLine` struct is a thin wrapper around the bytes that have been read
23/// in order to retain the context of where in the file they have been read from.
24///
25/// The offset field contains the byte offset of the beginning of the line within
26/// the file that it was read from.
27#[derive(Debug)]
28pub(super) struct RawLine {
29    pub offset: u64,
30    pub bytes: Bytes,
31}
32
33#[derive(Debug)]
34pub struct RawLineResult {
35    pub raw_line: Option<RawLine>,
36    pub discarded_for_size_and_truncated: Vec<BytesMut>,
37}
38
39/// The `FileWatcher` struct defines the polling based state machine which reads
40/// from a file path, transparently updating the underlying file descriptor when
41/// the file has been rolled over, as is common for logs.
42///
43/// The `FileWatcher` is expected to live for the lifetime of the file
44/// path. `FileServer` is responsible for clearing away `FileWatchers` which no
45/// longer exist.
46pub struct FileWatcher {
47    pub path: PathBuf,
48    findable: bool,
49    reader: Box<dyn BufRead>,
50    file_position: FilePosition,
51    devno: u64,
52    inode: u64,
53    is_dead: bool,
54    reached_eof: bool,
55    last_read_attempt: Instant,
56    last_read_success: Instant,
57    last_seen: Instant,
58    max_line_bytes: usize,
59    line_delimiter: Bytes,
60    buf: BytesMut,
61}
62
63impl FileWatcher {
64    /// Create a new `FileWatcher`
65    ///
66    /// The input path will be used by `FileWatcher` to prime its state
67    /// machine. A `FileWatcher` tracks _only one_ file. This function returns
68    /// None if the path does not exist or is not readable by the current process.
69    pub fn new(
70        path: PathBuf,
71        read_from: ReadFrom,
72        ignore_before: Option<DateTime<Utc>>,
73        max_line_bytes: usize,
74        line_delimiter: Bytes,
75    ) -> Result<FileWatcher, io::Error> {
76        let f = fs::File::open(&path)?;
77        let (devno, ino) = (f.portable_dev()?, f.portable_ino()?);
78        let metadata = f.metadata()?;
79        let mut reader = io::BufReader::new(f);
80
81        let too_old = if let (Some(ignore_before), Ok(modified_time)) = (
82            ignore_before,
83            metadata.modified().map(DateTime::<Utc>::from),
84        ) {
85            modified_time < ignore_before
86        } else {
87            false
88        };
89
90        let gzipped = is_gzipped(&mut reader)?;
91
92        // Determine the actual position at which we should start reading
93        let (reader, file_position): (Box<dyn BufRead>, FilePosition) =
94            match (gzipped, too_old, read_from) {
95                (true, true, _) => {
96                    debug!(
97                        message = "Not reading gzipped file older than `ignore_older`.",
98                        ?path,
99                    );
100                    (Box::new(null_reader()), 0)
101                }
102                (true, _, ReadFrom::Checkpoint(file_position)) => {
103                    debug!(
104                        message = "Not re-reading gzipped file with existing stored offset.",
105                        ?path,
106                        %file_position
107                    );
108                    (Box::new(null_reader()), file_position)
109                }
110                // TODO: This may become the default, leading us to stop reading gzipped files that
111                // we were reading before. Should we merge this and the next branch to read
112                // compressed file from the beginning even when `read_from = "end"` (implicitly via
113                // default or explicitly via config)?
114                (true, _, ReadFrom::End) => {
115                    debug!(
116                        message = "Can't read from the end of already-compressed file.",
117                        ?path,
118                    );
119                    (Box::new(null_reader()), 0)
120                }
121                (true, false, ReadFrom::Beginning) => {
122                    (Box::new(io::BufReader::new(MultiGzDecoder::new(reader))), 0)
123                }
124                (false, true, _) => {
125                    let pos = reader.seek(io::SeekFrom::End(0)).unwrap();
126                    (Box::new(reader), pos)
127                }
128                (false, false, ReadFrom::Checkpoint(file_position)) => {
129                    let pos = reader.seek(io::SeekFrom::Start(file_position)).unwrap();
130                    (Box::new(reader), pos)
131                }
132                (false, false, ReadFrom::Beginning) => {
133                    let pos = reader.seek(io::SeekFrom::Start(0)).unwrap();
134                    (Box::new(reader), pos)
135                }
136                (false, false, ReadFrom::End) => {
137                    let pos = reader.seek(io::SeekFrom::End(0)).unwrap();
138                    (Box::new(reader), pos)
139                }
140            };
141
142        let ts = metadata
143            .modified()
144            .ok()
145            .and_then(|mtime| mtime.elapsed().ok())
146            .and_then(|diff| Instant::now().checked_sub(diff))
147            .unwrap_or_else(Instant::now);
148
149        Ok(FileWatcher {
150            path,
151            findable: true,
152            reader,
153            file_position,
154            devno,
155            inode: ino,
156            is_dead: false,
157            reached_eof: false,
158            last_read_attempt: ts,
159            last_read_success: ts,
160            last_seen: ts,
161            max_line_bytes,
162            line_delimiter,
163            buf: BytesMut::new(),
164        })
165    }
166
167    pub fn update_path(&mut self, path: PathBuf) -> io::Result<()> {
168        let file_handle = File::open(&path)?;
169        if (file_handle.portable_dev()?, file_handle.portable_ino()?) != (self.devno, self.inode) {
170            let mut reader = io::BufReader::new(fs::File::open(&path)?);
171            let gzipped = is_gzipped(&mut reader)?;
172            let new_reader: Box<dyn BufRead> = if gzipped {
173                if self.file_position != 0 {
174                    Box::new(null_reader())
175                } else {
176                    Box::new(io::BufReader::new(MultiGzDecoder::new(reader)))
177                }
178            } else {
179                reader.seek(io::SeekFrom::Start(self.file_position))?;
180                Box::new(reader)
181            };
182            self.reader = new_reader;
183            self.devno = file_handle.portable_dev()?;
184            self.inode = file_handle.portable_ino()?;
185        }
186        self.path = path;
187        Ok(())
188    }
189
190    pub fn set_file_findable(&mut self, f: bool) {
191        self.findable = f;
192        if f {
193            self.last_seen = Instant::now();
194        }
195    }
196
197    pub fn file_findable(&self) -> bool {
198        self.findable
199    }
200
201    pub fn set_dead(&mut self) {
202        self.is_dead = true;
203    }
204
205    pub fn dead(&self) -> bool {
206        self.is_dead
207    }
208
209    pub fn get_file_position(&self) -> FilePosition {
210        self.file_position
211    }
212
213    /// Read a single line from the underlying file
214    ///
215    /// This function will attempt to read a new line from its file, blocking,
216    /// up to some maximum but unspecified amount of time. `read_line` will open
217    /// a new file handler as needed, transparently to the caller.
218    pub(super) fn read_line(&mut self) -> io::Result<RawLineResult> {
219        self.track_read_attempt();
220
221        let reader = &mut self.reader;
222        let file_position = &mut self.file_position;
223        let initial_position = *file_position;
224        match read_until_with_max_size(
225            reader,
226            file_position,
227            self.line_delimiter.as_ref(),
228            &mut self.buf,
229            self.max_line_bytes,
230        ) {
231            Ok(ReadResult {
232                successfully_read: Some(_),
233                discarded_for_size_and_truncated,
234            }) => {
235                self.track_read_success();
236                Ok(RawLineResult {
237                    raw_line: Some(RawLine {
238                        offset: initial_position,
239                        bytes: self.buf.split().freeze(),
240                    }),
241                    discarded_for_size_and_truncated,
242                })
243            }
244            Ok(ReadResult {
245                successfully_read: None,
246                discarded_for_size_and_truncated,
247            }) => {
248                if !self.file_findable() {
249                    self.set_dead();
250                    // File has been deleted, so return what we have in the buffer, even though it
251                    // didn't end with a newline. This is not a perfect signal for when we should
252                    // give up waiting for a newline, but it's decent.
253                    let buf = self.buf.split().freeze();
254                    if buf.is_empty() {
255                        // EOF
256                        self.reached_eof = true;
257                        Ok(RawLineResult {
258                            raw_line: None,
259                            discarded_for_size_and_truncated,
260                        })
261                    } else {
262                        Ok(RawLineResult {
263                            raw_line: Some(RawLine {
264                                offset: initial_position,
265                                bytes: buf,
266                            }),
267                            discarded_for_size_and_truncated,
268                        })
269                    }
270                } else {
271                    self.reached_eof = true;
272                    Ok(RawLineResult {
273                        raw_line: None,
274                        discarded_for_size_and_truncated,
275                    })
276                }
277            }
278            Err(e) => {
279                if let io::ErrorKind::NotFound = e.kind() {
280                    self.set_dead();
281                }
282                Err(e)
283            }
284        }
285    }
286
287    #[inline]
288    fn track_read_attempt(&mut self) {
289        self.last_read_attempt = Instant::now();
290    }
291
292    #[inline]
293    fn track_read_success(&mut self) {
294        self.last_read_success = Instant::now();
295    }
296
297    #[inline]
298    pub fn last_read_success(&self) -> Instant {
299        self.last_read_success
300    }
301
302    #[inline]
303    pub fn should_read(&self) -> bool {
304        self.last_read_success.elapsed() < Duration::from_secs(10)
305            || self.last_read_attempt.elapsed() > Duration::from_secs(10)
306    }
307
308    #[inline]
309    pub fn last_seen(&self) -> Instant {
310        self.last_seen
311    }
312
313    #[inline]
314    pub fn reached_eof(&self) -> bool {
315        self.reached_eof
316    }
317}
318
319fn is_gzipped(r: &mut io::BufReader<fs::File>) -> io::Result<bool> {
320    let header_bytes = r.fill_buf()?;
321    // WARN: The paired `BufReader::consume` is not called intentionally. If we
322    // do we'll chop a decent part of the potential gzip stream off.
323    Ok(header_bytes.starts_with(GZIP_MAGIC))
324}
325
326fn null_reader() -> impl BufRead {
327    io::Cursor::new(Vec::new())
328}