1use std::{
2 collections::BTreeSet,
3 fs, io,
4 path::{Path, PathBuf},
5 sync::{Arc, Mutex},
6};
7
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use glob::glob;
11use serde::{Deserialize, Serialize};
12use tracing::{error, info, warn};
13
14use super::{
15 fingerprinter::{FileFingerprint, Fingerprinter},
16 FilePosition,
17};
18
19const TMP_FILE_NAME: &str = "checkpoints.new.json";
20pub const CHECKPOINT_FILE_NAME: &str = "checkpoints.json";
21
22#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(tag = "version", rename_all = "snake_case")]
28enum State {
29 #[serde(rename = "1")]
30 V1 { checkpoints: BTreeSet<Checkpoint> },
31}
32
33#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
36#[serde(rename_all = "snake_case")]
37struct Checkpoint {
38 fingerprint: FileFingerprint,
39 position: FilePosition,
40 modified: DateTime<Utc>,
41}
42
43pub struct Checkpointer {
44 directory: PathBuf,
45 tmp_file_path: PathBuf,
46 stable_file_path: PathBuf,
47 glob_string: String,
48 checkpoints: Arc<CheckpointsView>,
49 last: Mutex<Option<State>>,
50}
51
52#[derive(Debug, Default)]
55pub struct CheckpointsView {
56 checkpoints: DashMap<FileFingerprint, FilePosition>,
57 modified_times: DashMap<FileFingerprint, DateTime<Utc>>,
58 removed_times: DashMap<FileFingerprint, DateTime<Utc>>,
59}
60
61impl CheckpointsView {
62 pub fn update(&self, fng: FileFingerprint, pos: FilePosition) {
63 self.checkpoints.insert(fng, pos);
64 self.modified_times.insert(fng, Utc::now());
65 self.removed_times.remove(&fng);
66 }
67
68 pub fn get(&self, fng: FileFingerprint) -> Option<FilePosition> {
69 self.checkpoints.get(&fng).map(|r| *r.value())
70 }
71
72 pub fn set_dead(&self, fng: FileFingerprint) {
73 self.removed_times.insert(fng, Utc::now());
74 }
75
76 pub fn update_key(&self, old: FileFingerprint, new: FileFingerprint) {
77 if let Some((_, value)) = self.checkpoints.remove(&old) {
78 self.checkpoints.insert(new, value);
79 }
80
81 if let Some((_, value)) = self.modified_times.remove(&old) {
82 self.modified_times.insert(new, value);
83 }
84
85 if let Some((_, value)) = self.removed_times.remove(&old) {
86 self.removed_times.insert(new, value);
87 }
88 }
89
90 pub fn contains_bytes_checksums(&self) -> bool {
91 self.checkpoints
92 .iter()
93 .any(|entry| matches!(entry.key(), FileFingerprint::BytesChecksum(_)))
94 }
95
96 pub fn remove_expired(&self) {
97 let now = Utc::now();
98
99 let to_remove = self
103 .removed_times
104 .iter()
105 .filter(|entry| {
106 let ts = entry.value();
107 let duration = now - *ts;
108 duration >= chrono::Duration::seconds(60)
109 })
110 .map(|entry| *entry.key())
111 .collect::<Vec<FileFingerprint>>();
112
113 for fng in to_remove {
114 self.checkpoints.remove(&fng);
115 self.modified_times.remove(&fng);
116 self.removed_times.remove(&fng);
117 }
118 }
119
120 fn load(&self, checkpoint: Checkpoint) {
121 self.checkpoints
122 .insert(checkpoint.fingerprint, checkpoint.position);
123 self.modified_times
124 .insert(checkpoint.fingerprint, checkpoint.modified);
125 }
126
127 fn set_state(&self, state: State, ignore_before: Option<DateTime<Utc>>) {
128 match state {
129 State::V1 { checkpoints } => {
130 for checkpoint in checkpoints {
131 if let Some(ignore_before) = ignore_before {
132 if checkpoint.modified < ignore_before {
133 continue;
134 }
135 }
136 self.load(checkpoint);
137 }
138 }
139 }
140 }
141
142 fn get_state(&self) -> State {
143 State::V1 {
144 checkpoints: self
145 .checkpoints
146 .iter()
147 .map(|entry| {
148 let fingerprint = entry.key();
149 let position = entry.value();
150 Checkpoint {
151 fingerprint: *fingerprint,
152 position: *position,
153 modified: self
154 .modified_times
155 .get(fingerprint)
156 .map(|r| *r.value())
157 .unwrap_or_else(Utc::now),
158 }
159 })
160 .collect(),
161 }
162 }
163
164 fn maybe_upgrade(
165 &self,
166 path: &Path,
167 fng: FileFingerprint,
168 fingerprinter: &Fingerprinter,
169 fingerprint_buffer: &mut Vec<u8>,
170 ) {
171 if let Ok(Some(old_checksum)) = fingerprinter.get_bytes_checksum(path, fingerprint_buffer) {
172 self.update_key(old_checksum, fng)
173 }
174
175 if let Some((_, pos)) = self
176 .checkpoints
177 .remove(&FileFingerprint::Unknown(fng.as_legacy()))
178 {
179 self.update(fng, pos);
180 }
181
182 if self.checkpoints.get(&fng).is_none() {
183 if let Ok(Some(fingerprint)) =
184 fingerprinter.get_legacy_checksum(path, fingerprint_buffer)
185 {
186 if let Some((_, pos)) = self.checkpoints.remove(&fingerprint) {
187 self.update(fng, pos);
188 }
189 }
190 if let Ok(Some(fingerprint)) =
191 fingerprinter.get_legacy_first_lines_checksum(path, fingerprint_buffer)
192 {
193 if let Some((_, pos)) = self.checkpoints.remove(&fingerprint) {
194 self.update(fng, pos);
195 }
196 }
197 }
198 }
199}
200
201impl Checkpointer {
202 pub fn new(data_dir: &Path) -> Checkpointer {
203 let directory = data_dir.join("checkpoints");
204 let glob_string = directory.join("*").to_string_lossy().into_owned();
205 let tmp_file_path = data_dir.join(TMP_FILE_NAME);
206 let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME);
207
208 Checkpointer {
209 directory,
210 glob_string,
211 tmp_file_path,
212 stable_file_path,
213 checkpoints: Arc::new(CheckpointsView::default()),
214 last: Mutex::new(None),
215 }
216 }
217
218 pub fn view(&self) -> Arc<CheckpointsView> {
219 Arc::clone(&self.checkpoints)
220 }
221
222 #[cfg(test)]
228 fn encode(&self, fng: FileFingerprint, pos: FilePosition) -> PathBuf {
229 use FileFingerprint::*;
230
231 let path = match fng {
232 BytesChecksum(c) => format!("g{c:x}.{pos}"),
233 FirstLinesChecksum(c) => format!("h{c:x}.{pos}"),
234 DevInode(dev, ino) => format!("i{dev:x}.{ino:x}.{pos}"),
235 Unknown(x) => format!("{x:x}.{pos}"),
236 };
237 self.directory.join(path)
238 }
239
240 fn decode(&self, path: &Path) -> (FileFingerprint, FilePosition) {
248 use FileFingerprint::*;
249
250 let file_name = &path.file_name().unwrap().to_string_lossy();
251 match file_name.chars().next().expect("empty file name") {
252 'g' => {
253 let (c, pos) = scan_fmt!(file_name, "g{x}.{}", [hex u64], FilePosition).unwrap();
254 (BytesChecksum(c), pos)
255 }
256 'h' => {
257 let (c, pos) = scan_fmt!(file_name, "h{x}.{}", [hex u64], FilePosition).unwrap();
258 (FirstLinesChecksum(c), pos)
259 }
260 'i' => {
261 let (dev, ino, pos) =
262 scan_fmt!(file_name, "i{x}.{x}.{}", [hex u64], [hex u64], FilePosition)
263 .unwrap();
264 (DevInode(dev, ino), pos)
265 }
266 _ => {
267 let (c, pos) = scan_fmt!(file_name, "{x}.{}", [hex u64], FilePosition).unwrap();
268 (Unknown(c), pos)
269 }
270 }
271 }
272
273 #[cfg(test)]
274 pub fn update_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) {
275 self.checkpoints.update(fng, pos);
276 }
277
278 #[cfg(test)]
279 pub fn get_checkpoint(&self, fng: FileFingerprint) -> Option<FilePosition> {
280 self.checkpoints.get(fng)
281 }
282
283 pub fn maybe_upgrade(
286 &mut self,
287 path: &Path,
288 fresh: FileFingerprint,
289 fingerprinter: &Fingerprinter,
290 fingerprint_buffer: &mut Vec<u8>,
291 ) {
292 self.checkpoints
293 .maybe_upgrade(path, fresh, fingerprinter, fingerprint_buffer)
294 }
295
296 pub fn write_checkpoints(&self) -> Result<usize, io::Error> {
300 self.checkpoints.remove_expired();
305
306 let current = self.checkpoints.get_state();
307
308 let mut last = self.last.lock().expect("Data poisoned.");
310 if last.as_ref() != Some(¤t) {
311 let mut f = io::BufWriter::new(fs::File::create(&self.tmp_file_path)?);
316 serde_json::to_writer(&mut f, ¤t)?;
317 f.into_inner()?.sync_all()?;
318
319 fs::rename(&self.tmp_file_path, &self.stable_file_path)?;
325
326 *last = Some(current);
327 }
328
329 Ok(self.checkpoints.checkpoints.len())
330 }
331
332 #[cfg(test)]
335 pub fn write_legacy_checkpoints(&mut self) -> Result<usize, io::Error> {
336 fs::remove_dir_all(&self.directory).ok();
337 fs::create_dir_all(&self.directory)?;
338 for c in self.checkpoints.checkpoints.iter() {
339 fs::File::create(self.encode(*c.key(), *c.value()))?;
340 }
341 Ok(self.checkpoints.checkpoints.len())
342 }
343
344 pub fn read_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
348 match self.read_checkpoints_file(&self.tmp_file_path) {
353 Ok(state) => {
354 warn!(message = "Recovered checkpoint data from interrupted process.");
355 self.checkpoints.set_state(state, ignore_before);
356
357 if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path) {
360 warn!(message = "Error persisting recovered checkpoint file.", %error);
361 }
362 return;
363 }
364 Err(error) if error.kind() == io::ErrorKind::NotFound => {
365 }
367 Err(error) => {
368 error!(message = "Unable to recover checkpoint data from interrupted process.", %error);
369 }
370 }
371
372 match self.read_checkpoints_file(&self.stable_file_path) {
376 Ok(state) => {
377 info!(message = "Loaded checkpoint data.");
378 self.checkpoints.set_state(state, ignore_before);
379 return;
380 }
381 Err(error) if error.kind() == io::ErrorKind::NotFound => {
382 }
384 Err(error) => {
385 warn!(message = "Unable to load checkpoint data.", %error);
386 return;
387 }
388 }
389
390 info!("Attempting to read legacy checkpoint files.");
393 self.read_legacy_checkpoints(ignore_before);
394
395 if self.write_checkpoints().is_ok() {
396 fs::remove_dir_all(&self.directory).ok();
397 }
398 }
399
400 fn read_checkpoints_file(&self, path: &Path) -> Result<State, io::Error> {
401 let reader = io::BufReader::new(fs::File::open(path)?);
402 serde_json::from_reader(reader).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
403 }
404
405 fn read_legacy_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
406 for path in glob(&self.glob_string).unwrap().flatten() {
407 let mut mtime = None;
408 if let Some(ignore_before) = ignore_before {
409 if let Ok(Ok(modified)) = fs::metadata(&path).map(|metadata| metadata.modified()) {
410 let modified = DateTime::<Utc>::from(modified);
411 if modified < ignore_before {
412 fs::remove_file(path).ok();
413 continue;
414 }
415 mtime = Some(modified);
416 }
417 }
418 let (fng, pos) = self.decode(&path);
419 self.checkpoints.checkpoints.insert(fng, pos);
420 if let Some(mtime) = mtime {
421 self.checkpoints.modified_times.insert(fng, mtime);
422 }
423 }
424 }
425}
426
427#[cfg(test)]
428mod test {
429 use chrono::{Duration, Utc};
430 use similar_asserts::assert_eq;
431 use tempfile::tempdir;
432
433 use super::{
434 super::{FingerprintStrategy, Fingerprinter},
435 Checkpoint, Checkpointer, FileFingerprint, FilePosition, CHECKPOINT_FILE_NAME,
436 TMP_FILE_NAME,
437 };
438
439 #[test]
440 fn test_checkpointer_basics() {
441 let fingerprints = vec![
442 FileFingerprint::DevInode(1, 2),
443 FileFingerprint::BytesChecksum(3456),
444 FileFingerprint::FirstLinesChecksum(78910),
445 FileFingerprint::Unknown(1337),
446 ];
447 for fingerprint in fingerprints {
448 let position: FilePosition = 1234;
449 let data_dir = tempdir().unwrap();
450 let mut chkptr = Checkpointer::new(data_dir.path());
451 assert_eq!(
452 chkptr.decode(&chkptr.encode(fingerprint, position)),
453 (fingerprint, position)
454 );
455 chkptr.update_checkpoint(fingerprint, position);
456 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
457 }
458 }
459
460 #[test]
461 fn test_checkpointer_ignore_before() {
462 let newer = (
463 FileFingerprint::DevInode(1, 2),
464 Utc::now() - Duration::seconds(5),
465 );
466 let newish = (
467 FileFingerprint::BytesChecksum(3456),
468 Utc::now() - Duration::seconds(10),
469 );
470 let oldish = (
471 FileFingerprint::FirstLinesChecksum(78910),
472 Utc::now() - Duration::seconds(15),
473 );
474 let older = (
475 FileFingerprint::Unknown(1337),
476 Utc::now() - Duration::seconds(20),
477 );
478 let ignore_before = Some(Utc::now() - Duration::seconds(12));
479
480 let position: FilePosition = 1234;
481 let data_dir = tempdir().unwrap();
482
483 {
485 let chkptr = Checkpointer::new(data_dir.path());
486
487 for (fingerprint, modified) in &[&newer, &newish, &oldish, &older] {
488 chkptr.checkpoints.load(Checkpoint {
489 fingerprint: *fingerprint,
490 position,
491 modified: *modified,
492 });
493 assert_eq!(chkptr.get_checkpoint(*fingerprint), Some(position));
494 chkptr.write_checkpoints().unwrap();
495 }
496 }
497
498 {
500 let mut chkptr = Checkpointer::new(data_dir.path());
501 chkptr.read_checkpoints(ignore_before);
502
503 assert_eq!(chkptr.get_checkpoint(newish.0), Some(position));
504 assert_eq!(chkptr.get_checkpoint(newer.0), Some(position));
505 assert_eq!(chkptr.get_checkpoint(oldish.0), None);
506 assert_eq!(chkptr.get_checkpoint(older.0), None);
507 }
508 }
509
510 #[test]
511 fn test_checkpointer_restart() {
512 let fingerprints = vec![
513 FileFingerprint::DevInode(1, 2),
514 FileFingerprint::BytesChecksum(3456),
515 FileFingerprint::FirstLinesChecksum(78910),
516 FileFingerprint::Unknown(1337),
517 ];
518 for fingerprint in fingerprints {
519 let position: FilePosition = 1234;
520 let data_dir = tempdir().unwrap();
521 {
522 let mut chkptr = Checkpointer::new(data_dir.path());
523 chkptr.update_checkpoint(fingerprint, position);
524 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
525 chkptr.write_checkpoints().ok();
526 }
527 {
528 let mut chkptr = Checkpointer::new(data_dir.path());
529 assert_eq!(chkptr.get_checkpoint(fingerprint), None);
530 chkptr.read_checkpoints(None);
531 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
532 }
533 }
534 }
535
536 #[test]
537 fn test_checkpointer_fingerprint_upgrades_unknown() {
538 let log_dir = tempdir().unwrap();
539 let path = log_dir.path().join("test.log");
540 let data = "hello\n";
541 std::fs::write(&path, data).unwrap();
542
543 let new_fingerprint = FileFingerprint::DevInode(1, 2);
544 let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.as_legacy());
545 let position: FilePosition = 1234;
546 let fingerprinter = Fingerprinter {
547 strategy: FingerprintStrategy::DevInode,
548 max_line_length: 1000,
549 ignore_not_found: false,
550 };
551
552 let mut buf = Vec::new();
553
554 let data_dir = tempdir().unwrap();
555 {
556 let mut chkptr = Checkpointer::new(data_dir.path());
557 chkptr.update_checkpoint(old_fingerprint, position);
558 assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
559 chkptr.write_checkpoints().ok();
560 }
561 {
562 let mut chkptr = Checkpointer::new(data_dir.path());
563 chkptr.read_checkpoints(None);
564 assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
565
566 chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);
567
568 assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
569 assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
570 }
571 }
572
573 #[test]
574 fn test_checkpointer_fingerprint_upgrades_legacy_checksum() {
575 let log_dir = tempdir().unwrap();
576 let path = log_dir.path().join("test.log");
577 let data = "hello\n";
578 std::fs::write(&path, data).unwrap();
579
580 let old_fingerprint = FileFingerprint::FirstLinesChecksum(18057733963141331840);
581 let new_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
582 let position: FilePosition = 6;
583
584 let fingerprinter = Fingerprinter {
585 strategy: FingerprintStrategy::FirstLinesChecksum {
586 ignored_header_bytes: 0,
587 lines: 1,
588 },
589 max_line_length: 102400,
590 ignore_not_found: false,
591 };
592
593 let mut buf = Vec::new();
594
595 let data_dir = tempdir().unwrap();
596 {
597 let mut chkptr = Checkpointer::new(data_dir.path());
598 chkptr.update_checkpoint(old_fingerprint, position);
599 assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
600 chkptr.write_checkpoints().ok();
601 }
602 {
603 let mut chkptr = Checkpointer::new(data_dir.path());
604 chkptr.read_checkpoints(None);
605 assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
606
607 chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);
608
609 assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
610 assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
611 }
612 }
613
614 #[test]
615 fn test_checkpointer_fingerprint_upgrades_legacy_first_lines_checksum() {
616 let log_dir = tempdir().unwrap();
617 let path = log_dir.path().join("test.log");
618 let data = "hello\n";
619 std::fs::write(&path, data).unwrap();
620
621 let old_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
622 let new_fingerprint = FileFingerprint::FirstLinesChecksum(11081174131906673079);
623 let position: FilePosition = 6;
624
625 let fingerprinter = Fingerprinter {
626 strategy: FingerprintStrategy::FirstLinesChecksum {
627 ignored_header_bytes: 0,
628 lines: 1,
629 },
630 max_line_length: 102400,
631 ignore_not_found: false,
632 };
633
634 let mut buf = Vec::new();
635
636 let data_dir = tempdir().unwrap();
637 {
638 let mut chkptr = Checkpointer::new(data_dir.path());
639 chkptr.update_checkpoint(old_fingerprint, position);
640 assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
641 chkptr.write_checkpoints().ok();
642 }
643 {
644 let mut chkptr = Checkpointer::new(data_dir.path());
645 chkptr.read_checkpoints(None);
646 assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
647
648 chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);
649
650 assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
651 assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
652 }
653 }
654
655 #[test]
656 fn test_checkpointer_file_upgrades() {
657 let fingerprint = FileFingerprint::DevInode(1, 2);
658 let position: FilePosition = 1234;
659
660 let data_dir = tempdir().unwrap();
661
662 {
664 let mut chkptr = Checkpointer::new(data_dir.path());
665 chkptr.update_checkpoint(fingerprint, position);
666 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
667 chkptr.write_legacy_checkpoints().unwrap();
668 }
669
670 assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
672 assert!(!data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
673 assert!(data_dir.path().join("checkpoints").is_dir());
674
675 {
678 let mut chkptr = Checkpointer::new(data_dir.path());
679 chkptr.read_checkpoints(None);
680 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
681 chkptr.write_checkpoints().unwrap();
682 }
683
684 assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
687 assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
688 assert!(!data_dir.path().join("checkpoints").is_dir());
689
690 {
692 let mut chkptr = Checkpointer::new(data_dir.path());
693 chkptr.read_checkpoints(None);
694 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
695 }
696 }
697
698 #[test]
699 fn test_checkpointer_expiration() {
700 let cases = vec![
701 (FileFingerprint::BytesChecksum(123), 0, 30),
703 (FileFingerprint::BytesChecksum(456), 1, 60),
704 (FileFingerprint::BytesChecksum(789), 2, 90),
705 (FileFingerprint::BytesChecksum(101112), 3, 120),
706 ];
707
708 let data_dir = tempdir().unwrap();
709 let mut chkptr = Checkpointer::new(data_dir.path());
710
711 for (fingerprint, position, removed) in cases.clone() {
712 chkptr.update_checkpoint(fingerprint, position);
713
714 chkptr
716 .checkpoints
717 .removed_times
718 .insert(fingerprint, Utc::now() - chrono::Duration::seconds(removed));
719
720 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
721 }
722
723 chkptr.update_checkpoint(cases[2].0, 42);
725
726 chkptr.write_checkpoints().unwrap();
728
729 assert_eq!(chkptr.get_checkpoint(cases[0].0), Some(0));
730 assert_eq!(chkptr.get_checkpoint(cases[1].0), None);
731 assert_eq!(chkptr.get_checkpoint(cases[2].0), Some(42));
732 assert_eq!(chkptr.get_checkpoint(cases[3].0), None);
733 }
734
735 #[test]
736 fn test_checkpointer_checksum_updates() {
737 let data_dir = tempdir().unwrap();
738
739 let fingerprinter = crate::Fingerprinter {
740 strategy: crate::FingerprintStrategy::Checksum {
741 bytes: 16,
742 ignored_header_bytes: 0,
743 lines: 1,
744 },
745 max_line_length: 1024,
746 ignore_not_found: false,
747 };
748
749 let log_path = data_dir.path().join("test.log");
750 let contents = "hello i am a test log line that is just long enough but not super long\n";
751 std::fs::write(&log_path, contents).expect("writing test data");
752
753 let mut buf = vec![0; 1024];
754 let old = fingerprinter
755 .get_bytes_checksum(&log_path, &mut buf)
756 .expect("getting old checksum")
757 .expect("still getting old checksum");
758
759 let new = fingerprinter
760 .get_fingerprint_of_file(&log_path, &mut buf)
761 .expect("getting new checksum");
762
763 match (old, new) {
765 (FileFingerprint::BytesChecksum(old), FileFingerprint::FirstLinesChecksum(new)) => {
766 assert_ne!(old, new)
767 }
768 _ => panic!("unexpected checksum types"),
769 }
770
771 let mut chkptr = Checkpointer::new(data_dir.path());
772
773 chkptr.update_checkpoint(old, 1234);
775
776 assert!(chkptr.checkpoints.contains_bytes_checksums());
777
778 chkptr.maybe_upgrade(&log_path, new, &fingerprinter, &mut buf);
779
780 assert!(!chkptr.checkpoints.contains_bytes_checksums());
781 assert_eq!(Some(1234), chkptr.get_checkpoint(new));
782 assert_eq!(None, chkptr.get_checkpoint(old));
783 }
784
785 #[test]
787 fn test_checkpointer_serialization() {
788 let fingerprints = vec![
789 (
790 FileFingerprint::DevInode(1, 2),
791 r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
792 ),
793 (
794 FileFingerprint::BytesChecksum(3456),
795 r#"{"version":"1","checkpoints":[{"fingerprint":{"checksum":3456},"position":1234}]}"#,
796 ),
797 (
798 FileFingerprint::FirstLinesChecksum(78910),
799 r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
800 ),
801 (
802 FileFingerprint::Unknown(1337),
803 r#"{"version":"1","checkpoints":[{"fingerprint":{"unknown":1337},"position":1234}]}"#,
804 ),
805 ];
806 for (fingerprint, expected) in fingerprints {
807 let expected: serde_json::Value = serde_json::from_str(expected).unwrap();
808
809 let position: FilePosition = 1234;
810 let data_dir = tempdir().unwrap();
811 let mut chkptr = Checkpointer::new(data_dir.path());
812
813 chkptr.update_checkpoint(fingerprint, position);
814 chkptr.write_checkpoints().unwrap();
815
816 let got: serde_json::Value = {
817 let s =
818 std::fs::read_to_string(data_dir.path().join(CHECKPOINT_FILE_NAME)).unwrap();
819 let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
820 for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
821 checkpoint.as_object_mut().unwrap().remove("modified");
822 }
823 checkpoints
824 };
825
826 assert_eq!(expected, got);
827 }
828 }
829
830 #[test]
833 fn test_checkpointer_deserialization() {
834 let serialized_checkpoints = r#"
835{
836 "version": "1",
837 "checkpoints": [
838 {
839 "fingerprint": { "dev_inode": [ 1, 2 ] },
840 "position": 1234,
841 "modified": "2021-07-12T18:19:11.769003Z"
842 },
843 {
844 "fingerprint": { "checksum": 3456 },
845 "position": 1234,
846 "modified": "2021-07-12T18:19:11.769003Z"
847 },
848 {
849 "fingerprint": { "first_line_checksum": 1234 },
850 "position": 1234,
851 "modified": "2021-07-12T18:19:11.769003Z"
852 },
853 {
854 "fingerprint": { "first_lines_checksum": 78910 },
855 "position": 1234,
856 "modified": "2021-07-12T18:19:11.769003Z"
857 },
858 {
859 "fingerprint": { "unknown": 1337 },
860 "position": 1234,
861 "modified": "2021-07-12T18:19:11.769003Z"
862 }
863 ]
864}
865 "#;
866 let fingerprints = vec![
867 FileFingerprint::DevInode(1, 2),
868 FileFingerprint::BytesChecksum(3456),
869 FileFingerprint::FirstLinesChecksum(1234),
870 FileFingerprint::FirstLinesChecksum(78910),
871 FileFingerprint::Unknown(1337),
872 ];
873
874 let data_dir = tempdir().unwrap();
875
876 let mut chkptr = Checkpointer::new(data_dir.path());
877
878 std::fs::write(
879 data_dir.path().join(CHECKPOINT_FILE_NAME),
880 serialized_checkpoints,
881 )
882 .unwrap();
883
884 chkptr.read_checkpoints(None);
885
886 for fingerprint in fingerprints {
887 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
888 }
889 }
890}