file_source/file_watcher/
mod.rs1use async_compression::tokio::bufread::GzipDecoder;
2use bytes::{Bytes, BytesMut};
3use chrono::{DateTime, Utc};
4use std::{
5 io::{self, SeekFrom},
6 path::PathBuf,
7 time::Duration,
8};
9use tokio::{
10 fs::File,
11 io::{AsyncBufRead, AsyncBufReadExt, AsyncSeekExt, BufReader},
12 time::Instant,
13};
14use tracing::debug;
15use vector_common::constants::GZIP_MAGIC;
16
17use file_source_common::{
18 AsyncFileInfo, FilePosition, PortableFileExt, ReadFrom,
19 buffer::{ReadResult, read_until_with_max_size},
20};
21
22const EOF_READ_BACKOFF_MIN: Duration = Duration::from_millis(1);
23const EOF_READ_BACKOFF_MAX: Duration = Duration::from_millis(250);
24
25#[cfg(test)]
26mod tests;
27
28#[derive(Debug)]
34pub struct RawLine {
35 pub offset: u64,
36 pub bytes: Bytes,
37}
38
39#[derive(Debug)]
40pub struct RawLineResult {
41 pub raw_line: Option<RawLine>,
42 pub discarded_for_size_and_truncated: Vec<BytesMut>,
43}
44
45pub struct FileWatcher {
53 pub path: PathBuf,
54 findable: bool,
55 reader: Box<dyn AsyncBufRead + Send + Unpin>,
56 file_position: FilePosition,
57 devno: u64,
58 inode: u64,
59 is_dead: bool,
60 reached_eof: bool,
61 last_read_attempt: Instant,
62 last_read_success: Instant,
63 read_retry_delay: Duration,
64 last_seen: Instant,
65 max_line_bytes: usize,
66 line_delimiter: Bytes,
67 buf: BytesMut,
68}
69
70impl FileWatcher {
71 pub async fn new(
77 path: PathBuf,
78 read_from: ReadFrom,
79 ignore_before: Option<DateTime<Utc>>,
80 max_line_bytes: usize,
81 line_delimiter: Bytes,
82 ) -> Result<FileWatcher, std::io::Error> {
83 let f = File::open(&path).await?;
84 let file_info = f.file_info().await?;
85 let (devno, ino) = (file_info.portable_dev(), file_info.portable_ino());
86
87 #[cfg(unix)]
88 let metadata = file_info;
89 #[cfg(windows)]
90 let metadata = f.metadata().await?;
91
92 let mut reader = BufReader::new(f);
93
94 let too_old = if let (Some(ignore_before), Ok(modified_time)) = (
95 ignore_before,
96 metadata.modified().map(DateTime::<Utc>::from),
97 ) {
98 modified_time < ignore_before
99 } else {
100 false
101 };
102
103 let gzipped = is_gzipped(&mut reader).await?;
104
105 let (reader, file_position): (Box<dyn AsyncBufRead + Send + Unpin>, FilePosition) =
107 match (gzipped, too_old, read_from) {
108 (true, true, _) => {
109 debug!(
110 message = "Not reading gzipped file older than `ignore_older`.",
111 ?path,
112 );
113 (Box::new(null_reader()), 0)
114 }
115 (true, _, ReadFrom::Checkpoint(file_position)) => {
116 debug!(
117 message = "Not re-reading gzipped file with existing stored offset.",
118 ?path,
119 %file_position
120 );
121 (Box::new(null_reader()), file_position)
122 }
123 (true, _, ReadFrom::End) => {
128 debug!(
129 message = "Can't read from the end of already-compressed file.",
130 ?path,
131 );
132 (Box::new(null_reader()), 0)
133 }
134 (true, false, ReadFrom::Beginning) => {
135 (Box::new(BufReader::new(GzipDecoder::new(reader))), 0)
136 }
137 (false, true, _) => {
138 let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
139 (Box::new(reader), pos)
140 }
141 (false, false, ReadFrom::Checkpoint(file_position)) => {
142 let pos = reader.seek(SeekFrom::Start(file_position)).await.unwrap();
143 (Box::new(reader), pos)
144 }
145 (false, false, ReadFrom::Beginning) => {
146 let pos = reader.seek(SeekFrom::Start(0)).await.unwrap();
147 (Box::new(reader), pos)
148 }
149 (false, false, ReadFrom::End) => {
150 let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
151 (Box::new(reader), pos)
152 }
153 };
154
155 let ts = metadata
156 .modified()
157 .ok()
158 .and_then(|mtime| mtime.elapsed().ok())
159 .and_then(|diff| Instant::now().checked_sub(diff))
160 .unwrap_or_else(Instant::now);
161
162 Ok(FileWatcher {
163 path,
164 findable: true,
165 reader,
166 file_position,
167 devno,
168 inode: ino,
169 is_dead: false,
170 reached_eof: false,
171 last_read_attempt: ts,
172 last_read_success: ts,
173 read_retry_delay: EOF_READ_BACKOFF_MIN,
174 last_seen: ts,
175 max_line_bytes,
176 line_delimiter,
177 buf: BytesMut::new(),
178 })
179 }
180
181 pub async fn update_path(&mut self, path: PathBuf) -> io::Result<()> {
182 let file_handle = File::open(&path).await?;
183
184 let file_info = file_handle.file_info().await?;
185 if (file_info.portable_dev(), file_info.portable_ino()) != (self.devno, self.inode) {
186 let mut reader = BufReader::new(File::open(&path).await?);
187 let gzipped = is_gzipped(&mut reader).await?;
188 let new_reader: Box<dyn AsyncBufRead + Send + Unpin> = if gzipped {
189 if self.file_position != 0 {
190 Box::new(null_reader())
191 } else {
192 Box::new(BufReader::new(GzipDecoder::new(reader)))
193 }
194 } else {
195 reader.seek(io::SeekFrom::Start(self.file_position)).await?;
196 Box::new(reader)
197 };
198 self.reader = new_reader;
199
200 let file_info = file_handle.file_info().await?;
201 self.devno = file_info.portable_dev();
202 self.inode = file_info.portable_ino();
203 }
204 self.reached_eof = false;
205 self.read_retry_delay = EOF_READ_BACKOFF_MIN;
206 self.path = path;
207 Ok(())
208 }
209
210 pub fn set_file_findable(&mut self, f: bool) {
211 self.findable = f;
212 if f {
213 self.last_seen = Instant::now();
214 }
215 }
216
217 pub fn file_findable(&self) -> bool {
218 self.findable
219 }
220
221 pub fn set_dead(&mut self) {
222 self.is_dead = true;
223 }
224
225 pub fn dead(&self) -> bool {
226 self.is_dead
227 }
228
229 pub fn get_file_position(&self) -> FilePosition {
230 self.file_position
231 }
232
233 pub(super) async fn read_line(&mut self) -> io::Result<RawLineResult> {
239 self.track_read_attempt();
240
241 let reader = &mut self.reader;
242 let file_position = &mut self.file_position;
243 let initial_position = *file_position;
244 match read_until_with_max_size(
245 reader.as_mut(),
246 file_position,
247 self.line_delimiter.as_ref(),
248 &mut self.buf,
249 self.max_line_bytes,
250 )
251 .await
252 {
253 Ok(ReadResult {
254 successfully_read: Some(_),
255 discarded_for_size_and_truncated,
256 }) => {
257 self.track_read_success();
258 Ok(RawLineResult {
259 raw_line: Some(RawLine {
260 offset: initial_position,
261 bytes: self.buf.split().freeze(),
262 }),
263 discarded_for_size_and_truncated,
264 })
265 }
266 Ok(ReadResult {
267 successfully_read: None,
268 discarded_for_size_and_truncated,
269 }) => {
270 if !self.file_findable() {
271 self.set_dead();
272 let buf = self.buf.split().freeze();
276 if buf.is_empty() {
277 self.reached_eof = true;
279 Ok(RawLineResult {
280 raw_line: None,
281 discarded_for_size_and_truncated,
282 })
283 } else {
284 Ok(RawLineResult {
285 raw_line: Some(RawLine {
286 offset: initial_position,
287 bytes: buf,
288 }),
289 discarded_for_size_and_truncated,
290 })
291 }
292 } else {
293 self.track_read_eof();
294 Ok(RawLineResult {
295 raw_line: None,
296 discarded_for_size_and_truncated,
297 })
298 }
299 }
300 Err(e) => {
301 if let io::ErrorKind::NotFound = e.kind() {
302 self.set_dead();
303 }
304 Err(e)
305 }
306 }
307 }
308
309 #[inline]
310 fn track_read_attempt(&mut self) {
311 self.last_read_attempt = Instant::now();
312 }
313
314 #[inline]
315 fn track_read_success(&mut self) {
316 self.reached_eof = false;
317 self.read_retry_delay = EOF_READ_BACKOFF_MIN;
318 self.last_read_success = Instant::now();
319 }
320
321 #[inline]
322 fn track_read_eof(&mut self) {
323 self.read_retry_delay = if self.reached_eof {
324 std::cmp::min(
325 self.read_retry_delay.saturating_mul(2),
326 EOF_READ_BACKOFF_MAX,
327 )
328 } else {
329 EOF_READ_BACKOFF_MIN
330 };
331 self.reached_eof = true;
332 }
333
334 #[inline]
335 pub fn last_read_success(&self) -> Instant {
336 self.last_read_success
337 }
338
339 #[inline]
340 pub fn should_read(&self) -> bool {
341 if self.reached_eof && self.last_read_attempt.elapsed() < self.read_retry_delay {
342 return false;
343 }
344
345 self.last_read_success.elapsed() < Duration::from_secs(10)
346 || self.last_read_attempt.elapsed() > Duration::from_secs(10)
347 }
348
349 #[inline]
350 pub fn last_seen(&self) -> Instant {
351 self.last_seen
352 }
353
354 #[inline]
355 pub fn reached_eof(&self) -> bool {
356 self.reached_eof
357 }
358}
359
360async fn is_gzipped(r: &mut BufReader<File>) -> io::Result<bool> {
361 let header_bytes = r.fill_buf().await?;
362 Ok(header_bytes.starts_with(GZIP_MAGIC))
365}
366
367fn null_reader() -> impl AsyncBufRead {
368 io::Cursor::new(Vec::new())
369}