vector_buffers/variants/disk_v2/
ledger.rs

1use std::{
2    fmt, io, mem,
3    path::PathBuf,
4    sync::{
5        Arc,
6        atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
7    },
8    time::Instant,
9};
10
11use bytecheck::CheckBytes;
12use bytes::BytesMut;
13use crossbeam_utils::atomic::AtomicCell;
14use fslock::LockFile;
15use futures::StreamExt;
16use rkyv::{Archive, Serialize, with::Atomic};
17use snafu::{ResultExt, Snafu};
18use tokio::{fs, io::AsyncWriteExt, sync::Notify};
19use vector_common::finalizer::OrderedFinalizer;
20
21use super::{
22    Filesystem,
23    backed_archive::BackedArchive,
24    common::{DiskBufferConfig, MAX_FILE_ID, align16},
25    io::{AsyncFile, WritableMemoryMap},
26    ser::SerializeError,
27};
28use crate::buffer_usage_data::BufferUsageHandle;
29
30pub const LEDGER_LEN: usize = align16(mem::size_of::<ArchivedLedgerState>());
31
32/// Error that occurred during calls to [`Ledger`].
33#[derive(Debug, Snafu)]
34pub enum LedgerLoadCreateError {
35    /// A general I/O error occurred.
36    ///
37    /// Generally, I/O errors should only occur when flushing the ledger state and the underlying
38    /// ledger file has been corrupted or altered in some way outside of this process.  As the
39    /// ledger is fixed in size, and does not grow during the life of the process, common errors
40    /// such as running out of disk space will not typically be relevant (or possible) here.
41    #[snafu(display("ledger I/O error: {}", source))]
42    Io { source: io::Error },
43
44    /// The ledger is already opened by another Vector process.
45    ///
46    /// Advisory locking is used to prevent other Vector processes from concurrently opening the
47    /// same buffer, but bear in mind that this does not prevent other processes or users from
48    /// modifying the ledger file in a way that could cause undefined behavior during buffer operation.
49    #[snafu(display(
50        "failed to lock buffer.lock; is another Vector process running and using this buffer?"
51    ))]
52    LedgerLockAlreadyHeld,
53
54    /// The ledger state was unable to be deserialized.
55    ///
56    /// This should only occur if the ledger file was modified or truncated out of the Vector
57    /// process.  In rare situations, if the ledger state type (`LedgerState`, here in ledger.rs)
58    /// was modified, then the layout may now be out-of-line with the structure as it exists on disk.
59    ///
60    /// We have many strongly-worded warnings to not do this unless a developer absolutely knows
61    /// what they're doing, but it is still technically a possibility. :)
62    #[snafu(display("failed to deserialize ledger from buffer: {}", reason))]
63    FailedToDeserialize { reason: String },
64
65    /// The ledger state was unable to be serialized.
66    ///
67    /// This only occurs when initially creating a new buffer where the ledger state has not yet
68    /// been written to disk.  During normal operation, the ledger is memory-mapped directly and so
69    /// serialization does not occur.
70    ///
71    /// This error is likely only to occur if the process is unable to allocate memory for the
72    /// buffers required for the serialization step.
73    #[snafu(display("failed to serialize ledger to buffer: {}", reason))]
74    FailedToSerialize { reason: String },
75}
76
77/// Ledger state.
78///
79/// Stores the relevant information related to both the reader and writer.  Gets serialized and
80/// stored on disk, and is managed via a memory-mapped file.
81///
82/// # Warning
83///
84/// - Do not add fields to this struct.
85/// - Do not remove fields from this struct.
86/// - Do not change the type of fields in this struct.
87/// - Do not change the order of fields this struct.
88///
89/// Doing so will change the serialized representation.  This will break things.
90///
91/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
92#[derive(Archive, Serialize, Debug)]
93#[archive_attr(derive(CheckBytes, Debug))]
94pub struct LedgerState {
95    /// Next record ID to use when writing a record.
96    #[with(Atomic)]
97    writer_next_record: AtomicU64,
98    /// The current data file ID being written to.
99    #[with(Atomic)]
100    writer_current_data_file: AtomicU16,
101    /// The current data file ID being read from.
102    #[with(Atomic)]
103    reader_current_data_file: AtomicU16,
104    /// The last record ID read by the reader.
105    #[with(Atomic)]
106    reader_last_record: AtomicU64,
107}
108
109impl Default for LedgerState {
110    fn default() -> Self {
111        Self {
112            // First record written is always 1, so that our default of 0 for
113            // `reader_last_record_id` ensures we start up in a state of "alright, waiting to read
114            // record #1 next".
115            writer_next_record: AtomicU64::new(1),
116            writer_current_data_file: AtomicU16::new(0),
117            reader_current_data_file: AtomicU16::new(0),
118            reader_last_record: AtomicU64::new(0),
119        }
120    }
121}
122
123impl ArchivedLedgerState {
124    fn get_current_writer_file_id(&self) -> u16 {
125        self.writer_current_data_file.load(Ordering::Acquire)
126    }
127
128    fn get_next_writer_file_id(&self) -> u16 {
129        (self.get_current_writer_file_id() + 1) % MAX_FILE_ID
130    }
131
132    pub(super) fn increment_writer_file_id(&self) {
133        self.writer_current_data_file
134            .store(self.get_next_writer_file_id(), Ordering::Release);
135    }
136
137    pub(super) fn get_next_writer_record_id(&self) -> u64 {
138        self.writer_next_record.load(Ordering::Acquire)
139    }
140
141    pub(super) fn increment_next_writer_record_id(&self, amount: u64) -> u64 {
142        let previous = self.writer_next_record.fetch_add(amount, Ordering::AcqRel);
143        previous.wrapping_add(amount)
144    }
145
146    fn get_current_reader_file_id(&self) -> u16 {
147        self.reader_current_data_file.load(Ordering::Acquire)
148    }
149
150    fn get_next_reader_file_id(&self) -> u16 {
151        (self.get_current_reader_file_id() + 1) % MAX_FILE_ID
152    }
153
154    fn get_offset_reader_file_id(&self, offset: u16) -> u16 {
155        self.get_current_reader_file_id().wrapping_add(offset) % MAX_FILE_ID
156    }
157
158    fn increment_reader_file_id(&self) -> u16 {
159        let value = self.get_next_reader_file_id();
160        self.reader_current_data_file
161            .store(value, Ordering::Release);
162        value
163    }
164
165    pub(super) fn get_last_reader_record_id(&self) -> u64 {
166        self.reader_last_record.load(Ordering::Acquire)
167    }
168
169    pub(super) fn increment_last_reader_record_id(&self, amount: u64) {
170        self.reader_last_record.fetch_add(amount, Ordering::AcqRel);
171    }
172
173    #[cfg(test)]
174    pub unsafe fn unsafe_set_writer_next_record_id(&self, id: u64) {
175        // UNSAFETY:
176        // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
177        // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
178        // reader.
179        //
180        // This is exclusively used under test to make it possible to check certain edge cases, as
181        // writing enough records to actually increment it to the maximum value would take longer
182        // than any of us will be alive.
183        //
184        // Despite it being test-only, we're really amping up the "this is only for testing!" factor
185        // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
186        self.writer_next_record.store(id, Ordering::Release);
187    }
188
189    #[cfg(test)]
190    pub unsafe fn unsafe_set_reader_last_record_id(&self, id: u64) {
191        // UNSAFETY:
192        // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
193        // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
194        // reader.
195        //
196        // This is exclusively used under test to make it possible to check certain edge cases, as
197        // writing enough records to actually increment it to the maximum value would take longer
198        // than any of us will be alive.
199        //
200        // Despite it being test-only, we're really amping up the "this is only for testing!" factor
201        // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
202        self.reader_last_record.store(id, Ordering::Release);
203    }
204}
205
206/// Tracks the internal state of the buffer.
207pub(crate) struct Ledger<FS>
208where
209    FS: Filesystem,
210{
211    // Buffer configuration.
212    config: DiskBufferConfig<FS>,
213    // Advisory lock for this buffer directory.
214    #[allow(dead_code)]
215    lock: LockFile,
216    // Ledger state.
217    state: BackedArchive<FS::MutableMemoryMap, LedgerState>,
218    // The total size, in bytes, of all unread records in the buffer.
219    total_buffer_size: AtomicU64,
220    // Notifier for reader-related progress.
221    reader_notify: Notify,
222    // Notifier for writer-related progress.
223    writer_notify: Notify,
224    // Tracks when writer has fully shutdown.
225    writer_done: AtomicBool,
226    // Number of pending record acknowledgements that have yeet to be consumed by the reader.
227    pending_acks: AtomicU64,
228    // The file ID offset of the reader past the acknowledged reader file ID.
229    unacked_reader_file_id_offset: AtomicU16,
230    // Last flush of all unflushed files: ledger, data file, etc.
231    last_flush: AtomicCell<Instant>,
232    // Tracks usage data about the buffer.
233    usage_handle: BufferUsageHandle,
234}
235
236impl<FS> Ledger<FS>
237where
238    FS: Filesystem,
239{
240    /// Gets the configuration for the buffer that this ledger represents.
241    pub fn config(&self) -> &DiskBufferConfig<FS> {
242        &self.config
243    }
244
245    /// Gets the filesystem configured for this buffer.
246    pub fn filesystem(&self) -> &FS {
247        &self.config.filesystem
248    }
249
250    /// Gets the internal ledger state.
251    ///
252    /// This is the information persisted to disk.
253    pub fn state(&self) -> &ArchivedLedgerState {
254        self.state.get_archive_ref()
255    }
256
257    /// Gets the total number of unread records in the buffer.
258    ///
259    /// This number is based on acknowledged reads only, which is to say that if 10 records are
260    /// written, and 8 of them have been read, but only 3 have been acked, then `get_total_records`
261    /// would return `7`.
262    pub fn get_total_records(&self) -> u64 {
263        let next_writer_id = self.state().get_next_writer_record_id();
264        let last_reader_id = self.state().get_last_reader_record_id();
265
266        next_writer_id.wrapping_sub(last_reader_id) - 1
267    }
268
269    /// Gets the total number of bytes for all unread records in the buffer.
270    ///
271    /// This number will often disagree with the size of files on disk, as data files are deleted
272    /// only after being read entirely, and are simply appended to when they are not yet full.  This
273    /// leads to behavior where writes and reads will change this value only by the size of the
274    /// records being written and read, while data files on disk will grow incrementally, and be
275    /// deleted in full.
276    pub fn get_total_buffer_size(&self) -> u64 {
277        self.total_buffer_size.load(Ordering::Acquire)
278    }
279
280    /// Increments the total number of bytes for all unread records in the buffer.
281    pub fn increment_total_buffer_size(&self, amount: u64) {
282        let last_total_buffer_size = self.total_buffer_size.fetch_add(amount, Ordering::AcqRel);
283        trace!(
284            previous_buffer_size = last_total_buffer_size,
285            new_buffer_size = last_total_buffer_size + amount,
286            "Updated buffer size.",
287        );
288    }
289
290    /// Decrements the total number of bytes for all unread records in the buffer.
291    pub fn decrement_total_buffer_size(&self, amount: u64) {
292        let last_total_buffer_size = self.total_buffer_size.fetch_sub(amount, Ordering::AcqRel);
293        trace!(
294            previous_buffer_size = last_total_buffer_size,
295            new_buffer_size = last_total_buffer_size - amount,
296            "Updated buffer size.",
297        );
298    }
299
300    /// Gets the current reader file ID.
301    ///
302    /// This is internally adjusted to compensate for the fact that the reader can read far past
303    /// the latest acknowledge record/data file, and so is not representative of where the reader
304    /// would start reading from if the process crashed or was abruptly stopped.
305    pub fn get_current_reader_file_id(&self) -> u16 {
306        let unacked_offset = self.unacked_reader_file_id_offset.load(Ordering::Acquire);
307        self.state().get_offset_reader_file_id(unacked_offset)
308    }
309
310    /// Gets the current writer file ID.
311    pub fn get_current_writer_file_id(&self) -> u16 {
312        self.state().get_current_writer_file_id()
313    }
314
315    /// Gets the next writer file ID.
316    ///
317    /// This is purely a future-looking operation i.e. what would the file ID be if it was
318    /// incremented from its current value.  It does not alter the current writer file ID.
319    pub fn get_next_writer_file_id(&self) -> u16 {
320        self.state().get_next_writer_file_id()
321    }
322
323    /// Gets the current reader and writer file IDs.
324    ///
325    /// Similar to [`get_current_reader_file_id`], the file ID returned for the reader compensates
326    /// for the acknowledgement state of the reader.
327    pub fn get_current_reader_writer_file_id(&self) -> (u16, u16) {
328        let reader = self.get_current_reader_file_id();
329        let writer = self.get_current_writer_file_id();
330
331        (reader, writer)
332    }
333
334    /// Gets the current reader data file path, accounting for the unacknowledged offset.
335    pub fn get_current_reader_data_file_path(&self) -> PathBuf {
336        self.get_data_file_path(self.get_current_reader_file_id())
337    }
338
339    /// Gets the current writer data file path.
340    pub fn get_current_writer_data_file_path(&self) -> PathBuf {
341        self.get_data_file_path(self.get_current_writer_file_id())
342    }
343
344    /// Gets the next writer data file path.
345    pub fn get_next_writer_data_file_path(&self) -> PathBuf {
346        self.get_data_file_path(self.state().get_next_writer_file_id())
347    }
348
349    /// Gets the data file path for an arbitrary file ID.
350    pub fn get_data_file_path(&self, file_id: u16) -> PathBuf {
351        self.config
352            .data_dir
353            .join(format!("buffer-data-{file_id}.dat"))
354    }
355
356    /// Waits for a signal from the reader that progress has been made.
357    ///
358    /// This will only occur when a record is read, which may allow enough space (below the maximum
359    /// configured buffer size) for a write to occur, or similarly, when a data file is deleted.
360    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
361    pub async fn wait_for_reader(&self) {
362        self.reader_notify.notified().await;
363    }
364
365    /// Waits for a signal from the writer that progress has been made.
366    ///
367    /// This will occur when a record is written, or when a new data file is created.
368    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
369    pub async fn wait_for_writer(&self) {
370        self.writer_notify.notified().await;
371    }
372
373    /// Notifies all tasks waiting on progress by the reader.
374    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
375    pub fn notify_reader_waiters(&self) {
376        self.reader_notify.notify_one();
377    }
378
379    /// Notifies all tasks waiting on progress by the writer.
380    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
381    pub fn notify_writer_waiters(&self) {
382        self.writer_notify.notify_one();
383    }
384
385    /// Tracks the statistics of a successful write.
386    pub fn track_write(&self, event_count: u64, record_size: u64) {
387        self.increment_total_buffer_size(record_size);
388        self.usage_handle
389            .increment_received_event_count_and_byte_size(event_count, record_size);
390    }
391
392    /// Tracks the statistics of multiple successful reads.
393    pub fn track_reads(&self, event_count: u64, total_record_size: u64) {
394        self.decrement_total_buffer_size(total_record_size);
395        self.usage_handle
396            .increment_sent_event_count_and_byte_size(event_count, total_record_size);
397    }
398
399    /// Marks the writer as finished.
400    ///
401    /// If the writer was not yet marked done, `false` is returned.  Otherwise, `true` is returned,
402    /// and the caller should handle any necessary logic for closing the writer.
403    pub fn mark_writer_done(&self) -> bool {
404        self.writer_done
405            .compare_exchange_weak(false, true, Ordering::SeqCst, Ordering::SeqCst)
406            .is_ok()
407    }
408
409    /// Returns `true` if the writer was marked as done.
410    pub fn is_writer_done(&self) -> bool {
411        self.writer_done.load(Ordering::Acquire)
412    }
413
414    /// Increments the pending acknowledgement counter by the given amount.
415    pub fn increment_pending_acks(&self, amount: u64) {
416        self.pending_acks.fetch_add(amount, Ordering::AcqRel);
417    }
418
419    /// Consumes the full amount of pending acknowledgements, and resets the counter to zero.
420    pub fn consume_pending_acks(&self) -> u64 {
421        self.pending_acks.swap(0, Ordering::AcqRel)
422    }
423
424    /// Increments the unacknowledged reader file ID.
425    ///
426    /// As further described in `increment_acked_reader_file_id`, the underlying value here allows
427    /// the reader to read ahead of a data file, even if it hasn't been durably processed yet.
428    pub fn increment_unacked_reader_file_id(&self) {
429        let last_unacked_reader_file_id_offset = self
430            .unacked_reader_file_id_offset
431            .fetch_add(1, Ordering::AcqRel);
432        trace!(
433            unacked_reader_file_id_offset = last_unacked_reader_file_id_offset + 1,
434            "Incremented unacknowledged reader file ID."
435        );
436    }
437
438    /// Increments the acknowledged reader file ID.
439    ///
440    /// As records may be read and stored for a small period of time (batching in a sink, etc), we
441    /// cannot truly say that we have durably processed a record until the caller acknowledges the
442    /// record.  However, if we always waited for an acknowledgement, then each read could be forced
443    /// to wait for multiple seconds.  Such a design would clearly be unusable.
444    ///
445    /// Instead, we allow the reader to move ahead of the latest acknowledged record by tracking
446    /// their current file ID and acknowledged file ID separately.  Once all records in a file have
447    /// been acknowledged, the data file can be deleted and the reader file ID can be durably
448    /// stored in the ledger.
449    ///
450    /// Callers use [`increment_unacked_reader_file_id`] to move to the next data file without
451    /// tracking that the previous data file has been durably processed and can be deleted, and
452    /// [`increment_acked_reader_file_id`] is the reciprocal function which tracks the highest data
453    /// file that _has_ been durably processed.
454    ///
455    /// Since the unacked file ID is simply a relative offset to the acked file ID, we decrement it
456    /// here to keep the "current" file ID stable.
457    pub fn increment_acked_reader_file_id(&self) {
458        let new_reader_file_id = self.state().increment_reader_file_id();
459
460        // We ignore the return value because when the value is already zero, we don't want to do an
461        // update, so we return `None`, which causes `fetch_update` to return `Err`.  It's not
462        // really an error, we just wanted to avoid the extra atomic compare/exchange.
463        //
464        // Basically, this call is actually infallible for our purposes.
465        let result = self.unacked_reader_file_id_offset.fetch_update(
466            Ordering::Release,
467            Ordering::Relaxed,
468            |n| {
469                if n == 0 { None } else { Some(n - 1) }
470            },
471        );
472
473        trace!(
474            unacked_reader_file_id_offset = result.map(|n| n - 1).unwrap_or(0),
475            acked_reader_file_id_offset = new_reader_file_id,
476            "Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement."
477        );
478    }
479
480    /// Determines whether or not all files should be flushed/fsync'd to disk.
481    ///
482    /// In the case of concurrent callers when the flush deadline has been exceeded, only one caller
483    /// will get a return value of `true`, and the others will receive `false`.  The caller that
484    /// receives `true` is responsible for flushing the necessary files.
485    pub fn should_flush(&self) -> bool {
486        let last_flush = self.last_flush.load();
487        if last_flush.elapsed() > self.config.flush_interval
488            && self
489                .last_flush
490                .compare_exchange(last_flush, Instant::now())
491                .is_ok()
492        {
493            return true;
494        }
495
496        false
497    }
498
499    /// Flushes the memory-mapped file backing the ledger to disk.
500    ///
501    /// This operation is synchronous.
502    ///
503    /// # Errors
504    ///
505    /// If there is an error while flushing the ledger to disk, an error variant will be returned
506    /// describing the error.
507    pub(super) fn flush(&self) -> io::Result<()> {
508        self.state.get_backing_ref().flush()
509    }
510
511    /// Synchronizes the record count and total size of the buffer with buffer usage data.
512    ///
513    /// This should not be called until both the reader and writer have been initialized via
514    /// [`Reader::seek_to_last_record`] and [`Writer::validate_last_write`], otherwise the values
515    /// will not be accurate.
516    pub fn synchronize_buffer_usage(&self) {
517        let initial_buffer_events = self.get_total_records();
518        let initial_buffer_size = self.get_total_buffer_size();
519        self.usage_handle
520            .increment_received_event_count_and_byte_size(
521                initial_buffer_events,
522                initial_buffer_size,
523            );
524    }
525
526    pub fn track_dropped_events(&self, count: u64) {
527        // We don't know how many bytes are represented by dropped events because we never actually had a chance to read
528        // them, so we have to use a byte size of 0 here.
529        //
530        // On the flipside, this would only matter if we incremented the buffer size and simultaneously skipped/lost the
531        // events within the same process lifecycle, since otherwise we'd start from the correct buffer size when
532        // loading the buffer initially.
533        //
534        // TODO: Can we do better here?
535        self.usage_handle
536            .increment_dropped_event_count_and_byte_size(count, 0, false);
537    }
538}
539
540impl<FS> Ledger<FS>
541where
542    FS: Filesystem + 'static,
543    FS::File: Unpin,
544{
545    /// Loads or creates a ledger for the given [`DiskBufferConfig`].
546    ///
547    /// If the ledger file does not yet exist, a default ledger state will be created and persisted
548    /// to disk.  Otherwise, the ledger file on disk will be loaded and verified.
549    ///
550    /// # Errors
551    ///
552    /// If there is an error during either serialization of the new, default ledger state, or
553    /// deserializing existing data in the ledger file, or generally during the underlying I/O
554    /// operations, an error variant will be returned describing the error.
555    #[cfg_attr(test, instrument(skip_all, level = "trace"))]
556    pub(super) async fn load_or_create(
557        config: DiskBufferConfig<FS>,
558        usage_handle: BufferUsageHandle,
559    ) -> Result<Ledger<FS>, LedgerLoadCreateError> {
560        // Create our containing directory if it doesn't already exist.
561        fs::create_dir_all(&config.data_dir)
562            .await
563            .context(IoSnafu)?;
564
565        // Acquire an exclusive lock on our lock file, which prevents another Vector process from
566        // loading this buffer and clashing with us.  Specifically, though: this does _not_ prevent
567        // another process from messing with our ledger files, or any of the data files, etc.
568        //
569        // TODO: It'd be nice to incorporate this within `Filesystem` to fully encapsulate _all_
570        // file I/O, but the code is so specific, including the drop guard for the lock file, that I
571        // don't know if it's worth it.
572        let ledger_lock_path = config.data_dir.join("buffer.lock");
573        let mut lock = LockFile::open(&ledger_lock_path).context(IoSnafu)?;
574        if !lock.try_lock().context(IoSnafu)? {
575            return Err(LedgerLoadCreateError::LedgerLockAlreadyHeld);
576        }
577
578        // Open the ledger file, which may involve creating it if it doesn't yet exist.
579        let ledger_path = config.data_dir.join("buffer.db");
580        let mut ledger_handle = config
581            .filesystem
582            .open_file_writable(&ledger_path)
583            .await
584            .context(IoSnafu)?;
585
586        // If we just created the ledger file, then we need to create the default ledger state, and
587        // then serialize and write to the file, before trying to load it as a memory-mapped file.
588        let ledger_metadata = ledger_handle.metadata().await.context(IoSnafu)?;
589        let ledger_len = ledger_metadata.len();
590        if ledger_len == 0 {
591            debug!("Ledger file empty.  Initializing with default ledger state.");
592            let mut buf = BytesMut::new();
593            loop {
594                match BackedArchive::from_value(&mut buf, LedgerState::default()) {
595                    Ok(archive) => {
596                        ledger_handle
597                            .write_all(archive.get_backing_ref())
598                            .await
599                            .context(IoSnafu)?;
600                        break;
601                    }
602                    Err(SerializeError::FailedToSerialize(reason)) => {
603                        return Err(LedgerLoadCreateError::FailedToSerialize { reason });
604                    }
605                    // Our buffer wasn't big enough, but that's OK!  Resize it and try again.
606                    Err(SerializeError::BackingStoreTooSmall(_, min_len)) => buf.resize(min_len, 0),
607                }
608            }
609
610            // Now sync the file to ensure everything is on disk before proceeding.
611            ledger_handle.sync_all().await.context(IoSnafu)?;
612        }
613
614        // Load the ledger state by memory-mapping the ledger file, and zero-copy deserializing our
615        // ledger state back out of it.
616        let ledger_mmap = config
617            .filesystem
618            .open_mmap_writable(&ledger_path)
619            .await
620            .context(IoSnafu)?;
621        let ledger_state = match BackedArchive::from_backing(ledger_mmap) {
622            // Deserialized the ledger state without issue from an existing file.
623            Ok(backed) => backed,
624            // Either invalid data, or the buffer doesn't represent a valid ledger structure.
625            Err(e) => {
626                return Err(LedgerLoadCreateError::FailedToDeserialize {
627                    reason: e.into_inner(),
628                });
629            }
630        };
631
632        // Create the ledger object, and synchronize the buffer statistics with the buffer usage
633        // handle.  This handles making sure we account for the starting size of the buffer, and
634        // what not.
635        let mut ledger = Ledger {
636            config,
637            lock,
638            state: ledger_state,
639            total_buffer_size: AtomicU64::new(0),
640            reader_notify: Notify::new(),
641            writer_notify: Notify::new(),
642            writer_done: AtomicBool::new(false),
643            pending_acks: AtomicU64::new(0),
644            unacked_reader_file_id_offset: AtomicU16::new(0),
645            last_flush: AtomicCell::new(Instant::now()),
646            usage_handle,
647        };
648        ledger.update_buffer_size().await?;
649
650        Ok(ledger)
651    }
652
653    async fn update_buffer_size(&mut self) -> Result<(), LedgerLoadCreateError> {
654        // Under normal operation, the reader and writer maintain a consistent state within the
655        // ledger.  However, due to the nature of how we update the ledger, process crashes could
656        // lead to missed updates as we execute reads and writes as non-atomic units of execution:
657        // update a field, do the read/write, update some more fields depending on success or
658        // failure, etc.
659        //
660        // This is an issue because we depend on knowing the total buffer size (the total size of
661        // unread records, specifically) so that we can correctly limit writes when we've reached
662        // the configured maximum buffer size.
663        //
664        // While it's not terribly efficient, and I'd like to eventually formulate a better design,
665        // this approach is absolutely correct: get the file size of every data file on disk,
666        // and set the "total buffer size" to the sum of all of those file sizes.
667        //
668        // When the reader does any necessary seeking to get to the record it left off on, it will
669        // adjust the "total buffer size" downwards for each record it runs through, leaving "total
670        // buffer size" at the correct value.
671        let mut dat_reader = fs::read_dir(&self.config.data_dir).await.context(IoSnafu)?;
672
673        let mut total_buffer_size = 0;
674        while let Some(dir_entry) = dat_reader.next_entry().await.context(IoSnafu)? {
675            if let Some(file_name) = dir_entry.file_name().to_str() {
676                // I really _do_ want to only find files with a .dat extension, as that's what the
677                // code generates, and having them be .dAt or .Dat or whatever would indicate that
678                // the file is not related to our buffer.  If we had to cope with case-sensitivity
679                // of filenames from another program/OS, then it would be a different story.
680                #[allow(clippy::case_sensitive_file_extension_comparisons)]
681                if file_name.ends_with(".dat") {
682                    let metadata = dir_entry.metadata().await.context(IoSnafu)?;
683                    total_buffer_size += metadata.len();
684
685                    debug!(
686                        data_file = file_name,
687                        file_size = metadata.len(),
688                        total_buffer_size,
689                        "Found existing data file."
690                    );
691                }
692            }
693        }
694
695        self.increment_total_buffer_size(total_buffer_size);
696
697        Ok(())
698    }
699
700    #[must_use]
701    pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
702        let (finalizer, mut stream) = OrderedFinalizer::new(None);
703        tokio::spawn(async move {
704            while let Some((_status, amount)) = stream.next().await {
705                self.increment_pending_acks(amount);
706                self.notify_writer_waiters();
707            }
708        });
709        finalizer
710    }
711}
712
713impl<FS> fmt::Debug for Ledger<FS>
714where
715    FS: Filesystem + fmt::Debug,
716{
717    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718        f.debug_struct("Ledger")
719            .field("config", &self.config)
720            .field("state", &self.state.get_archive_ref())
721            .field(
722                "total_buffer_size",
723                &self.total_buffer_size.load(Ordering::Acquire),
724            )
725            .field("pending_acks", &self.pending_acks.load(Ordering::Acquire))
726            .field(
727                "unacked_reader_file_id_offset",
728                &self.unacked_reader_file_id_offset.load(Ordering::Acquire),
729            )
730            .field("writer_done", &self.writer_done.load(Ordering::Acquire))
731            .field("last_flush", &self.last_flush.load())
732            .finish_non_exhaustive()
733    }
734}