file_source/
fingerprinter.rs

1use std::{
2    collections::HashMap,
3    fs::{self, metadata, File},
4    io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write},
5    path::{Path, PathBuf},
6    time,
7};
8
9use crc::Crc;
10use flate2::bufread::GzDecoder;
11use serde::{Deserialize, Serialize};
12use vector_common::constants::GZIP_MAGIC;
13
14use crate::{metadata_ext::PortableFileExt, FileSourceInternalEvents};
15
16const FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_ECMA_182);
17const LEGACY_FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_XZ);
18
19#[derive(Debug, Clone)]
20pub struct Fingerprinter {
21    pub strategy: FingerprintStrategy,
22    pub max_line_length: usize,
23    pub ignore_not_found: bool,
24}
25
26#[derive(Debug, Clone)]
27pub enum FingerprintStrategy {
28    Checksum {
29        bytes: usize,
30        ignored_header_bytes: usize,
31        lines: usize,
32    },
33    FirstLinesChecksum {
34        ignored_header_bytes: usize,
35        lines: usize,
36    },
37    DevInode,
38}
39
40#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Ord, PartialOrd)]
41#[serde(rename_all = "snake_case")]
42pub enum FileFingerprint {
43    #[serde(rename = "checksum")]
44    BytesChecksum(u64),
45    #[serde(alias = "first_line_checksum")]
46    FirstLinesChecksum(u64),
47    DevInode(u64, u64),
48    Unknown(u64),
49}
50
51impl FileFingerprint {
52    pub fn as_legacy(&self) -> u64 {
53        use FileFingerprint::*;
54
55        match self {
56            BytesChecksum(c) => *c,
57            FirstLinesChecksum(c) => *c,
58            DevInode(dev, ino) => {
59                let mut buf = Vec::with_capacity(std::mem::size_of_val(dev) * 2);
60                buf.write_all(&dev.to_be_bytes()).expect("writing to array");
61                buf.write_all(&ino.to_be_bytes()).expect("writing to array");
62                FINGERPRINT_CRC.checksum(&buf[..])
63            }
64            Unknown(c) => *c,
65        }
66    }
67}
68
69impl From<u64> for FileFingerprint {
70    fn from(c: u64) -> Self {
71        FileFingerprint::Unknown(c)
72    }
73}
74
75#[derive(Debug, Copy, Clone)]
76enum SupportedCompressionAlgorithms {
77    Gzip,
78}
79
80impl SupportedCompressionAlgorithms {
81    fn values() -> Vec<SupportedCompressionAlgorithms> {
82        // Enumerate these from smallest magic_header_bytes to largest
83        vec![SupportedCompressionAlgorithms::Gzip]
84    }
85
86    fn magic_header_bytes(&self) -> &'static [u8] {
87        match self {
88            SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC,
89        }
90    }
91}
92
93trait UncompressedReader {
94    fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error>;
95    fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error>;
96}
97
98struct UncompressedReaderImpl;
99impl UncompressedReader for UncompressedReaderImpl {
100    /// Checks a file for supported compression algorithms by searching for
101    /// supported magic header bytes.
102    ///
103    /// If an error occurs during reading, the file handler may become unusable,
104    /// as the cursor position of the file may not be reset.
105    ///
106    /// # Arguments
107    /// - `fp`: A mutable reference to the file to check.
108    ///
109    /// # Returns
110    /// - `Ok(Some(algorithm))` if a supported compression algorithm is detected.
111    /// - `Ok(None)` if no supported compression algorithm is detected.
112    /// - `Err(std::io::Error)` if an I/O error occurs.
113    fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>, std::io::Error> {
114        let mut algorithm: Option<SupportedCompressionAlgorithms> = None;
115        for compression_algorithm in SupportedCompressionAlgorithms::values() {
116            // magic headers for algorithms can be of different lengths, and using a buffer too long could exceed the length of the file
117            // so instantiate and check the various sizes in monotonically increasing order
118            let magic_header_bytes = compression_algorithm.magic_header_bytes();
119
120            let mut magic = vec![0u8; magic_header_bytes.len()];
121
122            fp.seek(SeekFrom::Start(0))?;
123            let result = fp.read_exact(&mut magic);
124
125            if result.is_err() {
126                fp.seek(SeekFrom::Start(0))?;
127                return Err(result.unwrap_err());
128            }
129
130            if magic == magic_header_bytes {
131                algorithm = Some(compression_algorithm);
132                break;
133            }
134        }
135        fp.seek(SeekFrom::Start(0))?;
136        Ok(algorithm)
137    }
138
139    fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn BufRead + 'a>, std::io::Error> {
140        // To support new compression algorithms, add them below
141        match Self::check(fp)? {
142            Some(SupportedCompressionAlgorithms::Gzip) => {
143                Ok(Box::new(BufReader::new(GzDecoder::new(BufReader::new(fp)))))
144            }
145            // No compression, or read the raw bytes
146            None => Ok(Box::new(BufReader::new(fp))),
147        }
148    }
149}
150
151fn skip_first_n_bytes<R: BufRead>(reader: &mut R, n: usize) -> io::Result<()> {
152    // We cannot simply seek the file by n because the file may be compressed;
153    // to skip the first n decompressed bytes, we decompress up to n and discard the output.
154    let mut skipped_bytes = 0;
155    while skipped_bytes < n {
156        let chunk = reader.fill_buf()?;
157        let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes);
158        reader.consume(bytes_to_skip);
159        skipped_bytes += bytes_to_skip;
160    }
161    Ok(())
162}
163
164impl Fingerprinter {
165    pub fn get_fingerprint_of_file(
166        &self,
167        path: &Path,
168        buffer: &mut Vec<u8>,
169    ) -> Result<FileFingerprint, io::Error> {
170        use FileFingerprint::*;
171
172        match self.strategy {
173            FingerprintStrategy::DevInode => {
174                let file_handle = File::open(path)?;
175                let dev = file_handle.portable_dev()?;
176                let ino = file_handle.portable_ino()?;
177                Ok(DevInode(dev, ino))
178            }
179            FingerprintStrategy::Checksum {
180                ignored_header_bytes,
181                bytes: _,
182                lines,
183            }
184            | FingerprintStrategy::FirstLinesChecksum {
185                ignored_header_bytes,
186                lines,
187            } => {
188                buffer.resize(self.max_line_length, 0u8);
189                let mut fp = fs::File::open(path)?;
190                let mut reader = UncompressedReaderImpl::reader(&mut fp)?;
191
192                skip_first_n_bytes(&mut reader, ignored_header_bytes)?;
193                let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer)?;
194                let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]);
195                Ok(FirstLinesChecksum(fingerprint))
196            }
197        }
198    }
199
200    pub fn get_fingerprint_or_log_error(
201        &self,
202        path: &Path,
203        buffer: &mut Vec<u8>,
204        known_small_files: &mut HashMap<PathBuf, time::Instant>,
205        emitter: &impl FileSourceInternalEvents,
206    ) -> Option<FileFingerprint> {
207        metadata(path)
208            .and_then(|metadata| {
209                if metadata.is_dir() {
210                    Ok(None)
211                } else {
212                    self.get_fingerprint_of_file(path, buffer).map(Some)
213                }
214            })
215            .inspect(|_| {
216                // Drop the path from the small files map if we've got enough data to fingerprint it.
217                known_small_files.remove(&path.to_path_buf());
218            })
219            .map_err(|error| {
220                match error.kind() {
221                    io::ErrorKind::UnexpectedEof => {
222                        if !known_small_files.contains_key(path) {
223                            emitter.emit_file_checksum_failed(path);
224                            known_small_files.insert(path.to_path_buf(), time::Instant::now());
225                        }
226                        return;
227                    }
228                    io::ErrorKind::NotFound => {
229                        if !self.ignore_not_found {
230                            emitter.emit_file_fingerprint_read_error(path, error);
231                        }
232                    }
233                    _ => {
234                        emitter.emit_file_fingerprint_read_error(path, error);
235                    }
236                };
237                // For scenarios other than UnexpectedEOF, remove the path from the small files map.
238                known_small_files.remove(&path.to_path_buf());
239            })
240            .ok()
241            .flatten()
242    }
243
244    pub fn get_bytes_checksum(
245        &self,
246        path: &Path,
247        buffer: &mut Vec<u8>,
248    ) -> Result<Option<FileFingerprint>, io::Error> {
249        match self.strategy {
250            FingerprintStrategy::Checksum {
251                bytes,
252                ignored_header_bytes,
253                lines: _,
254            } => {
255                buffer.resize(bytes, 0u8);
256                let mut fp = fs::File::open(path)?;
257                fp.seek(io::SeekFrom::Start(ignored_header_bytes as u64))?;
258                fp.read_exact(&mut buffer[..bytes])?;
259                let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..]);
260                Ok(Some(FileFingerprint::BytesChecksum(fingerprint)))
261            }
262            _ => Ok(None),
263        }
264    }
265
266    /// Calculates checksums using strategy pre-0.14.0
267    /// <https://github.com/vectordotdev/vector/issues/8182>
268    pub fn get_legacy_checksum(
269        &self,
270        path: &Path,
271        buffer: &mut Vec<u8>,
272    ) -> Result<Option<FileFingerprint>, io::Error> {
273        match self.strategy {
274            FingerprintStrategy::Checksum {
275                ignored_header_bytes,
276                bytes: _,
277                lines,
278            }
279            | FingerprintStrategy::FirstLinesChecksum {
280                ignored_header_bytes,
281                lines,
282            } => {
283                buffer.resize(self.max_line_length, 0u8);
284                let mut fp = fs::File::open(path)?;
285                fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
286                fingerprinter_read_until_and_zerofill_buf(fp, b'\n', lines, buffer)?;
287                let fingerprint = LEGACY_FINGERPRINT_CRC.checksum(&buffer[..]);
288                Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
289            }
290            _ => Ok(None),
291        }
292    }
293    /// For upgrades from legacy strategy version
294    /// <https://github.com/vectordotdev/vector/issues/15700>
295    pub fn get_legacy_first_lines_checksum(
296        &self,
297        path: &Path,
298        buffer: &mut Vec<u8>,
299    ) -> Result<Option<FileFingerprint>, io::Error> {
300        match self.strategy {
301            FingerprintStrategy::Checksum {
302                ignored_header_bytes,
303                bytes: _,
304                lines,
305            }
306            | FingerprintStrategy::FirstLinesChecksum {
307                ignored_header_bytes,
308                lines,
309            } => {
310                buffer.resize(self.max_line_length, 0u8);
311                let mut fp = fs::File::open(path)?;
312                fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?;
313                fingerprinter_read_until_and_zerofill_buf(fp, b'\n', lines, buffer)?;
314                let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..]);
315                Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
316            }
317            _ => Ok(None),
318        }
319    }
320}
321
322/// Saved for backwards compatibility.
323fn fingerprinter_read_until_and_zerofill_buf(
324    mut r: impl Read,
325    delim: u8,
326    mut count: usize,
327    mut buf: &mut [u8],
328) -> io::Result<()> {
329    'main: while !buf.is_empty() {
330        let read = match r.read(buf) {
331            Ok(0) => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "EOF reached")),
332            Ok(n) => n,
333            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
334            Err(e) => return Err(e),
335        };
336
337        for (pos, &c) in buf[..read].iter().enumerate() {
338            if c == delim {
339                if count <= 1 {
340                    for el in &mut buf[(pos + 1)..] {
341                        *el = 0;
342                    }
343                    break 'main;
344                } else {
345                    count -= 1;
346                }
347            }
348        }
349        buf = &mut buf[read..];
350    }
351    Ok(())
352}
353
354fn fingerprinter_read_until(
355    mut r: impl Read,
356    delim: u8,
357    mut count: usize,
358    mut buf: &mut [u8],
359) -> io::Result<usize> {
360    let mut total_read = 0;
361    'main: while !buf.is_empty() {
362        let read = match r.read(buf) {
363            Ok(0) => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "EOF reached")),
364            Ok(n) => n,
365            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
366            Err(e) => return Err(e),
367        };
368
369        for (pos, &c) in buf[..read].iter().enumerate() {
370            if c == delim {
371                if count <= 1 {
372                    total_read += pos + 1;
373                    break 'main;
374                } else {
375                    count -= 1;
376                }
377            }
378        }
379        total_read += read;
380        buf = &mut buf[read..];
381    }
382    Ok(total_read)
383}
384
385#[cfg(test)]
386mod test {
387    use std::{
388        collections::HashMap,
389        fs,
390        io::{Error, Read, Write},
391        path::Path,
392        time::Duration,
393    };
394
395    use bytes::BytesMut;
396    use flate2::write::GzEncoder;
397    use tempfile::{tempdir, TempDir};
398
399    use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter};
400
401    fn gzip(data: &mut [u8]) -> Vec<u8> {
402        let mut buffer = vec![];
403        let mut encoder = GzEncoder::new(&mut buffer, flate2::Compression::default());
404        encoder.write_all(data).expect("Failed to write data");
405        encoder
406            .finish()
407            .expect("Failed to finish encoding with gzip footer");
408        buffer
409    }
410
411    fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec<u8> {
412        let path = target_dir.path().join(file);
413        let mut file = fs::File::open(path).unwrap();
414        let mut content = Vec::new();
415        file.read_to_end(&mut content).unwrap();
416        content
417    }
418
419    #[test]
420    fn test_checksum_fingerprint() {
421        let fingerprinter = Fingerprinter {
422            strategy: FingerprintStrategy::Checksum {
423                bytes: 256,
424                ignored_header_bytes: 0,
425                lines: 1,
426            },
427            max_line_length: 1024,
428            ignore_not_found: false,
429        };
430
431        let target_dir = tempdir().unwrap();
432        let mut full_line_data = vec![b'x'; 256];
433        full_line_data.push(b'\n');
434        let not_full_line_data = vec![b'x'; 199];
435        let empty_path = target_dir.path().join("empty.log");
436        let full_line_path = target_dir.path().join("full_line.log");
437        let duplicate_path = target_dir.path().join("duplicate.log");
438        let not_full_line_path = target_dir.path().join("not_full_line.log");
439        fs::write(&empty_path, []).unwrap();
440        fs::write(&full_line_path, &full_line_data).unwrap();
441        fs::write(&duplicate_path, &full_line_data).unwrap();
442        fs::write(&not_full_line_path, not_full_line_data).unwrap();
443
444        let mut buf = Vec::new();
445        assert!(fingerprinter
446            .get_fingerprint_of_file(&empty_path, &mut buf)
447            .is_err());
448        assert!(fingerprinter
449            .get_fingerprint_of_file(&full_line_path, &mut buf)
450            .is_ok());
451        assert!(fingerprinter
452            .get_fingerprint_of_file(&not_full_line_path, &mut buf)
453            .is_err());
454        assert_eq!(
455            fingerprinter
456                .get_fingerprint_of_file(&full_line_path, &mut buf)
457                .unwrap(),
458            fingerprinter
459                .get_fingerprint_of_file(&duplicate_path, &mut buf)
460                .unwrap(),
461        );
462    }
463
464    #[test]
465    fn test_first_line_checksum_fingerprint() {
466        let max_line_length = 64;
467        let fingerprinter = Fingerprinter {
468            strategy: FingerprintStrategy::FirstLinesChecksum {
469                ignored_header_bytes: 0,
470                lines: 1,
471            },
472            max_line_length,
473            ignore_not_found: false,
474        };
475
476        let target_dir = tempdir().unwrap();
477        let prepare_test = |file: &str, contents: &[u8]| {
478            let path = target_dir.path().join(file);
479            fs::write(&path, contents).unwrap();
480            path
481        };
482        let prepare_test_long = |file: &str, amount| {
483            prepare_test(
484                file,
485                b"hello world "
486                    .iter()
487                    .cloned()
488                    .cycle()
489                    .clone()
490                    .take(amount)
491                    .collect::<Box<_>>()
492                    .as_ref(),
493            )
494        };
495
496        let empty = prepare_test("empty.log", b"");
497        let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char");
498        let one_line = prepare_test(
499            "one_line_duplicate_compressed.log",
500            &gzip(&mut b"hello world\n".to_vec()),
501        );
502        let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n");
503        let one_line_duplicate_compressed = prepare_test(
504            "one_line_duplicate_compressed.log",
505            &gzip(&mut b"hello world\n".to_vec()),
506        );
507        let one_line_continued =
508            prepare_test("one_line_continued.log", b"hello world\nthe next line\n");
509        let one_line_continued_compressed = prepare_test(
510            "one_line_continued_compressed.log",
511            &gzip(&mut b"hello world\nthe next line\n".to_vec()),
512        );
513        let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n");
514
515        let exactly_max_line_length =
516            prepare_test_long("exactly_max_line_length.log", max_line_length);
517        let exceeding_max_line_length =
518            prepare_test_long("exceeding_max_line_length.log", max_line_length + 1);
519        let incomplete_under_max_line_length_by_one = prepare_test_long(
520            "incomplete_under_max_line_length_by_one.log",
521            max_line_length - 1,
522        );
523
524        let mut buf = Vec::new();
525        let mut run = move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf);
526
527        assert!(run(&empty).is_err());
528        assert!(run(&incomplete_line).is_err());
529        assert!(run(&incomplete_under_max_line_length_by_one).is_err());
530
531        assert!(run(&one_line).is_ok());
532        assert!(run(&one_line_duplicate).is_ok());
533        assert!(run(&one_line_continued).is_ok());
534        assert!(run(&different_two_lines).is_ok());
535        assert!(run(&exactly_max_line_length).is_ok());
536        assert!(run(&exceeding_max_line_length).is_ok());
537
538        assert_eq!(
539            run(&one_line).unwrap(),
540            run(&one_line_duplicate_compressed).unwrap()
541        );
542        assert_eq!(
543            run(&one_line).unwrap(),
544            run(&one_line_continued_compressed).unwrap()
545        );
546        assert_eq!(
547            run(&one_line).unwrap(),
548            run(&one_line_duplicate_compressed).unwrap()
549        );
550        assert_eq!(
551            run(&one_line).unwrap(),
552            run(&one_line_continued_compressed).unwrap()
553        );
554
555        assert_ne!(run(&one_line).unwrap(), run(&different_two_lines).unwrap());
556
557        assert_eq!(
558            run(&exactly_max_line_length).unwrap(),
559            run(&exceeding_max_line_length).unwrap()
560        );
561
562        assert_ne!(
563            read_byte_content(&target_dir, "one_line_duplicate.log"),
564            read_byte_content(&target_dir, "one_line_duplicate_compressed.log")
565        );
566
567        assert_ne!(
568            read_byte_content(&target_dir, "one_line_continued.log"),
569            read_byte_content(&target_dir, "one_line_continued_compressed.log")
570        );
571    }
572
573    #[test]
574    fn test_first_two_lines_checksum_fingerprint() {
575        let max_line_length = 64;
576        let fingerprinter = Fingerprinter {
577            strategy: FingerprintStrategy::FirstLinesChecksum {
578                ignored_header_bytes: 0,
579                lines: 2,
580            },
581            max_line_length,
582            ignore_not_found: false,
583        };
584
585        let target_dir = tempdir().unwrap();
586        let prepare_test = |file: &str, contents: &[u8]| {
587            let path = target_dir.path().join(file);
588            fs::write(&path, contents).unwrap();
589            path
590        };
591
592        let incomplete_lines = prepare_test(
593            "incomplete_lines.log",
594            b"missing newline char\non second line",
595        );
596        let two_lines = prepare_test("two_lines.log", b"hello world\nfrom vector\n");
597        let two_lines_duplicate =
598            prepare_test("two_lines_duplicate.log", b"hello world\nfrom vector\n");
599        let two_lines_continued = prepare_test(
600            "two_lines_continued.log",
601            b"hello world\nfrom vector\nthe next line\n",
602        );
603        let two_lines_duplicate_compressed = prepare_test(
604            "two_lines_duplicate_compressed.log",
605            &gzip(&mut b"hello world\nfrom vector\n".to_vec()),
606        );
607        let two_lines_continued_compressed = prepare_test(
608            "two_lines_continued_compressed.log",
609            &gzip(&mut b"hello world\nfrom vector\nthe next line\n".to_vec()),
610        );
611
612        let different_three_lines = prepare_test(
613            "different_three_lines.log",
614            b"line one\nline two\nine three\n",
615        );
616
617        let mut buf = Vec::new();
618        let mut run = move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf);
619
620        assert!(run(&incomplete_lines).is_err());
621
622        assert!(run(&two_lines).is_ok());
623        assert!(run(&two_lines_duplicate).is_ok());
624        assert!(run(&two_lines_continued).is_ok());
625        assert!(run(&different_three_lines).is_ok());
626
627        assert_eq!(run(&two_lines).unwrap(), run(&two_lines_duplicate).unwrap());
628        assert_eq!(run(&two_lines).unwrap(), run(&two_lines_continued).unwrap());
629        assert_eq!(
630            run(&two_lines).unwrap(),
631            run(&two_lines_duplicate_compressed).unwrap()
632        );
633        assert_eq!(
634            run(&two_lines).unwrap(),
635            run(&two_lines_continued_compressed).unwrap()
636        );
637
638        assert_ne!(
639            run(&two_lines).unwrap(),
640            run(&different_three_lines).unwrap()
641        );
642
643        assert_ne!(
644            read_byte_content(&target_dir, "two_lines_duplicate.log"),
645            read_byte_content(&target_dir, "two_lines_duplicate_compressed.log")
646        );
647        assert_ne!(
648            read_byte_content(&target_dir, "two_lines_continued.log"),
649            read_byte_content(&target_dir, "two_lines_continued_compressed.log")
650        );
651    }
652
653    #[test]
654    fn test_first_two_lines_checksum_fingerprint_with_headers() {
655        let max_line_length = 64;
656        let fingerprinter = Fingerprinter {
657            strategy: FingerprintStrategy::FirstLinesChecksum {
658                ignored_header_bytes: 14,
659                lines: 2,
660            },
661            max_line_length,
662            ignore_not_found: false,
663        };
664
665        let target_dir = tempdir().unwrap();
666        let prepare_test = |file: &str, contents: &[u8]| {
667            let path = target_dir.path().join(file);
668            fs::write(&path, contents).unwrap();
669            path
670        };
671
672        let two_lines = prepare_test(
673            "two_lines.log",
674            b"some-header-1\nhello world\nfrom vector\n",
675        );
676        let two_lines_compressed_same_header = prepare_test(
677            "two_lines_compressed_same_header.log",
678            &gzip(&mut b"some-header-1\nhello world\nfrom vector\n".to_vec()),
679        );
680        let two_lines_compressed_same_header_size = prepare_test(
681            "two_lines_compressed_same_header_size.log",
682            &gzip(&mut b"some-header-2\nhello world\nfrom vector\n".to_vec()),
683        );
684        let two_lines_compressed_different_header_size = prepare_test(
685            "two_lines_compressed_different_header_size.log",
686            &gzip(&mut b"some-header-22\nhellow world\nfrom vector\n".to_vec()),
687        );
688
689        let mut buf = Vec::new();
690        let mut run = move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf);
691
692        assert!(run(&two_lines).is_ok());
693        assert_eq!(
694            run(&two_lines).unwrap(),
695            run(&two_lines_compressed_same_header).unwrap()
696        );
697        assert_eq!(
698            run(&two_lines).unwrap(),
699            run(&two_lines_compressed_same_header_size).unwrap()
700        );
701        assert_ne!(
702            run(&two_lines).unwrap(),
703            run(&two_lines_compressed_different_header_size).unwrap()
704        );
705    }
706
707    #[test]
708    fn test_inode_fingerprint() {
709        let fingerprinter = Fingerprinter {
710            strategy: FingerprintStrategy::DevInode,
711            max_line_length: 42,
712            ignore_not_found: false,
713        };
714
715        let target_dir = tempdir().unwrap();
716        let small_data = vec![b'x'; 1];
717        let medium_data = vec![b'x'; 256];
718        let empty_path = target_dir.path().join("empty.log");
719        let small_path = target_dir.path().join("small.log");
720        let medium_path = target_dir.path().join("medium.log");
721        let duplicate_path = target_dir.path().join("duplicate.log");
722        fs::write(&empty_path, []).unwrap();
723        fs::write(&small_path, small_data).unwrap();
724        fs::write(&medium_path, &medium_data).unwrap();
725        fs::write(&duplicate_path, &medium_data).unwrap();
726
727        let mut buf = Vec::new();
728        assert!(fingerprinter
729            .get_fingerprint_of_file(&empty_path, &mut buf)
730            .is_ok());
731        assert!(fingerprinter
732            .get_fingerprint_of_file(&small_path, &mut buf)
733            .is_ok());
734        assert_ne!(
735            fingerprinter
736                .get_fingerprint_of_file(&medium_path, &mut buf)
737                .unwrap(),
738            fingerprinter
739                .get_fingerprint_of_file(&duplicate_path, &mut buf)
740                .unwrap()
741        );
742    }
743
744    #[test]
745    fn no_error_on_dir() {
746        let target_dir = tempdir().unwrap();
747        let fingerprinter = Fingerprinter {
748            strategy: FingerprintStrategy::Checksum {
749                bytes: 256,
750                ignored_header_bytes: 0,
751                lines: 1,
752            },
753            max_line_length: 1024,
754            ignore_not_found: false,
755        };
756
757        let mut buf = Vec::new();
758        let mut small_files = HashMap::new();
759        assert!(fingerprinter
760            .get_fingerprint_or_log_error(target_dir.path(), &mut buf, &mut small_files, &NoErrors)
761            .is_none());
762    }
763
764    #[test]
765    fn test_monotonic_compression_algorithms() {
766        // This test is necessary to handle an edge case where when assessing the magic header
767        // bytes of a file to determine the compression algorithm, it's possible that the length of
768        // the file is smaller than the size of the magic header bytes it's being assessed against.
769        // While this could be an indication that the file is simply too small, it could also
770        // just be that the compression header is a smaller one than the assessed algorithm.
771        // Checking this with a guarantee on the monotonically increasing order assures that this edge case doesn't happen.
772        let algos = super::SupportedCompressionAlgorithms::values();
773        let mut smallest_byte_length = 0;
774        for algo in algos {
775            let magic_header_bytes = algo.magic_header_bytes();
776            assert!(smallest_byte_length <= magic_header_bytes.len());
777            smallest_byte_length = magic_header_bytes.len();
778        }
779    }
780    #[derive(Clone)]
781    struct NoErrors;
782
783    impl FileSourceInternalEvents for NoErrors {
784        fn emit_file_added(&self, _: &Path) {}
785
786        fn emit_file_resumed(&self, _: &Path, _: u64) {}
787
788        fn emit_file_watch_error(&self, _: &Path, _: Error) {
789            panic!();
790        }
791
792        fn emit_file_unwatched(&self, _: &Path, _: bool) {}
793
794        fn emit_file_deleted(&self, _: &Path) {}
795
796        fn emit_file_delete_error(&self, _: &Path, _: Error) {
797            panic!();
798        }
799
800        fn emit_file_fingerprint_read_error(&self, _: &Path, _: Error) {
801            panic!();
802        }
803
804        fn emit_file_checkpointed(&self, _: usize, _: Duration) {}
805
806        fn emit_file_checksum_failed(&self, _: &Path) {
807            panic!();
808        }
809
810        fn emit_file_checkpoint_write_error(&self, _: Error) {
811            panic!();
812        }
813
814        fn emit_files_open(&self, _: usize) {}
815
816        fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}
817
818        fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
819    }
820}