1use std::{
2 collections::BTreeSet,
3 io,
4 path::{Path, PathBuf},
5 sync::Arc,
6};
7
8use chrono::{DateTime, Utc};
9use dashmap::DashMap;
10use serde::{Deserialize, Serialize};
11use tokio::{
12 fs::{self, File},
13 io::{AsyncReadExt, BufReader},
14 sync::Mutex,
15};
16use tracing::{error, info, warn};
17
18use super::{FilePosition, fingerprinter::FileFingerprint};
19
20const TMP_FILE_NAME: &str = "checkpoints.new.json";
21pub const CHECKPOINT_FILE_NAME: &str = "checkpoints.json";
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28#[serde(tag = "version", rename_all = "snake_case")]
29enum State {
30 #[serde(rename = "1")]
31 V1 { checkpoints: BTreeSet<Checkpoint> },
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
37#[serde(rename_all = "snake_case")]
38struct Checkpoint {
39 fingerprint: FileFingerprint,
40 position: FilePosition,
41 modified: DateTime<Utc>,
42}
43
44pub struct Checkpointer {
45 tmp_file_path: PathBuf,
46 stable_file_path: PathBuf,
47 checkpoints: Arc<CheckpointsView>,
48 last: Mutex<Option<State>>,
49}
50
51#[derive(Debug, Default)]
54pub struct CheckpointsView {
55 checkpoints: DashMap<FileFingerprint, FilePosition>,
56 modified_times: DashMap<FileFingerprint, DateTime<Utc>>,
57 removed_times: DashMap<FileFingerprint, DateTime<Utc>>,
58}
59
60impl CheckpointsView {
61 pub fn update(&self, fng: FileFingerprint, pos: FilePosition) {
62 self.checkpoints.insert(fng, pos);
63 self.modified_times.insert(fng, Utc::now());
64 self.removed_times.remove(&fng);
65 }
66
67 pub fn get(&self, fng: FileFingerprint) -> Option<FilePosition> {
68 self.checkpoints.get(&fng).map(|r| *r.value())
69 }
70
71 pub fn set_dead(&self, fng: FileFingerprint) {
72 self.removed_times.insert(fng, Utc::now());
73 }
74
75 pub fn update_key(&self, old: FileFingerprint, new: FileFingerprint) {
76 if let Some((_, value)) = self.checkpoints.remove(&old) {
77 self.checkpoints.insert(new, value);
78 }
79
80 if let Some((_, value)) = self.modified_times.remove(&old) {
81 self.modified_times.insert(new, value);
82 }
83
84 if let Some((_, value)) = self.removed_times.remove(&old) {
85 self.removed_times.insert(new, value);
86 }
87 }
88
89 pub fn remove_expired(&self) {
90 let now = Utc::now();
91
92 let to_remove = self
96 .removed_times
97 .iter()
98 .filter(|entry| {
99 let ts = entry.value();
100 let duration = now - *ts;
101 duration >= chrono::Duration::seconds(60)
102 })
103 .map(|entry| *entry.key())
104 .collect::<Vec<FileFingerprint>>();
105
106 for fng in to_remove {
107 self.checkpoints.remove(&fng);
108 self.modified_times.remove(&fng);
109 self.removed_times.remove(&fng);
110 }
111 }
112
113 fn load(&self, checkpoint: Checkpoint) {
114 self.checkpoints
115 .insert(checkpoint.fingerprint, checkpoint.position);
116 self.modified_times
117 .insert(checkpoint.fingerprint, checkpoint.modified);
118 }
119
120 fn set_state(&self, state: State, ignore_before: Option<DateTime<Utc>>) {
121 match state {
122 State::V1 { checkpoints } => {
123 for checkpoint in checkpoints {
124 if let Some(ignore_before) = ignore_before
125 && checkpoint.modified < ignore_before
126 {
127 continue;
128 }
129 self.load(checkpoint);
130 }
131 }
132 }
133 }
134
135 fn get_state(&self) -> State {
136 State::V1 {
137 checkpoints: self
138 .checkpoints
139 .iter()
140 .map(|entry| {
141 let fingerprint = entry.key();
142 let position = entry.value();
143 Checkpoint {
144 fingerprint: *fingerprint,
145 position: *position,
146 modified: self
147 .modified_times
148 .get(fingerprint)
149 .map(|r| *r.value())
150 .unwrap_or_else(Utc::now),
151 }
152 })
153 .collect(),
154 }
155 }
156}
157
158impl Checkpointer {
159 pub fn new(data_dir: &Path) -> Checkpointer {
160 let tmp_file_path = data_dir.join(TMP_FILE_NAME);
161 let stable_file_path = data_dir.join(CHECKPOINT_FILE_NAME);
162
163 Checkpointer {
164 tmp_file_path,
165 stable_file_path,
166 checkpoints: Arc::new(CheckpointsView::default()),
167 last: Mutex::new(None),
168 }
169 }
170
171 pub fn view(&self) -> Arc<CheckpointsView> {
172 Arc::clone(&self.checkpoints)
173 }
174
175 #[cfg(test)]
176 pub fn update_checkpoint(&mut self, fng: FileFingerprint, pos: FilePosition) {
177 self.checkpoints.update(fng, pos);
178 }
179
180 #[cfg(test)]
181 pub fn get_checkpoint(&self, fng: FileFingerprint) -> Option<FilePosition> {
182 self.checkpoints.get(fng)
183 }
184
185 pub async fn write_checkpoints(&self) -> Result<usize, io::Error> {
189 self.checkpoints.remove_expired();
194
195 let current = self.checkpoints.get_state();
196
197 let mut last = self.last.lock().await;
199 if last.as_ref() != Some(¤t) {
200 let tmp_file_path = self.tmp_file_path.clone();
205
206 let current = tokio::task::spawn_blocking(move || -> Result<State, io::Error> {
208 let mut f = std::io::BufWriter::new(std::fs::File::create(tmp_file_path)?);
209 serde_json::to_writer(&mut f, ¤t)?;
210 f.into_inner()?.sync_all()?;
211 Ok(current)
212 })
213 .await
214 .map_err(io::Error::other)??;
215
216 fs::rename(&self.tmp_file_path, &self.stable_file_path).await?;
222
223 *last = Some(current);
224 }
225
226 Ok(self.checkpoints.checkpoints.len())
227 }
228
229 pub async fn read_checkpoints(&mut self, ignore_before: Option<DateTime<Utc>>) {
233 match self.read_checkpoints_file(&self.tmp_file_path).await {
238 Ok(state) => {
239 warn!(message = "Recovered checkpoint data from interrupted process.");
240 self.checkpoints.set_state(state, ignore_before);
241
242 if let Err(error) = fs::rename(&self.tmp_file_path, &self.stable_file_path).await {
245 warn!(message = "Error persisting recovered checkpoint file.", %error);
246 }
247 return;
248 }
249 Err(error) if error.kind() == io::ErrorKind::NotFound => {
250 }
252 Err(error) => {
253 error!(message = "Unable to recover checkpoint data from interrupted process.", %error);
254 }
255 }
256
257 match self.read_checkpoints_file(&self.stable_file_path).await {
261 Ok(state) => {
262 info!(message = "Loaded checkpoint data.");
263 self.checkpoints.set_state(state, ignore_before);
264 }
265 Err(error) if error.kind() == io::ErrorKind::NotFound => {
266 }
268 Err(error) => {
269 warn!(message = "Unable to load checkpoint data.", %error);
270 }
271 }
272 }
273
274 async fn read_checkpoints_file(&self, path: &Path) -> Result<State, io::Error> {
275 let mut reader = BufReader::new(File::open(path).await?);
279 let mut output = Vec::new();
280 reader.read_to_end(&mut output).await?;
281
282 serde_json::from_slice(&output[..])
283 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
284 }
285}
286
287#[cfg(test)]
288mod test {
289 use chrono::{Duration, Utc};
290 use similar_asserts::assert_eq;
291 use tempfile::tempdir;
292 use tokio::fs;
293
294 use super::{
295 CHECKPOINT_FILE_NAME, Checkpoint, Checkpointer, FileFingerprint, FilePosition,
296 TMP_FILE_NAME,
297 };
298
299 #[test]
300 fn test_checkpointer_basics() {
301 let fingerprints = vec![
302 FileFingerprint::DevInode(1, 2),
303 FileFingerprint::FirstLinesChecksum(78910),
304 ];
305 for fingerprint in fingerprints {
306 let position: FilePosition = 1234;
307 let data_dir = tempdir().unwrap();
308 let mut chkptr = Checkpointer::new(data_dir.path());
309 chkptr.update_checkpoint(fingerprint, position);
310 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
311 }
312 }
313
314 #[tokio::test]
315 async fn test_checkpointer_ignore_before() {
316 let now = Utc::now();
317 let newer = (FileFingerprint::DevInode(1, 2), now - Duration::seconds(5));
318 let oldish = (
319 FileFingerprint::FirstLinesChecksum(78910),
320 now - Duration::seconds(15),
321 );
322 let older = (FileFingerprint::DevInode(3, 4), now - Duration::seconds(20));
323 let ignore_before = Some(now - Duration::seconds(12));
324
325 let position: FilePosition = 1234;
326 let data_dir = tempdir().unwrap();
327
328 {
330 let chkptr = Checkpointer::new(data_dir.path());
331
332 for (fingerprint, modified) in &[&newer, &oldish, &older] {
333 chkptr.checkpoints.load(Checkpoint {
334 fingerprint: *fingerprint,
335 position,
336 modified: *modified,
337 });
338 assert_eq!(chkptr.get_checkpoint(*fingerprint), Some(position));
339 chkptr.write_checkpoints().await.unwrap();
340 }
341 }
342
343 {
345 let mut chkptr = Checkpointer::new(data_dir.path());
346 chkptr.read_checkpoints(ignore_before).await;
347
348 assert_eq!(chkptr.get_checkpoint(newer.0), Some(position));
349 assert_eq!(chkptr.get_checkpoint(oldish.0), None);
350 assert_eq!(chkptr.get_checkpoint(older.0), None);
351 }
352 }
353
354 #[tokio::test]
355 async fn test_checkpointer_restart() {
356 let fingerprints = vec![
357 FileFingerprint::DevInode(1, 2),
358 FileFingerprint::FirstLinesChecksum(78910),
359 ];
360 for fingerprint in fingerprints {
361 let position: FilePosition = 1234;
362 let data_dir = tempdir().unwrap();
363 {
364 let mut chkptr = Checkpointer::new(data_dir.path());
365 chkptr.update_checkpoint(fingerprint, position);
366 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
367 chkptr.write_checkpoints().await.unwrap();
368 }
369 {
370 let mut chkptr = Checkpointer::new(data_dir.path());
371 assert_eq!(chkptr.get_checkpoint(fingerprint), None);
372 chkptr.read_checkpoints(None).await;
373 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
374 }
375 }
376 }
377
378 #[tokio::test]
379 async fn test_checkpointer_file_upgrades() {
380 let fingerprint = FileFingerprint::DevInode(1, 2);
381 let position: FilePosition = 1234;
382
383 let data_dir = tempdir().unwrap();
384
385 {
386 let mut chkptr = Checkpointer::new(data_dir.path());
387 chkptr.update_checkpoint(fingerprint, position);
388 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
389
390 assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
392 assert!(!data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
393 assert!(!data_dir.path().join("checkpoints").is_dir());
394
395 chkptr.write_checkpoints().await.unwrap();
396
397 assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
398 assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
399 assert!(!data_dir.path().join("checkpoints").is_dir());
400 }
401
402 {
405 let mut chkptr = Checkpointer::new(data_dir.path());
406 chkptr.read_checkpoints(None).await;
407 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
408 chkptr.write_checkpoints().await.unwrap();
409 }
410
411 assert!(!data_dir.path().join(TMP_FILE_NAME).exists());
414 assert!(data_dir.path().join(CHECKPOINT_FILE_NAME).exists());
415 assert!(!data_dir.path().join("checkpoints").is_dir());
416
417 {
419 let mut chkptr = Checkpointer::new(data_dir.path());
420 chkptr.read_checkpoints(None).await;
421 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
422 }
423 }
424
425 #[tokio::test]
426 async fn test_checkpointer_expiration() {
427 let cases = vec![
428 (FileFingerprint::FirstLinesChecksum(123), 0, 30),
430 (FileFingerprint::FirstLinesChecksum(456), 1, 60),
431 (FileFingerprint::FirstLinesChecksum(789), 2, 90),
432 (FileFingerprint::FirstLinesChecksum(101112), 3, 120),
433 ];
434
435 let data_dir = tempdir().unwrap();
436 let mut chkptr = Checkpointer::new(data_dir.path());
437
438 for (fingerprint, position, removed) in cases.clone() {
439 chkptr.update_checkpoint(fingerprint, position);
440
441 chkptr
443 .checkpoints
444 .removed_times
445 .insert(fingerprint, Utc::now() - chrono::Duration::seconds(removed));
446
447 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(position));
448 }
449
450 chkptr.update_checkpoint(cases[2].0, 42);
452
453 chkptr.write_checkpoints().await.unwrap();
455
456 assert_eq!(chkptr.get_checkpoint(cases[0].0), Some(0));
457 assert_eq!(chkptr.get_checkpoint(cases[1].0), None);
458 assert_eq!(chkptr.get_checkpoint(cases[2].0), Some(42));
459 assert_eq!(chkptr.get_checkpoint(cases[3].0), None);
460 }
461
462 #[tokio::test]
463 async fn test_checkpointer_strategy_checksum_happy_path() {
464 let data_dir = tempdir().unwrap();
465
466 let mut fingerprinter = crate::Fingerprinter::new(
467 crate::FingerprintStrategy::FirstLinesChecksum {
468 ignored_header_bytes: 0,
469 lines: 1,
470 },
471 1024,
472 false,
473 );
474
475 let log_path = data_dir.path().join("test.log");
476 let contents = "hello i am a test log line that is just long enough but not super long\n";
477 fs::write(&log_path, contents)
478 .await
479 .expect("writing test data");
480
481 let new = fingerprinter
482 .fingerprint(&log_path)
483 .await
484 .expect("getting new checksum");
485
486 assert!(matches!(new, FileFingerprint::FirstLinesChecksum(_)));
487
488 let mut chkptr = Checkpointer::new(data_dir.path());
489 chkptr.update_checkpoint(new, 1234);
490 assert_eq!(Some(1234), chkptr.get_checkpoint(new));
491 }
492
493 #[tokio::test]
495 async fn test_checkpointer_serialization() {
496 let fingerprints = vec![
497 (
498 FileFingerprint::DevInode(1, 2),
499 r#"{"version":"1","checkpoints":[{"fingerprint":{"dev_inode":[1,2]},"position":1234}]}"#,
500 ),
501 (
502 FileFingerprint::FirstLinesChecksum(78910),
503 r#"{"version":"1","checkpoints":[{"fingerprint":{"first_lines_checksum":78910},"position":1234}]}"#,
504 ),
505 ];
506 for (fingerprint, expected) in fingerprints {
507 let expected: serde_json::Value = serde_json::from_str(expected).unwrap();
508
509 let position: FilePosition = 1234;
510 let data_dir = tempdir().unwrap();
511 let mut chkptr = Checkpointer::new(data_dir.path());
512
513 chkptr.update_checkpoint(fingerprint, position);
514 chkptr.write_checkpoints().await.unwrap();
515
516 let got: serde_json::Value = {
517 let s = fs::read_to_string(data_dir.path().join(CHECKPOINT_FILE_NAME))
518 .await
519 .unwrap();
520 let mut checkpoints: serde_json::Value = serde_json::from_str(&s).unwrap();
521 for checkpoint in checkpoints["checkpoints"].as_array_mut().unwrap() {
522 checkpoint.as_object_mut().unwrap().remove("modified");
523 }
524 checkpoints
525 };
526
527 assert_eq!(expected, got);
528 }
529 }
530
531 #[tokio::test]
534 async fn test_checkpointer_deserialization() {
535 let serialized_checkpoints = r#"
536{
537 "version": "1",
538 "checkpoints": [
539 {
540 "fingerprint": { "dev_inode": [ 1, 2 ] },
541 "position": 1234,
542 "modified": "2021-07-12T18:19:11.769003Z"
543 },
544 {
545 "fingerprint": { "first_line_checksum": 1234 },
546 "position": 1234,
547 "modified": "2021-07-12T18:19:11.769003Z"
548 },
549 {
550 "fingerprint": { "first_lines_checksum": 78910 },
551 "position": 1234,
552 "modified": "2021-07-12T18:19:11.769003Z"
553 }
554 ]
555}
556 "#;
557 let fingerprints = vec![
558 FileFingerprint::DevInode(1, 2),
559 FileFingerprint::FirstLinesChecksum(1234),
560 FileFingerprint::FirstLinesChecksum(78910),
561 ];
562
563 let data_dir = tempdir().unwrap();
564
565 let mut chkptr = Checkpointer::new(data_dir.path());
566
567 fs::write(
568 data_dir.path().join(CHECKPOINT_FILE_NAME),
569 serialized_checkpoints,
570 )
571 .await
572 .unwrap();
573
574 chkptr.read_checkpoints(None).await;
575
576 for fingerprint in fingerprints {
577 assert_eq!(chkptr.get_checkpoint(fingerprint), Some(1234))
578 }
579 }
580}