file_source/
checkpointer.rs

1use std::{
2    collections::BTreeSet,
3    fs, io,
4    path::{Path, PathBuf},
5    sync::{Arc, Mutex},
6};
7
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use glob::glob;
11use serde::{Deserialize, Serialize};
12use tracing::{error, info, warn};
13
14use super::{
15    fingerprinter::{FileFingerprint, Fingerprinter},
16    FilePosition,
17};
18
19const TMP_FILE_NAME: &str = "checkpoints.new.json";
20pub const CHECKPOINT_FILE_NAME: &str = "checkpoints.json";
21
22/// This enum represents the file format of checkpoints persisted to disk. Right
23/// now there is only one variant, but any incompatible changes will require and
24/// additional variant to be added here and handled anywhere that we transit
25/// this format.
26#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
27#[serde(tag = "version", rename_all = "snake_case")]
28enum State {
29    #[serde(rename = "1")]
30    V1 { checkpoints: BTreeSet<Checkpoint> },
31}
32
33/// A simple JSON-friendly struct of the fingerprint/position pair, since
34/// fingerprints as objects cannot be keys in a plain JSON map.
35#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
36#[serde(rename_all = "snake_case")]
37struct Checkpoint {
38    fingerprint: FileFingerprint,
39    position: FilePosition,
40    modified: DateTime<Utc>,
41}
42
43pub struct Checkpointer {
44    directory: PathBuf,
45    tmp_file_path: PathBuf,
46    stable_file_path: PathBuf,
47    glob_string: String,
48    checkpoints: Arc<CheckpointsView>,
49    last: Mutex<Option<State>>,
50}
51
52/// A thread-safe handle for reading and writing checkpoints in-memory across
53/// multiple threads.
54#[derive(Debug, Default)]
55pub struct CheckpointsView {
56    checkpoints: DashMap<FileFingerprint, FilePosition>,
57    modified_times: DashMap<FileFingerprint, DateTime<Utc>>,
58    removed_times: DashMap<FileFingerprint, DateTime<Utc>>,
59}
60
61impl CheckpointsView {
62    pub fn update(&self, fng: FileFingerprint, pos: FilePosition) {
63        self.checkpoints.insert(fng, pos);
64        self.modified_times.insert(fng, Utc::now());
65        self.removed_times.remove(&fng);
66    }
67
68    pub fn get(&self, fng: FileFingerprint) -> Option<FilePosition> {
69        self.checkpoints.get(&fng).map(|r| *r.value())
70    }
71
72    pub fn set_dead(&self, fng: FileFingerprint) {
73        self.removed_times.insert(fng, Utc::now());
74    }
75
76    pub fn update_key(&self, old: FileFingerprint, new: FileFingerprint) {
77        if let Some((_, value)) = self.checkpoints.remove(&old) {
78            self.checkpoints.insert(new, value);
79        }
80
81        if let Some((_, value)) = self.modified_times.remove(&old) {
82            self.modified_times.insert(new, value);
83        }
84
85        if let Some((_, value)) = self.removed_times.remove(&old) {
86            self.removed_times.insert(new, value);
87        }
88    }
89
90    pub fn contains_bytes_checksums(&self) -> bool {
91        self.checkpoints
92            .iter()
93            .any(|entry| matches!(entry.key(), FileFingerprint::BytesChecksum(_)))
94    }
95
96    pub fn remove_expired(&self) {
97        let now = Utc::now();
98
99        // Collect all of the expired keys. Removing them while iterating can
100        // lead to deadlocks, the set should be small, and this is not a
101        // performance-sensitive path.
102        let to_remove = self
103            .removed_times
104            .iter()
105            .filter(|entry| {
106                let ts = entry.value();
107                let duration = now - *ts;
108                duration >= chrono::Duration::seconds(60)
109            })
110            .map(|entry| *entry.key())
111            .collect::<Vec<FileFingerprint>>();
112
113        for fng in to_remove {
114            self.checkpoints.remove(&fng);
115            self.modified_times.remove(&fng);
116            self.removed_times.remove(&fng);
117        }
118    }
119
120    fn load(&self, checkpoint: Checkpoint) {
121        self.checkpoints
122            .insert(checkpoint.fingerprint, checkpoint.position);
123        self.modified_times
124            .insert(checkpoint.fingerprint, checkpoint.modified);
125    }
126
127    fn set_state(&self, state: State, ignore_before: Option<DateTime<Utc>>) {
128        match state {
129            State::V1 { checkpoints } => {
130                for checkpoint in checkpoints {
131                    if let Some(ignore_before) = ignore_before {
132                        if checkpoint.modified < ignore_before {
133                            continue;
134                        }
135                    }
136                    self.load(checkpoint);
137                }
138            }
139        }
140    }
141
142    fn get_state(&self) -> State {
143        State::V1 {
144            checkpoints: self
145                .checkpoints
146                .iter()
147                .map(|entry| {
148                    let fingerprint = entry.key();
149                    let position = entry.value();
150                    Checkpoint {
151                        fingerprint: *fingerprint,
152                        position: *position,
153                        modified: self
154                            .modified_times
155                            .get(fingerprint)
156                            .map(|r| *r.value())
157                            .unwrap_or_else(Utc::now),
158                    }
159                })
160                .collect(),
161        }
162    }
163
164    fn maybe_upgrade(
165        &self,
166        path: &Path,
167        fng: FileFingerprint,
168        fingerprinter: &Fingerprinter,
169        fingerprint_buffer: &mut Vec<u8>,
170    ) {
171        if let Ok(Some(old_checksum)) = fingerprinter.get_bytes_checksum(path, fingerprint_buffer) {
172            self.update_key(old_checksum, fng)
173        }
174
175        if let Some((_, pos)) = self
176            .checkpoints
177            .remove(&FileFingerprint::Unknown(fng.as_legacy()))
178        {
179            self.update(fng, pos);
180        }
181
182        if self.checkpoints.get(&fng).is_none() {
183            if let Ok(Some(fingerprint)) =
184                fingerprinter.get_legacy_checksum(path, fingerprint_buffer)
185            {
186                if let Some((_, pos)) = self.checkpoints.remove(&fingerprint) {
187                    self.update(fng, pos);
188                }
189            }
190            if let Ok(Some(fingerprint)) =
191                fingerprinter.get_legacy_first_lines_checksum(path, fingerprint_buffer)
192            {
193                if let Some((_, pos)) = self.checkpoints.remove(&fingerprint) {
194                    self.update(fng, pos);
195                }
196            }
197        }
198    }
199}
200
201impl Checkpointer {
202    pub fn new(data_dir: &Path) -> Checkpointer {
203        let directory = data_dir.join("checkpoints");
204        let glob_string = directory.join("*").to_string_lossy().into_owned();
205        let tmp_file_path = data_dir.join(TMP_FILE_NAME);
206        let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME);
207
208        Checkpointer {
209            directory,
210            glob_string,
211            tmp_file_path,
212            stable_file_path,
213            checkpoints: Arc::new(CheckpointsView::default()),
214            last: Mutex::new(None),
215        }
216    }
217
218    pub fn view(&self) -> Arc<CheckpointsView> {
219        Arc::clone(&self.checkpoints)
220    }
221
222    /// Encode a fingerprint to a file name, including legacy Unknown values
223    ///
224    /// For each of the non-legacy variants, prepend an identifier byte that
225    /// falls outside of the hex range used by the legacy implementation. This
226    /// allows them to be differentiated by simply peeking at the first byte.
227    #[cfg(test)]
228    fn encode(&self, fng: FileFingerprint, pos: FilePosition) -> PathBuf {
229        use FileFingerprint::*;
230
231        let path = match fng {
232            BytesChecksum(c) => format!("g{c:x}.{pos}"),
233            FirstLinesChecksum(c) => format!("h{c:x}.{pos}"),
234            DevInode(dev, ino) => format!("i{dev:x}.{ino:x}.{pos}"),
235            Unknown(x) => format!("{x:x}.{pos}"),
236        };
237        self.directory.join(path)
238    }
239
240    /// Decode a fingerprint from a file name, accounting for unknowns due to the legacy
241    /// implementation.
242    ///
243    /// The trick here is to rely on the hex encoding of the legacy
244    /// format. Because hex encoding only allows [0-9a-f], we can use any
245    /// character outside of that range as a magic byte identifier for the newer
246    /// formats.
247    fn decode(&self, path: &Path) -> (FileFingerprint, FilePosition) {
248        use FileFingerprint::*;
249
250        let file_name = &path.file_name().unwrap().to_string_lossy();
251        match file_name.chars().next().expect("empty file name") {
252            'g' => {
253                let (c, pos) = scan_fmt!(file_name, "g{x}.{}", [hex u64], FilePosition).unwrap();
254                (BytesChecksum(c), pos)
255            }
256            'h' => {
257                let (c, pos) = scan_fmt!(file_name, "h{x}.{}", [hex u64], FilePosition).unwrap();
258                (FirstLinesChecksum(c), pos)
259            }
260            'i' => {
261                let (dev, ino, pos) =
262                    scan_fmt!(file_name, "i{x}.{x}.{}", [hex u64], [hex u64], FilePosition)
263                        .unwrap();
264                (DevInode(dev, ino), pos)
265            }
266            _ => {
267                let (c, pos) = scan_fmt!(file_name, "{x}.{}", [hex u64], FilePosition).unwrap();
268                (Unknown(c), pos)
269            }
270        }
271    }
272
273    #[cfg(test)]
274    pub fn update_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) {
275        self.checkpoints.update(fng, pos);
276    }
277
278    #[cfg(test)]
279    pub fn get_checkpoint(&self, fng: FileFingerprint) -> Option<FilePosition> {
280        self.checkpoints.get(fng)
281    }
282
283    /// Scan through a given list of fresh fingerprints to see if any match an existing legacy
284    /// fingerprint. If so, upgrade the existing fingerprint.
285    pub fn maybe_upgrade(
286        &mut self,
287        path: &Path,
288        fresh: FileFingerprint,
289        fingerprinter: &Fingerprinter,
290        fingerprint_buffer: &mut Vec<u8>,
291    ) {
292        self.checkpoints
293            .maybe_upgrade(path, fresh, fingerprinter, fingerprint_buffer)
294    }
295
296    /// Persist the current checkpoints state to disk, making our best effort to
297    /// do so in an atomic way that allow for recovering the previous state in
298    /// the event of a crash.
299    pub fn write_checkpoints(&self) -> Result<usize, io::Error> {
300        // First drop any checkpoints for files that were removed more than 60
301        // seconds ago. This keeps our working set as small as possible and
302        // makes sure we don't spend time and IO writing checkpoints that don't
303        // matter anymore.
304        self.checkpoints.remove_expired();
305
306        let current = self.checkpoints.get_state();
307
308        // Fetch last written state.
309        let mut last = self.last.lock().expect("Data poisoned.");
310        if last.as_ref() != Some(&current) {
311            // Write the new checkpoints to a tmp file and flush it fully to
312            // disk. If vector dies anywhere during this section, the existing
313            // stable file will still be in its current valid state and we'll be
314            // able to recover.
315            let mut f = io::BufWriter::new(fs::File::create(&self.tmp_file_path)?);
316            serde_json::to_writer(&mut f, &current)?;
317            f.into_inner()?.sync_all()?;
318
319            // Once the temp file is fully flushed, rename the tmp file to replace
320            // the previous stable file. This is an atomic operation on POSIX
321            // systems (and the stdlib claims to provide equivalent behavior on
322            // Windows), which should prevent scenarios where we don't have at least
323            // one full valid file to recover from.
324            fs::rename(&self.tmp_file_path, &self.stable_file_path)?;
325
326            *last = Some(current);
327        }
328
329        Ok(self.checkpoints.checkpoints.len())
330    }
331
332    /// Write checkpoints to disk in the legacy format. Used for compatibility
333    /// testing only.
334    #[cfg(test)]
335    pub fn write_legacy_checkpoints(&mut self) -> Result<usize, io::Error> {
336        fs::remove_dir_all(&self.directory).ok();
337        fs::create_dir_all(&self.directory)?;
338        for c in self.checkpoints.checkpoints.iter() {
339            fs::File::create(self.encode(*c.key(), *c.value()))?;
340        }
341        Ok(self.checkpoints.checkpoints.len())
342    }
343
344    /// Read persisted checkpoints from disk, preferring the new JSON file
345    /// format but falling back to the legacy system when those files are found
346    /// instead.
347    pub fn read_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
348        // First try reading from the tmp file location. If this works, it means
349        // that the previous process was interrupted in the process of
350        // checkpointing and the tmp file should contain more recent data that
351        // should be preferred.
352        match self.read_checkpoints_file(&self.tmp_file_path) {
353            Ok(state) => {
354                warn!(message = "Recovered checkpoint data from interrupted process.");
355                self.checkpoints.set_state(state, ignore_before);
356
357                // Try to move this tmp file to the stable location so we don't
358                // immediately overwrite it when we next persist checkpoints.
359                if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path) {
360                    warn!(message = "Error persisting recovered checkpoint file.", %error);
361                }
362                return;
363            }
364            Err(error) if error.kind() == io::ErrorKind::NotFound => {
365                // This is expected, so no warning needed
366            }
367            Err(error) => {
368                error!(message = "Unable to recover checkpoint data from interrupted process.", %error);
369            }
370        }
371
372        // Next, attempt to read checkpoints from the stable file location. This
373        // is the expected location, so warn more aggressively if something goes
374        // wrong.
375        match self.read_checkpoints_file(&self.stable_file_path) {
376            Ok(state) => {
377                info!(message = "Loaded checkpoint data.");
378                self.checkpoints.set_state(state, ignore_before);
379                return;
380            }
381            Err(error) if error.kind() == io::ErrorKind::NotFound => {
382                // This is expected, so no warning needed
383            }
384            Err(error) => {
385                warn!(message = "Unable to load checkpoint data.", %error);
386                return;
387            }
388        }
389
390        // If we haven't returned yet, go ahead and look for the legacy files
391        // and try to read them.
392        info!("Attempting to read legacy checkpoint files.");
393        self.read_legacy_checkpoints(ignore_before);
394
395        if self.write_checkpoints().is_ok() {
396            fs::remove_dir_all(&self.directory).ok();
397        }
398    }
399
400    fn read_checkpoints_file(&self, path: &Path) -> Result<State, io::Error> {
401        let reader = io::BufReader::new(fs::File::open(path)?);
402        serde_json::from_reader(reader).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
403    }
404
405    fn read_legacy_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
406        for path in glob(&self.glob_string).unwrap().flatten() {
407            let mut mtime = None;
408            if let Some(ignore_before) = ignore_before {
409                if let Ok(Ok(modified)) = fs::metadata(&path).map(|metadata| metadata.modified()) {
410                    let modified = DateTime::<Utc>::from(modified);
411                    if modified < ignore_before {
412                        fs::remove_file(path).ok();
413                        continue;
414                    }
415                    mtime = Some(modified);
416                }
417            }
418            let (fng, pos) = self.decode(&path);
419            self.checkpoints.checkpoints.insert(fng, pos);
420            if let Some(mtime) = mtime {
421                self.checkpoints.modified_times.insert(fng, mtime);
422            }
423        }
424    }
425}
426
427#[cfg(test)]
428mod test {
429    use chrono::{Duration, Utc};
430    use similar_asserts::assert_eq;
431    use tempfile::tempdir;
432
433    use super::{
434        super::{FingerprintStrategy, Fingerprinter},
435        Checkpoint, Checkpointer, FileFingerprint, FilePosition, CHECKPOINT_FILE_NAME,
436        TMP_FILE_NAME,
437    };
438
439    #[test]
440    fn test_checkpointer_basics() {
441        let fingerprints = vec![
442            FileFingerprint::DevInode(1, 2),
443            FileFingerprint::BytesChecksum(3456),
444            FileFingerprint::FirstLinesChecksum(78910),
445            FileFingerprint::Unknown(1337),
446        ];
447        for fingerprint in fingerprints {
448            let position: FilePosition = 1234;
449            let data_dir = tempdir().unwrap();
450            let mut chkptr = Checkpointer::new(data_dir.path());
451            assert_eq!(
452                chkptr.decode(&chkptr.encode(fingerprint, position)),
453                (fingerprint, position)
454            );
455            chkptr.update_checkpoint(fingerprint, position);
456            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
457        }
458    }
459
460    #[test]
461    fn test_checkpointer_ignore_before() {
462        let newer = (
463            FileFingerprint::DevInode(1, 2),
464            Utc::now() - Duration::seconds(5),
465        );
466        let newish = (
467            FileFingerprint::BytesChecksum(3456),
468            Utc::now() - Duration::seconds(10),
469        );
470        let oldish = (
471            FileFingerprint::FirstLinesChecksum(78910),
472            Utc::now() - Duration::seconds(15),
473        );
474        let older = (
475            FileFingerprint::Unknown(1337),
476            Utc::now() - Duration::seconds(20),
477        );
478        let ignore_before = Some(Utc::now() - Duration::seconds(12));
479
480        let position: FilePosition = 1234;
481        let data_dir = tempdir().unwrap();
482
483        // load and persist the checkpoints
484        {
485            let chkptr = Checkpointer::new(data_dir.path());
486
487            for (fingerprint, modified) in &[&newer, &newish, &oldish, &older] {
488                chkptr.checkpoints.load(Checkpoint {
489                    fingerprint: *fingerprint,
490                    position,
491                    modified: *modified,
492                });
493                assert_eq!(chkptr.get_checkpoint(*fingerprint), Some(position));
494                chkptr.write_checkpoints().unwrap();
495            }
496        }
497
498        // read them back and assert old are removed
499        {
500            let mut chkptr = Checkpointer::new(data_dir.path());
501            chkptr.read_checkpoints(ignore_before);
502
503            assert_eq!(chkptr.get_checkpoint(newish.0), Some(position));
504            assert_eq!(chkptr.get_checkpoint(newer.0), Some(position));
505            assert_eq!(chkptr.get_checkpoint(oldish.0), None);
506            assert_eq!(chkptr.get_checkpoint(older.0), None);
507        }
508    }
509
510    #[test]
511    fn test_checkpointer_restart() {
512        let fingerprints = vec![
513            FileFingerprint::DevInode(1, 2),
514            FileFingerprint::BytesChecksum(3456),
515            FileFingerprint::FirstLinesChecksum(78910),
516            FileFingerprint::Unknown(1337),
517        ];
518        for fingerprint in fingerprints {
519            let position: FilePosition = 1234;
520            let data_dir = tempdir().unwrap();
521            {
522                let mut chkptr = Checkpointer::new(data_dir.path());
523                chkptr.update_checkpoint(fingerprint, position);
524                assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
525                chkptr.write_checkpoints().ok();
526            }
527            {
528                let mut chkptr = Checkpointer::new(data_dir.path());
529                assert_eq!(chkptr.get_checkpoint(fingerprint), None);
530                chkptr.read_checkpoints(None);
531                assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
532            }
533        }
534    }
535
536    #[test]
537    fn test_checkpointer_fingerprint_upgrades_unknown() {
538        let log_dir = tempdir().unwrap();
539        let path = log_dir.path().join("test.log");
540        let data = "hello\n";
541        std::fs::write(&path, data).unwrap();
542
543        let new_fingerprint = FileFingerprint::DevInode(1, 2);
544        let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.as_legacy());
545        let position: FilePosition = 1234;
546        let fingerprinter = Fingerprinter {
547            strategy: FingerprintStrategy::DevInode,
548            max_line_length: 1000,
549            ignore_not_found: false,
550        };
551
552        let mut buf = Vec::new();
553
554        let data_dir = tempdir().unwrap();
555        {
556            let mut chkptr = Checkpointer::new(data_dir.path());
557            chkptr.update_checkpoint(old_fingerprint, position);
558            assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
559            chkptr.write_checkpoints().ok();
560        }
561        {
562            let mut chkptr = Checkpointer::new(data_dir.path());
563            chkptr.read_checkpoints(None);
564            assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
565
566            chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);
567
568            assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
569            assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
570        }
571    }
572
573    #[test]
574    fn test_checkpointer_fingerprint_upgrades_legacy_checksum() {
575        let log_dir = tempdir().unwrap();
576        let path = log_dir.path().join("test.log");
577        let data = "hello\n";
578        std::fs::write(&path, data).unwrap();
579
580        let old_fingerprint = FileFingerprint::FirstLinesChecksum(18057733963141331840);
581        let new_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
582        let position: FilePosition = 6;
583
584        let fingerprinter = Fingerprinter {
585            strategy: FingerprintStrategy::FirstLinesChecksum {
586                ignored_header_bytes: 0,
587                lines: 1,
588            },
589            max_line_length: 102400,
590            ignore_not_found: false,
591        };
592
593        let mut buf = Vec::new();
594
595        let data_dir = tempdir().unwrap();
596        {
597            let mut chkptr = Checkpointer::new(data_dir.path());
598            chkptr.update_checkpoint(old_fingerprint, position);
599            assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
600            chkptr.write_checkpoints().ok();
601        }
602        {
603            let mut chkptr = Checkpointer::new(data_dir.path());
604            chkptr.read_checkpoints(None);
605            assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
606
607            chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);
608
609            assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
610            assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
611        }
612    }
613
614    #[test]
615    fn test_checkpointer_fingerprint_upgrades_legacy_first_lines_checksum() {
616        let log_dir = tempdir().unwrap();
617        let path = log_dir.path().join("test.log");
618        let data = "hello\n";
619        std::fs::write(&path, data).unwrap();
620
621        let old_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
622        let new_fingerprint = FileFingerprint::FirstLinesChecksum(11081174131906673079);
623        let position: FilePosition = 6;
624
625        let fingerprinter = Fingerprinter {
626            strategy: FingerprintStrategy::FirstLinesChecksum {
627                ignored_header_bytes: 0,
628                lines: 1,
629            },
630            max_line_length: 102400,
631            ignore_not_found: false,
632        };
633
634        let mut buf = Vec::new();
635
636        let data_dir = tempdir().unwrap();
637        {
638            let mut chkptr = Checkpointer::new(data_dir.path());
639            chkptr.update_checkpoint(old_fingerprint, position);
640            assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
641            chkptr.write_checkpoints().ok();
642        }
643        {
644            let mut chkptr = Checkpointer::new(data_dir.path());
645            chkptr.read_checkpoints(None);
646            assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
647
648            chkptr.maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf);
649
650            assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
651            assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
652        }
653    }
654
655    #[test]
656    fn test_checkpointer_file_upgrades() {
657        let fingerprint = FileFingerprint::DevInode(1, 2);
658        let position: FilePosition = 1234;
659
660        let data_dir = tempdir().unwrap();
661
662        // Write out checkpoints in the legacy file format
663        {
664            let mut chkptr = Checkpointer::new(data_dir.path());
665            chkptr.update_checkpoint(fingerprint, position);
666            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
667            chkptr.write_legacy_checkpoints().unwrap();
668        }
669
670        // Ensure that the new files were not written but the old style of files were
671        assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
672        assert!(!data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
673        assert!(data_dir.path().join("checkpoints").is_dir());
674
675        // Read from those old files, ensure the checkpoints were loaded properly, and then write
676        // them normally (i.e. in the new format)
677        {
678            let mut chkptr = Checkpointer::new(data_dir.path());
679            chkptr.read_checkpoints(None);
680            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
681            chkptr.write_checkpoints().unwrap();
682        }
683
684        // Ensure that the stable file is present, the tmp file is not, and the legacy files have
685        // been cleaned up
686        assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
687        assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
688        assert!(!data_dir.path().join("checkpoints").is_dir());
689
690        // Ensure one last time that we can reread from the new files and get the same result
691        {
692            let mut chkptr = Checkpointer::new(data_dir.path());
693            chkptr.read_checkpoints(None);
694            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
695        }
696    }
697
698    #[test]
699    fn test_checkpointer_expiration() {
700        let cases = vec![
701            // (checkpoint, position, seconds since removed)
702            (FileFingerprint::BytesChecksum(123), 0, 30),
703            (FileFingerprint::BytesChecksum(456), 1, 60),
704            (FileFingerprint::BytesChecksum(789), 2, 90),
705            (FileFingerprint::BytesChecksum(101112), 3, 120),
706        ];
707
708        let data_dir = tempdir().unwrap();
709        let mut chkptr = Checkpointer::new(data_dir.path());
710
711        for (fingerprint, position, removed) in cases.clone() {
712            chkptr.update_checkpoint(fingerprint, position);
713
714            // slide these in manually so we don't have to sleep for a long time
715            chkptr
716                .checkpoints
717                .removed_times
718                .insert(fingerprint, Utc::now() - chrono::Duration::seconds(removed));
719
720            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
721        }
722
723        // Update one that would otherwise be expired to ensure it sticks around
724        chkptr.update_checkpoint(cases[2].0, 42);
725
726        // Expiration is piggybacked on the persistence interval, so do a write to trigger it
727        chkptr.write_checkpoints().unwrap();
728
729        assert_eq!(chkptr.get_checkpoint(cases[0].0), Some(0));
730        assert_eq!(chkptr.get_checkpoint(cases[1].0), None);
731        assert_eq!(chkptr.get_checkpoint(cases[2].0), Some(42));
732        assert_eq!(chkptr.get_checkpoint(cases[3].0), None);
733    }
734
735    #[test]
736    fn test_checkpointer_checksum_updates() {
737        let data_dir = tempdir().unwrap();
738
739        let fingerprinter = crate::Fingerprinter {
740            strategy: crate::FingerprintStrategy::Checksum {
741                bytes: 16,
742                ignored_header_bytes: 0,
743                lines: 1,
744            },
745            max_line_length: 1024,
746            ignore_not_found: false,
747        };
748
749        let log_path = data_dir.path().join("test.log");
750        let contents = "hello i am a test log line that is just long enough but not super long\n";
751        std::fs::write(&log_path, contents).expect("writing test data");
752
753        let mut buf = vec![0; 1024];
754        let old = fingerprinter
755            .get_bytes_checksum(&log_path, &mut buf)
756            .expect("getting old checksum")
757            .expect("still getting old checksum");
758
759        let new = fingerprinter
760            .get_fingerprint_of_file(&log_path, &mut buf)
761            .expect("getting new checksum");
762
763        // make sure each is of the expected type and that the inner values are not the same
764        match (old, new) {
765            (FileFingerprint::BytesChecksum(old), FileFingerprint::FirstLinesChecksum(new)) => {
766                assert_ne!(old, new)
767            }
768            _ => panic!("unexpected checksum types"),
769        }
770
771        let mut chkptr = Checkpointer::new(data_dir.path());
772
773        // pretend that we had loaded this old style checksum from disk after an upgrade
774        chkptr.update_checkpoint(old, 1234);
775
776        assert!(chkptr.checkpoints.contains_bytes_checksums());
777
778        chkptr.maybe_upgrade(&log_path, new, &fingerprinter, &mut buf);
779
780        assert!(!chkptr.checkpoints.contains_bytes_checksums());
781        assert_eq!(Some(1234), chkptr.get_checkpoint(new));
782        assert_eq!(None, chkptr.get_checkpoint(old));
783    }
784
785    // guards against accidental changes to the checkpoint serialization
786    #[test]
787    fn test_checkpointer_serialization() {
788        let fingerprints = vec![
789            (
790                FileFingerprint::DevInode(1, 2),
791                r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
792            ),
793            (
794                FileFingerprint::BytesChecksum(3456),
795                r#"{"version":"1","checkpoints":[{"fingerprint":{"checksum":3456},"position":1234}]}"#,
796            ),
797            (
798                FileFingerprint::FirstLinesChecksum(78910),
799                r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
800            ),
801            (
802                FileFingerprint::Unknown(1337),
803                r#"{"version":"1","checkpoints":[{"fingerprint":{"unknown":1337},"position":1234}]}"#,
804            ),
805        ];
806        for (fingerprint, expected) in fingerprints {
807            let expected: serde_json::Value = serde_json::from_str(expected).unwrap();
808
809            let position: FilePosition = 1234;
810            let data_dir = tempdir().unwrap();
811            let mut chkptr = Checkpointer::new(data_dir.path());
812
813            chkptr.update_checkpoint(fingerprint, position);
814            chkptr.write_checkpoints().unwrap();
815
816            let got: serde_json::Value = {
817                let s =
818                    std::fs::read_to_string(data_dir.path().join(CHECKPOINT_FILE_NAME)).unwrap();
819                let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
820                for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
821                    checkpoint.as_object_mut().unwrap().remove("modified");
822                }
823                checkpoints
824            };
825
826            assert_eq!(expected, got);
827        }
828    }
829
830    // guards against accidental changes to the checkpoint deserialization and tests deserializing
831    // old checkpoint versions
832    #[test]
833    fn test_checkpointer_deserialization() {
834        let serialized_checkpoints = r#"
835{
836  "version": "1",
837  "checkpoints": [
838    {
839      "fingerprint": { "dev_inode": [ 1, 2 ] },
840      "position": 1234,
841      "modified": "2021-07-12T18:19:11.769003Z"
842    },
843    {
844      "fingerprint": { "checksum": 3456 },
845      "position": 1234,
846      "modified": "2021-07-12T18:19:11.769003Z"
847    },
848    {
849      "fingerprint": { "first_line_checksum": 1234 },
850      "position": 1234,
851      "modified": "2021-07-12T18:19:11.769003Z"
852    },
853    {
854      "fingerprint": { "first_lines_checksum": 78910 },
855      "position": 1234,
856      "modified": "2021-07-12T18:19:11.769003Z"
857    },
858    {
859      "fingerprint": { "unknown": 1337 },
860      "position": 1234,
861      "modified": "2021-07-12T18:19:11.769003Z"
862    }
863  ]
864}
865        "#;
866        let fingerprints = vec![
867            FileFingerprint::DevInode(1, 2),
868            FileFingerprint::BytesChecksum(3456),
869            FileFingerprint::FirstLinesChecksum(1234),
870            FileFingerprint::FirstLinesChecksum(78910),
871            FileFingerprint::Unknown(1337),
872        ];
873
874        let data_dir = tempdir().unwrap();
875
876        let mut chkptr = Checkpointer::new(data_dir.path());
877
878        std::fs::write(
879            data_dir.path().join(CHECKPOINT_FILE_NAME),
880            serialized_checkpoints,
881        )
882        .unwrap();
883
884        chkptr.read_checkpoints(None);
885
886        for fingerprint in fingerprints {
887            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
888        }
889    }
890}