file_source_common/
fingerprinter.rs

1use std::{
2    collections::HashMap,
3    io::{ErrorKind, Result, SeekFrom},
4    path::{Path, PathBuf},
5    time,
6};
7
8use async_compression::tokio::bufread::GzipDecoder;
9use crc::Crc;
10use serde::{Deserialize, Serialize};
11use tokio::{
12    fs::{self, File},
13    io::{
14        AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
15        BufReader,
16    },
17};
18use vector_common::constants::GZIP_MAGIC;
19
20use crate::{
21    AsyncFileInfo, internal_events::FileSourceInternalEvents, metadata_ext::PortableFileExt,
22};
23
24const FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_ECMA_182);
25const LEGACY_FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_XZ);
26
27#[derive(Debug, Clone)]
28pub struct Fingerprinter {
29    pub strategy: FingerprintStrategy,
30    pub max_line_length: usize,
31    pub ignore_not_found: bool,
32}
33
34#[derive(Debug, Clone)]
35pub enum FingerprintStrategy {
36    Checksum {
37        bytes: usize,
38        ignored_header_bytes: usize,
39        lines: usize,
40    },
41    FirstLinesChecksum {
42        ignored_header_bytes: usize,
43        lines: usize,
44    },
45    DevInode,
46}
47
48#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Ord, PartialOrd)]
49#[serde(rename_all = "snake_case")]
50pub enum FileFingerprint {
51    #[serde(rename = "checksum")]
52    BytesChecksum(u64),
53    #[serde(alias = "first_line_checksum")]
54    FirstLinesChecksum(u64),
55    DevInode(u64, u64),
56    Unknown(u64),
57}
58
59impl FileFingerprint {
60    pub async fn as_legacy(&self) -> u64 {
61        use FileFingerprint::*;
62
63        match self {
64            BytesChecksum(c) => *c,
65            FirstLinesChecksum(c) => *c,
66            DevInode(dev, ino) => {
67                let mut buf = Vec::with_capacity(std::mem::size_of_val(dev) * 2);
68                buf.write_all(&dev.to_be_bytes())
69                    .await
70                    .expect("writing to array");
71                buf.write_all(&ino.to_be_bytes())
72                    .await
73                    .expect("writing to array");
74                FINGERPRINT_CRC.checksum(&buf[..])
75            }
76            Unknown(c) => *c,
77        }
78    }
79}
80
81impl From<u64> for FileFingerprint {
82    fn from(c: u64) -> Self {
83        FileFingerprint::Unknown(c)
84    }
85}
86
87#[derive(Debug, Copy, Clone)]
88enum SupportedCompressionAlgorithms {
89    Gzip,
90}
91
92impl SupportedCompressionAlgorithms {
93    fn values() -> Vec<SupportedCompressionAlgorithms> {
94        // Enumerate these from smallest magic_header_bytes to largest
95        vec![SupportedCompressionAlgorithms::Gzip]
96    }
97
98    fn magic_header_bytes(&self) -> &'static [u8] {
99        match self {
100            SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC,
101        }
102    }
103}
104
105trait UncompressedReader {
106    async fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>>;
107    async fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn AsyncBufRead + Unpin + Send + 'a>>;
108}
109
110struct UncompressedReaderImpl;
111impl UncompressedReader for UncompressedReaderImpl {
112    /// Checks a file for supported compression algorithms by searching for
113    /// supported magic header bytes.
114    ///
115    /// If an error occurs during reading, the file handler may become unusable,
116    /// as the cursor position of the file may not be reset.
117    ///
118    /// # Arguments
119    /// - `fp`: A mutable reference to the file to check.
120    ///
121    /// # Returns
122    /// - `Ok(Some(algorithm))` if a supported compression algorithm is detected.
123    /// - `Ok(None)` if no supported compression algorithm is detected.
124    /// - `Err(std::io::Error)` if an I/O error occurs.
125    async fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>> {
126        let mut algorithm: Option<SupportedCompressionAlgorithms> = None;
127        for compression_algorithm in SupportedCompressionAlgorithms::values() {
128            // magic headers for algorithms can be of different lengths, and using a buffer too long could exceed the length of the file
129            // so instantiate and check the various sizes in monotonically increasing order
130            let magic_header_bytes = compression_algorithm.magic_header_bytes();
131
132            let mut magic = vec![0u8; magic_header_bytes.len()];
133
134            fp.seek(SeekFrom::Start(0)).await?;
135            let result = fp.read_exact(&mut magic).await;
136
137            if let Err(err) = result {
138                fp.seek(SeekFrom::Start(0)).await?;
139                return Err(err);
140            }
141
142            if magic == magic_header_bytes {
143                algorithm = Some(compression_algorithm);
144                break;
145            }
146        }
147        fp.seek(SeekFrom::Start(0)).await?;
148        Ok(algorithm)
149    }
150
151    async fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn AsyncBufRead + Unpin + Send + 'a>> {
152        // To support new compression algorithms, add them below
153        match Self::check(fp).await? {
154            Some(SupportedCompressionAlgorithms::Gzip) => Ok(Box::new(BufReader::new(
155                GzipDecoder::new(BufReader::new(fp)),
156            ))),
157            // No compression, or read the raw bytes
158            None => Ok(Box::new(BufReader::new(fp))),
159        }
160    }
161}
162
163async fn skip_first_n_bytes<R: AsyncBufRead + Unpin + Send>(
164    reader: &mut R,
165    n: usize,
166) -> Result<()> {
167    // We cannot simply seek the file by n because the file may be compressed;
168    // to skip the first n decompressed bytes, we decompress up to n and discard the output.
169    let mut skipped_bytes = 0;
170    while skipped_bytes < n {
171        let chunk = reader.fill_buf().await?;
172        let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes);
173        reader.consume(bytes_to_skip);
174        skipped_bytes += bytes_to_skip;
175    }
176    Ok(())
177}
178
179impl Fingerprinter {
180    pub async fn get_fingerprint_of_file(
181        &self,
182        path: &Path,
183        buffer: &mut Vec<u8>,
184    ) -> Result<FileFingerprint> {
185        use FileFingerprint::*;
186
187        match self.strategy {
188            FingerprintStrategy::DevInode => {
189                let file_handle = File::open(path).await?;
190                let file_info = file_handle.file_info().await?;
191                let dev = file_info.portable_dev();
192                let ino = file_info.portable_ino();
193                Ok(DevInode(dev, ino))
194            }
195            FingerprintStrategy::Checksum {
196                ignored_header_bytes,
197                bytes: _,
198                lines,
199            }
200            | FingerprintStrategy::FirstLinesChecksum {
201                ignored_header_bytes,
202                lines,
203            } => {
204                buffer.resize(self.max_line_length, 0u8);
205                let mut fp = File::open(path).await?;
206                let mut reader = UncompressedReaderImpl::reader(&mut fp).await?;
207
208                skip_first_n_bytes(&mut reader, ignored_header_bytes).await?;
209                let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer).await?;
210                let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]);
211                Ok(FirstLinesChecksum(fingerprint))
212            }
213        }
214    }
215
216    pub async fn get_fingerprint_or_log_error(
217        &self,
218        path: &Path,
219        buffer: &mut Vec<u8>,
220        known_small_files: &mut HashMap<PathBuf, time::Instant>,
221        emitter: &impl FileSourceInternalEvents,
222    ) -> Option<FileFingerprint> {
223        let metadata = match fs::metadata(path).await {
224            Ok(metadata) => {
225                if !metadata.is_dir() {
226                    self.get_fingerprint_of_file(path, buffer).await.map(Some)
227                } else {
228                    Ok(None)
229                }
230            }
231            Err(e) => Err(e),
232        };
233
234        metadata
235            .inspect(|_| {
236                // Drop the path from the small files map if we've got enough data to fingerprint it.
237                known_small_files.remove(&path.to_path_buf());
238            })
239            .map_err(|error| {
240                match error.kind() {
241                    ErrorKind::UnexpectedEof => {
242                        if !known_small_files.contains_key(path) {
243                            emitter.emit_file_checksum_failed(path);
244                            known_small_files.insert(path.to_path_buf(), time::Instant::now());
245                        }
246                        return;
247                    }
248                    ErrorKind::NotFound => {
249                        if !self.ignore_not_found {
250                            emitter.emit_file_fingerprint_read_error(path, error);
251                        }
252                    }
253                    _ => {
254                        emitter.emit_file_fingerprint_read_error(path, error);
255                    }
256                };
257                // For scenarios other than UnexpectedEOF, remove the path from the small files map.
258                known_small_files.remove(&path.to_path_buf());
259            })
260            .ok()
261            .flatten()
262    }
263
264    pub async fn get_bytes_checksum(
265        &self,
266        path: &Path,
267        buffer: &mut Vec<u8>,
268    ) -> Result<Option<FileFingerprint>> {
269        match self.strategy {
270            FingerprintStrategy::Checksum {
271                bytes,
272                ignored_header_bytes,
273                lines: _,
274            } => {
275                buffer.resize(bytes, 0u8);
276                let mut fp = File::open(path).await?;
277                fp.seek(SeekFrom::Start(ignored_header_bytes as u64))
278                    .await?;
279                fp.read_exact(&mut buffer[..bytes]).await?;
280                let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..]);
281                Ok(Some(FileFingerprint::BytesChecksum(fingerprint)))
282            }
283            _ => Ok(None),
284        }
285    }
286
287    /// Calculates checksums using strategy pre-0.14.0
288    /// <https://github.com/vectordotdev/vector/issues/8182>
289    pub async fn get_legacy_checksum(
290        &self,
291        path: &Path,
292        buffer: &mut Vec<u8>,
293    ) -> Result<Option<FileFingerprint>> {
294        match self.strategy {
295            FingerprintStrategy::Checksum {
296                ignored_header_bytes,
297                bytes: _,
298                lines,
299            }
300            | FingerprintStrategy::FirstLinesChecksum {
301                ignored_header_bytes,
302                lines,
303            } => {
304                buffer.resize(self.max_line_length, 0u8);
305                let mut fp = File::open(path).await?;
306                fp.seek(SeekFrom::Start(ignored_header_bytes as u64))
307                    .await?;
308                fingerprinter_read_until_and_zerofill_buf(fp, b'\n', lines, buffer).await?;
309                let fingerprint = LEGACY_FINGERPRINT_CRC.checksum(&buffer[..]);
310                Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
311            }
312            _ => Ok(None),
313        }
314    }
315    /// For upgrades from legacy strategy version
316    /// <https://github.com/vectordotdev/vector/issues/15700>
317    pub async fn get_legacy_first_lines_checksum(
318        &self,
319        path: &Path,
320        buffer: &mut Vec<u8>,
321    ) -> Result<Option<FileFingerprint>> {
322        match self.strategy {
323            FingerprintStrategy::Checksum {
324                ignored_header_bytes,
325                bytes: _,
326                lines,
327            }
328            | FingerprintStrategy::FirstLinesChecksum {
329                ignored_header_bytes,
330                lines,
331            } => {
332                buffer.resize(self.max_line_length, 0u8);
333                let mut fp = File::open(path).await?;
334                fp.seek(SeekFrom::Start(ignored_header_bytes as u64))
335                    .await?;
336                fingerprinter_read_until_and_zerofill_buf(fp, b'\n', lines, buffer).await?;
337                let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..]);
338                Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
339            }
340            _ => Ok(None),
341        }
342    }
343}
344
345/// Saved for backwards compatibility.
346async fn fingerprinter_read_until_and_zerofill_buf(
347    mut r: impl AsyncRead + Unpin + Send,
348    delim: u8,
349    mut count: usize,
350    mut buf: &mut [u8],
351) -> Result<()> {
352    'main: while !buf.is_empty() {
353        let read = match r.read(buf).await {
354            Ok(0) => return Err(std::io::Error::new(ErrorKind::UnexpectedEof, "EOF reached")),
355            Ok(n) => n,
356            Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
357            Err(e) => return Err(e),
358        };
359
360        for (pos, &c) in buf[..read].iter().enumerate() {
361            if c == delim {
362                if count <= 1 {
363                    for el in &mut buf[(pos + 1)..] {
364                        *el = 0;
365                    }
366                    break 'main;
367                } else {
368                    count -= 1;
369                }
370            }
371        }
372        buf = &mut buf[read..];
373    }
374    Ok(())
375}
376
377async fn fingerprinter_read_until(
378    mut r: impl AsyncRead + Unpin + Send,
379    delim: u8,
380    mut count: usize,
381    mut buf: &mut [u8],
382) -> Result<usize> {
383    let mut total_read = 0;
384    'main: while !buf.is_empty() {
385        let read = match r.read(buf).await {
386            Ok(0) => return Err(std::io::Error::new(ErrorKind::UnexpectedEof, "EOF reached")),
387            Ok(n) => n,
388            Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
389            Err(e) => return Err(e),
390        };
391
392        for (pos, &c) in buf[..read].iter().enumerate() {
393            if c == delim {
394                if count <= 1 {
395                    total_read += pos + 1;
396                    break 'main;
397                } else {
398                    count -= 1;
399                }
400            }
401        }
402        total_read += read;
403        buf = &mut buf[read..];
404    }
405    Ok(total_read)
406}
407
408#[cfg(test)]
409mod test {
410    use std::{collections::HashMap, fs, io::Error, path::Path, time::Duration};
411
412    use async_compression::tokio::bufread::GzipEncoder;
413    use bytes::BytesMut;
414    use tempfile::{TempDir, tempdir};
415
416    use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter};
417
418    use tokio::io::AsyncReadExt;
419
420    pub async fn gzip(data: &[u8]) -> Vec<u8> {
421        let mut encoder = GzipEncoder::new(data);
422
423        let mut out = Vec::new();
424        encoder.read_to_end(&mut out).await.expect("Failed to read");
425        out
426    }
427    fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec<u8> {
428        use std::{fs::File, io::Read};
429
430        let path = target_dir.path().join(file);
431        let mut file = File::open(path).unwrap();
432        let mut content = Vec::new();
433        file.read_to_end(&mut content).unwrap();
434        content
435    }
436
437    #[tokio::test]
438    async fn test_checksum_fingerprint() {
439        let fingerprinter = Fingerprinter {
440            strategy: FingerprintStrategy::Checksum {
441                bytes: 256,
442                ignored_header_bytes: 0,
443                lines: 1,
444            },
445            max_line_length: 1024,
446            ignore_not_found: false,
447        };
448
449        let target_dir = tempdir().unwrap();
450        let mut full_line_data = vec![b'x'; 256];
451        full_line_data.push(b'\n');
452        let not_full_line_data = vec![b'x'; 199];
453        let empty_path = target_dir.path().join("empty.log");
454        let full_line_path = target_dir.path().join("full_line.log");
455        let duplicate_path = target_dir.path().join("duplicate.log");
456        let not_full_line_path = target_dir.path().join("not_full_line.log");
457        fs::write(&empty_path, []).unwrap();
458        fs::write(&full_line_path, &full_line_data).unwrap();
459        fs::write(&duplicate_path, &full_line_data).unwrap();
460        fs::write(&not_full_line_path, not_full_line_data).unwrap();
461
462        let mut buf = Vec::new();
463        assert!(
464            fingerprinter
465                .get_fingerprint_of_file(&empty_path, &mut buf)
466                .await
467                .is_err()
468        );
469        assert!(
470            fingerprinter
471                .get_fingerprint_of_file(&full_line_path, &mut buf)
472                .await
473                .is_ok()
474        );
475        assert!(
476            fingerprinter
477                .get_fingerprint_of_file(&not_full_line_path, &mut buf)
478                .await
479                .is_err()
480        );
481        assert_eq!(
482            fingerprinter
483                .get_fingerprint_of_file(&full_line_path, &mut buf)
484                .await
485                .unwrap(),
486            fingerprinter
487                .get_fingerprint_of_file(&duplicate_path, &mut buf)
488                .await
489                .unwrap(),
490        );
491    }
492
493    #[tokio::test]
494    async fn test_first_line_checksum_fingerprint() {
495        let max_line_length = 64;
496        let fingerprinter = Fingerprinter {
497            strategy: FingerprintStrategy::FirstLinesChecksum {
498                ignored_header_bytes: 0,
499                lines: 1,
500            },
501            max_line_length,
502            ignore_not_found: false,
503        };
504
505        let target_dir = tempdir().unwrap();
506        let prepare_test = |file: &str, contents: &[u8]| {
507            let path = target_dir.path().join(file);
508            fs::write(&path, contents).unwrap();
509            path
510        };
511        let prepare_test_long = |file: &str, amount| {
512            prepare_test(
513                file,
514                b"hello world "
515                    .iter()
516                    .cloned()
517                    .cycle()
518                    .clone()
519                    .take(amount)
520                    .collect::<Box<_>>()
521                    .as_ref(),
522            )
523        };
524
525        let empty = prepare_test("empty.log", b"");
526        let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char");
527        let one_line = prepare_test(
528            "one_line_duplicate_compressed.log",
529            &gzip(b"hello world\n").await,
530        );
531        let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n");
532        let one_line_duplicate_compressed = prepare_test(
533            "one_line_duplicate_compressed.log",
534            &gzip(b"hello world\n").await,
535        );
536        let one_line_continued =
537            prepare_test("one_line_continued.log", b"hello world\nthe next line\n");
538        let one_line_continued_compressed = prepare_test(
539            "one_line_continued_compressed.log",
540            &gzip(b"hello world\nthe next line\n").await,
541        );
542        let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n");
543
544        let exactly_max_line_length =
545            prepare_test_long("exactly_max_line_length.log", max_line_length);
546        let exceeding_max_line_length =
547            prepare_test_long("exceeding_max_line_length.log", max_line_length + 1);
548        let incomplete_under_max_line_length_by_one = prepare_test_long(
549            "incomplete_under_max_line_length_by_one.log",
550            max_line_length - 1,
551        );
552
553        let mut buf = Vec::new();
554        let mut run = async |path| fingerprinter.get_fingerprint_of_file(path, &mut buf).await;
555
556        assert!(run(&empty).await.is_err());
557        assert!(run(&incomplete_line).await.is_err());
558        assert!(run(&incomplete_under_max_line_length_by_one).await.is_err());
559
560        assert!(run(&one_line).await.is_ok());
561        assert!(run(&one_line_duplicate).await.is_ok());
562        assert!(run(&one_line_continued).await.is_ok());
563        assert!(run(&different_two_lines).await.is_ok());
564        assert!(run(&exactly_max_line_length).await.is_ok());
565        assert!(run(&exceeding_max_line_length).await.is_ok());
566
567        assert_eq!(
568            run(&one_line).await.unwrap(),
569            run(&one_line_duplicate_compressed).await.unwrap()
570        );
571        assert_eq!(
572            run(&one_line).await.unwrap(),
573            run(&one_line_continued_compressed).await.unwrap()
574        );
575        assert_eq!(
576            run(&one_line).await.unwrap(),
577            run(&one_line_duplicate_compressed).await.unwrap()
578        );
579        assert_eq!(
580            run(&one_line).await.unwrap(),
581            run(&one_line_continued_compressed).await.unwrap()
582        );
583
584        assert_ne!(
585            run(&one_line).await.unwrap(),
586            run(&different_two_lines).await.unwrap()
587        );
588
589        assert_eq!(
590            run(&exactly_max_line_length).await.unwrap(),
591            run(&exceeding_max_line_length).await.unwrap()
592        );
593
594        assert_ne!(
595            read_byte_content(&target_dir, "one_line_duplicate.log"),
596            read_byte_content(&target_dir, "one_line_duplicate_compressed.log")
597        );
598
599        assert_ne!(
600            read_byte_content(&target_dir, "one_line_continued.log"),
601            read_byte_content(&target_dir, "one_line_continued_compressed.log")
602        );
603    }
604
605    #[tokio::test]
606    async fn test_first_two_lines_checksum_fingerprint() {
607        let max_line_length = 64;
608        let fingerprinter = Fingerprinter {
609            strategy: FingerprintStrategy::FirstLinesChecksum {
610                ignored_header_bytes: 0,
611                lines: 2,
612            },
613            max_line_length,
614            ignore_not_found: false,
615        };
616
617        let target_dir = tempdir().unwrap();
618        let prepare_test = |file: &str, contents: &[u8]| {
619            let path = target_dir.path().join(file);
620            fs::write(&path, contents).unwrap();
621            path
622        };
623
624        let incomplete_lines = prepare_test(
625            "incomplete_lines.log",
626            b"missing newline char\non second line",
627        );
628        let two_lines = prepare_test("two_lines.log", b"hello world\nfrom vector\n");
629        let two_lines_duplicate =
630            prepare_test("two_lines_duplicate.log", b"hello world\nfrom vector\n");
631        let two_lines_continued = prepare_test(
632            "two_lines_continued.log",
633            b"hello world\nfrom vector\nthe next line\n",
634        );
635        let two_lines_duplicate_compressed = prepare_test(
636            "two_lines_duplicate_compressed.log",
637            &gzip(b"hello world\nfrom vector\n").await,
638        );
639        let two_lines_continued_compressed = prepare_test(
640            "two_lines_continued_compressed.log",
641            &gzip(b"hello world\nfrom vector\nthe next line\n").await,
642        );
643
644        let different_three_lines = prepare_test(
645            "different_three_lines.log",
646            b"line one\nline two\nine three\n",
647        );
648
649        let mut buf = Vec::new();
650        let mut run = async move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf).await;
651
652        assert!(run(&incomplete_lines).await.is_err());
653
654        assert!(run(&two_lines).await.is_ok());
655        assert!(run(&two_lines_duplicate).await.is_ok());
656        assert!(run(&two_lines_continued).await.is_ok());
657        assert!(run(&different_three_lines).await.is_ok());
658
659        assert_eq!(
660            run(&two_lines).await.unwrap(),
661            run(&two_lines_duplicate).await.unwrap()
662        );
663        assert_eq!(
664            run(&two_lines).await.unwrap(),
665            run(&two_lines_continued).await.unwrap()
666        );
667        assert_eq!(
668            run(&two_lines).await.unwrap(),
669            run(&two_lines_duplicate_compressed).await.unwrap()
670        );
671        assert_eq!(
672            run(&two_lines).await.unwrap(),
673            run(&two_lines_continued_compressed).await.unwrap()
674        );
675
676        assert_ne!(
677            run(&two_lines).await.unwrap(),
678            run(&different_three_lines).await.unwrap()
679        );
680
681        assert_ne!(
682            read_byte_content(&target_dir, "two_lines_duplicate.log"),
683            read_byte_content(&target_dir, "two_lines_duplicate_compressed.log")
684        );
685        assert_ne!(
686            read_byte_content(&target_dir, "two_lines_continued.log"),
687            read_byte_content(&target_dir, "two_lines_continued_compressed.log")
688        );
689    }
690
691    #[tokio::test]
692    async fn test_first_two_lines_checksum_fingerprint_with_headers() {
693        let max_line_length = 64;
694        let fingerprinter = Fingerprinter {
695            strategy: FingerprintStrategy::FirstLinesChecksum {
696                ignored_header_bytes: 14,
697                lines: 2,
698            },
699            max_line_length,
700            ignore_not_found: false,
701        };
702
703        let target_dir = tempdir().unwrap();
704        let prepare_test = |file: &str, contents: &[u8]| {
705            let path = target_dir.path().join(file);
706            fs::write(&path, contents).unwrap();
707            path
708        };
709
710        let two_lines = prepare_test(
711            "two_lines.log",
712            b"some-header-1\nhello world\nfrom vector\n",
713        );
714        let two_lines_compressed_same_header = prepare_test(
715            "two_lines_compressed_same_header.log",
716            &gzip(b"some-header-1\nhello world\nfrom vector\n").await,
717        );
718        let two_lines_compressed_same_header_size = prepare_test(
719            "two_lines_compressed_same_header_size.log",
720            &gzip(b"some-header-2\nhello world\nfrom vector\n").await,
721        );
722        let two_lines_compressed_different_header_size = prepare_test(
723            "two_lines_compressed_different_header_size.log",
724            &gzip(b"some-header-22\nhellow world\nfrom vector\n").await,
725        );
726
727        let mut buf = Vec::new();
728        let mut run = async move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf).await;
729
730        assert!(run(&two_lines).await.is_ok());
731        assert_eq!(
732            run(&two_lines).await.unwrap(),
733            run(&two_lines_compressed_same_header).await.unwrap()
734        );
735        assert_eq!(
736            run(&two_lines).await.unwrap(),
737            run(&two_lines_compressed_same_header_size).await.unwrap()
738        );
739        assert_ne!(
740            run(&two_lines).await.unwrap(),
741            run(&two_lines_compressed_different_header_size)
742                .await
743                .unwrap()
744        );
745    }
746
747    #[tokio::test]
748    async fn test_inode_fingerprint() {
749        let fingerprinter = Fingerprinter {
750            strategy: FingerprintStrategy::DevInode,
751            max_line_length: 42,
752            ignore_not_found: false,
753        };
754
755        let target_dir = tempdir().unwrap();
756        let small_data = vec![b'x'; 1];
757        let medium_data = vec![b'x'; 256];
758        let empty_path = target_dir.path().join("empty.log");
759        let small_path = target_dir.path().join("small.log");
760        let medium_path = target_dir.path().join("medium.log");
761        let duplicate_path = target_dir.path().join("duplicate.log");
762        fs::write(&empty_path, []).unwrap();
763        fs::write(&small_path, small_data).unwrap();
764        fs::write(&medium_path, &medium_data).unwrap();
765        fs::write(&duplicate_path, &medium_data).unwrap();
766
767        let mut buf = Vec::new();
768        assert!(
769            fingerprinter
770                .get_fingerprint_of_file(&empty_path, &mut buf)
771                .await
772                .is_ok()
773        );
774        assert!(
775            fingerprinter
776                .get_fingerprint_of_file(&small_path, &mut buf)
777                .await
778                .is_ok()
779        );
780        assert_ne!(
781            fingerprinter
782                .get_fingerprint_of_file(&medium_path, &mut buf)
783                .await
784                .unwrap(),
785            fingerprinter
786                .get_fingerprint_of_file(&duplicate_path, &mut buf)
787                .await
788                .unwrap()
789        );
790    }
791
792    #[tokio::test]
793    async fn no_error_on_dir() {
794        let target_dir = tempdir().unwrap();
795        let fingerprinter = Fingerprinter {
796            strategy: FingerprintStrategy::Checksum {
797                bytes: 256,
798                ignored_header_bytes: 0,
799                lines: 1,
800            },
801            max_line_length: 1024,
802            ignore_not_found: false,
803        };
804
805        let mut buf = Vec::new();
806        let mut small_files = HashMap::new();
807        assert!(
808            fingerprinter
809                .get_fingerprint_or_log_error(
810                    target_dir.path(),
811                    &mut buf,
812                    &mut small_files,
813                    &NoErrors
814                )
815                .await
816                .is_none()
817        );
818    }
819
820    #[test]
821    fn test_monotonic_compression_algorithms() {
822        // This test is necessary to handle an edge case where when assessing the magic header
823        // bytes of a file to determine the compression algorithm, it's possible that the length of
824        // the file is smaller than the size of the magic header bytes it's being assessed against.
825        // While this could be an indication that the file is simply too small, it could also
826        // just be that the compression header is a smaller one than the assessed algorithm.
827        // Checking this with a guarantee on the monotonically increasing order assures that this edge case doesn't happen.
828        let algos = super::SupportedCompressionAlgorithms::values();
829        let mut smallest_byte_length = 0;
830        for algo in algos {
831            let magic_header_bytes = algo.magic_header_bytes();
832            assert!(smallest_byte_length <= magic_header_bytes.len());
833            smallest_byte_length = magic_header_bytes.len();
834        }
835    }
836    #[derive(Clone)]
837    struct NoErrors;
838
839    impl FileSourceInternalEvents for NoErrors {
840        fn emit_file_added(&self, _: &Path) {}
841
842        fn emit_file_resumed(&self, _: &Path, _: u64) {}
843
844        fn emit_file_watch_error(&self, _: &Path, _: Error) {
845            panic!();
846        }
847
848        fn emit_file_unwatched(&self, _: &Path, _: bool) {}
849
850        fn emit_file_deleted(&self, _: &Path) {}
851
852        fn emit_file_delete_error(&self, _: &Path, _: Error) {
853            panic!();
854        }
855
856        fn emit_file_fingerprint_read_error(&self, _: &Path, _: Error) {
857            panic!();
858        }
859
860        fn emit_file_checkpointed(&self, _: usize, _: Duration) {}
861
862        fn emit_file_checksum_failed(&self, _: &Path) {
863            panic!();
864        }
865
866        fn emit_file_checkpoint_write_error(&self, _: Error) {
867            panic!();
868        }
869
870        fn emit_files_open(&self, _: usize) {}
871
872        fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}
873
874        fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
875    }
876}