file_source/file_watcher/
mod.rs

1use async_compression::tokio::bufread::GzipDecoder;
2use bytes::{Bytes, BytesMut};
3use chrono::{DateTime, Utc};
4use std::{
5    io::{self, SeekFrom},
6    path::PathBuf,
7    time::Duration,
8};
9use tokio::{
10    fs::File,
11    io::{AsyncBufRead, AsyncBufReadExt, AsyncSeekExt, BufReader},
12    time::Instant,
13};
14use tracing::debug;
15use vector_common::constants::GZIP_MAGIC;
16
17use file_source_common::{
18    AsyncFileInfo, FilePosition, PortableFileExt, ReadFrom,
19    buffer::{ReadResult, read_until_with_max_size},
20};
21
22const EOF_READ_BACKOFF_MIN: Duration = Duration::from_millis(1);
23const EOF_READ_BACKOFF_MAX: Duration = Duration::from_millis(250);
24
25#[cfg(test)]
26mod tests;
27
28/// The `RawLine` struct is a thin wrapper around the bytes that have been read
29/// in order to retain the context of where in the file they have been read from.
30///
31/// The offset field contains the byte offset of the beginning of the line within
32/// the file that it was read from.
33#[derive(Debug)]
34pub struct RawLine {
35    pub offset: u64,
36    pub bytes: Bytes,
37}
38
39#[derive(Debug)]
40pub struct RawLineResult {
41    pub raw_line: Option<RawLine>,
42    pub discarded_for_size_and_truncated: Vec<BytesMut>,
43}
44
45/// The `FileWatcher` struct defines the polling based state machine which reads
46/// from a file path, transparently updating the underlying file descriptor when
47/// the file has been rolled over, as is common for logs.
48///
49/// The `FileWatcher` is expected to live for the lifetime of the file
50/// path. `FileServer` is responsible for clearing away `FileWatchers` which no
51/// longer exist.
52pub struct FileWatcher {
53    pub path: PathBuf,
54    findable: bool,
55    reader: Box<dyn AsyncBufRead + Send + Unpin>,
56    file_position: FilePosition,
57    devno: u64,
58    inode: u64,
59    is_dead: bool,
60    reached_eof: bool,
61    last_read_attempt: Instant,
62    last_read_success: Instant,
63    read_retry_delay: Duration,
64    last_seen: Instant,
65    max_line_bytes: usize,
66    line_delimiter: Bytes,
67    buf: BytesMut,
68}
69
70impl FileWatcher {
71    /// Create a new `FileWatcher`
72    ///
73    /// The input path will be used by `FileWatcher` to prime its state
74    /// machine. A `FileWatcher` tracks _only one_ file. This function returns
75    /// None if the path does not exist or is not readable by the current process.
76    pub async fn new(
77        path: PathBuf,
78        read_from: ReadFrom,
79        ignore_before: Option<DateTime<Utc>>,
80        max_line_bytes: usize,
81        line_delimiter: Bytes,
82    ) -> Result<FileWatcher, std::io::Error> {
83        let f = File::open(&path).await?;
84        let file_info = f.file_info().await?;
85        let (devno, ino) = (file_info.portable_dev(), file_info.portable_ino());
86
87        #[cfg(unix)]
88        let metadata = file_info;
89        #[cfg(windows)]
90        let metadata = f.metadata().await?;
91
92        let mut reader = BufReader::new(f);
93
94        let too_old = if let (Some(ignore_before), Ok(modified_time)) = (
95            ignore_before,
96            metadata.modified().map(DateTime::<Utc>::from),
97        ) {
98            modified_time < ignore_before
99        } else {
100            false
101        };
102
103        let gzipped = is_gzipped(&mut reader).await?;
104
105        // Determine the actual position at which we should start reading
106        let (reader, file_position): (Box<dyn AsyncBufRead + Send + Unpin>, FilePosition) =
107            match (gzipped, too_old, read_from) {
108                (true, true, _) => {
109                    debug!(
110                        message = "Not reading gzipped file older than `ignore_older`.",
111                        ?path,
112                    );
113                    (Box::new(null_reader()), 0)
114                }
115                (true, _, ReadFrom::Checkpoint(file_position)) => {
116                    debug!(
117                        message = "Not re-reading gzipped file with existing stored offset.",
118                        ?path,
119                        %file_position
120                    );
121                    (Box::new(null_reader()), file_position)
122                }
123                // TODO: This may become the default, leading us to stop reading gzipped files that
124                // we were reading before. Should we merge this and the next branch to read
125                // compressed file from the beginning even when `read_from = "end"` (implicitly via
126                // default or explicitly via config)?
127                (true, _, ReadFrom::End) => {
128                    debug!(
129                        message = "Can't read from the end of already-compressed file.",
130                        ?path,
131                    );
132                    (Box::new(null_reader()), 0)
133                }
134                (true, false, ReadFrom::Beginning) => {
135                    (Box::new(BufReader::new(GzipDecoder::new(reader))), 0)
136                }
137                (false, true, _) => {
138                    let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
139                    (Box::new(reader), pos)
140                }
141                (false, false, ReadFrom::Checkpoint(file_position)) => {
142                    let pos = reader.seek(SeekFrom::Start(file_position)).await.unwrap();
143                    (Box::new(reader), pos)
144                }
145                (false, false, ReadFrom::Beginning) => {
146                    let pos = reader.seek(SeekFrom::Start(0)).await.unwrap();
147                    (Box::new(reader), pos)
148                }
149                (false, false, ReadFrom::End) => {
150                    let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
151                    (Box::new(reader), pos)
152                }
153            };
154
155        let ts = metadata
156            .modified()
157            .ok()
158            .and_then(|mtime| mtime.elapsed().ok())
159            .and_then(|diff| Instant::now().checked_sub(diff))
160            .unwrap_or_else(Instant::now);
161
162        Ok(FileWatcher {
163            path,
164            findable: true,
165            reader,
166            file_position,
167            devno,
168            inode: ino,
169            is_dead: false,
170            reached_eof: false,
171            last_read_attempt: ts,
172            last_read_success: ts,
173            read_retry_delay: EOF_READ_BACKOFF_MIN,
174            last_seen: ts,
175            max_line_bytes,
176            line_delimiter,
177            buf: BytesMut::new(),
178        })
179    }
180
181    pub async fn update_path(&mut self, path: PathBuf) -> io::Result<()> {
182        let file_handle = File::open(&path).await?;
183
184        let file_info = file_handle.file_info().await?;
185        if (file_info.portable_dev(), file_info.portable_ino()) != (self.devno, self.inode) {
186            let mut reader = BufReader::new(File::open(&path).await?);
187            let gzipped = is_gzipped(&mut reader).await?;
188            let new_reader: Box<dyn AsyncBufRead + Send + Unpin> = if gzipped {
189                if self.file_position != 0 {
190                    Box::new(null_reader())
191                } else {
192                    Box::new(BufReader::new(GzipDecoder::new(reader)))
193                }
194            } else {
195                reader.seek(io::SeekFrom::Start(self.file_position)).await?;
196                Box::new(reader)
197            };
198            self.reader = new_reader;
199
200            let file_info = file_handle.file_info().await?;
201            self.devno = file_info.portable_dev();
202            self.inode = file_info.portable_ino();
203        }
204        self.reached_eof = false;
205        self.read_retry_delay = EOF_READ_BACKOFF_MIN;
206        self.path = path;
207        Ok(())
208    }
209
210    pub fn set_file_findable(&mut self, f: bool) {
211        self.findable = f;
212        if f {
213            self.last_seen = Instant::now();
214        }
215    }
216
217    pub fn file_findable(&self) -> bool {
218        self.findable
219    }
220
221    pub fn set_dead(&mut self) {
222        self.is_dead = true;
223    }
224
225    pub fn dead(&self) -> bool {
226        self.is_dead
227    }
228
229    pub fn get_file_position(&self) -> FilePosition {
230        self.file_position
231    }
232
233    /// Read a single line from the underlying file
234    ///
235    /// This function will attempt to read a new line from its file, blocking,
236    /// up to some maximum but unspecified amount of time. `read_line` will open
237    /// a new file handler as needed, transparently to the caller.
238    pub(super) async fn read_line(&mut self) -> io::Result<RawLineResult> {
239        self.track_read_attempt();
240
241        let reader = &mut self.reader;
242        let file_position = &mut self.file_position;
243        let initial_position = *file_position;
244        match read_until_with_max_size(
245            reader.as_mut(),
246            file_position,
247            self.line_delimiter.as_ref(),
248            &mut self.buf,
249            self.max_line_bytes,
250        )
251        .await
252        {
253            Ok(ReadResult {
254                successfully_read: Some(_),
255                discarded_for_size_and_truncated,
256            }) => {
257                self.track_read_success();
258                Ok(RawLineResult {
259                    raw_line: Some(RawLine {
260                        offset: initial_position,
261                        bytes: self.buf.split().freeze(),
262                    }),
263                    discarded_for_size_and_truncated,
264                })
265            }
266            Ok(ReadResult {
267                successfully_read: None,
268                discarded_for_size_and_truncated,
269            }) => {
270                if !self.file_findable() {
271                    self.set_dead();
272                    // File has been deleted, so return what we have in the buffer, even though it
273                    // didn't end with a newline. This is not a perfect signal for when we should
274                    // give up waiting for a newline, but it's decent.
275                    let buf = self.buf.split().freeze();
276                    if buf.is_empty() {
277                        // EOF
278                        self.reached_eof = true;
279                        Ok(RawLineResult {
280                            raw_line: None,
281                            discarded_for_size_and_truncated,
282                        })
283                    } else {
284                        Ok(RawLineResult {
285                            raw_line: Some(RawLine {
286                                offset: initial_position,
287                                bytes: buf,
288                            }),
289                            discarded_for_size_and_truncated,
290                        })
291                    }
292                } else {
293                    self.track_read_eof();
294                    Ok(RawLineResult {
295                        raw_line: None,
296                        discarded_for_size_and_truncated,
297                    })
298                }
299            }
300            Err(e) => {
301                if let io::ErrorKind::NotFound = e.kind() {
302                    self.set_dead();
303                }
304                Err(e)
305            }
306        }
307    }
308
309    #[inline]
310    fn track_read_attempt(&mut self) {
311        self.last_read_attempt = Instant::now();
312    }
313
314    #[inline]
315    fn track_read_success(&mut self) {
316        self.reached_eof = false;
317        self.read_retry_delay = EOF_READ_BACKOFF_MIN;
318        self.last_read_success = Instant::now();
319    }
320
321    #[inline]
322    fn track_read_eof(&mut self) {
323        self.read_retry_delay = if self.reached_eof {
324            std::cmp::min(
325                self.read_retry_delay.saturating_mul(2),
326                EOF_READ_BACKOFF_MAX,
327            )
328        } else {
329            EOF_READ_BACKOFF_MIN
330        };
331        self.reached_eof = true;
332    }
333
334    #[inline]
335    pub fn last_read_success(&self) -> Instant {
336        self.last_read_success
337    }
338
339    #[inline]
340    pub fn should_read(&self) -> bool {
341        if self.reached_eof && self.last_read_attempt.elapsed() < self.read_retry_delay {
342            return false;
343        }
344
345        self.last_read_success.elapsed() < Duration::from_secs(10)
346            || self.last_read_attempt.elapsed() > Duration::from_secs(10)
347    }
348
349    #[inline]
350    pub fn last_seen(&self) -> Instant {
351        self.last_seen
352    }
353
354    #[inline]
355    pub fn reached_eof(&self) -> bool {
356        self.reached_eof
357    }
358}
359
360async fn is_gzipped(r: &mut BufReader<File>) -> io::Result<bool> {
361    let header_bytes = r.fill_buf().await?;
362    // WARN: The paired `BufReader::consume` is not called intentionally. If we
363    // do we'll chop a decent part of the potential gzip stream off.
364    Ok(header_bytes.starts_with(GZIP_MAGIC))
365}
366
367fn null_reader() -> impl AsyncBufRead {
368    io::Cursor::new(Vec::new())
369}