1use std::{
2 collections::HashMap,
3 io::{ErrorKind, Result, SeekFrom},
4 path::{Path, PathBuf},
5 time,
6};
7
8use async_compression::tokio::bufread::GzipDecoder;
9use crc::Crc;
10use serde::{Deserialize, Serialize};
11use tokio::{
12 fs::{self, File},
13 io::{
14 AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
15 BufReader,
16 },
17};
18use vector_common::constants::GZIP_MAGIC;
19
20use crate::{
21 AsyncFileInfo, internal_events::FileSourceInternalEvents, metadata_ext::PortableFileExt,
22};
23
24const FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_ECMA_182);
25const LEGACY_FINGERPRINT_CRC: Crc<u64> = Crc::<u64>::new(&crc::CRC_64_XZ);
26
27#[derive(Debug, Clone)]
28pub struct Fingerprinter {
29 pub strategy: FingerprintStrategy,
30 pub max_line_length: usize,
31 pub ignore_not_found: bool,
32}
33
34#[derive(Debug, Clone)]
35pub enum FingerprintStrategy {
36 Checksum {
37 bytes: usize,
38 ignored_header_bytes: usize,
39 lines: usize,
40 },
41 FirstLinesChecksum {
42 ignored_header_bytes: usize,
43 lines: usize,
44 },
45 DevInode,
46}
47
48#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Ord, PartialOrd)]
49#[serde(rename_all = "snake_case")]
50pub enum FileFingerprint {
51 #[serde(rename = "checksum")]
52 BytesChecksum(u64),
53 #[serde(alias = "first_line_checksum")]
54 FirstLinesChecksum(u64),
55 DevInode(u64, u64),
56 Unknown(u64),
57}
58
59impl FileFingerprint {
60 pub async fn as_legacy(&self) -> u64 {
61 use FileFingerprint::*;
62
63 match self {
64 BytesChecksum(c) => *c,
65 FirstLinesChecksum(c) => *c,
66 DevInode(dev, ino) => {
67 let mut buf = Vec::with_capacity(std::mem::size_of_val(dev) * 2);
68 buf.write_all(&dev.to_be_bytes())
69 .await
70 .expect("writing to array");
71 buf.write_all(&ino.to_be_bytes())
72 .await
73 .expect("writing to array");
74 FINGERPRINT_CRC.checksum(&buf[..])
75 }
76 Unknown(c) => *c,
77 }
78 }
79}
80
81impl From<u64> for FileFingerprint {
82 fn from(c: u64) -> Self {
83 FileFingerprint::Unknown(c)
84 }
85}
86
87#[derive(Debug, Copy, Clone)]
88enum SupportedCompressionAlgorithms {
89 Gzip,
90}
91
92impl SupportedCompressionAlgorithms {
93 fn values() -> Vec<SupportedCompressionAlgorithms> {
94 vec![SupportedCompressionAlgorithms::Gzip]
96 }
97
98 fn magic_header_bytes(&self) -> &'static [u8] {
99 match self {
100 SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC,
101 }
102 }
103}
104
105trait UncompressedReader {
106 async fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>>;
107 async fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn AsyncBufRead + Unpin + Send + 'a>>;
108}
109
110struct UncompressedReaderImpl;
111impl UncompressedReader for UncompressedReaderImpl {
112 async fn check(fp: &mut File) -> Result<Option<SupportedCompressionAlgorithms>> {
126 let mut algorithm: Option<SupportedCompressionAlgorithms> = None;
127 for compression_algorithm in SupportedCompressionAlgorithms::values() {
128 let magic_header_bytes = compression_algorithm.magic_header_bytes();
131
132 let mut magic = vec![0u8; magic_header_bytes.len()];
133
134 fp.seek(SeekFrom::Start(0)).await?;
135 let result = fp.read_exact(&mut magic).await;
136
137 if let Err(err) = result {
138 fp.seek(SeekFrom::Start(0)).await?;
139 return Err(err);
140 }
141
142 if magic == magic_header_bytes {
143 algorithm = Some(compression_algorithm);
144 break;
145 }
146 }
147 fp.seek(SeekFrom::Start(0)).await?;
148 Ok(algorithm)
149 }
150
151 async fn reader<'a>(fp: &'a mut File) -> Result<Box<dyn AsyncBufRead + Unpin + Send + 'a>> {
152 match Self::check(fp).await? {
154 Some(SupportedCompressionAlgorithms::Gzip) => Ok(Box::new(BufReader::new(
155 GzipDecoder::new(BufReader::new(fp)),
156 ))),
157 None => Ok(Box::new(BufReader::new(fp))),
159 }
160 }
161}
162
163async fn skip_first_n_bytes<R: AsyncBufRead + Unpin + Send>(
164 reader: &mut R,
165 n: usize,
166) -> Result<()> {
167 let mut skipped_bytes = 0;
170 while skipped_bytes < n {
171 let chunk = reader.fill_buf().await?;
172 let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes);
173 reader.consume(bytes_to_skip);
174 skipped_bytes += bytes_to_skip;
175 }
176 Ok(())
177}
178
179impl Fingerprinter {
180 pub async fn get_fingerprint_of_file(
181 &self,
182 path: &Path,
183 buffer: &mut Vec<u8>,
184 ) -> Result<FileFingerprint> {
185 use FileFingerprint::*;
186
187 match self.strategy {
188 FingerprintStrategy::DevInode => {
189 let file_handle = File::open(path).await?;
190 let file_info = file_handle.file_info().await?;
191 let dev = file_info.portable_dev();
192 let ino = file_info.portable_ino();
193 Ok(DevInode(dev, ino))
194 }
195 FingerprintStrategy::Checksum {
196 ignored_header_bytes,
197 bytes: _,
198 lines,
199 }
200 | FingerprintStrategy::FirstLinesChecksum {
201 ignored_header_bytes,
202 lines,
203 } => {
204 buffer.resize(self.max_line_length, 0u8);
205 let mut fp = File::open(path).await?;
206 let mut reader = UncompressedReaderImpl::reader(&mut fp).await?;
207
208 skip_first_n_bytes(&mut reader, ignored_header_bytes).await?;
209 let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer).await?;
210 let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]);
211 Ok(FirstLinesChecksum(fingerprint))
212 }
213 }
214 }
215
216 pub async fn get_fingerprint_or_log_error(
217 &self,
218 path: &Path,
219 buffer: &mut Vec<u8>,
220 known_small_files: &mut HashMap<PathBuf, time::Instant>,
221 emitter: &impl FileSourceInternalEvents,
222 ) -> Option<FileFingerprint> {
223 let metadata = match fs::metadata(path).await {
224 Ok(metadata) => {
225 if !metadata.is_dir() {
226 self.get_fingerprint_of_file(path, buffer).await.map(Some)
227 } else {
228 Ok(None)
229 }
230 }
231 Err(e) => Err(e),
232 };
233
234 metadata
235 .inspect(|_| {
236 known_small_files.remove(&path.to_path_buf());
238 })
239 .map_err(|error| {
240 match error.kind() {
241 ErrorKind::UnexpectedEof => {
242 if !known_small_files.contains_key(path) {
243 emitter.emit_file_checksum_failed(path);
244 known_small_files.insert(path.to_path_buf(), time::Instant::now());
245 }
246 return;
247 }
248 ErrorKind::NotFound => {
249 if !self.ignore_not_found {
250 emitter.emit_file_fingerprint_read_error(path, error);
251 }
252 }
253 _ => {
254 emitter.emit_file_fingerprint_read_error(path, error);
255 }
256 };
257 known_small_files.remove(&path.to_path_buf());
259 })
260 .ok()
261 .flatten()
262 }
263
264 pub async fn get_bytes_checksum(
265 &self,
266 path: &Path,
267 buffer: &mut Vec<u8>,
268 ) -> Result<Option<FileFingerprint>> {
269 match self.strategy {
270 FingerprintStrategy::Checksum {
271 bytes,
272 ignored_header_bytes,
273 lines: _,
274 } => {
275 buffer.resize(bytes, 0u8);
276 let mut fp = File::open(path).await?;
277 fp.seek(SeekFrom::Start(ignored_header_bytes as u64))
278 .await?;
279 fp.read_exact(&mut buffer[..bytes]).await?;
280 let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..]);
281 Ok(Some(FileFingerprint::BytesChecksum(fingerprint)))
282 }
283 _ => Ok(None),
284 }
285 }
286
287 pub async fn get_legacy_checksum(
290 &self,
291 path: &Path,
292 buffer: &mut Vec<u8>,
293 ) -> Result<Option<FileFingerprint>> {
294 match self.strategy {
295 FingerprintStrategy::Checksum {
296 ignored_header_bytes,
297 bytes: _,
298 lines,
299 }
300 | FingerprintStrategy::FirstLinesChecksum {
301 ignored_header_bytes,
302 lines,
303 } => {
304 buffer.resize(self.max_line_length, 0u8);
305 let mut fp = File::open(path).await?;
306 fp.seek(SeekFrom::Start(ignored_header_bytes as u64))
307 .await?;
308 fingerprinter_read_until_and_zerofill_buf(fp, b'\n', lines, buffer).await?;
309 let fingerprint = LEGACY_FINGERPRINT_CRC.checksum(&buffer[..]);
310 Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
311 }
312 _ => Ok(None),
313 }
314 }
315 pub async fn get_legacy_first_lines_checksum(
318 &self,
319 path: &Path,
320 buffer: &mut Vec<u8>,
321 ) -> Result<Option<FileFingerprint>> {
322 match self.strategy {
323 FingerprintStrategy::Checksum {
324 ignored_header_bytes,
325 bytes: _,
326 lines,
327 }
328 | FingerprintStrategy::FirstLinesChecksum {
329 ignored_header_bytes,
330 lines,
331 } => {
332 buffer.resize(self.max_line_length, 0u8);
333 let mut fp = File::open(path).await?;
334 fp.seek(SeekFrom::Start(ignored_header_bytes as u64))
335 .await?;
336 fingerprinter_read_until_and_zerofill_buf(fp, b'\n', lines, buffer).await?;
337 let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..]);
338 Ok(Some(FileFingerprint::FirstLinesChecksum(fingerprint)))
339 }
340 _ => Ok(None),
341 }
342 }
343}
344
345async fn fingerprinter_read_until_and_zerofill_buf(
347 mut r: impl AsyncRead + Unpin + Send,
348 delim: u8,
349 mut count: usize,
350 mut buf: &mut [u8],
351) -> Result<()> {
352 'main: while !buf.is_empty() {
353 let read = match r.read(buf).await {
354 Ok(0) => return Err(std::io::Error::new(ErrorKind::UnexpectedEof, "EOF reached")),
355 Ok(n) => n,
356 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
357 Err(e) => return Err(e),
358 };
359
360 for (pos, &c) in buf[..read].iter().enumerate() {
361 if c == delim {
362 if count <= 1 {
363 for el in &mut buf[(pos + 1)..] {
364 *el = 0;
365 }
366 break 'main;
367 } else {
368 count -= 1;
369 }
370 }
371 }
372 buf = &mut buf[read..];
373 }
374 Ok(())
375}
376
377async fn fingerprinter_read_until(
378 mut r: impl AsyncRead + Unpin + Send,
379 delim: u8,
380 mut count: usize,
381 mut buf: &mut [u8],
382) -> Result<usize> {
383 let mut total_read = 0;
384 'main: while !buf.is_empty() {
385 let read = match r.read(buf).await {
386 Ok(0) => return Err(std::io::Error::new(ErrorKind::UnexpectedEof, "EOF reached")),
387 Ok(n) => n,
388 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
389 Err(e) => return Err(e),
390 };
391
392 for (pos, &c) in buf[..read].iter().enumerate() {
393 if c == delim {
394 if count <= 1 {
395 total_read += pos + 1;
396 break 'main;
397 } else {
398 count -= 1;
399 }
400 }
401 }
402 total_read += read;
403 buf = &mut buf[read..];
404 }
405 Ok(total_read)
406}
407
408#[cfg(test)]
409mod test {
410 use std::{collections::HashMap, fs, io::Error, path::Path, time::Duration};
411
412 use async_compression::tokio::bufread::GzipEncoder;
413 use bytes::BytesMut;
414 use tempfile::{TempDir, tempdir};
415
416 use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter};
417
418 use tokio::io::AsyncReadExt;
419
420 pub async fn gzip(data: &[u8]) -> Vec<u8> {
421 let mut encoder = GzipEncoder::new(data);
422
423 let mut out = Vec::new();
424 encoder.read_to_end(&mut out).await.expect("Failed to read");
425 out
426 }
427 fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec<u8> {
428 use std::{fs::File, io::Read};
429
430 let path = target_dir.path().join(file);
431 let mut file = File::open(path).unwrap();
432 let mut content = Vec::new();
433 file.read_to_end(&mut content).unwrap();
434 content
435 }
436
437 #[tokio::test]
438 async fn test_checksum_fingerprint() {
439 let fingerprinter = Fingerprinter {
440 strategy: FingerprintStrategy::Checksum {
441 bytes: 256,
442 ignored_header_bytes: 0,
443 lines: 1,
444 },
445 max_line_length: 1024,
446 ignore_not_found: false,
447 };
448
449 let target_dir = tempdir().unwrap();
450 let mut full_line_data = vec![b'x'; 256];
451 full_line_data.push(b'\n');
452 let not_full_line_data = vec![b'x'; 199];
453 let empty_path = target_dir.path().join("empty.log");
454 let full_line_path = target_dir.path().join("full_line.log");
455 let duplicate_path = target_dir.path().join("duplicate.log");
456 let not_full_line_path = target_dir.path().join("not_full_line.log");
457 fs::write(&empty_path, []).unwrap();
458 fs::write(&full_line_path, &full_line_data).unwrap();
459 fs::write(&duplicate_path, &full_line_data).unwrap();
460 fs::write(¬_full_line_path, not_full_line_data).unwrap();
461
462 let mut buf = Vec::new();
463 assert!(
464 fingerprinter
465 .get_fingerprint_of_file(&empty_path, &mut buf)
466 .await
467 .is_err()
468 );
469 assert!(
470 fingerprinter
471 .get_fingerprint_of_file(&full_line_path, &mut buf)
472 .await
473 .is_ok()
474 );
475 assert!(
476 fingerprinter
477 .get_fingerprint_of_file(¬_full_line_path, &mut buf)
478 .await
479 .is_err()
480 );
481 assert_eq!(
482 fingerprinter
483 .get_fingerprint_of_file(&full_line_path, &mut buf)
484 .await
485 .unwrap(),
486 fingerprinter
487 .get_fingerprint_of_file(&duplicate_path, &mut buf)
488 .await
489 .unwrap(),
490 );
491 }
492
493 #[tokio::test]
494 async fn test_first_line_checksum_fingerprint() {
495 let max_line_length = 64;
496 let fingerprinter = Fingerprinter {
497 strategy: FingerprintStrategy::FirstLinesChecksum {
498 ignored_header_bytes: 0,
499 lines: 1,
500 },
501 max_line_length,
502 ignore_not_found: false,
503 };
504
505 let target_dir = tempdir().unwrap();
506 let prepare_test = |file: &str, contents: &[u8]| {
507 let path = target_dir.path().join(file);
508 fs::write(&path, contents).unwrap();
509 path
510 };
511 let prepare_test_long = |file: &str, amount| {
512 prepare_test(
513 file,
514 b"hello world "
515 .iter()
516 .cloned()
517 .cycle()
518 .clone()
519 .take(amount)
520 .collect::<Box<_>>()
521 .as_ref(),
522 )
523 };
524
525 let empty = prepare_test("empty.log", b"");
526 let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char");
527 let one_line = prepare_test(
528 "one_line_duplicate_compressed.log",
529 &gzip(b"hello world\n").await,
530 );
531 let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n");
532 let one_line_duplicate_compressed = prepare_test(
533 "one_line_duplicate_compressed.log",
534 &gzip(b"hello world\n").await,
535 );
536 let one_line_continued =
537 prepare_test("one_line_continued.log", b"hello world\nthe next line\n");
538 let one_line_continued_compressed = prepare_test(
539 "one_line_continued_compressed.log",
540 &gzip(b"hello world\nthe next line\n").await,
541 );
542 let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n");
543
544 let exactly_max_line_length =
545 prepare_test_long("exactly_max_line_length.log", max_line_length);
546 let exceeding_max_line_length =
547 prepare_test_long("exceeding_max_line_length.log", max_line_length + 1);
548 let incomplete_under_max_line_length_by_one = prepare_test_long(
549 "incomplete_under_max_line_length_by_one.log",
550 max_line_length - 1,
551 );
552
553 let mut buf = Vec::new();
554 let mut run = async |path| fingerprinter.get_fingerprint_of_file(path, &mut buf).await;
555
556 assert!(run(&empty).await.is_err());
557 assert!(run(&incomplete_line).await.is_err());
558 assert!(run(&incomplete_under_max_line_length_by_one).await.is_err());
559
560 assert!(run(&one_line).await.is_ok());
561 assert!(run(&one_line_duplicate).await.is_ok());
562 assert!(run(&one_line_continued).await.is_ok());
563 assert!(run(&different_two_lines).await.is_ok());
564 assert!(run(&exactly_max_line_length).await.is_ok());
565 assert!(run(&exceeding_max_line_length).await.is_ok());
566
567 assert_eq!(
568 run(&one_line).await.unwrap(),
569 run(&one_line_duplicate_compressed).await.unwrap()
570 );
571 assert_eq!(
572 run(&one_line).await.unwrap(),
573 run(&one_line_continued_compressed).await.unwrap()
574 );
575 assert_eq!(
576 run(&one_line).await.unwrap(),
577 run(&one_line_duplicate_compressed).await.unwrap()
578 );
579 assert_eq!(
580 run(&one_line).await.unwrap(),
581 run(&one_line_continued_compressed).await.unwrap()
582 );
583
584 assert_ne!(
585 run(&one_line).await.unwrap(),
586 run(&different_two_lines).await.unwrap()
587 );
588
589 assert_eq!(
590 run(&exactly_max_line_length).await.unwrap(),
591 run(&exceeding_max_line_length).await.unwrap()
592 );
593
594 assert_ne!(
595 read_byte_content(&target_dir, "one_line_duplicate.log"),
596 read_byte_content(&target_dir, "one_line_duplicate_compressed.log")
597 );
598
599 assert_ne!(
600 read_byte_content(&target_dir, "one_line_continued.log"),
601 read_byte_content(&target_dir, "one_line_continued_compressed.log")
602 );
603 }
604
605 #[tokio::test]
606 async fn test_first_two_lines_checksum_fingerprint() {
607 let max_line_length = 64;
608 let fingerprinter = Fingerprinter {
609 strategy: FingerprintStrategy::FirstLinesChecksum {
610 ignored_header_bytes: 0,
611 lines: 2,
612 },
613 max_line_length,
614 ignore_not_found: false,
615 };
616
617 let target_dir = tempdir().unwrap();
618 let prepare_test = |file: &str, contents: &[u8]| {
619 let path = target_dir.path().join(file);
620 fs::write(&path, contents).unwrap();
621 path
622 };
623
624 let incomplete_lines = prepare_test(
625 "incomplete_lines.log",
626 b"missing newline char\non second line",
627 );
628 let two_lines = prepare_test("two_lines.log", b"hello world\nfrom vector\n");
629 let two_lines_duplicate =
630 prepare_test("two_lines_duplicate.log", b"hello world\nfrom vector\n");
631 let two_lines_continued = prepare_test(
632 "two_lines_continued.log",
633 b"hello world\nfrom vector\nthe next line\n",
634 );
635 let two_lines_duplicate_compressed = prepare_test(
636 "two_lines_duplicate_compressed.log",
637 &gzip(b"hello world\nfrom vector\n").await,
638 );
639 let two_lines_continued_compressed = prepare_test(
640 "two_lines_continued_compressed.log",
641 &gzip(b"hello world\nfrom vector\nthe next line\n").await,
642 );
643
644 let different_three_lines = prepare_test(
645 "different_three_lines.log",
646 b"line one\nline two\nine three\n",
647 );
648
649 let mut buf = Vec::new();
650 let mut run = async move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf).await;
651
652 assert!(run(&incomplete_lines).await.is_err());
653
654 assert!(run(&two_lines).await.is_ok());
655 assert!(run(&two_lines_duplicate).await.is_ok());
656 assert!(run(&two_lines_continued).await.is_ok());
657 assert!(run(&different_three_lines).await.is_ok());
658
659 assert_eq!(
660 run(&two_lines).await.unwrap(),
661 run(&two_lines_duplicate).await.unwrap()
662 );
663 assert_eq!(
664 run(&two_lines).await.unwrap(),
665 run(&two_lines_continued).await.unwrap()
666 );
667 assert_eq!(
668 run(&two_lines).await.unwrap(),
669 run(&two_lines_duplicate_compressed).await.unwrap()
670 );
671 assert_eq!(
672 run(&two_lines).await.unwrap(),
673 run(&two_lines_continued_compressed).await.unwrap()
674 );
675
676 assert_ne!(
677 run(&two_lines).await.unwrap(),
678 run(&different_three_lines).await.unwrap()
679 );
680
681 assert_ne!(
682 read_byte_content(&target_dir, "two_lines_duplicate.log"),
683 read_byte_content(&target_dir, "two_lines_duplicate_compressed.log")
684 );
685 assert_ne!(
686 read_byte_content(&target_dir, "two_lines_continued.log"),
687 read_byte_content(&target_dir, "two_lines_continued_compressed.log")
688 );
689 }
690
691 #[tokio::test]
692 async fn test_first_two_lines_checksum_fingerprint_with_headers() {
693 let max_line_length = 64;
694 let fingerprinter = Fingerprinter {
695 strategy: FingerprintStrategy::FirstLinesChecksum {
696 ignored_header_bytes: 14,
697 lines: 2,
698 },
699 max_line_length,
700 ignore_not_found: false,
701 };
702
703 let target_dir = tempdir().unwrap();
704 let prepare_test = |file: &str, contents: &[u8]| {
705 let path = target_dir.path().join(file);
706 fs::write(&path, contents).unwrap();
707 path
708 };
709
710 let two_lines = prepare_test(
711 "two_lines.log",
712 b"some-header-1\nhello world\nfrom vector\n",
713 );
714 let two_lines_compressed_same_header = prepare_test(
715 "two_lines_compressed_same_header.log",
716 &gzip(b"some-header-1\nhello world\nfrom vector\n").await,
717 );
718 let two_lines_compressed_same_header_size = prepare_test(
719 "two_lines_compressed_same_header_size.log",
720 &gzip(b"some-header-2\nhello world\nfrom vector\n").await,
721 );
722 let two_lines_compressed_different_header_size = prepare_test(
723 "two_lines_compressed_different_header_size.log",
724 &gzip(b"some-header-22\nhellow world\nfrom vector\n").await,
725 );
726
727 let mut buf = Vec::new();
728 let mut run = async move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf).await;
729
730 assert!(run(&two_lines).await.is_ok());
731 assert_eq!(
732 run(&two_lines).await.unwrap(),
733 run(&two_lines_compressed_same_header).await.unwrap()
734 );
735 assert_eq!(
736 run(&two_lines).await.unwrap(),
737 run(&two_lines_compressed_same_header_size).await.unwrap()
738 );
739 assert_ne!(
740 run(&two_lines).await.unwrap(),
741 run(&two_lines_compressed_different_header_size)
742 .await
743 .unwrap()
744 );
745 }
746
747 #[tokio::test]
748 async fn test_inode_fingerprint() {
749 let fingerprinter = Fingerprinter {
750 strategy: FingerprintStrategy::DevInode,
751 max_line_length: 42,
752 ignore_not_found: false,
753 };
754
755 let target_dir = tempdir().unwrap();
756 let small_data = vec![b'x'; 1];
757 let medium_data = vec![b'x'; 256];
758 let empty_path = target_dir.path().join("empty.log");
759 let small_path = target_dir.path().join("small.log");
760 let medium_path = target_dir.path().join("medium.log");
761 let duplicate_path = target_dir.path().join("duplicate.log");
762 fs::write(&empty_path, []).unwrap();
763 fs::write(&small_path, small_data).unwrap();
764 fs::write(&medium_path, &medium_data).unwrap();
765 fs::write(&duplicate_path, &medium_data).unwrap();
766
767 let mut buf = Vec::new();
768 assert!(
769 fingerprinter
770 .get_fingerprint_of_file(&empty_path, &mut buf)
771 .await
772 .is_ok()
773 );
774 assert!(
775 fingerprinter
776 .get_fingerprint_of_file(&small_path, &mut buf)
777 .await
778 .is_ok()
779 );
780 assert_ne!(
781 fingerprinter
782 .get_fingerprint_of_file(&medium_path, &mut buf)
783 .await
784 .unwrap(),
785 fingerprinter
786 .get_fingerprint_of_file(&duplicate_path, &mut buf)
787 .await
788 .unwrap()
789 );
790 }
791
792 #[tokio::test]
793 async fn no_error_on_dir() {
794 let target_dir = tempdir().unwrap();
795 let fingerprinter = Fingerprinter {
796 strategy: FingerprintStrategy::Checksum {
797 bytes: 256,
798 ignored_header_bytes: 0,
799 lines: 1,
800 },
801 max_line_length: 1024,
802 ignore_not_found: false,
803 };
804
805 let mut buf = Vec::new();
806 let mut small_files = HashMap::new();
807 assert!(
808 fingerprinter
809 .get_fingerprint_or_log_error(
810 target_dir.path(),
811 &mut buf,
812 &mut small_files,
813 &NoErrors
814 )
815 .await
816 .is_none()
817 );
818 }
819
820 #[test]
821 fn test_monotonic_compression_algorithms() {
822 let algos = super::SupportedCompressionAlgorithms::values();
829 let mut smallest_byte_length = 0;
830 for algo in algos {
831 let magic_header_bytes = algo.magic_header_bytes();
832 assert!(smallest_byte_length <= magic_header_bytes.len());
833 smallest_byte_length = magic_header_bytes.len();
834 }
835 }
836 #[derive(Clone)]
837 struct NoErrors;
838
839 impl FileSourceInternalEvents for NoErrors {
840 fn emit_file_added(&self, _: &Path) {}
841
842 fn emit_file_resumed(&self, _: &Path, _: u64) {}
843
844 fn emit_file_watch_error(&self, _: &Path, _: Error) {
845 panic!();
846 }
847
848 fn emit_file_unwatched(&self, _: &Path, _: bool) {}
849
850 fn emit_file_deleted(&self, _: &Path) {}
851
852 fn emit_file_delete_error(&self, _: &Path, _: Error) {
853 panic!();
854 }
855
856 fn emit_file_fingerprint_read_error(&self, _: &Path, _: Error) {
857 panic!();
858 }
859
860 fn emit_file_checkpointed(&self, _: usize, _: Duration) {}
861
862 fn emit_file_checksum_failed(&self, _: &Path) {
863 panic!();
864 }
865
866 fn emit_file_checkpoint_write_error(&self, _: Error) {
867 panic!();
868 }
869
870 fn emit_files_open(&self, _: usize) {}
871
872 fn emit_path_globbing_failed(&self, _: &Path, _: &Error) {}
873
874 fn emit_file_line_too_long(&self, _: &BytesMut, _: usize, _: usize) {}
875 }
876}