file_source_common/
checkpointer.rs

1use std::{
2    collections::BTreeSet,
3    io,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use glob::glob;
11use serde::{Deserialize, Serialize};
12use tokio::{
13    fs::{self, File},
14    io::{AsyncReadExt, BufReader},
15    sync::Mutex,
16};
17use tracing::{error, info, warn};
18
19use super::{
20    FilePosition,
21    fingerprinter::{FileFingerprint, Fingerprinter},
22};
23
24const TMP_FILE_NAME: &str = "checkpoints.new.json";
25pub const CHECKPOINT_FILE_NAME: &str = "checkpoints.json";
26
27/// This enum represents the file format of checkpoints persisted to disk. Right
28/// now there is only one variant, but any incompatible changes will require and
29/// additional variant to be added here and handled anywhere that we transit
30/// this format.
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
32#[serde(tag = "version", rename_all = "snake_case")]
33enum State {
34    #[serde(rename = "1")]
35    V1 { checkpoints: BTreeSet<Checkpoint> },
36}
37
38/// A simple JSON-friendly struct of the fingerprint/position pair, since
39/// fingerprints as objects cannot be keys in a plain JSON map.
40#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
41#[serde(rename_all = "snake_case")]
42struct Checkpoint {
43    fingerprint: FileFingerprint,
44    position: FilePosition,
45    modified: DateTime<Utc>,
46}
47
48pub struct Checkpointer {
49    directory: PathBuf,
50    tmp_file_path: PathBuf,
51    stable_file_path: PathBuf,
52    glob_string: String,
53    checkpoints: Arc<CheckpointsView>,
54    last: Mutex<Option<State>>,
55}
56
57/// A thread-safe handle for reading and writing checkpoints in-memory across
58/// multiple threads.
59#[derive(Debug, Default)]
60pub struct CheckpointsView {
61    checkpoints: DashMap<FileFingerprint, FilePosition>,
62    modified_times: DashMap<FileFingerprint, DateTime<Utc>>,
63    removed_times: DashMap<FileFingerprint, DateTime<Utc>>,
64}
65
66impl CheckpointsView {
67    pub fn update(&self, fng: FileFingerprint, pos: FilePosition) {
68        self.checkpoints.insert(fng, pos);
69        self.modified_times.insert(fng, Utc::now());
70        self.removed_times.remove(&fng);
71    }
72
73    pub fn get(&self, fng: FileFingerprint) -> Option<FilePosition> {
74        self.checkpoints.get(&fng).map(|r| *r.value())
75    }
76
77    pub fn set_dead(&self, fng: FileFingerprint) {
78        self.removed_times.insert(fng, Utc::now());
79    }
80
81    pub fn update_key(&self, old: FileFingerprint, new: FileFingerprint) {
82        if let Some((_, value)) = self.checkpoints.remove(&old) {
83            self.checkpoints.insert(new, value);
84        }
85
86        if let Some((_, value)) = self.modified_times.remove(&old) {
87            self.modified_times.insert(new, value);
88        }
89
90        if let Some((_, value)) = self.removed_times.remove(&old) {
91            self.removed_times.insert(new, value);
92        }
93    }
94
95    pub fn contains_bytes_checksums(&self) -> bool {
96        self.checkpoints
97            .iter()
98            .any(|entry| matches!(entry.key(), FileFingerprint::BytesChecksum(_)))
99    }
100
101    pub fn remove_expired(&self) {
102        let now = Utc::now();
103
104        // Collect all of the expired keys. Removing them while iterating can
105        // lead to deadlocks, the set should be small, and this is not a
106        // performance-sensitive path.
107        let to_remove = self
108            .removed_times
109            .iter()
110            .filter(|entry| {
111                let ts = entry.value();
112                let duration = now - *ts;
113                duration >= chrono::Duration::seconds(60)
114            })
115            .map(|entry| *entry.key())
116            .collect::<Vec<FileFingerprint>>();
117
118        for fng in to_remove {
119            self.checkpoints.remove(&fng);
120            self.modified_times.remove(&fng);
121            self.removed_times.remove(&fng);
122        }
123    }
124
125    fn load(&self, checkpoint: Checkpoint) {
126        self.checkpoints
127            .insert(checkpoint.fingerprint, checkpoint.position);
128        self.modified_times
129            .insert(checkpoint.fingerprint, checkpoint.modified);
130    }
131
132    fn set_state(&self, state: State, ignore_before: Option<DateTime<Utc>>) {
133        match state {
134            State::V1 { checkpoints } => {
135                for checkpoint in checkpoints {
136                    if let Some(ignore_before) = ignore_before
137                        && checkpoint.modified < ignore_before
138                    {
139                        continue;
140                    }
141                    self.load(checkpoint);
142                }
143            }
144        }
145    }
146
147    fn get_state(&self) -> State {
148        State::V1 {
149            checkpoints: self
150                .checkpoints
151                .iter()
152                .map(|entry| {
153                    let fingerprint = entry.key();
154                    let position = entry.value();
155                    Checkpoint {
156                        fingerprint: *fingerprint,
157                        position: *position,
158                        modified: self
159                            .modified_times
160                            .get(fingerprint)
161                            .map(|r| *r.value())
162                            .unwrap_or_else(Utc::now),
163                    }
164                })
165                .collect(),
166        }
167    }
168
169    async fn maybe_upgrade(
170        &self,
171        path: &Path,
172        fng: FileFingerprint,
173        fingerprinter: &Fingerprinter,
174        fingerprint_buffer: &mut Vec<u8>,
175    ) {
176        if let Ok(Some(old_checksum)) = fingerprinter
177            .get_bytes_checksum(path, fingerprint_buffer)
178            .await
179        {
180            self.update_key(old_checksum, fng)
181        }
182
183        if let Some((_, pos)) = self
184            .checkpoints
185            .remove(&FileFingerprint::Unknown(fng.as_legacy().await))
186        {
187            self.update(fng, pos);
188        }
189
190        if self.checkpoints.get(&fng).is_none() {
191            if let Ok(Some(fingerprint)) = fingerprinter
192                .get_legacy_checksum(path, fingerprint_buffer)
193                .await
194                && let Some((_, pos)) = self.checkpoints.remove(&fingerprint)
195            {
196                self.update(fng, pos);
197            }
198            if let Ok(Some(fingerprint)) = fingerprinter
199                .get_legacy_first_lines_checksum(path, fingerprint_buffer)
200                .await
201                && let Some((_, pos)) = self.checkpoints.remove(&fingerprint)
202            {
203                self.update(fng, pos);
204            }
205        }
206    }
207}
208
209impl Checkpointer {
210    pub fn new(data_dir: &Path) -> Checkpointer {
211        let directory = data_dir.join("checkpoints");
212        let glob_string = directory.join("*").to_string_lossy().into_owned();
213        let tmp_file_path = data_dir.join(TMP_FILE_NAME);
214        let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME);
215
216        Checkpointer {
217            directory,
218            glob_string,
219            tmp_file_path,
220            stable_file_path,
221            checkpoints: Arc::new(CheckpointsView::default()),
222            last: Mutex::new(None),
223        }
224    }
225
226    pub fn view(&self) -> Arc<CheckpointsView> {
227        Arc::clone(&self.checkpoints)
228    }
229
230    /// Encode a fingerprint to a file name, including legacy Unknown values
231    ///
232    /// For each of the non-legacy variants, prepend an identifier byte that
233    /// falls outside of the hex range used by the legacy implementation. This
234    /// allows them to be differentiated by simply peeking at the first byte.
235    #[cfg(test)]
236    fn encode(&self, fng: FileFingerprint, pos: FilePosition) -> PathBuf {
237        use FileFingerprint::*;
238
239        let path = match fng {
240            BytesChecksum(c) => format!("g{c:x}.{pos}"),
241            FirstLinesChecksum(c) => format!("h{c:x}.{pos}"),
242            DevInode(dev, ino) => format!("i{dev:x}.{ino:x}.{pos}"),
243            Unknown(x) => format!("{x:x}.{pos}"),
244        };
245        self.directory.join(path)
246    }
247
248    /// Decode a fingerprint from a file name, accounting for unknowns due to the legacy
249    /// implementation.
250    ///
251    /// The trick here is to rely on the hex encoding of the legacy
252    /// format. Because hex encoding only allows [0-9a-f], we can use any
253    /// character outside of that range as a magic byte identifier for the newer
254    /// formats.
255    fn decode(&self, path: &Path) -> (FileFingerprint, FilePosition) {
256        use FileFingerprint::*;
257
258        let file_name = &path.file_name().unwrap().to_string_lossy();
259        match file_name.chars().next().expect("empty file name") {
260            'g' => {
261                let (c, pos) = scan_fmt!(file_name, "g{x}.{}", [hex u64], FilePosition).unwrap();
262                (BytesChecksum(c), pos)
263            }
264            'h' => {
265                let (c, pos) = scan_fmt!(file_name, "h{x}.{}", [hex u64], FilePosition).unwrap();
266                (FirstLinesChecksum(c), pos)
267            }
268            'i' => {
269                let (dev, ino, pos) =
270                    scan_fmt!(file_name, "i{x}.{x}.{}", [hex u64], [hex u64], FilePosition)
271                        .unwrap();
272                (DevInode(dev, ino), pos)
273            }
274            _ => {
275                let (c, pos) = scan_fmt!(file_name, "{x}.{}", [hex u64], FilePosition).unwrap();
276                (Unknown(c), pos)
277            }
278        }
279    }
280
281    #[cfg(test)]
282    pub fn update_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) {
283        self.checkpoints.update(fng, pos);
284    }
285
286    #[cfg(test)]
287    pub fn get_checkpoint(&self, fng: FileFingerprint) -> Option<FilePosition> {
288        self.checkpoints.get(fng)
289    }
290
291    /// Scan through a given list of fresh fingerprints to see if any match an existing legacy
292    /// fingerprint. If so, upgrade the existing fingerprint.
293    pub async fn maybe_upgrade(
294        &mut self,
295        path: &Path,
296        fresh: FileFingerprint,
297        fingerprinter: &Fingerprinter,
298        fingerprint_buffer: &mut Vec<u8>,
299    ) {
300        self.checkpoints
301            .maybe_upgrade(path, fresh, fingerprinter, fingerprint_buffer)
302            .await
303    }
304
305    /// Persist the current checkpoints state to disk, making our best effort to
306    /// do so in an atomic way that allow for recovering the previous state in
307    /// the event of a crash.
308    pub async fn write_checkpoints(&self) -> Result<usize, io::Error> {
309        // First drop any checkpoints for files that were removed more than 60
310        // seconds ago. This keeps our working set as small as possible and
311        // makes sure we don't spend time and IO writing checkpoints that don't
312        // matter anymore.
313        self.checkpoints.remove_expired();
314
315        let current = self.checkpoints.get_state();
316
317        // Fetch last written state.
318        let mut last = self.last.lock().await;
319        if last.as_ref() != Some(&current) {
320            // Write the new checkpoints to a tmp file and flush it fully to
321            // disk. If vector dies anywhere during this section, the existing
322            // stable file will still be in its current valid state and we'll be
323            // able to recover.
324            let tmp_file_path = self.tmp_file_path.clone();
325
326            // spawn_blocking shouldn't be needed: https://github.com/vectordotdev/vector/issues/23743
327            let current = tokio::task::spawn_blocking(move || -> Result<State, io::Error> {
328                let mut f = std::io::BufWriter::new(std::fs::File::create(tmp_file_path)?);
329                serde_json::to_writer(&mut f, &current)?;
330                f.into_inner()?.sync_all()?;
331                Ok(current)
332            })
333            .await
334            .map_err(io::Error::other)??;
335
336            // Once the temp file is fully flushed, rename the tmp file to replace
337            // the previous stable file. This is an atomic operation on POSIX
338            // systems (and the stdlib claims to provide equivalent behavior on
339            // Windows), which should prevent scenarios where we don't have at least
340            // one full valid file to recover from.
341            fs::rename(&self.tmp_file_path, &self.stable_file_path).await?;
342
343            *last = Some(current);
344        }
345
346        Ok(self.checkpoints.checkpoints.len())
347    }
348
349    /// Write checkpoints to disk in the legacy format. Used for compatibility
350    /// testing only.
351    #[cfg(test)]
352    pub async fn write_legacy_checkpoints(&mut self) -> Result<usize, io::Error> {
353        use tokio::fs::File;
354
355        fs::remove_dir_all(&self.directory).await.ok();
356        fs::create_dir_all(&self.directory).await?;
357        for c in self.checkpoints.checkpoints.iter() {
358            File::create(self.encode(*c.key(), *c.value())).await?;
359        }
360        Ok(self.checkpoints.checkpoints.len())
361    }
362
363    /// Read persisted checkpoints from disk, preferring the new JSON file
364    /// format but falling back to the legacy system when those files are found
365    /// instead.
366    pub async fn read_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
367        // First try reading from the tmp file location. If this works, it means
368        // that the previous process was interrupted in the process of
369        // checkpointing and the tmp file should contain more recent data that
370        // should be preferred.
371        match self.read_checkpoints_file(&self.tmp_file_path).await {
372            Ok(state) => {
373                warn!(message = "Recovered checkpoint data from interrupted process.");
374                self.checkpoints.set_state(state, ignore_before);
375
376                // Try to move this tmp file to the stable location so we don't
377                // immediately overwrite it when we next persist checkpoints.
378                if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path).await {
379                    warn!(message = "Error persisting recovered checkpoint file.", %error);
380                }
381                return;
382            }
383            Err(error) if error.kind() == io::ErrorKind::NotFound => {
384                // This is expected, so no warning needed
385            }
386            Err(error) => {
387                error!(message = "Unable to recover checkpoint data from interrupted process.", %error);
388            }
389        }
390
391        // Next, attempt to read checkpoints from the stable file location. This
392        // is the expected location, so warn more aggressively if something goes
393        // wrong.
394        match self.read_checkpoints_file(&self.stable_file_path).await {
395            Ok(state) => {
396                info!(message = "Loaded checkpoint data.");
397                self.checkpoints.set_state(state, ignore_before);
398                return;
399            }
400            Err(error) if error.kind() == io::ErrorKind::NotFound => {
401                // This is expected, so no warning needed
402            }
403            Err(error) => {
404                warn!(message = "Unable to load checkpoint data.", %error);
405                return;
406            }
407        }
408
409        // If we haven't returned yet, go ahead and look for the legacy files
410        // and try to read them.
411        info!("Attempting to read legacy checkpoint files.");
412        self.read_legacy_checkpoints(ignore_before).await;
413
414        if self.write_checkpoints().await.is_ok() {
415            fs::remove_dir_all(&self.directory).await.ok();
416        }
417    }
418
419    async fn read_checkpoints_file(&self, path: &Path) -> Result<State, io::Error> {
420        // Possible optimization: mmap the file into a slice and pass it into serde_json instead of
421        // calling read_to_end. Need to investigate if this would work with tokio::fs::File
422
423        let mut reader = BufReader::new(File::open(path).await?);
424        let mut output = Vec::new();
425        reader.read_to_end(&mut output).await?;
426
427        serde_json::from_slice(&output[..])
428            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
429    }
430
431    async fn read_legacy_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
432        for path in glob(&self.glob_string).unwrap().flatten() {
433            let mut mtime = None;
434            if let Some(ignore_before) = ignore_before
435                && let Ok(Ok(modified)) = fs::metadata(&path)
436                    .await
437                    .map(|metadata| metadata.modified())
438            {
439                let modified = DateTime::<Utc>::from(modified);
440                if modified < ignore_before {
441                    fs::remove_file(path).await.ok();
442                    continue;
443                }
444                mtime = Some(modified);
445            }
446            let (fng, pos) = self.decode(&path);
447            self.checkpoints.checkpoints.insert(fng, pos);
448            if let Some(mtime) = mtime {
449                self.checkpoints.modified_times.insert(fng, mtime);
450            }
451        }
452    }
453}
454
455#[cfg(test)]
456mod test {
457    use chrono::{Duration, Utc};
458    use similar_asserts::assert_eq;
459    use tempfile::tempdir;
460    use tokio::fs;
461
462    use super::{
463        super::{FingerprintStrategy, Fingerprinter},
464        CHECKPOINT_FILE_NAME, Checkpoint, Checkpointer, FileFingerprint, FilePosition,
465        TMP_FILE_NAME,
466    };
467
468    #[test]
469    fn test_checkpointer_basics() {
470        let fingerprints = vec![
471            FileFingerprint::DevInode(1, 2),
472            FileFingerprint::BytesChecksum(3456),
473            FileFingerprint::FirstLinesChecksum(78910),
474            FileFingerprint::Unknown(1337),
475        ];
476        for fingerprint in fingerprints {
477            let position: FilePosition = 1234;
478            let data_dir = tempdir().unwrap();
479            let mut chkptr = Checkpointer::new(data_dir.path());
480            assert_eq!(
481                chkptr.decode(&chkptr.encode(fingerprint, position)),
482                (fingerprint, position)
483            );
484            chkptr.update_checkpoint(fingerprint, position);
485            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
486        }
487    }
488
489    #[tokio::test]
490    async fn test_checkpointer_ignore_before() {
491        let now = Utc::now();
492        let newer = (FileFingerprint::DevInode(1, 2), now - Duration::seconds(5));
493        let newish = (
494            FileFingerprint::BytesChecksum(3456),
495            now - Duration::seconds(10),
496        );
497        let oldish = (
498            FileFingerprint::FirstLinesChecksum(78910),
499            now - Duration::seconds(15),
500        );
501        let older = (FileFingerprint::Unknown(1337), now - Duration::seconds(20));
502        let ignore_before = Some(now - Duration::seconds(12));
503
504        let position: FilePosition = 1234;
505        let data_dir = tempdir().unwrap();
506
507        // load and persist the checkpoints
508        {
509            let chkptr = Checkpointer::new(data_dir.path());
510
511            for (fingerprint, modified) in &[&newer, &newish, &oldish, &older] {
512                chkptr.checkpoints.load(Checkpoint {
513                    fingerprint: *fingerprint,
514                    position,
515                    modified: *modified,
516                });
517                assert_eq!(chkptr.get_checkpoint(*fingerprint), Some(position));
518                chkptr.write_checkpoints().await.unwrap();
519            }
520        }
521
522        // read them back and assert old are removed
523        {
524            let mut chkptr = Checkpointer::new(data_dir.path());
525            chkptr.read_checkpoints(ignore_before).await;
526
527            assert_eq!(chkptr.get_checkpoint(newish.0), Some(position));
528            assert_eq!(chkptr.get_checkpoint(newer.0), Some(position));
529            assert_eq!(chkptr.get_checkpoint(oldish.0), None);
530            assert_eq!(chkptr.get_checkpoint(older.0), None);
531        }
532    }
533
534    #[tokio::test]
535    async fn test_checkpointer_restart() {
536        let fingerprints = vec![
537            FileFingerprint::DevInode(1, 2),
538            FileFingerprint::BytesChecksum(3456),
539            FileFingerprint::FirstLinesChecksum(78910),
540            FileFingerprint::Unknown(1337),
541        ];
542        for fingerprint in fingerprints {
543            let position: FilePosition = 1234;
544            let data_dir = tempdir().unwrap();
545            {
546                let mut chkptr = Checkpointer::new(data_dir.path());
547                chkptr.update_checkpoint(fingerprint, position);
548                assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
549                chkptr.write_checkpoints().await.unwrap();
550            }
551            {
552                let mut chkptr = Checkpointer::new(data_dir.path());
553                assert_eq!(chkptr.get_checkpoint(fingerprint), None);
554                chkptr.read_checkpoints(None).await;
555                assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
556            }
557        }
558    }
559
560    #[tokio::test]
561    async fn test_checkpointer_fingerprint_upgrades_unknown() {
562        let log_dir = tempdir().unwrap();
563        let path = log_dir.path().join("test.log");
564        let data = "hello\n";
565        fs::write(&path, data).await.unwrap();
566
567        let new_fingerprint = FileFingerprint::DevInode(1, 2);
568        let old_fingerprint = FileFingerprint::Unknown(new_fingerprint.as_legacy().await);
569        let position: FilePosition = 1234;
570        let fingerprinter = Fingerprinter {
571            strategy: FingerprintStrategy::DevInode,
572            max_line_length: 1000,
573            ignore_not_found: false,
574        };
575
576        let mut buf = Vec::new();
577
578        let data_dir = tempdir().unwrap();
579        {
580            let mut chkptr = Checkpointer::new(data_dir.path());
581            chkptr.update_checkpoint(old_fingerprint, position);
582            assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
583            chkptr.write_checkpoints().await.unwrap();
584        }
585        {
586            let mut chkptr = Checkpointer::new(data_dir.path());
587            chkptr.read_checkpoints(None).await;
588            assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
589
590            chkptr
591                .maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf)
592                .await;
593
594            assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
595            assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
596        }
597    }
598
599    #[tokio::test]
600    async fn test_checkpointer_fingerprint_upgrades_legacy_checksum() {
601        let log_dir = tempdir().unwrap();
602        let path = log_dir.path().join("test.log");
603        let data = "hello\n";
604        fs::write(&path, data).await.unwrap();
605
606        let old_fingerprint = FileFingerprint::FirstLinesChecksum(18057733963141331840);
607        let new_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
608        let position: FilePosition = 6;
609
610        let fingerprinter = Fingerprinter {
611            strategy: FingerprintStrategy::FirstLinesChecksum {
612                ignored_header_bytes: 0,
613                lines: 1,
614            },
615            max_line_length: 102400,
616            ignore_not_found: false,
617        };
618
619        let mut buf = Vec::new();
620
621        let data_dir = tempdir().unwrap();
622        {
623            let mut chkptr = Checkpointer::new(data_dir.path());
624            chkptr.update_checkpoint(old_fingerprint, position);
625            assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
626            chkptr.write_checkpoints().await.unwrap();
627        }
628        {
629            let mut chkptr = Checkpointer::new(data_dir.path());
630            chkptr.read_checkpoints(None).await;
631            assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
632
633            chkptr
634                .maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf)
635                .await;
636
637            assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
638            assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
639        }
640    }
641
642    #[tokio::test]
643    async fn test_checkpointer_fingerprint_upgrades_legacy_first_lines_checksum() {
644        let log_dir = tempdir().unwrap();
645        let path = log_dir.path().join("test.log");
646        let data = "hello\n";
647        fs::write(&path, data).await.unwrap();
648
649        let old_fingerprint = FileFingerprint::FirstLinesChecksum(17791311590754645022);
650        let new_fingerprint = FileFingerprint::FirstLinesChecksum(11081174131906673079);
651        let position: FilePosition = 6;
652
653        let fingerprinter = Fingerprinter {
654            strategy: FingerprintStrategy::FirstLinesChecksum {
655                ignored_header_bytes: 0,
656                lines: 1,
657            },
658            max_line_length: 102400,
659            ignore_not_found: false,
660        };
661
662        let mut buf = Vec::new();
663
664        let data_dir = tempdir().unwrap();
665        {
666            let mut chkptr = Checkpointer::new(data_dir.path());
667            chkptr.update_checkpoint(old_fingerprint, position);
668            assert_eq!(chkptr.get_checkpoint(old_fingerprint), Some(position));
669            chkptr.write_checkpoints().await.unwrap();
670        }
671        {
672            let mut chkptr = Checkpointer::new(data_dir.path());
673            chkptr.read_checkpoints(None).await;
674            assert_eq!(chkptr.get_checkpoint(new_fingerprint), None);
675
676            chkptr
677                .maybe_upgrade(&path, new_fingerprint, &fingerprinter, &mut buf)
678                .await;
679
680            assert_eq!(chkptr.get_checkpoint(new_fingerprint), Some(position));
681            assert_eq!(chkptr.get_checkpoint(old_fingerprint), None);
682        }
683    }
684
685    #[tokio::test]
686    async fn test_checkpointer_file_upgrades() {
687        let fingerprint = FileFingerprint::DevInode(1, 2);
688        let position: FilePosition = 1234;
689
690        let data_dir = tempdir().unwrap();
691
692        // Write out checkpoints in the legacy file format
693        {
694            let mut chkptr = Checkpointer::new(data_dir.path());
695            chkptr.update_checkpoint(fingerprint, position);
696            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
697            chkptr.write_legacy_checkpoints().await.unwrap();
698        }
699
700        // Ensure that the new files were not written but the old style of files were
701        assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
702        assert!(!data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
703        assert!(data_dir.path().join("checkpoints").is_dir());
704
705        // Read from those old files, ensure the checkpoints were loaded properly, and then write
706        // them normally (i.e. in the new format)
707        {
708            let mut chkptr = Checkpointer::new(data_dir.path());
709            chkptr.read_checkpoints(None).await;
710            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
711            chkptr.write_checkpoints().await.unwrap();
712        }
713
714        // Ensure that the stable file is present, the tmp file is not, and the legacy files have
715        // been cleaned up
716        assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
717        assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
718        assert!(!data_dir.path().join("checkpoints").is_dir());
719
720        // Ensure one last time that we can reread from the new files and get the same result
721        {
722            let mut chkptr = Checkpointer::new(data_dir.path());
723            chkptr.read_checkpoints(None).await;
724            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
725        }
726    }
727
728    #[tokio::test]
729    async fn test_checkpointer_expiration() {
730        let cases = vec![
731            // (checkpoint, position, seconds since removed)
732            (FileFingerprint::BytesChecksum(123), 0, 30),
733            (FileFingerprint::BytesChecksum(456), 1, 60),
734            (FileFingerprint::BytesChecksum(789), 2, 90),
735            (FileFingerprint::BytesChecksum(101112), 3, 120),
736        ];
737
738        let data_dir = tempdir().unwrap();
739        let mut chkptr = Checkpointer::new(data_dir.path());
740
741        for (fingerprint, position, removed) in cases.clone() {
742            chkptr.update_checkpoint(fingerprint, position);
743
744            // slide these in manually so we don't have to sleep for a long time
745            chkptr
746                .checkpoints
747                .removed_times
748                .insert(fingerprint, Utc::now() - chrono::Duration::seconds(removed));
749
750            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
751        }
752
753        // Update one that would otherwise be expired to ensure it sticks around
754        chkptr.update_checkpoint(cases[2].0, 42);
755
756        // Expiration is piggybacked on the persistence interval, so do a write to trigger it
757        chkptr.write_checkpoints().await.unwrap();
758
759        assert_eq!(chkptr.get_checkpoint(cases[0].0), Some(0));
760        assert_eq!(chkptr.get_checkpoint(cases[1].0), None);
761        assert_eq!(chkptr.get_checkpoint(cases[2].0), Some(42));
762        assert_eq!(chkptr.get_checkpoint(cases[3].0), None);
763    }
764
765    #[tokio::test]
766    async fn test_checkpointer_checksum_updates() {
767        let data_dir = tempdir().unwrap();
768
769        let fingerprinter = crate::Fingerprinter {
770            strategy: crate::FingerprintStrategy::Checksum {
771                bytes: 16,
772                ignored_header_bytes: 0,
773                lines: 1,
774            },
775            max_line_length: 1024,
776            ignore_not_found: false,
777        };
778
779        let log_path = data_dir.path().join("test.log");
780        let contents = "hello i am a test log line that is just long enough but not super long\n";
781        fs::write(&log_path, contents)
782            .await
783            .expect("writing test data");
784
785        let mut buf = vec![0; 1024];
786        let old = fingerprinter
787            .get_bytes_checksum(&log_path, &mut buf)
788            .await
789            .expect("getting old checksum")
790            .expect("still getting old checksum");
791
792        let new = fingerprinter
793            .get_fingerprint_of_file(&log_path, &mut buf)
794            .await
795            .expect("getting new checksum");
796
797        // make sure each is of the expected type and that the inner values are not the same
798        match (old, new) {
799            (FileFingerprint::BytesChecksum(old), FileFingerprint::FirstLinesChecksum(new)) => {
800                assert_ne!(old, new)
801            }
802            _ => panic!("unexpected checksum types"),
803        }
804
805        let mut chkptr = Checkpointer::new(data_dir.path());
806
807        // pretend that we had loaded this old style checksum from disk after an upgrade
808        chkptr.update_checkpoint(old, 1234);
809
810        assert!(chkptr.checkpoints.contains_bytes_checksums());
811
812        chkptr
813            .maybe_upgrade(&log_path, new, &fingerprinter, &mut buf)
814            .await;
815
816        assert!(!chkptr.checkpoints.contains_bytes_checksums());
817        assert_eq!(Some(1234), chkptr.get_checkpoint(new));
818        assert_eq!(None, chkptr.get_checkpoint(old));
819    }
820
821    // guards against accidental changes to the checkpoint serialization
822    #[tokio::test]
823    async fn test_checkpointer_serialization() {
824        let fingerprints = vec![
825            (
826                FileFingerprint::DevInode(1, 2),
827                r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
828            ),
829            (
830                FileFingerprint::BytesChecksum(3456),
831                r#"{"version":"1","checkpoints":[{"fingerprint":{"checksum":3456},"position":1234}]}"#,
832            ),
833            (
834                FileFingerprint::FirstLinesChecksum(78910),
835                r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
836            ),
837            (
838                FileFingerprint::Unknown(1337),
839                r#"{"version":"1","checkpoints":[{"fingerprint":{"unknown":1337},"position":1234}]}"#,
840            ),
841        ];
842        for (fingerprint, expected) in fingerprints {
843            let expected: serde_json::Value = serde_json::from_str(expected).unwrap();
844
845            let position: FilePosition = 1234;
846            let data_dir = tempdir().unwrap();
847            let mut chkptr = Checkpointer::new(data_dir.path());
848
849            chkptr.update_checkpoint(fingerprint, position);
850            chkptr.write_checkpoints().await.unwrap();
851
852            let got: serde_json::Value = {
853                let s = fs::read_to_string(data_dir.path().join(CHECKPOINT_FILE_NAME))
854                    .await
855                    .unwrap();
856                let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
857                for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
858                    checkpoint.as_object_mut().unwrap().remove("modified");
859                }
860                checkpoints
861            };
862
863            assert_eq!(expected, got);
864        }
865    }
866
867    // guards against accidental changes to the checkpoint deserialization and tests deserializing
868    // old checkpoint versions
869    #[tokio::test]
870    async fn test_checkpointer_deserialization() {
871        let serialized_checkpoints = r#"
872{
873  "version": "1",
874  "checkpoints": [
875    {
876      "fingerprint": { "dev_inode": [ 1, 2 ] },
877      "position": 1234,
878      "modified": "2021-07-12T18:19:11.769003Z"
879    },
880    {
881      "fingerprint": { "checksum": 3456 },
882      "position": 1234,
883      "modified": "2021-07-12T18:19:11.769003Z"
884    },
885    {
886      "fingerprint": { "first_line_checksum": 1234 },
887      "position": 1234,
888      "modified": "2021-07-12T18:19:11.769003Z"
889    },
890    {
891      "fingerprint": { "first_lines_checksum": 78910 },
892      "position": 1234,
893      "modified": "2021-07-12T18:19:11.769003Z"
894    },
895    {
896      "fingerprint": { "unknown": 1337 },
897      "position": 1234,
898      "modified": "2021-07-12T18:19:11.769003Z"
899    }
900  ]
901}
902        "#;
903        let fingerprints = vec![
904            FileFingerprint::DevInode(1, 2),
905            FileFingerprint::BytesChecksum(3456),
906            FileFingerprint::FirstLinesChecksum(1234),
907            FileFingerprint::FirstLinesChecksum(78910),
908            FileFingerprint::Unknown(1337),
909        ];
910
911        let data_dir = tempdir().unwrap();
912
913        let mut chkptr = Checkpointer::new(data_dir.path());
914
915        fs::write(
916            data_dir.path().join(CHECKPOINT_FILE_NAME),
917            serialized_checkpoints,
918        )
919        .await
920        .unwrap();
921
922        chkptr.read_checkpoints(None).await;
923
924        for fingerprint in fingerprints {
925            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
926        }
927    }
928}