1use std::{
2 collections::BTreeSet,
3 io,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use glob::glob;
11use serde::{Deserialize, Serialize};
12use tokio::{
13 fs::{self, File},
14 io::{AsyncReadExt, BufReader},
15 sync::Mutex,
16};
17use tracing::{error, info, warn};
18
19use super::{
20 FilePosition,
21 fingerprinter::{FileFingerprint, Fingerprinter},
22};
23
24const TMP_FILE_NAME: &str = "checkpoints.new.json";
25pub const CHECKPOINT_FILE_NAME: &str = "checkpoints.json";
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
32#[serde(tag = "version", rename_all = "snake_case")]
33enum State {
34 #[serde(rename = "1")]
35 V1 { checkpoints: BTreeSet<Checkpoint> },
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
41#[serde(rename_all = "snake_case")]
42struct Checkpoint {
43 fingerprint: FileFingerprint,
44 position: FilePosition,
45 modified: DateTime<Utc>,
46}
47
48pub struct Checkpointer {
49 directory: PathBuf,
50 tmp_file_path: PathBuf,
51 stable_file_path: PathBuf,
52 glob_string: String,
53 checkpoints: Arc<CheckpointsView>,
54 last: Mutex<Option<State>>,
55}
56
57#[derive(Debug, Default)]
60pub struct CheckpointsView {
61 checkpoints: DashMap<FileFingerprint, FilePosition>,
62 modified_times: DashMap<FileFingerprint, DateTime<Utc>>,
63 removed_times: DashMap<FileFingerprint, DateTime<Utc>>,
64}
65
66impl CheckpointsView {
67 pub fn update(&self, fng: FileFingerprint, pos: FilePosition) {
68 self.checkpoints.insert(fng, pos);
69 self.modified_times.insert(fng, Utc::now());
70 self.removed_times.remove(&fng);
71 }
72
73 pub fn get(&self, fng: FileFingerprint) -> Option<FilePosition> {
74 self.checkpoints.get(&fng).map(|r| *r.value())
75 }
76
77 pub fn set_dead(&self, fng: FileFingerprint) {
78 self.removed_times.insert(fng, Utc::now());
79 }
80
81 pub fn update_key(&self, old: FileFingerprint, new: FileFingerprint) {
82 if let Some((_, value)) = self.checkpoints.remove(&old) {
83 self.checkpoints.insert(new, value);
84 }
85
86 if let Some((_, value)) = self.modified_times.remove(&old) {
87 self.modified_times.insert(new, value);
88 }
89
90 if let Some((_, value)) = self.removed_times.remove(&old) {
91 self.removed_times.insert(new, value);
92 }
93 }
94
95 pub fn contains_bytes_checksums(&self) -> bool {
96 self.checkpoints
97 .iter()
98 .any(|entry| matches!(entry.key(), FileFingerprint::BytesChecksum(_)))
99 }
100
101 pub fn remove_expired(&self) {
102 let now = Utc::now();
103
104 let to_remove = self
108 .removed_times
109 .iter()
110 .filter(|entry| {
111 let ts = entry.value();
112 let duration = now - *ts;
113 duration >= chrono::Duration::seconds(60)
114 })
115 .map(|entry| *entry.key())
116 .collect::<Vec<FileFingerprint>>();
117
118 for fng in to_remove {
119 self.checkpoints.remove(&fng);
120 self.modified_times.remove(&fng);
121 self.removed_times.remove(&fng);
122 }
123 }
124
125 fn load(&self, checkpoint: Checkpoint) {
126 self.checkpoints
127 .insert(checkpoint.fingerprint, checkpoint.position);
128 self.modified_times
129 .insert(checkpoint.fingerprint, checkpoint.modified);
130 }
131
132 fn set_state(&self, state: State, ignore_before: Option<DateTime<Utc>>) {
133 match state {
134 State::V1 { checkpoints } => {
135 for checkpoint in checkpoints {
136 if let Some(ignore_before) = ignore_before
137 && checkpoint.modified < ignore_before
138 {
139 continue;
140 }
141 self.load(checkpoint);
142 }
143 }
144 }
145 }
146
147 fn get_state(&self) -> State {
148 State::V1 {
149 checkpoints: self
150 .checkpoints
151 .iter()
152 .map(|entry| {
153 let fingerprint = entry.key();
154 let position = entry.value();
155 Checkpoint {
156 fingerprint: *fingerprint,
157 position: *position,
158 modified: self
159 .modified_times
160 .get(fingerprint)
161 .map(|r| *r.value())
162 .unwrap_or_else(Utc::now),
163 }
164 })
165 .collect(),
166 }
167 }
168
169 async fn maybe_upgrade(
170 &self,
171 path: &Path,
172 fng: FileFingerprint,
173 fingerprinter: &Fingerprinter,
174 fingerprint_buffer: &mut Vec<u8>,
175 ) {
176 if let Ok(Some(old_checksum)) = fingerprinter
177 .get_bytes_checksum(path, fingerprint_buffer)
178 .await
179 {
180 self.update_key(old_checksum, fng)
181 }
182
183 if let Some((_, pos)) = self
184 .checkpoints
185 .remove(&FileFingerprint::Unknown(fng.as_legacy().await))
186 {
187 self.update(fng, pos);
188 }
189
190 if self.checkpoints.get(&fng).is_none() {
191 if let Ok(Some(fingerprint)) = fingerprinter
192 .get_legacy_checksum(path, fingerprint_buffer)
193 .await
194 && let Some((_, pos)) = self.checkpoints.remove(&fingerprint)
195 {
196 self.update(fng, pos);
197 }
198 if let Ok(Some(fingerprint)) = fingerprinter
199 .get_legacy_first_lines_checksum(path, fingerprint_buffer)
200 .await
201 && let Some((_, pos)) = self.checkpoints.remove(&fingerprint)
202 {
203 self.update(fng, pos);
204 }
205 }
206 }
207}
208
209impl Checkpointer {
210 pub fn new(data_dir: &Path) -> Checkpointer {
211 let directory = data_dir.join("checkpoints");
212 let glob_string = directory.join("*").to_string_lossy().into_owned();
213 let tmp_file_path = data_dir.join(TMP_FILE_NAME);
214 let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME);
215
216 Checkpointer {
217 directory,
218 glob_string,
219 tmp_file_path,
220 stable_file_path,
221 checkpoints: Arc::new(CheckpointsView::default()),
222 last: Mutex::new(None),
223 }
224 }
225
226 pub fn view(&self) -> Arc<CheckpointsView> {
227 Arc::clone(&self.checkpoints)
228 }
229
230 #[cfg(test)]
236 fn encode(&self, fng: FileFingerprint, pos: FilePosition) -> PathBuf {
237 use FileFingerprint::*;
238
239 let path = match fng {
240 BytesChecksum(c) => format!("g{c:x}.{pos}"),
241 FirstLinesChecksum(c) => format!("h{c:x}.{pos}"),
242 DevInode(dev, ino) => format!("i{dev:x}.{ino:x}.{pos}"),
243 Unknown(x) => format!("{x:x}.{pos}"),
244 };
245 self.directory.join(path)
246 }
247
248 fn decode(&self, path: &Path) -> (FileFingerprint, FilePosition) {
256 use FileFingerprint::*;
257
258 let file_name = &path.file_name().unwrap().to_string_lossy();
259 match file_name.chars().next().expect("empty file name") {
260 'g' => {
261 let (c, pos) = scan_fmt!(file_name, "g{x}.{}", [hex u64], FilePosition).unwrap();
262 (BytesChecksum(c), pos)
263 }
264 'h' => {
265 let (c, pos) = scan_fmt!(file_name, "h{x}.{}", [hex u64], FilePosition).unwrap();
266 (FirstLinesChecksum(c), pos)
267 }
268 'i' => {
269 let (dev, ino, pos) =
270 scan_fmt!(file_name, "i{x}.{x}.{}", [hex u64], [hex u64], FilePosition)
271 .unwrap();
272 (DevInode(dev, ino), pos)
273 }
274 _ => {
275 let (c, pos) = scan_fmt!(file_name, "{x}.{}", [hex u64], FilePosition).unwrap();
276 (Unknown(c), pos)
277 }
278 }
279 }
280
281 #[cfg(test)]
282 pub fn update_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) {
283 self.checkpoints.update(fng, pos);
284 }
285
286 #[cfg(test)]
287 pub fn get_checkpoint(&self, fng: FileFingerprint) -> Option<FilePosition> {
288 self.checkpoints.get(fng)
289 }
290
291 pub async fn maybe_upgrade(
294 &mut self,
295 path: &Path,
296 fresh: FileFingerprint,
297 fingerprinter: &Fingerprinter,
298 fingerprint_buffer: &mut Vec<u8>,
299 ) {
300 self.checkpoints
301 .maybe_upgrade(path, fresh, fingerprinter, fingerprint_buffer)
302 .await
303 }
304
305 pub async fn write_checkpoints(&self) -> Result<usize, io::Error> {
309 self.checkpoints.remove_expired();
314
315 let current = self.checkpoints.get_state();
316
317 let mut last = self.last.lock().await;
319 if last.as_ref() != Some(¤t) {
320 let tmp_file_path = self.tmp_file_path.clone();
325
326 let current = tokio::task::spawn_blocking(move || -> Result<State, io::Error> {
328 let mut f = std::io::BufWriter::new(std::fs::File::create(tmp_file_path)?);
329 serde_json::to_writer(&mut f, ¤t)?;
330 f.into_inner()?.sync_all()?;
331 Ok(current)
332 })
333 .await
334 .map_err(io::Error::other)??;
335
336 fs::rename(&self.tmp_file_path, &self.stable_file_path).await?;
342
343 *last = Some(current);
344 }
345
346 Ok(self.checkpoints.checkpoints.len())
347 }
348
349 #[cfg(test)]
352 pub async fn write_legacy_checkpoints(&mut self) -> Result<usize, io::Error> {
353 use tokio::fs::File;
354
355 fs::remove_dir_all(&self.directory).await.ok();
356 fs::create_dir_all(&self.directory).await?;
357 for c in self.checkpoints.checkpoints.iter() {
358 File::create(self.encode(*c.key(), *c.value())).await?;
359 }
360 Ok(self.checkpoints.checkpoints.len())
361 }
362
363 pub async fn read_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
367 match self.read_checkpoints_file(&self.tmp_file_path).await {
372 Ok(state) => {
373 warn!(message = "Recovered checkpoint data from interrupted process.");
374 self.checkpoints.set_state(state, ignore_before);
375
376 if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path).await {
379 warn!(message = "Error persisting recovered checkpoint file.", %error);
380 }
381 return;
382 }
383 Err(error) if error.kind() == io::ErrorKind::NotFound => {
384 }
386 Err(error) => {
387 error!(message = "Unable to recover checkpoint data from interrupted process.", %error);
388 }
389 }
390
391 match self.read_checkpoints_file(&self.stable_file_path).await {
395 Ok(state) => {
396 info!(message = "Loaded checkpoint data.");
397 self.checkpoints.set_state(state, ignore_before);
398 return;
399 }
400 Err(error) if error.kind() == io::ErrorKind::NotFound => {
401 }
403 Err(error) => {
404 warn!(message = "Unable to load checkpoint data.", %error);
405 return;
406 }
407 }
408
409 info!("Attempting to read legacy checkpoint files.");
412 self.read_legacy_checkpoints(ignore_before).await;
413
414 if self.write_checkpoints().await.is_ok() {
415 fs::remove_dir_all(&self.directory).await.ok();
416 }
417 }
418
419 async fn read_checkpoints_file(&self, path: &Path) -> Result<State, io::Error> {
420 let mut reader = BufReader::new(File::open(path).await?);
424 let mut output = Vec::new();
425 reader.read_to_end(&mut output).await?;
426
427 serde_json::from_slice(&output[..])
428 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
429 }
430
431 async fn read_legacy_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
432 for path in glob(&self.glob_string).unwrap().flatten() {
433 let mut mtime = None;
434 if let Some(ignore_before) = ignore_before
435 && let Ok(Ok(modified)) = fs::metadata(&path)
436 .await
437 .map(|metadata| metadata.modified())
438 {
439 let modified = DateTime::<Utc>::from(modified);
440 if modified < ignore_before {
441 fs::remove_file(path).await.ok();
442 continue;
443 }
444 mtime = Some(modified);
445 }
446 let (fng, pos) = self.decode(&path);
447 self.checkpoints.checkpoints.insert(fng, pos);
448 if let Some(mtime) = mtime {
449 self.checkpoints.modified_times.insert(fng, mtime);
450 }
451 }
452 }
453}
454
455#[cfg(test)]
456mod test {
457 use chrono::{Duration, Utc};
458 use similar_asserts::assert_eq;
459 use tempfile::tempdir;
460 use tokio::fs;
461
462 use super::{
463 super::{FingerprintStrategy, Fingerprinter},
464 CHECKPOINT_FILE_NAME, Checkpoint, Checkpointer, FileFingerprint, FilePosition,
465 TMP_FILE_NAME,
466 };
467
468 #[test]
469 fn test_checkpointer_basics() {
470 let fingerprints = vec![
471 FileFingerprint::DevInode(1, 2),
472 FileFingerprint::BytesChecksum(3456),
473 FileFingerprint::FirstLinesChecksum(78910),
474 FileFingerprint::Unknown(1337),
475 ];
476 for fingerprint in fingerprints {
477 let position: FilePosition = 1234;
478 let data_dir = tempdir().unwrap();
479 let mut chkptr = Checkpointer::new(data_dir.path());
480 assert_eq!(
481 chkptr.decode(&chkptr.encode(fingerprint, position)),
482 (fingerprint, position)
483 );
484 chkptr.update_checkpoint(fingerprint, position);
485 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
486 }
487 }
488
489 #[tokio::test]
490 async fn test_checkpointer_ignore_before() {
491 let now = Utc::now();
492 let newer = (FileFingerprint::DevInode(1, 2), now - Duration::seconds(5));
493 let newish = (
494 FileFingerprint::BytesChecksum(3456),
495 now - Duration::seconds(10),
496 );
497 let oldish = (
498 FileFingerprint::FirstLinesChecksum(78910),
499 now - Duration::seconds(15),
500 );
501 let older = (FileFingerprint::Unknown(1337), now - Duration::seconds(20));
502 let ignore_before = Some(now - Duration::seconds(12));
503
504 let position: FilePosition = 1234;
505 let data_dir = tempdir().unwrap();
506
507 {
509 let chkptr = Checkpointer::new(data_dir.path());
510
511 for (fingerprint, modified) in &[&newer, &newish, &oldish, &older] {
512 chkptr.checkpoints.load(Checkpoint {
513 fingerprint: *fingerprint,
514 position,
515 modified: *modified,
516 });
517 assert_eq!(chkptr.get_checkpoint(*fingerprint), Some(position));
518 chkptr.write_checkpoints().await.unwrap();
519 }
520 }
521
522 {
524 let mut chkptr = Checkpointer::new(data_dir.path());
525 chkptr.read_checkpoints(ignore_before).await;
526
527 assert_eq!(chkptr.get_checkpoint(newish.0), Some(position));
528 assert_eq!(chkptr.get_checkpoint(newer.0), Some(position));
529 assert_eq!(chkptr.get_checkpoint(oldish.0), None);
530 assert_eq!(chkptr.get_checkpoint(older.0), None);
531 }
532 }
533
534 #[tokio::test]
535 async fn test_checkpointer_restart() {
536 let fingerprints = vec![
537 FileFingerprint::DevInode(1, 2),
538 FileFingerprint::BytesChecksum(3456),
539 FileFingerprint::FirstLinesChecksum(78910),
540 FileFingerprint::Unknown(1337),
541 ];
542 for fingerprint in fingerprints {
543 let position: FilePosition = 1234;
544 let data_dir = tempdir().unwrap();
545 {
546 let mut chkptr = Checkpointer::new(data_dir.path());
547 chkptr.update_checkpoint(fingerprint, position);
548 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
549 chkptr.write_checkpoints().await.unwrap();
550 }
551 {
552 let mut chkptr = Checkpointer::new(data_dir.path());
553 assert_eq!(chkptr.get_checkpoint(fingerprint), None);
554 chkptr.read_checkpoints(None).await;
555 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
556 }
557 }
558 }
559
560 #[tokio::test]
561 async fn test_checkpointer_fingerprint_upgrades_unknown() {
562 let log_dir = tempdir().unwrap();
563 let path = log_dir.path().join("test.log");
564 let data = "hello\n";
565 fs::write(&path, data).await.unwrap();
566
567 let new_fingerprint = FileFingerprint::DevInode(1, 2);
568 let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.as_legacy().await);
569 let position: FilePosition = 1234;
570 let fingerprinter = Fingerprinter {
571 strategy: FingerprintStrategy::DevInode,
572 max_line_length: 1000,
573 ignore_not_found: false,
574 };
575
576 let mut buf = Vec::new();
577
578 let data_dir = tempdir().unwrap();
579 {
580 let mut chkptr = Checkpointer::new(data_dir.path());
581 chkptr.update_checkpoint(old_fingerprint, position);
582 assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
583 chkptr.write_checkpoints().await.unwrap();
584 }
585 {
586 let mut chkptr = Checkpointer::new(data_dir.path());
587 chkptr.read_checkpoints(None).await;
588 assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
589
590 chkptr
591 .maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf)
592 .await;
593
594 assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
595 assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
596 }
597 }
598
599 #[tokio::test]
600 async fn test_checkpointer_fingerprint_upgrades_legacy_checksum() {
601 let log_dir = tempdir().unwrap();
602 let path = log_dir.path().join("test.log");
603 let data = "hello\n";
604 fs::write(&path, data).await.unwrap();
605
606 let old_fingerprint = FileFingerprint::FirstLinesChecksum(18057733963141331840);
607 let new_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
608 let position: FilePosition = 6;
609
610 let fingerprinter = Fingerprinter {
611 strategy: FingerprintStrategy::FirstLinesChecksum {
612 ignored_header_bytes: 0,
613 lines: 1,
614 },
615 max_line_length: 102400,
616 ignore_not_found: false,
617 };
618
619 let mut buf = Vec::new();
620
621 let data_dir = tempdir().unwrap();
622 {
623 let mut chkptr = Checkpointer::new(data_dir.path());
624 chkptr.update_checkpoint(old_fingerprint, position);
625 assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
626 chkptr.write_checkpoints().await.unwrap();
627 }
628 {
629 let mut chkptr = Checkpointer::new(data_dir.path());
630 chkptr.read_checkpoints(None).await;
631 assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
632
633 chkptr
634 .maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf)
635 .await;
636
637 assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
638 assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
639 }
640 }
641
642 #[tokio::test]
643 async fn test_checkpointer_fingerprint_upgrades_legacy_first_lines_checksum() {
644 let log_dir = tempdir().unwrap();
645 let path = log_dir.path().join("test.log");
646 let data = "hello\n";
647 fs::write(&path, data).await.unwrap();
648
649 let old_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
650 let new_fingerprint = FileFingerprint::FirstLinesChecksum(11081174131906673079);
651 let position: FilePosition = 6;
652
653 let fingerprinter = Fingerprinter {
654 strategy: FingerprintStrategy::FirstLinesChecksum {
655 ignored_header_bytes: 0,
656 lines: 1,
657 },
658 max_line_length: 102400,
659 ignore_not_found: false,
660 };
661
662 let mut buf = Vec::new();
663
664 let data_dir = tempdir().unwrap();
665 {
666 let mut chkptr = Checkpointer::new(data_dir.path());
667 chkptr.update_checkpoint(old_fingerprint, position);
668 assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
669 chkptr.write_checkpoints().await.unwrap();
670 }
671 {
672 let mut chkptr = Checkpointer::new(data_dir.path());
673 chkptr.read_checkpoints(None).await;
674 assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
675
676 chkptr
677 .maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf)
678 .await;
679
680 assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
681 assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
682 }
683 }
684
685 #[tokio::test]
686 async fn test_checkpointer_file_upgrades() {
687 let fingerprint = FileFingerprint::DevInode(1, 2);
688 let position: FilePosition = 1234;
689
690 let data_dir = tempdir().unwrap();
691
692 {
694 let mut chkptr = Checkpointer::new(data_dir.path());
695 chkptr.update_checkpoint(fingerprint, position);
696 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
697 chkptr.write_legacy_checkpoints().await.unwrap();
698 }
699
700 assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
702 assert!(!data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
703 assert!(data_dir.path().join("checkpoints").is_dir());
704
705 {
708 let mut chkptr = Checkpointer::new(data_dir.path());
709 chkptr.read_checkpoints(None).await;
710 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
711 chkptr.write_checkpoints().await.unwrap();
712 }
713
714 assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
717 assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
718 assert!(!data_dir.path().join("checkpoints").is_dir());
719
720 {
722 let mut chkptr = Checkpointer::new(data_dir.path());
723 chkptr.read_checkpoints(None).await;
724 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
725 }
726 }
727
728 #[tokio::test]
729 async fn test_checkpointer_expiration() {
730 let cases = vec![
731 (FileFingerprint::BytesChecksum(123), 0, 30),
733 (FileFingerprint::BytesChecksum(456), 1, 60),
734 (FileFingerprint::BytesChecksum(789), 2, 90),
735 (FileFingerprint::BytesChecksum(101112), 3, 120),
736 ];
737
738 let data_dir = tempdir().unwrap();
739 let mut chkptr = Checkpointer::new(data_dir.path());
740
741 for (fingerprint, position, removed) in cases.clone() {
742 chkptr.update_checkpoint(fingerprint, position);
743
744 chkptr
746 .checkpoints
747 .removed_times
748 .insert(fingerprint, Utc::now() - chrono::Duration::seconds(removed));
749
750 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
751 }
752
753 chkptr.update_checkpoint(cases[2].0, 42);
755
756 chkptr.write_checkpoints().await.unwrap();
758
759 assert_eq!(chkptr.get_checkpoint(cases[0].0), Some(0));
760 assert_eq!(chkptr.get_checkpoint(cases[1].0), None);
761 assert_eq!(chkptr.get_checkpoint(cases[2].0), Some(42));
762 assert_eq!(chkptr.get_checkpoint(cases[3].0), None);
763 }
764
765 #[tokio::test]
766 async fn test_checkpointer_checksum_updates() {
767 let data_dir = tempdir().unwrap();
768
769 let fingerprinter = crate::Fingerprinter {
770 strategy: crate::FingerprintStrategy::Checksum {
771 bytes: 16,
772 ignored_header_bytes: 0,
773 lines: 1,
774 },
775 max_line_length: 1024,
776 ignore_not_found: false,
777 };
778
779 let log_path = data_dir.path().join("test.log");
780 let contents = "hello i am a test log line that is just long enough but not super long\n";
781 fs::write(&log_path, contents)
782 .await
783 .expect("writing test data");
784
785 let mut buf = vec![0; 1024];
786 let old = fingerprinter
787 .get_bytes_checksum(&log_path, &mut buf)
788 .await
789 .expect("getting old checksum")
790 .expect("still getting old checksum");
791
792 let new = fingerprinter
793 .get_fingerprint_of_file(&log_path, &mut buf)
794 .await
795 .expect("getting new checksum");
796
797 match (old, new) {
799 (FileFingerprint::BytesChecksum(old), FileFingerprint::FirstLinesChecksum(new)) => {
800 assert_ne!(old, new)
801 }
802 _ => panic!("unexpected checksum types"),
803 }
804
805 let mut chkptr = Checkpointer::new(data_dir.path());
806
807 chkptr.update_checkpoint(old, 1234);
809
810 assert!(chkptr.checkpoints.contains_bytes_checksums());
811
812 chkptr
813 .maybe_upgrade(&log_path, new, &fingerprinter, &mut buf)
814 .await;
815
816 assert!(!chkptr.checkpoints.contains_bytes_checksums());
817 assert_eq!(Some(1234), chkptr.get_checkpoint(new));
818 assert_eq!(None, chkptr.get_checkpoint(old));
819 }
820
821 #[tokio::test]
823 async fn test_checkpointer_serialization() {
824 let fingerprints = vec![
825 (
826 FileFingerprint::DevInode(1, 2),
827 r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
828 ),
829 (
830 FileFingerprint::BytesChecksum(3456),
831 r#"{"version":"1","checkpoints":[{"fingerprint":{"checksum":3456},"position":1234}]}"#,
832 ),
833 (
834 FileFingerprint::FirstLinesChecksum(78910),
835 r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
836 ),
837 (
838 FileFingerprint::Unknown(1337),
839 r#"{"version":"1","checkpoints":[{"fingerprint":{"unknown":1337},"position":1234}]}"#,
840 ),
841 ];
842 for (fingerprint, expected) in fingerprints {
843 let expected: serde_json::Value = serde_json::from_str(expected).unwrap();
844
845 let position: FilePosition = 1234;
846 let data_dir = tempdir().unwrap();
847 let mut chkptr = Checkpointer::new(data_dir.path());
848
849 chkptr.update_checkpoint(fingerprint, position);
850 chkptr.write_checkpoints().await.unwrap();
851
852 let got: serde_json::Value = {
853 let s = fs::read_to_string(data_dir.path().join(CHECKPOINT_FILE_NAME))
854 .await
855 .unwrap();
856 let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
857 for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
858 checkpoint.as_object_mut().unwrap().remove("modified");
859 }
860 checkpoints
861 };
862
863 assert_eq!(expected, got);
864 }
865 }
866
867 #[tokio::test]
870 async fn test_checkpointer_deserialization() {
871 let serialized_checkpoints = r#"
872{
873 "version": "1",
874 "checkpoints": [
875 {
876 "fingerprint": { "dev_inode": [ 1, 2 ] },
877 "position": 1234,
878 "modified": "2021-07-12T18:19:11.769003Z"
879 },
880 {
881 "fingerprint": { "checksum": 3456 },
882 "position": 1234,
883 "modified": "2021-07-12T18:19:11.769003Z"
884 },
885 {
886 "fingerprint": { "first_line_checksum": 1234 },
887 "position": 1234,
888 "modified": "2021-07-12T18:19:11.769003Z"
889 },
890 {
891 "fingerprint": { "first_lines_checksum": 78910 },
892 "position": 1234,
893 "modified": "2021-07-12T18:19:11.769003Z"
894 },
895 {
896 "fingerprint": { "unknown": 1337 },
897 "position": 1234,
898 "modified": "2021-07-12T18:19:11.769003Z"
899 }
900 ]
901}
902 "#;
903 let fingerprints = vec![
904 FileFingerprint::DevInode(1, 2),
905 FileFingerprint::BytesChecksum(3456),
906 FileFingerprint::FirstLinesChecksum(1234),
907 FileFingerprint::FirstLinesChecksum(78910),
908 FileFingerprint::Unknown(1337),
909 ];
910
911 let data_dir = tempdir().unwrap();
912
913 let mut chkptr = Checkpointer::new(data_dir.path());
914
915 fs::write(
916 data_dir.path().join(CHECKPOINT_FILE_NAME),
917 serialized_checkpoints,
918 )
919 .await
920 .unwrap();
921
922 chkptr.read_checkpoints(None).await;
923
924 for fingerprint in fingerprints {
925 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
926 }
927 }
928}