vector_buffers/variants/disk_v2/
ledger.rs

1use std::{
2    fmt, io, mem,
3    path::PathBuf,
4    sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
5    sync::Arc,
6    time::Instant,
7};
8
9use bytecheck::CheckBytes;
10use bytes::BytesMut;
11use crossbeam_utils::atomic::AtomicCell;
12use fslock::LockFile;
13use futures::StreamExt;
14use rkyv::{with::Atomic, Archive, Serialize};
15use snafu::{ResultExt, Snafu};
16use tokio::{fs, io::AsyncWriteExt, sync::Notify};
17use vector_common::finalizer::OrderedFinalizer;
18
19use super::{
20    backed_archive::BackedArchive,
21    common::{align16, DiskBufferConfig, MAX_FILE_ID},
22    io::{AsyncFile, WritableMemoryMap},
23    ser::SerializeError,
24    Filesystem,
25};
26use crate::buffer_usage_data::BufferUsageHandle;
27
28pub const LEDGER_LEN: usize = align16(mem::size_of::<ArchivedLedgerState>());
29
30/// Error that occurred during calls to [`Ledger`].
31#[derive(Debug, Snafu)]
32pub enum LedgerLoadCreateError {
33    /// A general I/O error occurred.
34    ///
35    /// Generally, I/O errors should only occur when flushing the ledger state and the underlying
36    /// ledger file has been corrupted or altered in some way outside of this process.  As the
37    /// ledger is fixed in size, and does not grow during the life of the process, common errors
38    /// such as running out of disk space will not typically be relevant (or possible) here.
39    #[snafu(display("ledger I/O error: {}", source))]
40    Io { source: io::Error },
41
42    /// The ledger is already opened by another Vector process.
43    ///
44    /// Advisory locking is used to prevent other Vector processes from concurrently opening the
45    /// same buffer, but bear in mind that this does not prevent other processes or users from
46    /// modifying the ledger file in a way that could cause undefined behavior during buffer operation.
47    #[snafu(display(
48        "failed to lock buffer.lock; is another Vector process running and using this buffer?"
49    ))]
50    LedgerLockAlreadyHeld,
51
52    /// The ledger state was unable to be deserialized.
53    ///
54    /// This should only occur if the ledger file was modified or truncated out of the Vector
55    /// process.  In rare situations, if the ledger state type (`LedgerState`, here in ledger.rs)
56    /// was modified, then the layout may now be out-of-line with the structure as it exists on disk.
57    ///
58    /// We have many strongly-worded warnings to not do this unless a developer absolutely knows
59    /// what they're doing, but it is still technically a possibility. :)
60    #[snafu(display("failed to deserialize ledger from buffer: {}", reason))]
61    FailedToDeserialize { reason: String },
62
63    /// The ledger state was unable to be serialized.
64    ///
65    /// This only occurs when initially creating a new buffer where the ledger state has not yet
66    /// been written to disk.  During normal operation, the ledger is memory-mapped directly and so
67    /// serialization does not occur.
68    ///
69    /// This error is likely only to occur if the process is unable to allocate memory for the
70    /// buffers required for the serialization step.
71    #[snafu(display("failed to serialize ledger to buffer: {}", reason))]
72    FailedToSerialize { reason: String },
73}
74
75/// Ledger state.
76///
77/// Stores the relevant information related to both the reader and writer.  Gets serialized and
78/// stored on disk, and is managed via a memory-mapped file.
79///
80/// # Warning
81///
82/// - Do not add fields to this struct.
83/// - Do not remove fields from this struct.
84/// - Do not change the type of fields in this struct.
85/// - Do not change the order of fields this struct.
86///
87/// Doing so will change the serialized representation.  This will break things.
88///
89/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
90#[derive(Archive, Serialize, Debug)]
91#[archive_attr(derive(CheckBytes, Debug))]
92pub struct LedgerState {
93    /// Next record ID to use when writing a record.
94    #[with(Atomic)]
95    writer_next_record: AtomicU64,
96    /// The current data file ID being written to.
97    #[with(Atomic)]
98    writer_current_data_file: AtomicU16,
99    /// The current data file ID being read from.
100    #[with(Atomic)]
101    reader_current_data_file: AtomicU16,
102    /// The last record ID read by the reader.
103    #[with(Atomic)]
104    reader_last_record: AtomicU64,
105}
106
107impl Default for LedgerState {
108    fn default() -> Self {
109        Self {
110            // First record written is always 1, so that our default of 0 for
111            // `reader_last_record_id` ensures we start up in a state of "alright, waiting to read
112            // record #1 next".
113            writer_next_record: AtomicU64::new(1),
114            writer_current_data_file: AtomicU16::new(0),
115            reader_current_data_file: AtomicU16::new(0),
116            reader_last_record: AtomicU64::new(0),
117        }
118    }
119}
120
121impl ArchivedLedgerState {
122    fn get_current_writer_file_id(&self) -> u16 {
123        self.writer_current_data_file.load(Ordering::Acquire)
124    }
125
126    fn get_next_writer_file_id(&self) -> u16 {
127        (self.get_current_writer_file_id() + 1) % MAX_FILE_ID
128    }
129
130    pub(super) fn increment_writer_file_id(&self) {
131        self.writer_current_data_file
132            .store(self.get_next_writer_file_id(), Ordering::Release);
133    }
134
135    pub(super) fn get_next_writer_record_id(&self) -> u64 {
136        self.writer_next_record.load(Ordering::Acquire)
137    }
138
139    pub(super) fn increment_next_writer_record_id(&self, amount: u64) -> u64 {
140        let previous = self.writer_next_record.fetch_add(amount, Ordering::AcqRel);
141        previous.wrapping_add(amount)
142    }
143
144    fn get_current_reader_file_id(&self) -> u16 {
145        self.reader_current_data_file.load(Ordering::Acquire)
146    }
147
148    fn get_next_reader_file_id(&self) -> u16 {
149        (self.get_current_reader_file_id() + 1) % MAX_FILE_ID
150    }
151
152    fn get_offset_reader_file_id(&self, offset: u16) -> u16 {
153        self.get_current_reader_file_id().wrapping_add(offset) % MAX_FILE_ID
154    }
155
156    fn increment_reader_file_id(&self) -> u16 {
157        let value = self.get_next_reader_file_id();
158        self.reader_current_data_file
159            .store(value, Ordering::Release);
160        value
161    }
162
163    pub(super) fn get_last_reader_record_id(&self) -> u64 {
164        self.reader_last_record.load(Ordering::Acquire)
165    }
166
167    pub(super) fn increment_last_reader_record_id(&self, amount: u64) {
168        self.reader_last_record.fetch_add(amount, Ordering::AcqRel);
169    }
170
171    #[cfg(test)]
172    pub unsafe fn unsafe_set_writer_next_record_id(&self, id: u64) {
173        // UNSAFETY:
174        // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
175        // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
176        // reader.
177        //
178        // This is exclusively used under test to make it possible to check certain edge cases, as
179        // writing enough records to actually increment it to the maximum value would take longer
180        // than any of us will be alive.
181        //
182        // Despite it being test-only, we're really amping up the "this is only for testing!" factor
183        // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
184        self.writer_next_record.store(id, Ordering::Release);
185    }
186
187    #[cfg(test)]
188    pub unsafe fn unsafe_set_reader_last_record_id(&self, id: u64) {
189        // UNSAFETY:
190        // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
191        // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
192        // reader.
193        //
194        // This is exclusively used under test to make it possible to check certain edge cases, as
195        // writing enough records to actually increment it to the maximum value would take longer
196        // than any of us will be alive.
197        //
198        // Despite it being test-only, we're really amping up the "this is only for testing!" factor
199        // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
200        self.reader_last_record.store(id, Ordering::Release);
201    }
202}
203
204/// Tracks the internal state of the buffer.
205pub(crate) struct Ledger<FS>
206where
207    FS: Filesystem,
208{
209    // Buffer configuration.
210    config: DiskBufferConfig<FS>,
211    // Advisory lock for this buffer directory.
212    #[allow(dead_code)]
213    lock: LockFile,
214    // Ledger state.
215    state: BackedArchive<FS::MutableMemoryMap, LedgerState>,
216    // The total size, in bytes, of all unread records in the buffer.
217    total_buffer_size: AtomicU64,
218    // Notifier for reader-related progress.
219    reader_notify: Notify,
220    // Notifier for writer-related progress.
221    writer_notify: Notify,
222    // Tracks when writer has fully shutdown.
223    writer_done: AtomicBool,
224    // Number of pending record acknowledgements that have yeet to be consumed by the reader.
225    pending_acks: AtomicU64,
226    // The file ID offset of the reader past the acknowledged reader file ID.
227    unacked_reader_file_id_offset: AtomicU16,
228    // Last flush of all unflushed files: ledger, data file, etc.
229    last_flush: AtomicCell<Instant>,
230    // Tracks usage data about the buffer.
231    usage_handle: BufferUsageHandle,
232}
233
234impl<FS> Ledger<FS>
235where
236    FS: Filesystem,
237{
238    /// Gets the configuration for the buffer that this ledger represents.
239    pub fn config(&self) -> &DiskBufferConfig<FS> {
240        &self.config
241    }
242
243    /// Gets the filesystem configured for this buffer.
244    pub fn filesystem(&self) -> &FS {
245        &self.config.filesystem
246    }
247
248    /// Gets the internal ledger state.
249    ///
250    /// This is the information persisted to disk.
251    pub fn state(&self) -> &ArchivedLedgerState {
252        self.state.get_archive_ref()
253    }
254
255    /// Gets the total number of unread records in the buffer.
256    ///
257    /// This number is based on acknowledged reads only, which is to say that if 10 records are
258    /// written, and 8 of them have been read, but only 3 have been acked, then `get_total_records`
259    /// would return `7`.
260    pub fn get_total_records(&self) -> u64 {
261        let next_writer_id = self.state().get_next_writer_record_id();
262        let last_reader_id = self.state().get_last_reader_record_id();
263
264        next_writer_id.wrapping_sub(last_reader_id) - 1
265    }
266
267    /// Gets the total number of bytes for all unread records in the buffer.
268    ///
269    /// This number will often disagree with the size of files on disk, as data files are deleted
270    /// only after being read entirely, and are simply appended to when they are not yet full.  This
271    /// leads to behavior where writes and reads will change this value only by the size of the
272    /// records being written and read, while data files on disk will grow incrementally, and be
273    /// deleted in full.
274    pub fn get_total_buffer_size(&self) -> u64 {
275        self.total_buffer_size.load(Ordering::Acquire)
276    }
277
278    /// Increments the total number of bytes for all unread records in the buffer.
279    pub fn increment_total_buffer_size(&self, amount: u64) {
280        let last_total_buffer_size = self.total_buffer_size.fetch_add(amount, Ordering::AcqRel);
281        trace!(
282            previous_buffer_size = last_total_buffer_size,
283            new_buffer_size = last_total_buffer_size + amount,
284            "Updated buffer size.",
285        );
286    }
287
288    /// Decrements the total number of bytes for all unread records in the buffer.
289    pub fn decrement_total_buffer_size(&self, amount: u64) {
290        let last_total_buffer_size = self.total_buffer_size.fetch_sub(amount, Ordering::AcqRel);
291        trace!(
292            previous_buffer_size = last_total_buffer_size,
293            new_buffer_size = last_total_buffer_size - amount,
294            "Updated buffer size.",
295        );
296    }
297
298    /// Gets the current reader file ID.
299    ///
300    /// This is internally adjusted to compensate for the fact that the reader can read far past
301    /// the latest acknowledge record/data file, and so is not representative of where the reader
302    /// would start reading from if the process crashed or was abruptly stopped.
303    pub fn get_current_reader_file_id(&self) -> u16 {
304        let unacked_offset = self.unacked_reader_file_id_offset.load(Ordering::Acquire);
305        self.state().get_offset_reader_file_id(unacked_offset)
306    }
307
308    /// Gets the current writer file ID.
309    pub fn get_current_writer_file_id(&self) -> u16 {
310        self.state().get_current_writer_file_id()
311    }
312
313    /// Gets the next writer file ID.
314    ///
315    /// This is purely a future-looking operation i.e. what would the file ID be if it was
316    /// incremented from its current value.  It does not alter the current writer file ID.
317    #[cfg(test)]
318    pub fn get_next_writer_file_id(&self) -> u16 {
319        self.state().get_next_writer_file_id()
320    }
321
322    /// Gets the current reader and writer file IDs.
323    ///
324    /// Similar to [`get_current_reader_file_id`], the file ID returned for the reader compensates
325    /// for the acknowledgement state of the reader.
326    pub fn get_current_reader_writer_file_id(&self) -> (u16, u16) {
327        let reader = self.get_current_reader_file_id();
328        let writer = self.get_current_writer_file_id();
329
330        (reader, writer)
331    }
332
333    /// Gets the current reader data file path, accounting for the unacknowledged offset.
334    pub fn get_current_reader_data_file_path(&self) -> PathBuf {
335        self.get_data_file_path(self.get_current_reader_file_id())
336    }
337
338    /// Gets the current writer data file path.
339    pub fn get_current_writer_data_file_path(&self) -> PathBuf {
340        self.get_data_file_path(self.get_current_writer_file_id())
341    }
342
343    /// Gets the next writer data file path.
344    pub fn get_next_writer_data_file_path(&self) -> PathBuf {
345        self.get_data_file_path(self.state().get_next_writer_file_id())
346    }
347
348    /// Gets the data file path for an arbitrary file ID.
349    pub fn get_data_file_path(&self, file_id: u16) -> PathBuf {
350        self.config
351            .data_dir
352            .join(format!("buffer-data-{file_id}.dat"))
353    }
354
355    /// Waits for a signal from the reader that progress has been made.
356    ///
357    /// This will only occur when a record is read, which may allow enough space (below the maximum
358    /// configured buffer size) for a write to occur, or similarly, when a data file is deleted.
359    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
360    pub async fn wait_for_reader(&self) {
361        self.reader_notify.notified().await;
362    }
363
364    /// Waits for a signal from the writer that progress has been made.
365    ///
366    /// This will occur when a record is written, or when a new data file is created.
367    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
368    pub async fn wait_for_writer(&self) {
369        self.writer_notify.notified().await;
370    }
371
372    /// Notifies all tasks waiting on progress by the reader.
373    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
374    pub fn notify_reader_waiters(&self) {
375        self.reader_notify.notify_one();
376    }
377
378    /// Notifies all tasks waiting on progress by the writer.
379    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
380    pub fn notify_writer_waiters(&self) {
381        self.writer_notify.notify_one();
382    }
383
384    /// Tracks the statistics of a successful write.
385    pub fn track_write(&self, event_count: u64, record_size: u64) {
386        self.increment_total_buffer_size(record_size);
387        self.usage_handle
388            .increment_received_event_count_and_byte_size(event_count, record_size);
389    }
390
391    /// Tracks the statistics of multiple successful reads.
392    pub fn track_reads(&self, event_count: u64, total_record_size: u64) {
393        self.decrement_total_buffer_size(total_record_size);
394        self.usage_handle
395            .increment_sent_event_count_and_byte_size(event_count, total_record_size);
396    }
397
398    /// Marks the writer as finished.
399    ///
400    /// If the writer was not yet marked done, `false` is returned.  Otherwise, `true` is returned,
401    /// and the caller should handle any necessary logic for closing the writer.
402    pub fn mark_writer_done(&self) -> bool {
403        self.writer_done
404            .compare_exchange_weak(false, true, Ordering::SeqCst, Ordering::SeqCst)
405            .is_ok()
406    }
407
408    /// Returns `true` if the writer was marked as done.
409    pub fn is_writer_done(&self) -> bool {
410        self.writer_done.load(Ordering::Acquire)
411    }
412
413    /// Increments the pending acknowledgement counter by the given amount.
414    pub fn increment_pending_acks(&self, amount: u64) {
415        self.pending_acks.fetch_add(amount, Ordering::AcqRel);
416    }
417
418    /// Consumes the full amount of pending acknowledgements, and resets the counter to zero.
419    pub fn consume_pending_acks(&self) -> u64 {
420        self.pending_acks.swap(0, Ordering::AcqRel)
421    }
422
423    /// Increments the unacknowledged reader file ID.
424    ///
425    /// As further described in `increment_acked_reader_file_id`, the underlying value here allows
426    /// the reader to read ahead of a data file, even if it hasn't been durably processed yet.
427    pub fn increment_unacked_reader_file_id(&self) {
428        let last_unacked_reader_file_id_offset = self
429            .unacked_reader_file_id_offset
430            .fetch_add(1, Ordering::AcqRel);
431        trace!(
432            unacked_reader_file_id_offset = last_unacked_reader_file_id_offset + 1,
433            "Incremented unacknowledged reader file ID."
434        );
435    }
436
437    /// Increments the acknowledged reader file ID.
438    ///
439    /// As records may be read and stored for a small period of time (batching in a sink, etc), we
440    /// cannot truly say that we have durably processed a record until the caller acknowledges the
441    /// record.  However, if we always waited for an acknowledgement, then each read could be forced
442    /// to wait for multiple seconds.  Such a design would clearly be unusable.
443    ///
444    /// Instead, we allow the reader to move ahead of the latest acknowledged record by tracking
445    /// their current file ID and acknowledged file ID separately.  Once all records in a file have
446    /// been acknowledged, the data file can be deleted and the reader file ID can be durably
447    /// stored in the ledger.
448    ///
449    /// Callers use [`increment_unacked_reader_file_id`] to move to the next data file without
450    /// tracking that the previous data file has been durably processed and can be deleted, and
451    /// [`increment_acked_reader_file_id`] is the reciprocal function which tracks the highest data
452    /// file that _has_ been durably processed.
453    ///
454    /// Since the unacked file ID is simply a relative offset to the acked file ID, we decrement it
455    /// here to keep the "current" file ID stable.
456    pub fn increment_acked_reader_file_id(&self) {
457        let new_reader_file_id = self.state().increment_reader_file_id();
458
459        // We ignore the return value because when the value is already zero, we don't want to do an
460        // update, so we return `None`, which causes `fetch_update` to return `Err`.  It's not
461        // really an error, we just wanted to avoid the extra atomic compare/exchange.
462        //
463        // Basically, this call is actually infallible for our purposes.
464        let result = self.unacked_reader_file_id_offset.fetch_update(
465            Ordering::Release,
466            Ordering::Relaxed,
467            |n| {
468                if n == 0 {
469                    None
470                } else {
471                    Some(n - 1)
472                }
473            },
474        );
475
476        trace!(
477            unacked_reader_file_id_offset = result.map(|n| n - 1).unwrap_or(0),
478            acked_reader_file_id_offset = new_reader_file_id,
479            "Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement."
480        );
481    }
482
483    /// Determines whether or not all files should be flushed/fsync'd to disk.
484    ///
485    /// In the case of concurrent callers when the flush deadline has been exceeded, only one caller
486    /// will get a return value of `true`, and the others will receive `false`.  The caller that
487    /// receives `true` is responsible for flushing the necessary files.
488    pub fn should_flush(&self) -> bool {
489        let last_flush = self.last_flush.load();
490        if last_flush.elapsed() > self.config.flush_interval
491            && self
492                .last_flush
493                .compare_exchange(last_flush, Instant::now())
494                .is_ok()
495        {
496            return true;
497        }
498
499        false
500    }
501
502    /// Flushes the memory-mapped file backing the ledger to disk.
503    ///
504    /// This operation is synchronous.
505    ///
506    /// # Errors
507    ///
508    /// If there is an error while flushing the ledger to disk, an error variant will be returned
509    /// describing the error.
510    pub(super) fn flush(&self) -> io::Result<()> {
511        self.state.get_backing_ref().flush()
512    }
513
514    /// Synchronizes the record count and total size of the buffer with buffer usage data.
515    ///
516    /// This should not be called until both the reader and writer have been initialized via
517    /// [`Reader::seek_to_last_record`] and [`Writer::validate_last_write`], otherwise the values
518    /// will not be accurate.
519    pub fn synchronize_buffer_usage(&self) {
520        let initial_buffer_events = self.get_total_records();
521        let initial_buffer_size = self.get_total_buffer_size();
522        self.usage_handle
523            .increment_received_event_count_and_byte_size(
524                initial_buffer_events,
525                initial_buffer_size,
526            );
527    }
528
529    pub fn track_dropped_events(&self, count: u64) {
530        // We don't know how many bytes are represented by dropped events because we never actually had a chance to read
531        // them, so we have to use a byte size of 0 here.
532        //
533        // On the flipside, this would only matter if we incremented the buffer size and simultaneously skipped/lost the
534        // events within the same process lifecycle, since otherwise we'd start from the correct buffer size when
535        // loading the buffer initially.
536        //
537        // TODO: Can we do better here?
538        self.usage_handle
539            .increment_dropped_event_count_and_byte_size(count, 0, false);
540    }
541}
542
543impl<FS> Ledger<FS>
544where
545    FS: Filesystem + 'static,
546    FS::File: Unpin,
547{
548    /// Loads or creates a ledger for the given [`DiskBufferConfig`].
549    ///
550    /// If the ledger file does not yet exist, a default ledger state will be created and persisted
551    /// to disk.  Otherwise, the ledger file on disk will be loaded and verified.
552    ///
553    /// # Errors
554    ///
555    /// If there is an error during either serialization of the new, default ledger state, or
556    /// deserializing existing data in the ledger file, or generally during the underlying I/O
557    /// operations, an error variant will be returned describing the error.
558    #[cfg_attr(test, instrument(skip_all, level = "trace"))]
559    pub(super) async fn load_or_create(
560        config: DiskBufferConfig<FS>,
561        usage_handle: BufferUsageHandle,
562    ) -> Result<Ledger<FS>, LedgerLoadCreateError> {
563        // Create our containing directory if it doesn't already exist.
564        fs::create_dir_all(&config.data_dir)
565            .await
566            .context(IoSnafu)?;
567
568        // Acquire an exclusive lock on our lock file, which prevents another Vector process from
569        // loading this buffer and clashing with us.  Specifically, though: this does _not_ prevent
570        // another process from messing with our ledger files, or any of the data files, etc.
571        //
572        // TODO: It'd be nice to incorporate this within `Filesystem` to fully encapsulate _all_
573        // file I/O, but the code is so specific, including the drop guard for the lock file, that I
574        // don't know if it's worth it.
575        let ledger_lock_path = config.data_dir.join("buffer.lock");
576        let mut lock = LockFile::open(&ledger_lock_path).context(IoSnafu)?;
577        if !lock.try_lock().context(IoSnafu)? {
578            return Err(LedgerLoadCreateError::LedgerLockAlreadyHeld);
579        }
580
581        // Open the ledger file, which may involve creating it if it doesn't yet exist.
582        let ledger_path = config.data_dir.join("buffer.db");
583        let mut ledger_handle = config
584            .filesystem
585            .open_file_writable(&ledger_path)
586            .await
587            .context(IoSnafu)?;
588
589        // If we just created the ledger file, then we need to create the default ledger state, and
590        // then serialize and write to the file, before trying to load it as a memory-mapped file.
591        let ledger_metadata = ledger_handle.metadata().await.context(IoSnafu)?;
592        let ledger_len = ledger_metadata.len();
593        if ledger_len == 0 {
594            debug!("Ledger file empty.  Initializing with default ledger state.");
595            let mut buf = BytesMut::new();
596            loop {
597                match BackedArchive::from_value(&mut buf, LedgerState::default()) {
598                    Ok(archive) => {
599                        ledger_handle
600                            .write_all(archive.get_backing_ref())
601                            .await
602                            .context(IoSnafu)?;
603                        break;
604                    }
605                    Err(SerializeError::FailedToSerialize(reason)) => {
606                        return Err(LedgerLoadCreateError::FailedToSerialize { reason })
607                    }
608                    // Our buffer wasn't big enough, but that's OK!  Resize it and try again.
609                    Err(SerializeError::BackingStoreTooSmall(_, min_len)) => buf.resize(min_len, 0),
610                }
611            }
612
613            // Now sync the file to ensure everything is on disk before proceeding.
614            ledger_handle.sync_all().await.context(IoSnafu)?;
615        }
616
617        // Load the ledger state by memory-mapping the ledger file, and zero-copy deserializing our
618        // ledger state back out of it.
619        let ledger_mmap = config
620            .filesystem
621            .open_mmap_writable(&ledger_path)
622            .await
623            .context(IoSnafu)?;
624        let ledger_state = match BackedArchive::from_backing(ledger_mmap) {
625            // Deserialized the ledger state without issue from an existing file.
626            Ok(backed) => backed,
627            // Either invalid data, or the buffer doesn't represent a valid ledger structure.
628            Err(e) => {
629                return Err(LedgerLoadCreateError::FailedToDeserialize {
630                    reason: e.into_inner(),
631                })
632            }
633        };
634
635        // Create the ledger object, and synchronize the buffer statistics with the buffer usage
636        // handle.  This handles making sure we account for the starting size of the buffer, and
637        // what not.
638        let mut ledger = Ledger {
639            config,
640            lock,
641            state: ledger_state,
642            total_buffer_size: AtomicU64::new(0),
643            reader_notify: Notify::new(),
644            writer_notify: Notify::new(),
645            writer_done: AtomicBool::new(false),
646            pending_acks: AtomicU64::new(0),
647            unacked_reader_file_id_offset: AtomicU16::new(0),
648            last_flush: AtomicCell::new(Instant::now()),
649            usage_handle,
650        };
651        ledger.update_buffer_size().await?;
652
653        Ok(ledger)
654    }
655
656    async fn update_buffer_size(&mut self) -> Result<(), LedgerLoadCreateError> {
657        // Under normal operation, the reader and writer maintain a consistent state within the
658        // ledger.  However, due to the nature of how we update the ledger, process crashes could
659        // lead to missed updates as we execute reads and writes as non-atomic units of execution:
660        // update a field, do the read/write, update some more fields depending on success or
661        // failure, etc.
662        //
663        // This is an issue because we depend on knowing the total buffer size (the total size of
664        // unread records, specifically) so that we can correctly limit writes when we've reached
665        // the configured maximum buffer size.
666        //
667        // While it's not terribly efficient, and I'd like to eventually formulate a better design,
668        // this approach is absolutely correct: get the file size of every data file on disk,
669        // and set the "total buffer size" to the sum of all of those file sizes.
670        //
671        // When the reader does any necessary seeking to get to the record it left off on, it will
672        // adjust the "total buffer size" downwards for each record it runs through, leaving "total
673        // buffer size" at the correct value.
674        let mut dat_reader = fs::read_dir(&self.config.data_dir).await.context(IoSnafu)?;
675
676        let mut total_buffer_size = 0;
677        while let Some(dir_entry) = dat_reader.next_entry().await.context(IoSnafu)? {
678            if let Some(file_name) = dir_entry.file_name().to_str() {
679                // I really _do_ want to only find files with a .dat extension, as that's what the
680                // code generates, and having them be .dAt or .Dat or whatever would indicate that
681                // the file is not related to our buffer.  If we had to cope with case-sensitivity
682                // of filenames from another program/OS, then it would be a different story.
683                #[allow(clippy::case_sensitive_file_extension_comparisons)]
684                if file_name.ends_with(".dat") {
685                    let metadata = dir_entry.metadata().await.context(IoSnafu)?;
686                    total_buffer_size += metadata.len();
687
688                    debug!(
689                        data_file = file_name,
690                        file_size = metadata.len(),
691                        total_buffer_size,
692                        "Found existing data file."
693                    );
694                }
695            }
696        }
697
698        self.increment_total_buffer_size(total_buffer_size);
699
700        Ok(())
701    }
702
703    #[must_use]
704    pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
705        let (finalizer, mut stream) = OrderedFinalizer::new(None);
706        tokio::spawn(async move {
707            while let Some((_status, amount)) = stream.next().await {
708                self.increment_pending_acks(amount);
709                self.notify_writer_waiters();
710            }
711        });
712        finalizer
713    }
714}
715
716impl<FS> fmt::Debug for Ledger<FS>
717where
718    FS: Filesystem + fmt::Debug,
719{
720    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721        f.debug_struct("Ledger")
722            .field("config", &self.config)
723            .field("state", &self.state.get_archive_ref())
724            .field(
725                "total_buffer_size",
726                &self.total_buffer_size.load(Ordering::Acquire),
727            )
728            .field("pending_acks", &self.pending_acks.load(Ordering::Acquire))
729            .field(
730                "unacked_reader_file_id_offset",
731                &self.unacked_reader_file_id_offset.load(Ordering::Acquire),
732            )
733            .field("writer_done", &self.writer_done.load(Ordering::Acquire))
734            .field("last_flush", &self.last_flush.load())
735            .finish_non_exhaustive()
736    }
737}