file_source/file_watcher/
mod.rs1use std::{
2 fs::{self, File},
3 io::{self, BufRead, Seek},
4 path::PathBuf,
5 time::{Duration, Instant},
6};
7
8use bytes::{Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use flate2::bufread::MultiGzDecoder;
11use tracing::debug;
12use vector_common::constants::GZIP_MAGIC;
13
14use crate::{
15 buffer::{read_until_with_max_size, ReadResult},
16 metadata_ext::PortableFileExt,
17 FilePosition, ReadFrom,
18};
19#[cfg(test)]
20mod tests;
21
22#[derive(Debug)]
28pub(super) struct RawLine {
29 pub offset: u64,
30 pub bytes: Bytes,
31}
32
33#[derive(Debug)]
34pub struct RawLineResult {
35 pub raw_line: Option<RawLine>,
36 pub discarded_for_size_and_truncated: Vec<BytesMut>,
37}
38
39pub struct FileWatcher {
47 pub path: PathBuf,
48 findable: bool,
49 reader: Box<dyn BufRead>,
50 file_position: FilePosition,
51 devno: u64,
52 inode: u64,
53 is_dead: bool,
54 reached_eof: bool,
55 last_read_attempt: Instant,
56 last_read_success: Instant,
57 last_seen: Instant,
58 max_line_bytes: usize,
59 line_delimiter: Bytes,
60 buf: BytesMut,
61}
62
63impl FileWatcher {
64 pub fn new(
70 path: PathBuf,
71 read_from: ReadFrom,
72 ignore_before: Option<DateTime<Utc>>,
73 max_line_bytes: usize,
74 line_delimiter: Bytes,
75 ) -> Result<FileWatcher, io::Error> {
76 let f = fs::File::open(&path)?;
77 let (devno, ino) = (f.portable_dev()?, f.portable_ino()?);
78 let metadata = f.metadata()?;
79 let mut reader = io::BufReader::new(f);
80
81 let too_old = if let (Some(ignore_before), Ok(modified_time)) = (
82 ignore_before,
83 metadata.modified().map(DateTime::<Utc>::from),
84 ) {
85 modified_time < ignore_before
86 } else {
87 false
88 };
89
90 let gzipped = is_gzipped(&mut reader)?;
91
92 let (reader, file_position): (Box<dyn BufRead>, FilePosition) =
94 match (gzipped, too_old, read_from) {
95 (true, true, _) => {
96 debug!(
97 message = "Not reading gzipped file older than `ignore_older`.",
98 ?path,
99 );
100 (Box::new(null_reader()), 0)
101 }
102 (true, _, ReadFrom::Checkpoint(file_position)) => {
103 debug!(
104 message = "Not re-reading gzipped file with existing stored offset.",
105 ?path,
106 %file_position
107 );
108 (Box::new(null_reader()), file_position)
109 }
110 (true, _, ReadFrom::End) => {
115 debug!(
116 message = "Can't read from the end of already-compressed file.",
117 ?path,
118 );
119 (Box::new(null_reader()), 0)
120 }
121 (true, false, ReadFrom::Beginning) => {
122 (Box::new(io::BufReader::new(MultiGzDecoder::new(reader))), 0)
123 }
124 (false, true, _) => {
125 let pos = reader.seek(io::SeekFrom::End(0)).unwrap();
126 (Box::new(reader), pos)
127 }
128 (false, false, ReadFrom::Checkpoint(file_position)) => {
129 let pos = reader.seek(io::SeekFrom::Start(file_position)).unwrap();
130 (Box::new(reader), pos)
131 }
132 (false, false, ReadFrom::Beginning) => {
133 let pos = reader.seek(io::SeekFrom::Start(0)).unwrap();
134 (Box::new(reader), pos)
135 }
136 (false, false, ReadFrom::End) => {
137 let pos = reader.seek(io::SeekFrom::End(0)).unwrap();
138 (Box::new(reader), pos)
139 }
140 };
141
142 let ts = metadata
143 .modified()
144 .ok()
145 .and_then(|mtime| mtime.elapsed().ok())
146 .and_then(|diff| Instant::now().checked_sub(diff))
147 .unwrap_or_else(Instant::now);
148
149 Ok(FileWatcher {
150 path,
151 findable: true,
152 reader,
153 file_position,
154 devno,
155 inode: ino,
156 is_dead: false,
157 reached_eof: false,
158 last_read_attempt: ts,
159 last_read_success: ts,
160 last_seen: ts,
161 max_line_bytes,
162 line_delimiter,
163 buf: BytesMut::new(),
164 })
165 }
166
167 pub fn update_path(&mut self, path: PathBuf) -> io::Result<()> {
168 let file_handle = File::open(&path)?;
169 if (file_handle.portable_dev()?, file_handle.portable_ino()?) != (self.devno, self.inode) {
170 let mut reader = io::BufReader::new(fs::File::open(&path)?);
171 let gzipped = is_gzipped(&mut reader)?;
172 let new_reader: Box<dyn BufRead> = if gzipped {
173 if self.file_position != 0 {
174 Box::new(null_reader())
175 } else {
176 Box::new(io::BufReader::new(MultiGzDecoder::new(reader)))
177 }
178 } else {
179 reader.seek(io::SeekFrom::Start(self.file_position))?;
180 Box::new(reader)
181 };
182 self.reader = new_reader;
183 self.devno = file_handle.portable_dev()?;
184 self.inode = file_handle.portable_ino()?;
185 }
186 self.path = path;
187 Ok(())
188 }
189
190 pub fn set_file_findable(&mut self, f: bool) {
191 self.findable = f;
192 if f {
193 self.last_seen = Instant::now();
194 }
195 }
196
197 pub fn file_findable(&self) -> bool {
198 self.findable
199 }
200
201 pub fn set_dead(&mut self) {
202 self.is_dead = true;
203 }
204
205 pub fn dead(&self) -> bool {
206 self.is_dead
207 }
208
209 pub fn get_file_position(&self) -> FilePosition {
210 self.file_position
211 }
212
213 pub(super) fn read_line(&mut self) -> io::Result<RawLineResult> {
219 self.track_read_attempt();
220
221 let reader = &mut self.reader;
222 let file_position = &mut self.file_position;
223 let initial_position = *file_position;
224 match read_until_with_max_size(
225 reader,
226 file_position,
227 self.line_delimiter.as_ref(),
228 &mut self.buf,
229 self.max_line_bytes,
230 ) {
231 Ok(ReadResult {
232 successfully_read: Some(_),
233 discarded_for_size_and_truncated,
234 }) => {
235 self.track_read_success();
236 Ok(RawLineResult {
237 raw_line: Some(RawLine {
238 offset: initial_position,
239 bytes: self.buf.split().freeze(),
240 }),
241 discarded_for_size_and_truncated,
242 })
243 }
244 Ok(ReadResult {
245 successfully_read: None,
246 discarded_for_size_and_truncated,
247 }) => {
248 if !self.file_findable() {
249 self.set_dead();
250 let buf = self.buf.split().freeze();
254 if buf.is_empty() {
255 self.reached_eof = true;
257 Ok(RawLineResult {
258 raw_line: None,
259 discarded_for_size_and_truncated,
260 })
261 } else {
262 Ok(RawLineResult {
263 raw_line: Some(RawLine {
264 offset: initial_position,
265 bytes: buf,
266 }),
267 discarded_for_size_and_truncated,
268 })
269 }
270 } else {
271 self.reached_eof = true;
272 Ok(RawLineResult {
273 raw_line: None,
274 discarded_for_size_and_truncated,
275 })
276 }
277 }
278 Err(e) => {
279 if let io::ErrorKind::NotFound = e.kind() {
280 self.set_dead();
281 }
282 Err(e)
283 }
284 }
285 }
286
287 #[inline]
288 fn track_read_attempt(&mut self) {
289 self.last_read_attempt = Instant::now();
290 }
291
292 #[inline]
293 fn track_read_success(&mut self) {
294 self.last_read_success = Instant::now();
295 }
296
297 #[inline]
298 pub fn last_read_success(&self) -> Instant {
299 self.last_read_success
300 }
301
302 #[inline]
303 pub fn should_read(&self) -> bool {
304 self.last_read_success.elapsed() < Duration::from_secs(10)
305 || self.last_read_attempt.elapsed() > Duration::from_secs(10)
306 }
307
308 #[inline]
309 pub fn last_seen(&self) -> Instant {
310 self.last_seen
311 }
312
313 #[inline]
314 pub fn reached_eof(&self) -> bool {
315 self.reached_eof
316 }
317}
318
319fn is_gzipped(r: &mut io::BufReader<fs::File>) -> io::Result<bool> {
320 let header_bytes = r.fill_buf()?;
321 Ok(header_bytes.starts_with(GZIP_MAGIC))
324}
325
326fn null_reader() -> impl BufRead {
327 io::Cursor::new(Vec::new())
328}