file_source/file_watcher/
mod.rs1use async_compression::tokio::bufread::GzipDecoder;
2use bytes::{Bytes, BytesMut};
3use chrono::{DateTime, Utc};
4use std::{
5 io::{self, SeekFrom},
6 path::PathBuf,
7 time::Duration,
8};
9use tokio::{
10 fs::File,
11 io::{AsyncBufRead, AsyncBufReadExt, AsyncSeekExt, BufReader},
12 time::Instant,
13};
14use tracing::debug;
15use vector_common::constants::GZIP_MAGIC;
16
17use file_source_common::{
18 AsyncFileInfo, FilePosition, PortableFileExt, ReadFrom,
19 buffer::{ReadResult, read_until_with_max_size},
20};
21
22#[cfg(test)]
23mod tests;
24
25#[derive(Debug)]
31pub struct RawLine {
32 pub offset: u64,
33 pub bytes: Bytes,
34}
35
36#[derive(Debug)]
37pub struct RawLineResult {
38 pub raw_line: Option<RawLine>,
39 pub discarded_for_size_and_truncated: Vec<BytesMut>,
40}
41
42pub struct FileWatcher {
50 pub path: PathBuf,
51 findable: bool,
52 reader: Box<dyn AsyncBufRead + Send + Unpin>,
53 file_position: FilePosition,
54 devno: u64,
55 inode: u64,
56 is_dead: bool,
57 reached_eof: bool,
58 last_read_attempt: Instant,
59 last_read_success: Instant,
60 last_seen: Instant,
61 max_line_bytes: usize,
62 line_delimiter: Bytes,
63 buf: BytesMut,
64}
65
66impl FileWatcher {
67 pub async fn new(
73 path: PathBuf,
74 read_from: ReadFrom,
75 ignore_before: Option<DateTime<Utc>>,
76 max_line_bytes: usize,
77 line_delimiter: Bytes,
78 ) -> Result<FileWatcher, std::io::Error> {
79 let f = File::open(&path).await?;
80 let file_info = f.file_info().await?;
81 let (devno, ino) = (file_info.portable_dev(), file_info.portable_ino());
82
83 #[cfg(unix)]
84 let metadata = file_info;
85 #[cfg(windows)]
86 let metadata = f.metadata().await?;
87
88 let mut reader = BufReader::new(f);
89
90 let too_old = if let (Some(ignore_before), Ok(modified_time)) = (
91 ignore_before,
92 metadata.modified().map(DateTime::<Utc>::from),
93 ) {
94 modified_time < ignore_before
95 } else {
96 false
97 };
98
99 let gzipped = is_gzipped(&mut reader).await?;
100
101 let (reader, file_position): (Box<dyn AsyncBufRead + Send + Unpin>, FilePosition) =
103 match (gzipped, too_old, read_from) {
104 (true, true, _) => {
105 debug!(
106 message = "Not reading gzipped file older than `ignore_older`.",
107 ?path,
108 );
109 (Box::new(null_reader()), 0)
110 }
111 (true, _, ReadFrom::Checkpoint(file_position)) => {
112 debug!(
113 message = "Not re-reading gzipped file with existing stored offset.",
114 ?path,
115 %file_position
116 );
117 (Box::new(null_reader()), file_position)
118 }
119 (true, _, ReadFrom::End) => {
124 debug!(
125 message = "Can't read from the end of already-compressed file.",
126 ?path,
127 );
128 (Box::new(null_reader()), 0)
129 }
130 (true, false, ReadFrom::Beginning) => {
131 (Box::new(BufReader::new(GzipDecoder::new(reader))), 0)
132 }
133 (false, true, _) => {
134 let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
135 (Box::new(reader), pos)
136 }
137 (false, false, ReadFrom::Checkpoint(file_position)) => {
138 let pos = reader.seek(SeekFrom::Start(file_position)).await.unwrap();
139 (Box::new(reader), pos)
140 }
141 (false, false, ReadFrom::Beginning) => {
142 let pos = reader.seek(SeekFrom::Start(0)).await.unwrap();
143 (Box::new(reader), pos)
144 }
145 (false, false, ReadFrom::End) => {
146 let pos = reader.seek(SeekFrom::End(0)).await.unwrap();
147 (Box::new(reader), pos)
148 }
149 };
150
151 let ts = metadata
152 .modified()
153 .ok()
154 .and_then(|mtime| mtime.elapsed().ok())
155 .and_then(|diff| Instant::now().checked_sub(diff))
156 .unwrap_or_else(Instant::now);
157
158 Ok(FileWatcher {
159 path,
160 findable: true,
161 reader,
162 file_position,
163 devno,
164 inode: ino,
165 is_dead: false,
166 reached_eof: false,
167 last_read_attempt: ts,
168 last_read_success: ts,
169 last_seen: ts,
170 max_line_bytes,
171 line_delimiter,
172 buf: BytesMut::new(),
173 })
174 }
175
176 pub async fn update_path(&mut self, path: PathBuf) -> io::Result<()> {
177 let file_handle = File::open(&path).await?;
178
179 let file_info = file_handle.file_info().await?;
180 if (file_info.portable_dev(), file_info.portable_ino()) != (self.devno, self.inode) {
181 let mut reader = BufReader::new(File::open(&path).await?);
182 let gzipped = is_gzipped(&mut reader).await?;
183 let new_reader: Box<dyn AsyncBufRead + Send + Unpin> = if gzipped {
184 if self.file_position != 0 {
185 Box::new(null_reader())
186 } else {
187 Box::new(BufReader::new(GzipDecoder::new(reader)))
188 }
189 } else {
190 reader.seek(io::SeekFrom::Start(self.file_position)).await?;
191 Box::new(reader)
192 };
193 self.reader = new_reader;
194
195 let file_info = file_handle.file_info().await?;
196 self.devno = file_info.portable_dev();
197 self.inode = file_info.portable_ino();
198 }
199 self.path = path;
200 Ok(())
201 }
202
203 pub fn set_file_findable(&mut self, f: bool) {
204 self.findable = f;
205 if f {
206 self.last_seen = Instant::now();
207 }
208 }
209
210 pub fn file_findable(&self) -> bool {
211 self.findable
212 }
213
214 pub fn set_dead(&mut self) {
215 self.is_dead = true;
216 }
217
218 pub fn dead(&self) -> bool {
219 self.is_dead
220 }
221
222 pub fn get_file_position(&self) -> FilePosition {
223 self.file_position
224 }
225
226 pub(super) async fn read_line(&mut self) -> io::Result<RawLineResult> {
232 self.track_read_attempt();
233
234 let reader = &mut self.reader;
235 let file_position = &mut self.file_position;
236 let initial_position = *file_position;
237 match read_until_with_max_size(
238 Box::pin(reader),
239 file_position,
240 self.line_delimiter.as_ref(),
241 &mut self.buf,
242 self.max_line_bytes,
243 )
244 .await
245 {
246 Ok(ReadResult {
247 successfully_read: Some(_),
248 discarded_for_size_and_truncated,
249 }) => {
250 self.track_read_success();
251 Ok(RawLineResult {
252 raw_line: Some(RawLine {
253 offset: initial_position,
254 bytes: self.buf.split().freeze(),
255 }),
256 discarded_for_size_and_truncated,
257 })
258 }
259 Ok(ReadResult {
260 successfully_read: None,
261 discarded_for_size_and_truncated,
262 }) => {
263 if !self.file_findable() {
264 self.set_dead();
265 let buf = self.buf.split().freeze();
269 if buf.is_empty() {
270 self.reached_eof = true;
272 Ok(RawLineResult {
273 raw_line: None,
274 discarded_for_size_and_truncated,
275 })
276 } else {
277 Ok(RawLineResult {
278 raw_line: Some(RawLine {
279 offset: initial_position,
280 bytes: buf,
281 }),
282 discarded_for_size_and_truncated,
283 })
284 }
285 } else {
286 self.reached_eof = true;
287 Ok(RawLineResult {
288 raw_line: None,
289 discarded_for_size_and_truncated,
290 })
291 }
292 }
293 Err(e) => {
294 if let io::ErrorKind::NotFound = e.kind() {
295 self.set_dead();
296 }
297 Err(e)
298 }
299 }
300 }
301
302 #[inline]
303 fn track_read_attempt(&mut self) {
304 self.last_read_attempt = Instant::now();
305 }
306
307 #[inline]
308 fn track_read_success(&mut self) {
309 self.last_read_success = Instant::now();
310 }
311
312 #[inline]
313 pub fn last_read_success(&self) -> Instant {
314 self.last_read_success
315 }
316
317 #[inline]
318 pub fn should_read(&self) -> bool {
319 self.last_read_success.elapsed() < Duration::from_secs(10)
320 || self.last_read_attempt.elapsed() > Duration::from_secs(10)
321 }
322
323 #[inline]
324 pub fn last_seen(&self) -> Instant {
325 self.last_seen
326 }
327
328 #[inline]
329 pub fn reached_eof(&self) -> bool {
330 self.reached_eof
331 }
332}
333
334async fn is_gzipped(r: &mut BufReader<File>) -> io::Result<bool> {
335 let header_bytes = r.fill_buf().await?;
336 Ok(header_bytes.starts_with(GZIP_MAGIC))
339}
340
341fn null_reader() -> impl AsyncBufRead {
342 io::Cursor::new(Vec::new())
343}