file_source/file_watcher/
mod.rs

1use async_compression::tokio::bufread::GzipDecoder;
2use bytes::{Bytes, BytesMut};
3use chrono::{DateTime, Utc};
4use std::{
5    io::{self, SeekFrom},
6    path::PathBuf,
7    time::Duration,
8};
9use tokio::{
10    fs::File,
11    io::{AsyncBufRead, AsyncBufReadExt, AsyncSeekExt, BufReader},
12    time::Instant,
13};
14use tracing::debug;
15use vector_common::constants::GZIP_MAGIC;
16
17use file_source_common::{
18    AsyncFileInfo, FilePosition, PortableFileExt, ReadFrom,
19    buffer::{ReadResult, read_until_with_max_size},
20};
21
22#[cfg(test)]
23mod tests;
24
25/// The `RawLine` struct is a thin wrapper around the bytes that have been read
26/// in order to retain the context of where in the file they have been read from.
27///
28/// The offset field contains the byte offset of the beginning of the line within
29/// the file that it was read from.
30#[derive(Debug)]
31pub struct RawLine {
32    pub offset: u64,
33    pub bytes: Bytes,
34}
35
36#[derive(Debug)]
37pub struct RawLineResult {
38    pub raw_line: Option<RawLine>,
39    pub discarded_for_size_and_truncated: Vec<BytesMut>,
40}
41
42/// The `FileWatcher` struct defines the polling based state machine which reads
43/// from a file path, transparently updating the underlying file descriptor when
44/// the file has been rolled over, as is common for logs.
45///
46/// The `FileWatcher` is expected to live for the lifetime of the file
47/// path. `FileServer` is responsible for clearing away `FileWatchers` which no
48/// longer exist.
49pub struct FileWatcher {
50    pub path: PathBuf,
51    findable: bool,
52    reader: Box<dyn AsyncBufRead + Send + Unpin>,
53    file_position: FilePosition,
54    devno: u64,
55    inode: u64,
56    is_dead: bool,
57    reached_eof: bool,
58    last_read_attempt: Instant,
59    last_read_success: Instant,
60    last_seen: Instant,
61    max_line_bytes: usize,
62    line_delimiter: Bytes,
63    buf: BytesMut,
64}
65
66impl FileWatcher {
67    /// Create a new `FileWatcher`
68    ///
69    /// The input path will be used by `FileWatcher` to prime its state
70    /// machine. A `FileWatcher` tracks _only one_ file. This function returns
71    /// None if the path does not exist or is not readable by the current process.
72    pub async fn new(
73        path: PathBuf,
74        read_from: ReadFrom,
75        ignore_before: Option<DateTime<Utc>>,
76        max_line_bytes: usize,
77        line_delimiter: Bytes,
78    ) -> Result<FileWatcher, std::io::Error> {
79        let f = File::open(&path).await?;
80        let file_info = f.file_info().await?;
81        let (devno, ino) = (file_info.portable_dev(), file_info.portable_ino());
82
83        #[cfg(unix)]
84        let metadata = file_info;
85        #[cfg(windows)]
86        let metadata = f.metadata().await?;
87
88        let mut reader = BufReader::new(f);
89
90        let too_old = if let (Some(ignore_before), Ok(modified_time)) = (
91            ignore_before,
92            metadata.modified().map(DateTime::<Utc>::from),
93        ) {
94            modified_time < ignore_before
95        } else {
96            false
97        };
98
99        let gzipped = is_gzipped(&mut reader).await?;
100
101        // Determine the actual position at which we should start reading
102        let (reader, file_position): (Box<dyn AsyncBufRead + Send + Unpin>, FilePosition) =
103            match (gzipped, too_old, read_from) {
104                (true, true, _) => {
105                    debug!(
106                        message = "Not reading gzipped file older than `ignore_older`.",
107                        ?path,
108                    );
109                    (Box::new(null_reader()), 0)
110                }
111                (true, _, ReadFrom::Checkpoint(file_position)) => {
112                    debug!(
113                        message = "Not re-reading gzipped file with existing stored offset.",
114                        ?path,
115                        %file_position
116                    );
117                    (Box::new(null_reader()), file_position)
118                }
119                // TODO: This may become the default, leading us to stop reading gzipped files that
120                // we were reading before. Should we merge this and the next branch to read
121                // compressed file from the beginning even when `read_from = "end"` (implicitly via
122                // default or explicitly via config)?
123                (true, _, ReadFrom::End) => {
124                    debug!(
125                        message = "Can't read from the end of already-compressed file.",
126                        ?path,
127                    );
128                    (Box::new(null_reader()), 0)
129                }
130                (true, false, ReadFrom::Beginning) => {
131                    (Box::new(BufReader::new(GzipDecoder::new(reader))), 0)
132                }
133                (false, true, _) => {
134                    let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
135                    (Box::new(reader), pos)
136                }
137                (false, false, ReadFrom::Checkpoint(file_position)) => {
138                    let pos = reader.seek(SeekFrom::Start(file_position)).await.unwrap();
139                    (Box::new(reader), pos)
140                }
141                (false, false, ReadFrom::Beginning) => {
142                    let pos = reader.seek(SeekFrom::Start(0)).await.unwrap();
143                    (Box::new(reader), pos)
144                }
145                (false, false, ReadFrom::End) => {
146                    let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
147                    (Box::new(reader), pos)
148                }
149            };
150
151        let ts = metadata
152            .modified()
153            .ok()
154            .and_then(|mtime| mtime.elapsed().ok())
155            .and_then(|diff| Instant::now().checked_sub(diff))
156            .unwrap_or_else(Instant::now);
157
158        Ok(FileWatcher {
159            path,
160            findable: true,
161            reader,
162            file_position,
163            devno,
164            inode: ino,
165            is_dead: false,
166            reached_eof: false,
167            last_read_attempt: ts,
168            last_read_success: ts,
169            last_seen: ts,
170            max_line_bytes,
171            line_delimiter,
172            buf: BytesMut::new(),
173        })
174    }
175
176    pub async fn update_path(&mut self, path: PathBuf) -> io::Result<()> {
177        let file_handle = File::open(&path).await?;
178
179        let file_info = file_handle.file_info().await?;
180        if (file_info.portable_dev(), file_info.portable_ino()) != (self.devno, self.inode) {
181            let mut reader = BufReader::new(File::open(&path).await?);
182            let gzipped = is_gzipped(&mut reader).await?;
183            let new_reader: Box<dyn AsyncBufRead + Send + Unpin> = if gzipped {
184                if self.file_position != 0 {
185                    Box::new(null_reader())
186                } else {
187                    Box::new(BufReader::new(GzipDecoder::new(reader)))
188                }
189            } else {
190                reader.seek(io::SeekFrom::Start(self.file_position)).await?;
191                Box::new(reader)
192            };
193            self.reader = new_reader;
194
195            let file_info = file_handle.file_info().await?;
196            self.devno = file_info.portable_dev();
197            self.inode = file_info.portable_ino();
198        }
199        self.path = path;
200        Ok(())
201    }
202
203    pub fn set_file_findable(&mut self, f: bool) {
204        self.findable = f;
205        if f {
206            self.last_seen = Instant::now();
207        }
208    }
209
210    pub fn file_findable(&self) -> bool {
211        self.findable
212    }
213
214    pub fn set_dead(&mut self) {
215        self.is_dead = true;
216    }
217
218    pub fn dead(&self) -> bool {
219        self.is_dead
220    }
221
222    pub fn get_file_position(&self) -> FilePosition {
223        self.file_position
224    }
225
226    /// Read a single line from the underlying file
227    ///
228    /// This function will attempt to read a new line from its file, blocking,
229    /// up to some maximum but unspecified amount of time. `read_line` will open
230    /// a new file handler as needed, transparently to the caller.
231    pub(super) async fn read_line(&mut self) -> io::Result<RawLineResult> {
232        self.track_read_attempt();
233
234        let reader = &mut self.reader;
235        let file_position = &mut self.file_position;
236        let initial_position = *file_position;
237        match read_until_with_max_size(
238            Box::pin(reader),
239            file_position,
240            self.line_delimiter.as_ref(),
241            &mut self.buf,
242            self.max_line_bytes,
243        )
244        .await
245        {
246            Ok(ReadResult {
247                successfully_read: Some(_),
248                discarded_for_size_and_truncated,
249            }) => {
250                self.track_read_success();
251                Ok(RawLineResult {
252                    raw_line: Some(RawLine {
253                        offset: initial_position,
254                        bytes: self.buf.split().freeze(),
255                    }),
256                    discarded_for_size_and_truncated,
257                })
258            }
259            Ok(ReadResult {
260                successfully_read: None,
261                discarded_for_size_and_truncated,
262            }) => {
263                if !self.file_findable() {
264                    self.set_dead();
265                    // File has been deleted, so return what we have in the buffer, even though it
266                    // didn't end with a newline. This is not a perfect signal for when we should
267                    // give up waiting for a newline, but it's decent.
268                    let buf = self.buf.split().freeze();
269                    if buf.is_empty() {
270                        // EOF
271                        self.reached_eof = true;
272                        Ok(RawLineResult {
273                            raw_line: None,
274                            discarded_for_size_and_truncated,
275                        })
276                    } else {
277                        Ok(RawLineResult {
278                            raw_line: Some(RawLine {
279                                offset: initial_position,
280                                bytes: buf,
281                            }),
282                            discarded_for_size_and_truncated,
283                        })
284                    }
285                } else {
286                    self.reached_eof = true;
287                    Ok(RawLineResult {
288                        raw_line: None,
289                        discarded_for_size_and_truncated,
290                    })
291                }
292            }
293            Err(e) => {
294                if let io::ErrorKind::NotFound = e.kind() {
295                    self.set_dead();
296                }
297                Err(e)
298            }
299        }
300    }
301
302    #[inline]
303    fn track_read_attempt(&mut self) {
304        self.last_read_attempt = Instant::now();
305    }
306
307    #[inline]
308    fn track_read_success(&mut self) {
309        self.last_read_success = Instant::now();
310    }
311
312    #[inline]
313    pub fn last_read_success(&self) -> Instant {
314        self.last_read_success
315    }
316
317    #[inline]
318    pub fn should_read(&self) -> bool {
319        self.last_read_success.elapsed() < Duration::from_secs(10)
320            || self.last_read_attempt.elapsed() > Duration::from_secs(10)
321    }
322
323    #[inline]
324    pub fn last_seen(&self) -> Instant {
325        self.last_seen
326    }
327
328    #[inline]
329    pub fn reached_eof(&self) -> bool {
330        self.reached_eof
331    }
332}
333
334async fn is_gzipped(r: &mut BufReader<File>) -> io::Result<bool> {
335    let header_bytes = r.fill_buf().await?;
336    // WARN: The paired `BufReader::consume` is not called intentionally. If we
337    // do we'll chop a decent part of the potential gzip stream off.
338    Ok(header_bytes.starts_with(GZIP_MAGIC))
339}
340
341fn null_reader() -> impl AsyncBufRead {
342    io::Cursor::new(Vec::new())
343}