1use std::{
2 collections::HashMap,
3 io::{ErrorKind, Result, SeekFrom},
4 path::{Path, PathBuf},
5 time,
6};
7
8use async_compression::tokio::bufread::GzipDecoder;
9use crc::Crc;
10use serde::{Deserialize, Serialize};
11use tokio::{
12 fs::{self, File},
13 io::{AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, BufReader},
14};
15use vector_common::constants::GZIP_MAGIC;
16
17use crate::{
18 AsyncFileInfo, internal_events::FileSourceInternalEvents, metadata_ext::PortableFileExt,
19};
20
21const FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_ECMA_182);
22
23#[derive(Debug, Clone)]
24pub struct Fingerprinter {
25 strategy: FingerprintStrategy,
26 max_line_length: usize,
27 ignore_not_found: bool,
28 buffer: Vec<u8>,
29}
30
31trait ResizeSlice<T> {
32 fn resize_slice_mut(&mut self, size: usize) -> &mut [T];
34}
35
36impl ResizeSlice<u8> for Vec<u8> {
37 fn resize_slice_mut(&mut self, size: usize) -> &mut [u8] {
38 if size > self.len() {
39 self.resize_with(size, Default::default);
40 }
41
42 &mut self[..size]
43 }
44}
45
46#[derive(Debug, Clone)]
47pub enum FingerprintStrategy {
48 FirstLinesChecksum {
49 ignored_header_bytes: usize,
50 lines: usize,
51 },
52 DevInode,
53}
54
55#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Ord, PartialOrd)]
56#[serde(rename_all = "snake_case")]
57pub enum FileFingerprint {
58 #[serde(alias = "first_line_checksum")]
59 FirstLinesChecksum(u64),
60 DevInode(u64, u64),
61}
62
63#[derive(Debug, Copy, Clone)]
64enum SupportedCompressionAlgorithms {
65 Gzip,
66}
67
68impl SupportedCompressionAlgorithms {
69 fn values() -> Vec<SupportedCompressionAlgorithms> {
70 vec![SupportedCompressionAlgorithms::Gzip]
72 }
73
74 fn magic_header_bytes(&self) -> &'static [u8] {
75 match self {
76 SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC,
77 }
78 }
79}
80
81trait UncompressedReader {
82 async fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>>;
83 async fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn AsyncBufRead + Unpin + Send + 'a>>;
84}
85
86struct UncompressedReaderImpl;
87impl UncompressedReader for UncompressedReaderImpl {
88 async fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>> {
102 let mut algorithm: Option<SupportedCompressionAlgorithms> = None;
103 for compression_algorithm in SupportedCompressionAlgorithms::values() {
104 let magic_header_bytes = compression_algorithm.magic_header_bytes();
107
108 let mut magic = vec![0u8; magic_header_bytes.len()];
109
110 fp.seek(SeekFrom::Start(0)).await?;
111 let result = fp.read_exact(&mut magic).await;
112
113 if let Err(err) = result {
114 fp.seek(SeekFrom::Start(0)).await?;
115 return Err(err);
116 }
117
118 if magic == magic_header_bytes {
119 algorithm = Some(compression_algorithm);
120 break;
121 }
122 }
123 fp.seek(SeekFrom::Start(0)).await?;
124 Ok(algorithm)
125 }
126
127 async fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn AsyncBufRead + Unpin + Send + 'a>> {
128 match Self::check(fp).await? {
130 Some(SupportedCompressionAlgorithms::Gzip) => Ok(Box::new(BufReader::new(
131 GzipDecoder::new(BufReader::new(fp)),
132 ))),
133 None => Ok(Box::new(BufReader::new(fp))),
135 }
136 }
137}
138
139async fn skip_first_n_bytes<R: AsyncBufRead + Unpin + Send>(
140 reader: &mut R,
141 n: usize,
142) -> Result<()> {
143 let mut skipped_bytes = 0;
146 while skipped_bytes < n {
147 let chunk = reader.fill_buf().await?;
148 let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes);
149 reader.consume(bytes_to_skip);
150 skipped_bytes += bytes_to_skip;
151 }
152 Ok(())
153}
154
155impl Fingerprinter {
156 pub fn new(
157 strategy: FingerprintStrategy,
158 max_line_length: usize,
159 ignore_not_found: bool,
160 ) -> Fingerprinter {
161 let buffer = vec![0u8; max_line_length];
162
163 Fingerprinter {
164 strategy,
165 max_line_length,
166 ignore_not_found,
167 buffer,
168 }
169 }
170
171 pub(crate) async fn fingerprint(&mut self, path: &Path) -> Result<FileFingerprint> {
173 use FileFingerprint::*;
174
175 match self.strategy {
176 FingerprintStrategy::DevInode => {
177 let file_handle = File::open(path).await?;
178 let file_info = file_handle.file_info().await?;
179 let dev = file_info.portable_dev();
180 let ino = file_info.portable_ino();
181 Ok(DevInode(dev, ino))
182 }
183 FingerprintStrategy::FirstLinesChecksum {
184 ignored_header_bytes,
185 lines,
186 } => {
187 let buffer = self.buffer.resize_slice_mut(self.max_line_length);
188 let mut fp = File::open(path).await?;
189 let mut reader = UncompressedReaderImpl::reader(&mut fp).await?;
190
191 skip_first_n_bytes(&mut reader, ignored_header_bytes).await?;
192 let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer).await?;
193 let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]);
194 Ok(FirstLinesChecksum(fingerprint))
195 }
196 }
197 }
198
199 pub async fn fingerprint_or_emit(
200 &mut self,
201 path: &Path,
202 known_small_files: &mut HashMap<PathBuf, time::Instant>,
203 emitter: &impl FileSourceInternalEvents,
204 ) -> Option<FileFingerprint> {
205 let metadata = match fs::metadata(path).await {
206 Ok(metadata) => {
207 if !metadata.is_dir() {
208 self.fingerprint(path).await.map(Some)
209 } else {
210 Ok(None)
211 }
212 }
213 Err(e) => Err(e),
214 };
215
216 metadata
217 .inspect(|_| {
218 known_small_files.remove(&path.to_path_buf());
220 })
221 .map_err(|error| {
222 match error.kind() {
223 ErrorKind::UnexpectedEof => {
224 if !known_small_files.contains_key(path) {
225 emitter.emit_file_checksum_failed(path);
226 known_small_files.insert(path.to_path_buf(), time::Instant::now());
227 }
228 return;
229 }
230 ErrorKind::NotFound => {
231 if !self.ignore_not_found {
232 emitter.emit_file_fingerprint_read_error(path, error);
233 }
234 }
235 _ => {
236 emitter.emit_file_fingerprint_read_error(path, error);
237 }
238 };
239 known_small_files.remove(&path.to_path_buf());
241 })
242 .ok()
243 .flatten()
244 }
245}
246
247async fn fingerprinter_read_until(
248 mut r: impl AsyncRead + Unpin + Send,
249 delim: u8,
250 mut count: usize,
251 mut buf: &mut [u8],
252) -> Result<usize> {
253 let mut total_read = 0;
254 'main: while !buf.is_empty() {
255 let read = match r.read(buf).await {
256 Ok(0) => return Err(std::io::Error::new(ErrorKind::UnexpectedEof, "EOF reached")),
257 Ok(n) => n,
258 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
259 Err(e) => return Err(e),
260 };
261
262 for (pos, &c) in buf[..read].iter().enumerate() {
263 if c == delim {
264 if count <= 1 {
265 total_read += pos + 1;
266 break 'main;
267 } else {
268 count -= 1;
269 }
270 }
271 }
272 total_read += read;
273 buf = &mut buf[read..];
274 }
275 Ok(total_read)
276}
277
278#[cfg(test)]
279mod test {
280 use std::{collections::HashMap, fs, io::Error, path::Path, time::Duration};
281
282 use async_compression::tokio::bufread::GzipEncoder;
283 use bytes::BytesMut;
284 use tempfile::{TempDir, tempdir};
285
286 use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter};
287
288 use tokio::io::AsyncReadExt;
289
290 pub async fn gzip(data: &[u8]) -> Vec<u8> {
291 let mut encoder = GzipEncoder::new(data);
292
293 let mut out = Vec::new();
294 encoder.read_to_end(&mut out).await.expect("Failed to read");
295 out
296 }
297 fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec<u8> {
298 use std::{fs::File, io::Read};
299
300 let path = target_dir.path().join(file);
301 let mut file = File::open(path).unwrap();
302 let mut content = Vec::new();
303 file.read_to_end(&mut content).unwrap();
304 content
305 }
306
307 #[tokio::test]
308 async fn test_checksum_fingerprint() {
309 let mut fingerprinter = Fingerprinter::new(
310 FingerprintStrategy::FirstLinesChecksum {
311 ignored_header_bytes: 0,
312 lines: 1,
313 },
314 1024,
315 false,
316 );
317
318 let target_dir = tempdir().unwrap();
319 let mut full_line_data = vec![b'x'; 256];
320 full_line_data.push(b'\n');
321 let not_full_line_data = vec![b'x'; 199];
322 let empty_path = target_dir.path().join("empty.log");
323 let full_line_path = target_dir.path().join("full_line.log");
324 let duplicate_path = target_dir.path().join("duplicate.log");
325 let not_full_line_path = target_dir.path().join("not_full_line.log");
326 fs::write(&empty_path, []).unwrap();
327 fs::write(&full_line_path, &full_line_data).unwrap();
328 fs::write(&duplicate_path, &full_line_data).unwrap();
329 fs::write(¬_full_line_path, not_full_line_data).unwrap();
330
331 assert!(fingerprinter.fingerprint(&empty_path).await.is_err());
332 assert!(fingerprinter.fingerprint(&full_line_path).await.is_ok());
333 assert!(
334 fingerprinter
335 .fingerprint(¬_full_line_path)
336 .await
337 .is_err()
338 );
339 assert_eq!(
340 fingerprinter.fingerprint(&full_line_path).await.unwrap(),
341 fingerprinter.fingerprint(&duplicate_path).await.unwrap(),
342 );
343 }
344
345 #[tokio::test]
346 async fn test_first_line_checksum_fingerprint() {
347 let max_line_length = 64;
348 let mut fingerprinter = Fingerprinter::new(
349 FingerprintStrategy::FirstLinesChecksum {
350 ignored_header_bytes: 0,
351 lines: 1,
352 },
353 max_line_length,
354 false,
355 );
356
357 let target_dir = tempdir().unwrap();
358 let prepare_test = |file: &str, contents: &[u8]| {
359 let path = target_dir.path().join(file);
360 fs::write(&path, contents).unwrap();
361 path
362 };
363 let prepare_test_long = |file: &str, amount| {
364 prepare_test(
365 file,
366 b"hello world "
367 .iter()
368 .cloned()
369 .cycle()
370 .clone()
371 .take(amount)
372 .collect::<Box<_>>()
373 .as_ref(),
374 )
375 };
376
377 let empty = prepare_test("empty.log", b"");
378 let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char");
379 let one_line = prepare_test(
380 "one_line_duplicate_compressed.log",
381 &gzip(b"hello world\n").await,
382 );
383 let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n");
384 let one_line_duplicate_compressed = prepare_test(
385 "one_line_duplicate_compressed.log",
386 &gzip(b"hello world\n").await,
387 );
388 let one_line_continued =
389 prepare_test("one_line_continued.log", b"hello world\nthe next line\n");
390 let one_line_continued_compressed = prepare_test(
391 "one_line_continued_compressed.log",
392 &gzip(b"hello world\nthe next line\n").await,
393 );
394 let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n");
395
396 let exactly_max_line_length =
397 prepare_test_long("exactly_max_line_length.log", max_line_length);
398 let exceeding_max_line_length =
399 prepare_test_long("exceeding_max_line_length.log", max_line_length + 1);
400 let incomplete_under_max_line_length_by_one = prepare_test_long(
401 "incomplete_under_max_line_length_by_one.log",
402 max_line_length - 1,
403 );
404
405 let mut run = async |path| fingerprinter.fingerprint(path).await;
406
407 assert!(run(&empty).await.is_err());
408 assert!(run(&incomplete_line).await.is_err());
409 assert!(run(&incomplete_under_max_line_length_by_one).await.is_err());
410
411 assert!(run(&one_line).await.is_ok());
412 assert!(run(&one_line_duplicate).await.is_ok());
413 assert!(run(&one_line_continued).await.is_ok());
414 assert!(run(&different_two_lines).await.is_ok());
415 assert!(run(&exactly_max_line_length).await.is_ok());
416 assert!(run(&exceeding_max_line_length).await.is_ok());
417
418 assert_eq!(
419 run(&one_line).await.unwrap(),
420 run(&one_line_duplicate_compressed).await.unwrap()
421 );
422 assert_eq!(
423 run(&one_line).await.unwrap(),
424 run(&one_line_continued_compressed).await.unwrap()
425 );
426 assert_eq!(
427 run(&one_line).await.unwrap(),
428 run(&one_line_duplicate_compressed).await.unwrap()
429 );
430 assert_eq!(
431 run(&one_line).await.unwrap(),
432 run(&one_line_continued_compressed).await.unwrap()
433 );
434
435 assert_ne!(
436 run(&one_line).await.unwrap(),
437 run(&different_two_lines).await.unwrap()
438 );
439
440 assert_eq!(
441 run(&exactly_max_line_length).await.unwrap(),
442 run(&exceeding_max_line_length).await.unwrap()
443 );
444
445 assert_ne!(
446 read_byte_content(&target_dir, "one_line_duplicate.log"),
447 read_byte_content(&target_dir, "one_line_duplicate_compressed.log")
448 );
449
450 assert_ne!(
451 read_byte_content(&target_dir, "one_line_continued.log"),
452 read_byte_content(&target_dir, "one_line_continued_compressed.log")
453 );
454 }
455
456 #[tokio::test]
457 async fn test_first_two_lines_checksum_fingerprint() {
458 let max_line_length = 64;
459 let mut fingerprinter = Fingerprinter::new(
460 FingerprintStrategy::FirstLinesChecksum {
461 ignored_header_bytes: 0,
462 lines: 2,
463 },
464 max_line_length,
465 false,
466 );
467
468 let target_dir = tempdir().unwrap();
469 let prepare_test = |file: &str, contents: &[u8]| {
470 let path = target_dir.path().join(file);
471 fs::write(&path, contents).unwrap();
472 path
473 };
474
475 let incomplete_lines = prepare_test(
476 "incomplete_lines.log",
477 b"missing newline char\non second line",
478 );
479 let two_lines = prepare_test("two_lines.log", b"hello world\nfrom vector\n");
480 let two_lines_duplicate =
481 prepare_test("two_lines_duplicate.log", b"hello world\nfrom vector\n");
482 let two_lines_continued = prepare_test(
483 "two_lines_continued.log",
484 b"hello world\nfrom vector\nthe next line\n",
485 );
486 let two_lines_duplicate_compressed = prepare_test(
487 "two_lines_duplicate_compressed.log",
488 &gzip(b"hello world\nfrom vector\n").await,
489 );
490 let two_lines_continued_compressed = prepare_test(
491 "two_lines_continued_compressed.log",
492 &gzip(b"hello world\nfrom vector\nthe next line\n").await,
493 );
494
495 let different_three_lines = prepare_test(
496 "different_three_lines.log",
497 b"line one\nline two\nine three\n",
498 );
499
500 let mut run = async move |path| fingerprinter.fingerprint(path).await;
501
502 assert!(run(&incomplete_lines).await.is_err());
503
504 assert!(run(&two_lines).await.is_ok());
505 assert!(run(&two_lines_duplicate).await.is_ok());
506 assert!(run(&two_lines_continued).await.is_ok());
507 assert!(run(&different_three_lines).await.is_ok());
508
509 assert_eq!(
510 run(&two_lines).await.unwrap(),
511 run(&two_lines_duplicate).await.unwrap()
512 );
513 assert_eq!(
514 run(&two_lines).await.unwrap(),
515 run(&two_lines_continued).await.unwrap()
516 );
517 assert_eq!(
518 run(&two_lines).await.unwrap(),
519 run(&two_lines_duplicate_compressed).await.unwrap()
520 );
521 assert_eq!(
522 run(&two_lines).await.unwrap(),
523 run(&two_lines_continued_compressed).await.unwrap()
524 );
525
526 assert_ne!(
527 run(&two_lines).await.unwrap(),
528 run(&different_three_lines).await.unwrap()
529 );
530
531 assert_ne!(
532 read_byte_content(&target_dir, "two_lines_duplicate.log"),
533 read_byte_content(&target_dir, "two_lines_duplicate_compressed.log")
534 );
535 assert_ne!(
536 read_byte_content(&target_dir, "two_lines_continued.log"),
537 read_byte_content(&target_dir, "two_lines_continued_compressed.log")
538 );
539 }
540
541 #[tokio::test]
542 async fn test_first_two_lines_checksum_fingerprint_with_headers() {
543 let max_line_length = 64;
544 let mut fingerprinter = Fingerprinter::new(
545 FingerprintStrategy::FirstLinesChecksum {
546 ignored_header_bytes: 14,
547 lines: 2,
548 },
549 max_line_length,
550 false,
551 );
552
553 let target_dir = tempdir().unwrap();
554 let prepare_test = |file: &str, contents: &[u8]| {
555 let path = target_dir.path().join(file);
556 fs::write(&path, contents).unwrap();
557 path
558 };
559
560 let two_lines = prepare_test(
561 "two_lines.log",
562 b"some-header-1\nhello world\nfrom vector\n",
563 );
564 let two_lines_compressed_same_header = prepare_test(
565 "two_lines_compressed_same_header.log",
566 &gzip(b"some-header-1\nhello world\nfrom vector\n").await,
567 );
568 let two_lines_compressed_same_header_size = prepare_test(
569 "two_lines_compressed_same_header_size.log",
570 &gzip(b"some-header-2\nhello world\nfrom vector\n").await,
571 );
572 let two_lines_compressed_different_header_size = prepare_test(
573 "two_lines_compressed_different_header_size.log",
574 &gzip(b"some-header-22\nhellow world\nfrom vector\n").await,
575 );
576
577 let mut run = async move |path| fingerprinter.fingerprint(path).await;
578
579 assert!(run(&two_lines).await.is_ok());
580 assert_eq!(
581 run(&two_lines).await.unwrap(),
582 run(&two_lines_compressed_same_header).await.unwrap()
583 );
584 assert_eq!(
585 run(&two_lines).await.unwrap(),
586 run(&two_lines_compressed_same_header_size).await.unwrap()
587 );
588 assert_ne!(
589 run(&two_lines).await.unwrap(),
590 run(&two_lines_compressed_different_header_size)
591 .await
592 .unwrap()
593 );
594 }
595
596 #[tokio::test]
597 async fn test_inode_fingerprint() {
598 let mut fingerprinter = Fingerprinter::new(FingerprintStrategy::DevInode, 42, false);
599
600 let target_dir = tempdir().unwrap();
601 let small_data = vec![b'x'; 1];
602 let medium_data = vec![b'x'; 256];
603 let empty_path = target_dir.path().join("empty.log");
604 let small_path = target_dir.path().join("small.log");
605 let medium_path = target_dir.path().join("medium.log");
606 let duplicate_path = target_dir.path().join("duplicate.log");
607 fs::write(&empty_path, []).unwrap();
608 fs::write(&small_path, small_data).unwrap();
609 fs::write(&medium_path, &medium_data).unwrap();
610 fs::write(&duplicate_path, &medium_data).unwrap();
611
612 assert!(fingerprinter.fingerprint(&empty_path).await.is_ok());
613 assert!(fingerprinter.fingerprint(&small_path).await.is_ok());
614 assert_ne!(
615 fingerprinter.fingerprint(&medium_path).await.unwrap(),
616 fingerprinter.fingerprint(&duplicate_path).await.unwrap()
617 );
618 }
619
620 #[tokio::test]
621 async fn no_error_on_dir() {
622 let target_dir = tempdir().unwrap();
623 let mut fingerprinter = Fingerprinter::new(
624 FingerprintStrategy::FirstLinesChecksum {
625 ignored_header_bytes: 0,
626 lines: 1,
627 },
628 1024,
629 false,
630 );
631
632 let mut small_files = HashMap::new();
633 assert!(
634 fingerprinter
635 .fingerprint_or_emit(target_dir.path(), &mut small_files, &NoErrors)
636 .await
637 .is_none()
638 );
639 }
640
641 #[test]
642 fn test_monotonic_compression_algorithms() {
643 let algos = super::SupportedCompressionAlgorithms::values();
650 let mut smallest_byte_length = 0;
651 for algo in algos {
652 let magic_header_bytes = algo.magic_header_bytes();
653 assert!(smallest_byte_length <= magic_header_bytes.len());
654 smallest_byte_length = magic_header_bytes.len();
655 }
656 }
657 #[derive(Clone)]
658 struct NoErrors;
659
660 impl FileSourceInternalEvents for NoErrors {
661 fn emit_file_added(&self, _: &Path) {}
662
663 fn emit_file_resumed(&self, _: &Path, _: u64) {}
664
665 fn emit_file_watch_error(&self, _: &Path, _: Error) {
666 panic!();
667 }
668
669 fn emit_file_unwatched(&self, _: &Path, _: bool) {}
670
671 fn emit_file_deleted(&self, _: &Path) {}
672
673 fn emit_file_delete_error(&self, _: &Path, _: Error) {
674 panic!();
675 }
676
677 fn emit_file_fingerprint_read_error(&self, _: &Path, _: Error) {
678 panic!();
679 }
680
681 fn emit_file_checkpointed(&self, _: usize, _: Duration) {}
682
683 fn emit_file_checksum_failed(&self, _: &Path) {
684 panic!();
685 }
686
687 fn emit_file_checkpoint_write_error(&self, _: Error) {
688 panic!();
689 }
690
691 fn emit_files_open(&self, _: usize) {}
692
693 fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {
694 panic!()
695 }
696
697 fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {
698 panic!()
699 }
700 }
701}