file_source_common/
checkpointer.rs

1use std::{
2    collections::BTreeSet,
3    io,
4    path::{Path, PathBuf},
5    sync::Arc,
6};
7
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use serde::{Deserialize, Serialize};
11use tokio::{
12    fs::{self, File},
13    io::{AsyncReadExt, BufReader},
14    sync::Mutex,
15};
16use tracing::{error, info, warn};
17
18use super::{FilePosition, fingerprinter::FileFingerprint};
19
20const TMP_FILE_NAME: &str = "checkpoints.new.json";
21pub const CHECKPOINT_FILE_NAME: &str = "checkpoints.json";
22
23/// This enum represents the file format of checkpoints persisted to disk. Right
24/// now there is only one variant, but any incompatible changes will require and
25/// additional variant to be added here and handled anywhere that we transit
26/// this format.
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(tag = "version", rename_all = "snake_case")]
29enum State {
30    #[serde(rename = "1")]
31    V1 { checkpoints: BTreeSet<Checkpoint> },
32}
33
34/// A simple JSON-friendly struct of the fingerprint/position pair, since
35/// fingerprints as objects cannot be keys in a plain JSON map.
36#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
37#[serde(rename_all = "snake_case")]
38struct Checkpoint {
39    fingerprint: FileFingerprint,
40    position: FilePosition,
41    modified: DateTime<Utc>,
42}
43
44pub struct Checkpointer {
45    tmp_file_path: PathBuf,
46    stable_file_path: PathBuf,
47    checkpoints: Arc<CheckpointsView>,
48    last: Mutex<Option<State>>,
49}
50
51/// A thread-safe handle for reading and writing checkpoints in-memory across
52/// multiple threads.
53#[derive(Debug, Default)]
54pub struct CheckpointsView {
55    checkpoints: DashMap<FileFingerprint, FilePosition>,
56    modified_times: DashMap<FileFingerprint, DateTime<Utc>>,
57    removed_times: DashMap<FileFingerprint, DateTime<Utc>>,
58}
59
60impl CheckpointsView {
61    pub fn update(&self, fng: FileFingerprint, pos: FilePosition) {
62        self.checkpoints.insert(fng, pos);
63        self.modified_times.insert(fng, Utc::now());
64        self.removed_times.remove(&fng);
65    }
66
67    pub fn get(&self, fng: FileFingerprint) -> Option<FilePosition> {
68        self.checkpoints.get(&fng).map(|r| *r.value())
69    }
70
71    pub fn set_dead(&self, fng: FileFingerprint) {
72        self.removed_times.insert(fng, Utc::now());
73    }
74
75    pub fn update_key(&self, old: FileFingerprint, new: FileFingerprint) {
76        if let Some((_, value)) = self.checkpoints.remove(&old) {
77            self.checkpoints.insert(new, value);
78        }
79
80        if let Some((_, value)) = self.modified_times.remove(&old) {
81            self.modified_times.insert(new, value);
82        }
83
84        if let Some((_, value)) = self.removed_times.remove(&old) {
85            self.removed_times.insert(new, value);
86        }
87    }
88
89    pub fn remove_expired(&self) {
90        let now = Utc::now();
91
92        // Collect all of the expired keys. Removing them while iterating can
93        // lead to deadlocks, the set should be small, and this is not a
94        // performance-sensitive path.
95        let to_remove = self
96            .removed_times
97            .iter()
98            .filter(|entry| {
99                let ts = entry.value();
100                let duration = now - *ts;
101                duration >= chrono::Duration::seconds(60)
102            })
103            .map(|entry| *entry.key())
104            .collect::<Vec<FileFingerprint>>();
105
106        for fng in to_remove {
107            self.checkpoints.remove(&fng);
108            self.modified_times.remove(&fng);
109            self.removed_times.remove(&fng);
110        }
111    }
112
113    fn load(&self, checkpoint: Checkpoint) {
114        self.checkpoints
115            .insert(checkpoint.fingerprint, checkpoint.position);
116        self.modified_times
117            .insert(checkpoint.fingerprint, checkpoint.modified);
118    }
119
120    fn set_state(&self, state: State, ignore_before: Option<DateTime<Utc>>) {
121        match state {
122            State::V1 { checkpoints } => {
123                for checkpoint in checkpoints {
124                    if let Some(ignore_before) = ignore_before
125                        && checkpoint.modified < ignore_before
126                    {
127                        continue;
128                    }
129                    self.load(checkpoint);
130                }
131            }
132        }
133    }
134
135    fn get_state(&self) -> State {
136        State::V1 {
137            checkpoints: self
138                .checkpoints
139                .iter()
140                .map(|entry| {
141                    let fingerprint = entry.key();
142                    let position = entry.value();
143                    Checkpoint {
144                        fingerprint: *fingerprint,
145                        position: *position,
146                        modified: self
147                            .modified_times
148                            .get(fingerprint)
149                            .map(|r| *r.value())
150                            .unwrap_or_else(Utc::now),
151                    }
152                })
153                .collect(),
154        }
155    }
156}
157
158impl Checkpointer {
159    pub fn new(data_dir: &Path) -> Checkpointer {
160        let tmp_file_path = data_dir.join(TMP_FILE_NAME);
161        let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME);
162
163        Checkpointer {
164            tmp_file_path,
165            stable_file_path,
166            checkpoints: Arc::new(CheckpointsView::default()),
167            last: Mutex::new(None),
168        }
169    }
170
171    pub fn view(&self) -> Arc<CheckpointsView> {
172        Arc::clone(&self.checkpoints)
173    }
174
175    #[cfg(test)]
176    pub fn update_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) {
177        self.checkpoints.update(fng, pos);
178    }
179
180    #[cfg(test)]
181    pub fn get_checkpoint(&self, fng: FileFingerprint) -> Option<FilePosition> {
182        self.checkpoints.get(fng)
183    }
184
185    /// Persist the current checkpoints state to disk, making our best effort to
186    /// do so in an atomic way that allow for recovering the previous state in
187    /// the event of a crash.
188    pub async fn write_checkpoints(&self) -> Result<usize, io::Error> {
189        // First drop any checkpoints for files that were removed more than 60
190        // seconds ago. This keeps our working set as small as possible and
191        // makes sure we don't spend time and IO writing checkpoints that don't
192        // matter anymore.
193        self.checkpoints.remove_expired();
194
195        let current = self.checkpoints.get_state();
196
197        // Fetch last written state.
198        let mut last = self.last.lock().await;
199        if last.as_ref() != Some(&current) {
200            // Write the new checkpoints to a tmp file and flush it fully to
201            // disk. If vector dies anywhere during this section, the existing
202            // stable file will still be in its current valid state and we'll be
203            // able to recover.
204            let tmp_file_path = self.tmp_file_path.clone();
205
206            // spawn_blocking shouldn't be needed: https://github.com/vectordotdev/vector/issues/23743
207            let current = tokio::task::spawn_blocking(move || -> Result<State, io::Error> {
208                let mut f = std::io::BufWriter::new(std::fs::File::create(tmp_file_path)?);
209                serde_json::to_writer(&mut f, &current)?;
210                f.into_inner()?.sync_all()?;
211                Ok(current)
212            })
213            .await
214            .map_err(io::Error::other)??;
215
216            // Once the temp file is fully flushed, rename the tmp file to replace
217            // the previous stable file. This is an atomic operation on POSIX
218            // systems (and the stdlib claims to provide equivalent behavior on
219            // Windows), which should prevent scenarios where we don't have at least
220            // one full valid file to recover from.
221            fs::rename(&self.tmp_file_path, &self.stable_file_path).await?;
222
223            *last = Some(current);
224        }
225
226        Ok(self.checkpoints.checkpoints.len())
227    }
228
229    /// Read persisted checkpoints from disk, preferring the new JSON file
230    /// format but falling back to the legacy system when those files are found
231    /// instead.
232    pub async fn read_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
233        // First try reading from the tmp file location. If this works, it means
234        // that the previous process was interrupted in the process of
235        // checkpointing and the tmp file should contain more recent data that
236        // should be preferred.
237        match self.read_checkpoints_file(&self.tmp_file_path).await {
238            Ok(state) => {
239                warn!(message = "Recovered checkpoint data from interrupted process.");
240                self.checkpoints.set_state(state, ignore_before);
241
242                // Try to move this tmp file to the stable location so we don't
243                // immediately overwrite it when we next persist checkpoints.
244                if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path).await {
245                    warn!(message = "Error persisting recovered checkpoint file.", %error);
246                }
247                return;
248            }
249            Err(error) if error.kind() == io::ErrorKind::NotFound => {
250                // This is expected, so no warning needed
251            }
252            Err(error) => {
253                error!(message = "Unable to recover checkpoint data from interrupted process.", %error);
254            }
255        }
256
257        // Next, attempt to read checkpoints from the stable file location. This
258        // is the expected location, so warn more aggressively if something goes
259        // wrong.
260        match self.read_checkpoints_file(&self.stable_file_path).await {
261            Ok(state) => {
262                info!(message = "Loaded checkpoint data.");
263                self.checkpoints.set_state(state, ignore_before);
264            }
265            Err(error) if error.kind() == io::ErrorKind::NotFound => {
266                // This is expected, so no warning needed
267            }
268            Err(error) => {
269                warn!(message = "Unable to load checkpoint data.", %error);
270            }
271        }
272    }
273
274    async fn read_checkpoints_file(&self, path: &Path) -> Result<State, io::Error> {
275        // Possible optimization: mmap the file into a slice and pass it into serde_json instead of
276        // calling read_to_end. Need to investigate if this would work with tokio::fs::File
277
278        let mut reader = BufReader::new(File::open(path).await?);
279        let mut output = Vec::new();
280        reader.read_to_end(&mut output).await?;
281
282        serde_json::from_slice(&output[..])
283            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
284    }
285}
286
287#[cfg(test)]
288mod test {
289    use chrono::{Duration, Utc};
290    use similar_asserts::assert_eq;
291    use tempfile::tempdir;
292    use tokio::fs;
293
294    use super::{
295        CHECKPOINT_FILE_NAME, Checkpoint, Checkpointer, FileFingerprint, FilePosition,
296        TMP_FILE_NAME,
297    };
298
299    #[test]
300    fn test_checkpointer_basics() {
301        let fingerprints = vec![
302            FileFingerprint::DevInode(1, 2),
303            FileFingerprint::FirstLinesChecksum(78910),
304        ];
305        for fingerprint in fingerprints {
306            let position: FilePosition = 1234;
307            let data_dir = tempdir().unwrap();
308            let mut chkptr = Checkpointer::new(data_dir.path());
309            chkptr.update_checkpoint(fingerprint, position);
310            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
311        }
312    }
313
314    #[tokio::test]
315    async fn test_checkpointer_ignore_before() {
316        let now = Utc::now();
317        let newer = (FileFingerprint::DevInode(1, 2), now - Duration::seconds(5));
318        let oldish = (
319            FileFingerprint::FirstLinesChecksum(78910),
320            now - Duration::seconds(15),
321        );
322        let older = (FileFingerprint::DevInode(3, 4), now - Duration::seconds(20));
323        let ignore_before = Some(now - Duration::seconds(12));
324
325        let position: FilePosition = 1234;
326        let data_dir = tempdir().unwrap();
327
328        // load and persist the checkpoints
329        {
330            let chkptr = Checkpointer::new(data_dir.path());
331
332            for (fingerprint, modified) in &[&newer, &oldish, &older] {
333                chkptr.checkpoints.load(Checkpoint {
334                    fingerprint: *fingerprint,
335                    position,
336                    modified: *modified,
337                });
338                assert_eq!(chkptr.get_checkpoint(*fingerprint), Some(position));
339                chkptr.write_checkpoints().await.unwrap();
340            }
341        }
342
343        // read them back and assert old are removed
344        {
345            let mut chkptr = Checkpointer::new(data_dir.path());
346            chkptr.read_checkpoints(ignore_before).await;
347
348            assert_eq!(chkptr.get_checkpoint(newer.0), Some(position));
349            assert_eq!(chkptr.get_checkpoint(oldish.0), None);
350            assert_eq!(chkptr.get_checkpoint(older.0), None);
351        }
352    }
353
354    #[tokio::test]
355    async fn test_checkpointer_restart() {
356        let fingerprints = vec![
357            FileFingerprint::DevInode(1, 2),
358            FileFingerprint::FirstLinesChecksum(78910),
359        ];
360        for fingerprint in fingerprints {
361            let position: FilePosition = 1234;
362            let data_dir = tempdir().unwrap();
363            {
364                let mut chkptr = Checkpointer::new(data_dir.path());
365                chkptr.update_checkpoint(fingerprint, position);
366                assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
367                chkptr.write_checkpoints().await.unwrap();
368            }
369            {
370                let mut chkptr = Checkpointer::new(data_dir.path());
371                assert_eq!(chkptr.get_checkpoint(fingerprint), None);
372                chkptr.read_checkpoints(None).await;
373                assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
374            }
375        }
376    }
377
378    #[tokio::test]
379    async fn test_checkpointer_file_upgrades() {
380        let fingerprint = FileFingerprint::DevInode(1, 2);
381        let position: FilePosition = 1234;
382
383        let data_dir = tempdir().unwrap();
384
385        {
386            let mut chkptr = Checkpointer::new(data_dir.path());
387            chkptr.update_checkpoint(fingerprint, position);
388            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
389
390            // Ensure that the new files were not written but the old style of files were
391            assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
392            assert!(!data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
393            assert!(!data_dir.path().join("checkpoints").is_dir());
394
395            chkptr.write_checkpoints().await.unwrap();
396
397            assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
398            assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
399            assert!(!data_dir.path().join("checkpoints").is_dir());
400        }
401
402        // Read from those old files, ensure the checkpoints were loaded properly, and then write
403        // them normally (i.e. in the new format)
404        {
405            let mut chkptr = Checkpointer::new(data_dir.path());
406            chkptr.read_checkpoints(None).await;
407            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
408            chkptr.write_checkpoints().await.unwrap();
409        }
410
411        // Ensure that the stable file is present, the tmp file is not, and the legacy files have
412        // been cleaned up
413        assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
414        assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
415        assert!(!data_dir.path().join("checkpoints").is_dir());
416
417        // Ensure one last time that we can reread from the new files and get the same result
418        {
419            let mut chkptr = Checkpointer::new(data_dir.path());
420            chkptr.read_checkpoints(None).await;
421            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
422        }
423    }
424
425    #[tokio::test]
426    async fn test_checkpointer_expiration() {
427        let cases = vec![
428            // (checkpoint, position, seconds since removed)
429            (FileFingerprint::FirstLinesChecksum(123), 0, 30),
430            (FileFingerprint::FirstLinesChecksum(456), 1, 60),
431            (FileFingerprint::FirstLinesChecksum(789), 2, 90),
432            (FileFingerprint::FirstLinesChecksum(101112), 3, 120),
433        ];
434
435        let data_dir = tempdir().unwrap();
436        let mut chkptr = Checkpointer::new(data_dir.path());
437
438        for (fingerprint, position, removed) in cases.clone() {
439            chkptr.update_checkpoint(fingerprint, position);
440
441            // slide these in manually so we don't have to sleep for a long time
442            chkptr
443                .checkpoints
444                .removed_times
445                .insert(fingerprint, Utc::now() - chrono::Duration::seconds(removed));
446
447            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
448        }
449
450        // Update one that would otherwise be expired to ensure it sticks around
451        chkptr.update_checkpoint(cases[2].0, 42);
452
453        // Expiration is piggybacked on the persistence interval, so do a write to trigger it
454        chkptr.write_checkpoints().await.unwrap();
455
456        assert_eq!(chkptr.get_checkpoint(cases[0].0), Some(0));
457        assert_eq!(chkptr.get_checkpoint(cases[1].0), None);
458        assert_eq!(chkptr.get_checkpoint(cases[2].0), Some(42));
459        assert_eq!(chkptr.get_checkpoint(cases[3].0), None);
460    }
461
462    #[tokio::test]
463    async fn test_checkpointer_strategy_checksum_happy_path() {
464        let data_dir = tempdir().unwrap();
465
466        let mut fingerprinter = crate::Fingerprinter::new(
467            crate::FingerprintStrategy::FirstLinesChecksum {
468                ignored_header_bytes: 0,
469                lines: 1,
470            },
471            1024,
472            false,
473        );
474
475        let log_path = data_dir.path().join("test.log");
476        let contents = "hello i am a test log line that is just long enough but not super long\n";
477        fs::write(&log_path, contents)
478            .await
479            .expect("writing test data");
480
481        let new = fingerprinter
482            .fingerprint(&log_path)
483            .await
484            .expect("getting new checksum");
485
486        assert!(matches!(new, FileFingerprint::FirstLinesChecksum(_)));
487
488        let mut chkptr = Checkpointer::new(data_dir.path());
489        chkptr.update_checkpoint(new, 1234);
490        assert_eq!(Some(1234), chkptr.get_checkpoint(new));
491    }
492
493    // guards against accidental changes to the checkpoint serialization
494    #[tokio::test]
495    async fn test_checkpointer_serialization() {
496        let fingerprints = vec![
497            (
498                FileFingerprint::DevInode(1, 2),
499                r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
500            ),
501            (
502                FileFingerprint::FirstLinesChecksum(78910),
503                r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
504            ),
505        ];
506        for (fingerprint, expected) in fingerprints {
507            let expected: serde_json::Value = serde_json::from_str(expected).unwrap();
508
509            let position: FilePosition = 1234;
510            let data_dir = tempdir().unwrap();
511            let mut chkptr = Checkpointer::new(data_dir.path());
512
513            chkptr.update_checkpoint(fingerprint, position);
514            chkptr.write_checkpoints().await.unwrap();
515
516            let got: serde_json::Value = {
517                let s = fs::read_to_string(data_dir.path().join(CHECKPOINT_FILE_NAME))
518                    .await
519                    .unwrap();
520                let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
521                for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
522                    checkpoint.as_object_mut().unwrap().remove("modified");
523                }
524                checkpoints
525            };
526
527            assert_eq!(expected, got);
528        }
529    }
530
531    // guards against accidental changes to the checkpoint deserialization and tests deserializing
532    // old checkpoint versions
533    #[tokio::test]
534    async fn test_checkpointer_deserialization() {
535        let serialized_checkpoints = r#"
536{
537  "version": "1",
538  "checkpoints": [
539    {
540      "fingerprint": { "dev_inode": [ 1, 2 ] },
541      "position": 1234,
542      "modified": "2021-07-12T18:19:11.769003Z"
543    },
544    {
545      "fingerprint": { "first_line_checksum": 1234 },
546      "position": 1234,
547      "modified": "2021-07-12T18:19:11.769003Z"
548    },
549    {
550      "fingerprint": { "first_lines_checksum": 78910 },
551      "position": 1234,
552      "modified": "2021-07-12T18:19:11.769003Z"
553    }
554  ]
555}
556        "#;
557        let fingerprints = vec![
558            FileFingerprint::DevInode(1, 2),
559            FileFingerprint::FirstLinesChecksum(1234),
560            FileFingerprint::FirstLinesChecksum(78910),
561        ];
562
563        let data_dir = tempdir().unwrap();
564
565        let mut chkptr = Checkpointer::new(data_dir.path());
566
567        fs::write(
568            data_dir.path().join(CHECKPOINT_FILE_NAME),
569            serialized_checkpoints,
570        )
571        .await
572        .unwrap();
573
574        chkptr.read_checkpoints(None).await;
575
576        for fingerprint in fingerprints {
577            assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
578        }
579    }
580}