vector_buffers/variants/disk_v2/ledger.rs
1use std::{
2 fmt, io, mem,
3 path::PathBuf,
4 sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
5 sync::Arc,
6 time::Instant,
7};
8
9use bytecheck::CheckBytes;
10use bytes::BytesMut;
11use crossbeam_utils::atomic::AtomicCell;
12use fslock::LockFile;
13use futures::StreamExt;
14use rkyv::{with::Atomic, Archive, Serialize};
15use snafu::{ResultExt, Snafu};
16use tokio::{fs, io::AsyncWriteExt, sync::Notify};
17use vector_common::finalizer::OrderedFinalizer;
18
19use super::{
20 backed_archive::BackedArchive,
21 common::{align16, DiskBufferConfig, MAX_FILE_ID},
22 io::{AsyncFile, WritableMemoryMap},
23 ser::SerializeError,
24 Filesystem,
25};
26use crate::buffer_usage_data::BufferUsageHandle;
27
28pub const LEDGER_LEN: usize = align16(mem::size_of::<ArchivedLedgerState>());
29
30/// Error that occurred during calls to [`Ledger`].
31#[derive(Debug, Snafu)]
32pub enum LedgerLoadCreateError {
33 /// A general I/O error occurred.
34 ///
35 /// Generally, I/O errors should only occur when flushing the ledger state and the underlying
36 /// ledger file has been corrupted or altered in some way outside of this process. As the
37 /// ledger is fixed in size, and does not grow during the life of the process, common errors
38 /// such as running out of disk space will not typically be relevant (or possible) here.
39 #[snafu(display("ledger I/O error: {}", source))]
40 Io { source: io::Error },
41
42 /// The ledger is already opened by another Vector process.
43 ///
44 /// Advisory locking is used to prevent other Vector processes from concurrently opening the
45 /// same buffer, but bear in mind that this does not prevent other processes or users from
46 /// modifying the ledger file in a way that could cause undefined behavior during buffer operation.
47 #[snafu(display(
48 "failed to lock buffer.lock; is another Vector process running and using this buffer?"
49 ))]
50 LedgerLockAlreadyHeld,
51
52 /// The ledger state was unable to be deserialized.
53 ///
54 /// This should only occur if the ledger file was modified or truncated out of the Vector
55 /// process. In rare situations, if the ledger state type (`LedgerState`, here in ledger.rs)
56 /// was modified, then the layout may now be out-of-line with the structure as it exists on disk.
57 ///
58 /// We have many strongly-worded warnings to not do this unless a developer absolutely knows
59 /// what they're doing, but it is still technically a possibility. :)
60 #[snafu(display("failed to deserialize ledger from buffer: {}", reason))]
61 FailedToDeserialize { reason: String },
62
63 /// The ledger state was unable to be serialized.
64 ///
65 /// This only occurs when initially creating a new buffer where the ledger state has not yet
66 /// been written to disk. During normal operation, the ledger is memory-mapped directly and so
67 /// serialization does not occur.
68 ///
69 /// This error is likely only to occur if the process is unable to allocate memory for the
70 /// buffers required for the serialization step.
71 #[snafu(display("failed to serialize ledger to buffer: {}", reason))]
72 FailedToSerialize { reason: String },
73}
74
75/// Ledger state.
76///
77/// Stores the relevant information related to both the reader and writer. Gets serialized and
78/// stored on disk, and is managed via a memory-mapped file.
79///
80/// # Warning
81///
82/// - Do not add fields to this struct.
83/// - Do not remove fields from this struct.
84/// - Do not change the type of fields in this struct.
85/// - Do not change the order of fields this struct.
86///
87/// Doing so will change the serialized representation. This will break things.
88///
89/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
90#[derive(Archive, Serialize, Debug)]
91#[archive_attr(derive(CheckBytes, Debug))]
92pub struct LedgerState {
93 /// Next record ID to use when writing a record.
94 #[with(Atomic)]
95 writer_next_record: AtomicU64,
96 /// The current data file ID being written to.
97 #[with(Atomic)]
98 writer_current_data_file: AtomicU16,
99 /// The current data file ID being read from.
100 #[with(Atomic)]
101 reader_current_data_file: AtomicU16,
102 /// The last record ID read by the reader.
103 #[with(Atomic)]
104 reader_last_record: AtomicU64,
105}
106
107impl Default for LedgerState {
108 fn default() -> Self {
109 Self {
110 // First record written is always 1, so that our default of 0 for
111 // `reader_last_record_id` ensures we start up in a state of "alright, waiting to read
112 // record #1 next".
113 writer_next_record: AtomicU64::new(1),
114 writer_current_data_file: AtomicU16::new(0),
115 reader_current_data_file: AtomicU16::new(0),
116 reader_last_record: AtomicU64::new(0),
117 }
118 }
119}
120
121impl ArchivedLedgerState {
122 fn get_current_writer_file_id(&self) -> u16 {
123 self.writer_current_data_file.load(Ordering::Acquire)
124 }
125
126 fn get_next_writer_file_id(&self) -> u16 {
127 (self.get_current_writer_file_id() + 1) % MAX_FILE_ID
128 }
129
130 pub(super) fn increment_writer_file_id(&self) {
131 self.writer_current_data_file
132 .store(self.get_next_writer_file_id(), Ordering::Release);
133 }
134
135 pub(super) fn get_next_writer_record_id(&self) -> u64 {
136 self.writer_next_record.load(Ordering::Acquire)
137 }
138
139 pub(super) fn increment_next_writer_record_id(&self, amount: u64) -> u64 {
140 let previous = self.writer_next_record.fetch_add(amount, Ordering::AcqRel);
141 previous.wrapping_add(amount)
142 }
143
144 fn get_current_reader_file_id(&self) -> u16 {
145 self.reader_current_data_file.load(Ordering::Acquire)
146 }
147
148 fn get_next_reader_file_id(&self) -> u16 {
149 (self.get_current_reader_file_id() + 1) % MAX_FILE_ID
150 }
151
152 fn get_offset_reader_file_id(&self, offset: u16) -> u16 {
153 self.get_current_reader_file_id().wrapping_add(offset) % MAX_FILE_ID
154 }
155
156 fn increment_reader_file_id(&self) -> u16 {
157 let value = self.get_next_reader_file_id();
158 self.reader_current_data_file
159 .store(value, Ordering::Release);
160 value
161 }
162
163 pub(super) fn get_last_reader_record_id(&self) -> u64 {
164 self.reader_last_record.load(Ordering::Acquire)
165 }
166
167 pub(super) fn increment_last_reader_record_id(&self, amount: u64) {
168 self.reader_last_record.fetch_add(amount, Ordering::AcqRel);
169 }
170
171 #[cfg(test)]
172 pub unsafe fn unsafe_set_writer_next_record_id(&self, id: u64) {
173 // UNSAFETY:
174 // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
175 // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
176 // reader.
177 //
178 // This is exclusively used under test to make it possible to check certain edge cases, as
179 // writing enough records to actually increment it to the maximum value would take longer
180 // than any of us will be alive.
181 //
182 // Despite it being test-only, we're really amping up the "this is only for testing!" factor
183 // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
184 self.writer_next_record.store(id, Ordering::Release);
185 }
186
187 #[cfg(test)]
188 pub unsafe fn unsafe_set_reader_last_record_id(&self, id: u64) {
189 // UNSAFETY:
190 // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
191 // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
192 // reader.
193 //
194 // This is exclusively used under test to make it possible to check certain edge cases, as
195 // writing enough records to actually increment it to the maximum value would take longer
196 // than any of us will be alive.
197 //
198 // Despite it being test-only, we're really amping up the "this is only for testing!" factor
199 // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
200 self.reader_last_record.store(id, Ordering::Release);
201 }
202}
203
204/// Tracks the internal state of the buffer.
205pub(crate) struct Ledger<FS>
206where
207 FS: Filesystem,
208{
209 // Buffer configuration.
210 config: DiskBufferConfig<FS>,
211 // Advisory lock for this buffer directory.
212 #[allow(dead_code)]
213 lock: LockFile,
214 // Ledger state.
215 state: BackedArchive<FS::MutableMemoryMap, LedgerState>,
216 // The total size, in bytes, of all unread records in the buffer.
217 total_buffer_size: AtomicU64,
218 // Notifier for reader-related progress.
219 reader_notify: Notify,
220 // Notifier for writer-related progress.
221 writer_notify: Notify,
222 // Tracks when writer has fully shutdown.
223 writer_done: AtomicBool,
224 // Number of pending record acknowledgements that have yeet to be consumed by the reader.
225 pending_acks: AtomicU64,
226 // The file ID offset of the reader past the acknowledged reader file ID.
227 unacked_reader_file_id_offset: AtomicU16,
228 // Last flush of all unflushed files: ledger, data file, etc.
229 last_flush: AtomicCell<Instant>,
230 // Tracks usage data about the buffer.
231 usage_handle: BufferUsageHandle,
232}
233
234impl<FS> Ledger<FS>
235where
236 FS: Filesystem,
237{
238 /// Gets the configuration for the buffer that this ledger represents.
239 pub fn config(&self) -> &DiskBufferConfig<FS> {
240 &self.config
241 }
242
243 /// Gets the filesystem configured for this buffer.
244 pub fn filesystem(&self) -> &FS {
245 &self.config.filesystem
246 }
247
248 /// Gets the internal ledger state.
249 ///
250 /// This is the information persisted to disk.
251 pub fn state(&self) -> &ArchivedLedgerState {
252 self.state.get_archive_ref()
253 }
254
255 /// Gets the total number of unread records in the buffer.
256 ///
257 /// This number is based on acknowledged reads only, which is to say that if 10 records are
258 /// written, and 8 of them have been read, but only 3 have been acked, then `get_total_records`
259 /// would return `7`.
260 pub fn get_total_records(&self) -> u64 {
261 let next_writer_id = self.state().get_next_writer_record_id();
262 let last_reader_id = self.state().get_last_reader_record_id();
263
264 next_writer_id.wrapping_sub(last_reader_id) - 1
265 }
266
267 /// Gets the total number of bytes for all unread records in the buffer.
268 ///
269 /// This number will often disagree with the size of files on disk, as data files are deleted
270 /// only after being read entirely, and are simply appended to when they are not yet full. This
271 /// leads to behavior where writes and reads will change this value only by the size of the
272 /// records being written and read, while data files on disk will grow incrementally, and be
273 /// deleted in full.
274 pub fn get_total_buffer_size(&self) -> u64 {
275 self.total_buffer_size.load(Ordering::Acquire)
276 }
277
278 /// Increments the total number of bytes for all unread records in the buffer.
279 pub fn increment_total_buffer_size(&self, amount: u64) {
280 let last_total_buffer_size = self.total_buffer_size.fetch_add(amount, Ordering::AcqRel);
281 trace!(
282 previous_buffer_size = last_total_buffer_size,
283 new_buffer_size = last_total_buffer_size + amount,
284 "Updated buffer size.",
285 );
286 }
287
288 /// Decrements the total number of bytes for all unread records in the buffer.
289 pub fn decrement_total_buffer_size(&self, amount: u64) {
290 let last_total_buffer_size = self.total_buffer_size.fetch_sub(amount, Ordering::AcqRel);
291 trace!(
292 previous_buffer_size = last_total_buffer_size,
293 new_buffer_size = last_total_buffer_size - amount,
294 "Updated buffer size.",
295 );
296 }
297
298 /// Gets the current reader file ID.
299 ///
300 /// This is internally adjusted to compensate for the fact that the reader can read far past
301 /// the latest acknowledge record/data file, and so is not representative of where the reader
302 /// would start reading from if the process crashed or was abruptly stopped.
303 pub fn get_current_reader_file_id(&self) -> u16 {
304 let unacked_offset = self.unacked_reader_file_id_offset.load(Ordering::Acquire);
305 self.state().get_offset_reader_file_id(unacked_offset)
306 }
307
308 /// Gets the current writer file ID.
309 pub fn get_current_writer_file_id(&self) -> u16 {
310 self.state().get_current_writer_file_id()
311 }
312
313 /// Gets the next writer file ID.
314 ///
315 /// This is purely a future-looking operation i.e. what would the file ID be if it was
316 /// incremented from its current value. It does not alter the current writer file ID.
317 #[cfg(test)]
318 pub fn get_next_writer_file_id(&self) -> u16 {
319 self.state().get_next_writer_file_id()
320 }
321
322 /// Gets the current reader and writer file IDs.
323 ///
324 /// Similar to [`get_current_reader_file_id`], the file ID returned for the reader compensates
325 /// for the acknowledgement state of the reader.
326 pub fn get_current_reader_writer_file_id(&self) -> (u16, u16) {
327 let reader = self.get_current_reader_file_id();
328 let writer = self.get_current_writer_file_id();
329
330 (reader, writer)
331 }
332
333 /// Gets the current reader data file path, accounting for the unacknowledged offset.
334 pub fn get_current_reader_data_file_path(&self) -> PathBuf {
335 self.get_data_file_path(self.get_current_reader_file_id())
336 }
337
338 /// Gets the current writer data file path.
339 pub fn get_current_writer_data_file_path(&self) -> PathBuf {
340 self.get_data_file_path(self.get_current_writer_file_id())
341 }
342
343 /// Gets the next writer data file path.
344 pub fn get_next_writer_data_file_path(&self) -> PathBuf {
345 self.get_data_file_path(self.state().get_next_writer_file_id())
346 }
347
348 /// Gets the data file path for an arbitrary file ID.
349 pub fn get_data_file_path(&self, file_id: u16) -> PathBuf {
350 self.config
351 .data_dir
352 .join(format!("buffer-data-{file_id}.dat"))
353 }
354
355 /// Waits for a signal from the reader that progress has been made.
356 ///
357 /// This will only occur when a record is read, which may allow enough space (below the maximum
358 /// configured buffer size) for a write to occur, or similarly, when a data file is deleted.
359 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
360 pub async fn wait_for_reader(&self) {
361 self.reader_notify.notified().await;
362 }
363
364 /// Waits for a signal from the writer that progress has been made.
365 ///
366 /// This will occur when a record is written, or when a new data file is created.
367 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
368 pub async fn wait_for_writer(&self) {
369 self.writer_notify.notified().await;
370 }
371
372 /// Notifies all tasks waiting on progress by the reader.
373 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
374 pub fn notify_reader_waiters(&self) {
375 self.reader_notify.notify_one();
376 }
377
378 /// Notifies all tasks waiting on progress by the writer.
379 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
380 pub fn notify_writer_waiters(&self) {
381 self.writer_notify.notify_one();
382 }
383
384 /// Tracks the statistics of a successful write.
385 pub fn track_write(&self, event_count: u64, record_size: u64) {
386 self.increment_total_buffer_size(record_size);
387 self.usage_handle
388 .increment_received_event_count_and_byte_size(event_count, record_size);
389 }
390
391 /// Tracks the statistics of multiple successful reads.
392 pub fn track_reads(&self, event_count: u64, total_record_size: u64) {
393 self.decrement_total_buffer_size(total_record_size);
394 self.usage_handle
395 .increment_sent_event_count_and_byte_size(event_count, total_record_size);
396 }
397
398 /// Marks the writer as finished.
399 ///
400 /// If the writer was not yet marked done, `false` is returned. Otherwise, `true` is returned,
401 /// and the caller should handle any necessary logic for closing the writer.
402 pub fn mark_writer_done(&self) -> bool {
403 self.writer_done
404 .compare_exchange_weak(false, true, Ordering::SeqCst, Ordering::SeqCst)
405 .is_ok()
406 }
407
408 /// Returns `true` if the writer was marked as done.
409 pub fn is_writer_done(&self) -> bool {
410 self.writer_done.load(Ordering::Acquire)
411 }
412
413 /// Increments the pending acknowledgement counter by the given amount.
414 pub fn increment_pending_acks(&self, amount: u64) {
415 self.pending_acks.fetch_add(amount, Ordering::AcqRel);
416 }
417
418 /// Consumes the full amount of pending acknowledgements, and resets the counter to zero.
419 pub fn consume_pending_acks(&self) -> u64 {
420 self.pending_acks.swap(0, Ordering::AcqRel)
421 }
422
423 /// Increments the unacknowledged reader file ID.
424 ///
425 /// As further described in `increment_acked_reader_file_id`, the underlying value here allows
426 /// the reader to read ahead of a data file, even if it hasn't been durably processed yet.
427 pub fn increment_unacked_reader_file_id(&self) {
428 let last_unacked_reader_file_id_offset = self
429 .unacked_reader_file_id_offset
430 .fetch_add(1, Ordering::AcqRel);
431 trace!(
432 unacked_reader_file_id_offset = last_unacked_reader_file_id_offset + 1,
433 "Incremented unacknowledged reader file ID."
434 );
435 }
436
437 /// Increments the acknowledged reader file ID.
438 ///
439 /// As records may be read and stored for a small period of time (batching in a sink, etc), we
440 /// cannot truly say that we have durably processed a record until the caller acknowledges the
441 /// record. However, if we always waited for an acknowledgement, then each read could be forced
442 /// to wait for multiple seconds. Such a design would clearly be unusable.
443 ///
444 /// Instead, we allow the reader to move ahead of the latest acknowledged record by tracking
445 /// their current file ID and acknowledged file ID separately. Once all records in a file have
446 /// been acknowledged, the data file can be deleted and the reader file ID can be durably
447 /// stored in the ledger.
448 ///
449 /// Callers use [`increment_unacked_reader_file_id`] to move to the next data file without
450 /// tracking that the previous data file has been durably processed and can be deleted, and
451 /// [`increment_acked_reader_file_id`] is the reciprocal function which tracks the highest data
452 /// file that _has_ been durably processed.
453 ///
454 /// Since the unacked file ID is simply a relative offset to the acked file ID, we decrement it
455 /// here to keep the "current" file ID stable.
456 pub fn increment_acked_reader_file_id(&self) {
457 let new_reader_file_id = self.state().increment_reader_file_id();
458
459 // We ignore the return value because when the value is already zero, we don't want to do an
460 // update, so we return `None`, which causes `fetch_update` to return `Err`. It's not
461 // really an error, we just wanted to avoid the extra atomic compare/exchange.
462 //
463 // Basically, this call is actually infallible for our purposes.
464 let result = self.unacked_reader_file_id_offset.fetch_update(
465 Ordering::Release,
466 Ordering::Relaxed,
467 |n| {
468 if n == 0 {
469 None
470 } else {
471 Some(n - 1)
472 }
473 },
474 );
475
476 trace!(
477 unacked_reader_file_id_offset = result.map(|n| n - 1).unwrap_or(0),
478 acked_reader_file_id_offset = new_reader_file_id,
479 "Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement."
480 );
481 }
482
483 /// Determines whether or not all files should be flushed/fsync'd to disk.
484 ///
485 /// In the case of concurrent callers when the flush deadline has been exceeded, only one caller
486 /// will get a return value of `true`, and the others will receive `false`. The caller that
487 /// receives `true` is responsible for flushing the necessary files.
488 pub fn should_flush(&self) -> bool {
489 let last_flush = self.last_flush.load();
490 if last_flush.elapsed() > self.config.flush_interval
491 && self
492 .last_flush
493 .compare_exchange(last_flush, Instant::now())
494 .is_ok()
495 {
496 return true;
497 }
498
499 false
500 }
501
502 /// Flushes the memory-mapped file backing the ledger to disk.
503 ///
504 /// This operation is synchronous.
505 ///
506 /// # Errors
507 ///
508 /// If there is an error while flushing the ledger to disk, an error variant will be returned
509 /// describing the error.
510 pub(super) fn flush(&self) -> io::Result<()> {
511 self.state.get_backing_ref().flush()
512 }
513
514 /// Synchronizes the record count and total size of the buffer with buffer usage data.
515 ///
516 /// This should not be called until both the reader and writer have been initialized via
517 /// [`Reader::seek_to_last_record`] and [`Writer::validate_last_write`], otherwise the values
518 /// will not be accurate.
519 pub fn synchronize_buffer_usage(&self) {
520 let initial_buffer_events = self.get_total_records();
521 let initial_buffer_size = self.get_total_buffer_size();
522 self.usage_handle
523 .increment_received_event_count_and_byte_size(
524 initial_buffer_events,
525 initial_buffer_size,
526 );
527 }
528
529 pub fn track_dropped_events(&self, count: u64) {
530 // We don't know how many bytes are represented by dropped events because we never actually had a chance to read
531 // them, so we have to use a byte size of 0 here.
532 //
533 // On the flipside, this would only matter if we incremented the buffer size and simultaneously skipped/lost the
534 // events within the same process lifecycle, since otherwise we'd start from the correct buffer size when
535 // loading the buffer initially.
536 //
537 // TODO: Can we do better here?
538 self.usage_handle
539 .increment_dropped_event_count_and_byte_size(count, 0, false);
540 }
541}
542
543impl<FS> Ledger<FS>
544where
545 FS: Filesystem + 'static,
546 FS::File: Unpin,
547{
548 /// Loads or creates a ledger for the given [`DiskBufferConfig`].
549 ///
550 /// If the ledger file does not yet exist, a default ledger state will be created and persisted
551 /// to disk. Otherwise, the ledger file on disk will be loaded and verified.
552 ///
553 /// # Errors
554 ///
555 /// If there is an error during either serialization of the new, default ledger state, or
556 /// deserializing existing data in the ledger file, or generally during the underlying I/O
557 /// operations, an error variant will be returned describing the error.
558 #[cfg_attr(test, instrument(skip_all, level = "trace"))]
559 pub(super) async fn load_or_create(
560 config: DiskBufferConfig<FS>,
561 usage_handle: BufferUsageHandle,
562 ) -> Result<Ledger<FS>, LedgerLoadCreateError> {
563 // Create our containing directory if it doesn't already exist.
564 fs::create_dir_all(&config.data_dir)
565 .await
566 .context(IoSnafu)?;
567
568 // Acquire an exclusive lock on our lock file, which prevents another Vector process from
569 // loading this buffer and clashing with us. Specifically, though: this does _not_ prevent
570 // another process from messing with our ledger files, or any of the data files, etc.
571 //
572 // TODO: It'd be nice to incorporate this within `Filesystem` to fully encapsulate _all_
573 // file I/O, but the code is so specific, including the drop guard for the lock file, that I
574 // don't know if it's worth it.
575 let ledger_lock_path = config.data_dir.join("buffer.lock");
576 let mut lock = LockFile::open(&ledger_lock_path).context(IoSnafu)?;
577 if !lock.try_lock().context(IoSnafu)? {
578 return Err(LedgerLoadCreateError::LedgerLockAlreadyHeld);
579 }
580
581 // Open the ledger file, which may involve creating it if it doesn't yet exist.
582 let ledger_path = config.data_dir.join("buffer.db");
583 let mut ledger_handle = config
584 .filesystem
585 .open_file_writable(&ledger_path)
586 .await
587 .context(IoSnafu)?;
588
589 // If we just created the ledger file, then we need to create the default ledger state, and
590 // then serialize and write to the file, before trying to load it as a memory-mapped file.
591 let ledger_metadata = ledger_handle.metadata().await.context(IoSnafu)?;
592 let ledger_len = ledger_metadata.len();
593 if ledger_len == 0 {
594 debug!("Ledger file empty. Initializing with default ledger state.");
595 let mut buf = BytesMut::new();
596 loop {
597 match BackedArchive::from_value(&mut buf, LedgerState::default()) {
598 Ok(archive) => {
599 ledger_handle
600 .write_all(archive.get_backing_ref())
601 .await
602 .context(IoSnafu)?;
603 break;
604 }
605 Err(SerializeError::FailedToSerialize(reason)) => {
606 return Err(LedgerLoadCreateError::FailedToSerialize { reason })
607 }
608 // Our buffer wasn't big enough, but that's OK! Resize it and try again.
609 Err(SerializeError::BackingStoreTooSmall(_, min_len)) => buf.resize(min_len, 0),
610 }
611 }
612
613 // Now sync the file to ensure everything is on disk before proceeding.
614 ledger_handle.sync_all().await.context(IoSnafu)?;
615 }
616
617 // Load the ledger state by memory-mapping the ledger file, and zero-copy deserializing our
618 // ledger state back out of it.
619 let ledger_mmap = config
620 .filesystem
621 .open_mmap_writable(&ledger_path)
622 .await
623 .context(IoSnafu)?;
624 let ledger_state = match BackedArchive::from_backing(ledger_mmap) {
625 // Deserialized the ledger state without issue from an existing file.
626 Ok(backed) => backed,
627 // Either invalid data, or the buffer doesn't represent a valid ledger structure.
628 Err(e) => {
629 return Err(LedgerLoadCreateError::FailedToDeserialize {
630 reason: e.into_inner(),
631 })
632 }
633 };
634
635 // Create the ledger object, and synchronize the buffer statistics with the buffer usage
636 // handle. This handles making sure we account for the starting size of the buffer, and
637 // what not.
638 let mut ledger = Ledger {
639 config,
640 lock,
641 state: ledger_state,
642 total_buffer_size: AtomicU64::new(0),
643 reader_notify: Notify::new(),
644 writer_notify: Notify::new(),
645 writer_done: AtomicBool::new(false),
646 pending_acks: AtomicU64::new(0),
647 unacked_reader_file_id_offset: AtomicU16::new(0),
648 last_flush: AtomicCell::new(Instant::now()),
649 usage_handle,
650 };
651 ledger.update_buffer_size().await?;
652
653 Ok(ledger)
654 }
655
656 async fn update_buffer_size(&mut self) -> Result<(), LedgerLoadCreateError> {
657 // Under normal operation, the reader and writer maintain a consistent state within the
658 // ledger. However, due to the nature of how we update the ledger, process crashes could
659 // lead to missed updates as we execute reads and writes as non-atomic units of execution:
660 // update a field, do the read/write, update some more fields depending on success or
661 // failure, etc.
662 //
663 // This is an issue because we depend on knowing the total buffer size (the total size of
664 // unread records, specifically) so that we can correctly limit writes when we've reached
665 // the configured maximum buffer size.
666 //
667 // While it's not terribly efficient, and I'd like to eventually formulate a better design,
668 // this approach is absolutely correct: get the file size of every data file on disk,
669 // and set the "total buffer size" to the sum of all of those file sizes.
670 //
671 // When the reader does any necessary seeking to get to the record it left off on, it will
672 // adjust the "total buffer size" downwards for each record it runs through, leaving "total
673 // buffer size" at the correct value.
674 let mut dat_reader = fs::read_dir(&self.config.data_dir).await.context(IoSnafu)?;
675
676 let mut total_buffer_size = 0;
677 while let Some(dir_entry) = dat_reader.next_entry().await.context(IoSnafu)? {
678 if let Some(file_name) = dir_entry.file_name().to_str() {
679 // I really _do_ want to only find files with a .dat extension, as that's what the
680 // code generates, and having them be .dAt or .Dat or whatever would indicate that
681 // the file is not related to our buffer. If we had to cope with case-sensitivity
682 // of filenames from another program/OS, then it would be a different story.
683 #[allow(clippy::case_sensitive_file_extension_comparisons)]
684 if file_name.ends_with(".dat") {
685 let metadata = dir_entry.metadata().await.context(IoSnafu)?;
686 total_buffer_size += metadata.len();
687
688 debug!(
689 data_file = file_name,
690 file_size = metadata.len(),
691 total_buffer_size,
692 "Found existing data file."
693 );
694 }
695 }
696 }
697
698 self.increment_total_buffer_size(total_buffer_size);
699
700 Ok(())
701 }
702
703 #[must_use]
704 pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
705 let (finalizer, mut stream) = OrderedFinalizer::new(None);
706 tokio::spawn(async move {
707 while let Some((_status, amount)) = stream.next().await {
708 self.increment_pending_acks(amount);
709 self.notify_writer_waiters();
710 }
711 });
712 finalizer
713 }
714}
715
716impl<FS> fmt::Debug for Ledger<FS>
717where
718 FS: Filesystem + fmt::Debug,
719{
720 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
721 f.debug_struct("Ledger")
722 .field("config", &self.config)
723 .field("state", &self.state.get_archive_ref())
724 .field(
725 "total_buffer_size",
726 &self.total_buffer_size.load(Ordering::Acquire),
727 )
728 .field("pending_acks", &self.pending_acks.load(Ordering::Acquire))
729 .field(
730 "unacked_reader_file_id_offset",
731 &self.unacked_reader_file_id_offset.load(Ordering::Acquire),
732 )
733 .field("writer_done", &self.writer_done.load(Ordering::Acquire))
734 .field("last_flush", &self.last_flush.load())
735 .finish_non_exhaustive()
736 }
737}