vector_buffers/variants/disk_v2/ledger.rs
1use std::{
2 fmt, io, mem,
3 path::PathBuf,
4 sync::{
5 Arc,
6 atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering},
7 },
8 time::Instant,
9};
10
11use bytecheck::CheckBytes;
12use bytes::BytesMut;
13use crossbeam_utils::atomic::AtomicCell;
14use fslock::LockFile;
15use futures::StreamExt;
16use rkyv::{Archive, Serialize, with::Atomic};
17use snafu::{ResultExt, Snafu};
18use tokio::{fs, io::AsyncWriteExt, sync::Notify};
19use vector_common::finalizer::OrderedFinalizer;
20
21use super::{
22 Filesystem,
23 backed_archive::BackedArchive,
24 common::{DiskBufferConfig, MAX_FILE_ID, align16},
25 io::{AsyncFile, WritableMemoryMap},
26 ser::SerializeError,
27};
28use crate::buffer_usage_data::BufferUsageHandle;
29
30pub const LEDGER_LEN: usize = align16(mem::size_of::<ArchivedLedgerState>());
31
32/// Error that occurred during calls to [`Ledger`].
33#[derive(Debug, Snafu)]
34pub enum LedgerLoadCreateError {
35 /// A general I/O error occurred.
36 ///
37 /// Generally, I/O errors should only occur when flushing the ledger state and the underlying
38 /// ledger file has been corrupted or altered in some way outside of this process. As the
39 /// ledger is fixed in size, and does not grow during the life of the process, common errors
40 /// such as running out of disk space will not typically be relevant (or possible) here.
41 #[snafu(display("ledger I/O error: {}", source))]
42 Io { source: io::Error },
43
44 /// The ledger is already opened by another Vector process.
45 ///
46 /// Advisory locking is used to prevent other Vector processes from concurrently opening the
47 /// same buffer, but bear in mind that this does not prevent other processes or users from
48 /// modifying the ledger file in a way that could cause undefined behavior during buffer operation.
49 #[snafu(display(
50 "failed to lock buffer.lock; is another Vector process running and using this buffer?"
51 ))]
52 LedgerLockAlreadyHeld,
53
54 /// The ledger state was unable to be deserialized.
55 ///
56 /// This should only occur if the ledger file was modified or truncated out of the Vector
57 /// process. In rare situations, if the ledger state type (`LedgerState`, here in ledger.rs)
58 /// was modified, then the layout may now be out-of-line with the structure as it exists on disk.
59 ///
60 /// We have many strongly-worded warnings to not do this unless a developer absolutely knows
61 /// what they're doing, but it is still technically a possibility. :)
62 #[snafu(display("failed to deserialize ledger from buffer: {}", reason))]
63 FailedToDeserialize { reason: String },
64
65 /// The ledger state was unable to be serialized.
66 ///
67 /// This only occurs when initially creating a new buffer where the ledger state has not yet
68 /// been written to disk. During normal operation, the ledger is memory-mapped directly and so
69 /// serialization does not occur.
70 ///
71 /// This error is likely only to occur if the process is unable to allocate memory for the
72 /// buffers required for the serialization step.
73 #[snafu(display("failed to serialize ledger to buffer: {}", reason))]
74 FailedToSerialize { reason: String },
75}
76
77/// Ledger state.
78///
79/// Stores the relevant information related to both the reader and writer. Gets serialized and
80/// stored on disk, and is managed via a memory-mapped file.
81///
82/// # Warning
83///
84/// - Do not add fields to this struct.
85/// - Do not remove fields from this struct.
86/// - Do not change the type of fields in this struct.
87/// - Do not change the order of fields this struct.
88///
89/// Doing so will change the serialized representation. This will break things.
90///
91/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
92#[derive(Archive, Serialize, Debug)]
93#[archive_attr(derive(CheckBytes, Debug))]
94pub struct LedgerState {
95 /// Next record ID to use when writing a record.
96 #[with(Atomic)]
97 writer_next_record: AtomicU64,
98 /// The current data file ID being written to.
99 #[with(Atomic)]
100 writer_current_data_file: AtomicU16,
101 /// The current data file ID being read from.
102 #[with(Atomic)]
103 reader_current_data_file: AtomicU16,
104 /// The last record ID read by the reader.
105 #[with(Atomic)]
106 reader_last_record: AtomicU64,
107}
108
109impl Default for LedgerState {
110 fn default() -> Self {
111 Self {
112 // First record written is always 1, so that our default of 0 for
113 // `reader_last_record_id` ensures we start up in a state of "alright, waiting to read
114 // record #1 next".
115 writer_next_record: AtomicU64::new(1),
116 writer_current_data_file: AtomicU16::new(0),
117 reader_current_data_file: AtomicU16::new(0),
118 reader_last_record: AtomicU64::new(0),
119 }
120 }
121}
122
123impl ArchivedLedgerState {
124 fn get_current_writer_file_id(&self) -> u16 {
125 self.writer_current_data_file.load(Ordering::Acquire)
126 }
127
128 fn get_next_writer_file_id(&self) -> u16 {
129 (self.get_current_writer_file_id() + 1) % MAX_FILE_ID
130 }
131
132 pub(super) fn increment_writer_file_id(&self) {
133 self.writer_current_data_file
134 .store(self.get_next_writer_file_id(), Ordering::Release);
135 }
136
137 pub(super) fn get_next_writer_record_id(&self) -> u64 {
138 self.writer_next_record.load(Ordering::Acquire)
139 }
140
141 pub(super) fn increment_next_writer_record_id(&self, amount: u64) -> u64 {
142 let previous = self.writer_next_record.fetch_add(amount, Ordering::AcqRel);
143 previous.wrapping_add(amount)
144 }
145
146 fn get_current_reader_file_id(&self) -> u16 {
147 self.reader_current_data_file.load(Ordering::Acquire)
148 }
149
150 fn get_next_reader_file_id(&self) -> u16 {
151 (self.get_current_reader_file_id() + 1) % MAX_FILE_ID
152 }
153
154 fn get_offset_reader_file_id(&self, offset: u16) -> u16 {
155 self.get_current_reader_file_id().wrapping_add(offset) % MAX_FILE_ID
156 }
157
158 fn increment_reader_file_id(&self) -> u16 {
159 let value = self.get_next_reader_file_id();
160 self.reader_current_data_file
161 .store(value, Ordering::Release);
162 value
163 }
164
165 pub(super) fn get_last_reader_record_id(&self) -> u64 {
166 self.reader_last_record.load(Ordering::Acquire)
167 }
168
169 pub(super) fn increment_last_reader_record_id(&self, amount: u64) {
170 self.reader_last_record.fetch_add(amount, Ordering::AcqRel);
171 }
172
173 #[cfg(test)]
174 pub unsafe fn unsafe_set_writer_next_record_id(&self, id: u64) {
175 // UNSAFETY:
176 // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
177 // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
178 // reader.
179 //
180 // This is exclusively used under test to make it possible to check certain edge cases, as
181 // writing enough records to actually increment it to the maximum value would take longer
182 // than any of us will be alive.
183 //
184 // Despite it being test-only, we're really amping up the "this is only for testing!" factor
185 // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
186 self.writer_next_record.store(id, Ordering::Release);
187 }
188
189 #[cfg(test)]
190 pub unsafe fn unsafe_set_reader_last_record_id(&self, id: u64) {
191 // UNSAFETY:
192 // The atomic operation itself is inherently safe, but adjusting the record IDs manually is
193 // _unsafe_ because it messes with the continuity of record IDs from the perspective of the
194 // reader.
195 //
196 // This is exclusively used under test to make it possible to check certain edge cases, as
197 // writing enough records to actually increment it to the maximum value would take longer
198 // than any of us will be alive.
199 //
200 // Despite it being test-only, we're really amping up the "this is only for testing!" factor
201 // by making it an actual `unsafe` function, and putting "unsafe" in the name. :)
202 self.reader_last_record.store(id, Ordering::Release);
203 }
204}
205
206/// Tracks the internal state of the buffer.
207pub(crate) struct Ledger<FS>
208where
209 FS: Filesystem,
210{
211 // Buffer configuration.
212 config: DiskBufferConfig<FS>,
213 // Advisory lock for this buffer directory.
214 #[allow(dead_code)]
215 lock: LockFile,
216 // Ledger state.
217 state: BackedArchive<FS::MutableMemoryMap, LedgerState>,
218 // The total size, in bytes, of all unread records in the buffer.
219 total_buffer_size: AtomicU64,
220 // Notifier for reader-related progress.
221 reader_notify: Notify,
222 // Notifier for writer-related progress.
223 writer_notify: Notify,
224 // Tracks when writer has fully shutdown.
225 writer_done: AtomicBool,
226 // Number of pending record acknowledgements that have yeet to be consumed by the reader.
227 pending_acks: AtomicU64,
228 // The file ID offset of the reader past the acknowledged reader file ID.
229 unacked_reader_file_id_offset: AtomicU16,
230 // Last flush of all unflushed files: ledger, data file, etc.
231 last_flush: AtomicCell<Instant>,
232 // Tracks usage data about the buffer.
233 usage_handle: BufferUsageHandle,
234}
235
236impl<FS> Ledger<FS>
237where
238 FS: Filesystem,
239{
240 /// Gets the configuration for the buffer that this ledger represents.
241 pub fn config(&self) -> &DiskBufferConfig<FS> {
242 &self.config
243 }
244
245 /// Gets the filesystem configured for this buffer.
246 pub fn filesystem(&self) -> &FS {
247 &self.config.filesystem
248 }
249
250 /// Gets the internal ledger state.
251 ///
252 /// This is the information persisted to disk.
253 pub fn state(&self) -> &ArchivedLedgerState {
254 self.state.get_archive_ref()
255 }
256
257 /// Gets the total number of unread records in the buffer.
258 ///
259 /// This number is based on acknowledged reads only, which is to say that if 10 records are
260 /// written, and 8 of them have been read, but only 3 have been acked, then `get_total_records`
261 /// would return `7`.
262 pub fn get_total_records(&self) -> u64 {
263 let next_writer_id = self.state().get_next_writer_record_id();
264 let last_reader_id = self.state().get_last_reader_record_id();
265
266 next_writer_id.wrapping_sub(last_reader_id) - 1
267 }
268
269 /// Gets the total number of bytes for all unread records in the buffer.
270 ///
271 /// This number will often disagree with the size of files on disk, as data files are deleted
272 /// only after being read entirely, and are simply appended to when they are not yet full. This
273 /// leads to behavior where writes and reads will change this value only by the size of the
274 /// records being written and read, while data files on disk will grow incrementally, and be
275 /// deleted in full.
276 pub fn get_total_buffer_size(&self) -> u64 {
277 self.total_buffer_size.load(Ordering::Acquire)
278 }
279
280 /// Increments the total number of bytes for all unread records in the buffer.
281 pub fn increment_total_buffer_size(&self, amount: u64) {
282 let last_total_buffer_size = self.total_buffer_size.fetch_add(amount, Ordering::AcqRel);
283 trace!(
284 previous_buffer_size = last_total_buffer_size,
285 new_buffer_size = last_total_buffer_size + amount,
286 "Updated buffer size.",
287 );
288 }
289
290 /// Decrements the total number of bytes for all unread records in the buffer.
291 pub fn decrement_total_buffer_size(&self, amount: u64) {
292 let last_total_buffer_size = self.total_buffer_size.fetch_sub(amount, Ordering::AcqRel);
293 trace!(
294 previous_buffer_size = last_total_buffer_size,
295 new_buffer_size = last_total_buffer_size - amount,
296 "Updated buffer size.",
297 );
298 }
299
300 /// Gets the current reader file ID.
301 ///
302 /// This is internally adjusted to compensate for the fact that the reader can read far past
303 /// the latest acknowledge record/data file, and so is not representative of where the reader
304 /// would start reading from if the process crashed or was abruptly stopped.
305 pub fn get_current_reader_file_id(&self) -> u16 {
306 let unacked_offset = self.unacked_reader_file_id_offset.load(Ordering::Acquire);
307 self.state().get_offset_reader_file_id(unacked_offset)
308 }
309
310 /// Gets the current writer file ID.
311 pub fn get_current_writer_file_id(&self) -> u16 {
312 self.state().get_current_writer_file_id()
313 }
314
315 /// Gets the next writer file ID.
316 ///
317 /// This is purely a future-looking operation i.e. what would the file ID be if it was
318 /// incremented from its current value. It does not alter the current writer file ID.
319 pub fn get_next_writer_file_id(&self) -> u16 {
320 self.state().get_next_writer_file_id()
321 }
322
323 /// Gets the current reader and writer file IDs.
324 ///
325 /// Similar to [`get_current_reader_file_id`], the file ID returned for the reader compensates
326 /// for the acknowledgement state of the reader.
327 pub fn get_current_reader_writer_file_id(&self) -> (u16, u16) {
328 let reader = self.get_current_reader_file_id();
329 let writer = self.get_current_writer_file_id();
330
331 (reader, writer)
332 }
333
334 /// Gets the current reader data file path, accounting for the unacknowledged offset.
335 pub fn get_current_reader_data_file_path(&self) -> PathBuf {
336 self.get_data_file_path(self.get_current_reader_file_id())
337 }
338
339 /// Gets the current writer data file path.
340 pub fn get_current_writer_data_file_path(&self) -> PathBuf {
341 self.get_data_file_path(self.get_current_writer_file_id())
342 }
343
344 /// Gets the next writer data file path.
345 pub fn get_next_writer_data_file_path(&self) -> PathBuf {
346 self.get_data_file_path(self.state().get_next_writer_file_id())
347 }
348
349 /// Gets the data file path for an arbitrary file ID.
350 pub fn get_data_file_path(&self, file_id: u16) -> PathBuf {
351 self.config
352 .data_dir
353 .join(format!("buffer-data-{file_id}.dat"))
354 }
355
356 /// Waits for a signal from the reader that progress has been made.
357 ///
358 /// This will only occur when a record is read, which may allow enough space (below the maximum
359 /// configured buffer size) for a write to occur, or similarly, when a data file is deleted.
360 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
361 pub async fn wait_for_reader(&self) {
362 self.reader_notify.notified().await;
363 }
364
365 /// Waits for a signal from the writer that progress has been made.
366 ///
367 /// This will occur when a record is written, or when a new data file is created.
368 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
369 pub async fn wait_for_writer(&self) {
370 self.writer_notify.notified().await;
371 }
372
373 /// Notifies all tasks waiting on progress by the reader.
374 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
375 pub fn notify_reader_waiters(&self) {
376 self.reader_notify.notify_one();
377 }
378
379 /// Notifies all tasks waiting on progress by the writer.
380 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
381 pub fn notify_writer_waiters(&self) {
382 self.writer_notify.notify_one();
383 }
384
385 /// Tracks the statistics of a successful write.
386 pub fn track_write(&self, event_count: u64, record_size: u64) {
387 self.increment_total_buffer_size(record_size);
388 self.usage_handle
389 .increment_received_event_count_and_byte_size(event_count, record_size);
390 }
391
392 /// Tracks the statistics of multiple successful reads.
393 pub fn track_reads(&self, event_count: u64, total_record_size: u64) {
394 self.decrement_total_buffer_size(total_record_size);
395 self.usage_handle
396 .increment_sent_event_count_and_byte_size(event_count, total_record_size);
397 }
398
399 /// Marks the writer as finished.
400 ///
401 /// If the writer was not yet marked done, `false` is returned. Otherwise, `true` is returned,
402 /// and the caller should handle any necessary logic for closing the writer.
403 pub fn mark_writer_done(&self) -> bool {
404 self.writer_done
405 .compare_exchange_weak(false, true, Ordering::SeqCst, Ordering::SeqCst)
406 .is_ok()
407 }
408
409 /// Returns `true` if the writer was marked as done.
410 pub fn is_writer_done(&self) -> bool {
411 self.writer_done.load(Ordering::Acquire)
412 }
413
414 /// Increments the pending acknowledgement counter by the given amount.
415 pub fn increment_pending_acks(&self, amount: u64) {
416 self.pending_acks.fetch_add(amount, Ordering::AcqRel);
417 }
418
419 /// Consumes the full amount of pending acknowledgements, and resets the counter to zero.
420 pub fn consume_pending_acks(&self) -> u64 {
421 self.pending_acks.swap(0, Ordering::AcqRel)
422 }
423
424 /// Increments the unacknowledged reader file ID.
425 ///
426 /// As further described in `increment_acked_reader_file_id`, the underlying value here allows
427 /// the reader to read ahead of a data file, even if it hasn't been durably processed yet.
428 pub fn increment_unacked_reader_file_id(&self) {
429 let last_unacked_reader_file_id_offset = self
430 .unacked_reader_file_id_offset
431 .fetch_add(1, Ordering::AcqRel);
432 trace!(
433 unacked_reader_file_id_offset = last_unacked_reader_file_id_offset + 1,
434 "Incremented unacknowledged reader file ID."
435 );
436 }
437
438 /// Increments the acknowledged reader file ID.
439 ///
440 /// As records may be read and stored for a small period of time (batching in a sink, etc), we
441 /// cannot truly say that we have durably processed a record until the caller acknowledges the
442 /// record. However, if we always waited for an acknowledgement, then each read could be forced
443 /// to wait for multiple seconds. Such a design would clearly be unusable.
444 ///
445 /// Instead, we allow the reader to move ahead of the latest acknowledged record by tracking
446 /// their current file ID and acknowledged file ID separately. Once all records in a file have
447 /// been acknowledged, the data file can be deleted and the reader file ID can be durably
448 /// stored in the ledger.
449 ///
450 /// Callers use [`increment_unacked_reader_file_id`] to move to the next data file without
451 /// tracking that the previous data file has been durably processed and can be deleted, and
452 /// [`increment_acked_reader_file_id`] is the reciprocal function which tracks the highest data
453 /// file that _has_ been durably processed.
454 ///
455 /// Since the unacked file ID is simply a relative offset to the acked file ID, we decrement it
456 /// here to keep the "current" file ID stable.
457 pub fn increment_acked_reader_file_id(&self) {
458 let new_reader_file_id = self.state().increment_reader_file_id();
459
460 // We ignore the return value because when the value is already zero, we don't want to do an
461 // update, so we return `None`, which causes `fetch_update` to return `Err`. It's not
462 // really an error, we just wanted to avoid the extra atomic compare/exchange.
463 //
464 // Basically, this call is actually infallible for our purposes.
465 let result = self.unacked_reader_file_id_offset.fetch_update(
466 Ordering::Release,
467 Ordering::Relaxed,
468 |n| {
469 if n == 0 { None } else { Some(n - 1) }
470 },
471 );
472
473 trace!(
474 unacked_reader_file_id_offset = result.map(|n| n - 1).unwrap_or(0),
475 acked_reader_file_id_offset = new_reader_file_id,
476 "Incremented acknowledged reader file ID offset with corresponding unacknowledged decrement."
477 );
478 }
479
480 /// Determines whether or not all files should be flushed/fsync'd to disk.
481 ///
482 /// In the case of concurrent callers when the flush deadline has been exceeded, only one caller
483 /// will get a return value of `true`, and the others will receive `false`. The caller that
484 /// receives `true` is responsible for flushing the necessary files.
485 pub fn should_flush(&self) -> bool {
486 let last_flush = self.last_flush.load();
487 if last_flush.elapsed() > self.config.flush_interval
488 && self
489 .last_flush
490 .compare_exchange(last_flush, Instant::now())
491 .is_ok()
492 {
493 return true;
494 }
495
496 false
497 }
498
499 /// Flushes the memory-mapped file backing the ledger to disk.
500 ///
501 /// This operation is synchronous.
502 ///
503 /// # Errors
504 ///
505 /// If there is an error while flushing the ledger to disk, an error variant will be returned
506 /// describing the error.
507 pub(super) fn flush(&self) -> io::Result<()> {
508 self.state.get_backing_ref().flush()
509 }
510
511 /// Synchronizes the record count and total size of the buffer with buffer usage data.
512 ///
513 /// This should not be called until both the reader and writer have been initialized via
514 /// [`Reader::seek_to_last_record`] and [`Writer::validate_last_write`], otherwise the values
515 /// will not be accurate.
516 pub fn synchronize_buffer_usage(&self) {
517 let initial_buffer_events = self.get_total_records();
518 let initial_buffer_size = self.get_total_buffer_size();
519 self.usage_handle
520 .increment_received_event_count_and_byte_size(
521 initial_buffer_events,
522 initial_buffer_size,
523 );
524 }
525
526 pub fn track_dropped_events(&self, count: u64) {
527 // We don't know how many bytes are represented by dropped events because we never actually had a chance to read
528 // them, so we have to use a byte size of 0 here.
529 //
530 // On the flipside, this would only matter if we incremented the buffer size and simultaneously skipped/lost the
531 // events within the same process lifecycle, since otherwise we'd start from the correct buffer size when
532 // loading the buffer initially.
533 //
534 // TODO: Can we do better here?
535 self.usage_handle
536 .increment_dropped_event_count_and_byte_size(count, 0, false);
537 }
538}
539
540impl<FS> Ledger<FS>
541where
542 FS: Filesystem + 'static,
543 FS::File: Unpin,
544{
545 /// Loads or creates a ledger for the given [`DiskBufferConfig`].
546 ///
547 /// If the ledger file does not yet exist, a default ledger state will be created and persisted
548 /// to disk. Otherwise, the ledger file on disk will be loaded and verified.
549 ///
550 /// # Errors
551 ///
552 /// If there is an error during either serialization of the new, default ledger state, or
553 /// deserializing existing data in the ledger file, or generally during the underlying I/O
554 /// operations, an error variant will be returned describing the error.
555 #[cfg_attr(test, instrument(skip_all, level = "trace"))]
556 pub(super) async fn load_or_create(
557 config: DiskBufferConfig<FS>,
558 usage_handle: BufferUsageHandle,
559 ) -> Result<Ledger<FS>, LedgerLoadCreateError> {
560 // Create our containing directory if it doesn't already exist.
561 fs::create_dir_all(&config.data_dir)
562 .await
563 .context(IoSnafu)?;
564
565 // Acquire an exclusive lock on our lock file, which prevents another Vector process from
566 // loading this buffer and clashing with us. Specifically, though: this does _not_ prevent
567 // another process from messing with our ledger files, or any of the data files, etc.
568 //
569 // TODO: It'd be nice to incorporate this within `Filesystem` to fully encapsulate _all_
570 // file I/O, but the code is so specific, including the drop guard for the lock file, that I
571 // don't know if it's worth it.
572 let ledger_lock_path = config.data_dir.join("buffer.lock");
573 let mut lock = LockFile::open(&ledger_lock_path).context(IoSnafu)?;
574 if !lock.try_lock().context(IoSnafu)? {
575 return Err(LedgerLoadCreateError::LedgerLockAlreadyHeld);
576 }
577
578 // Open the ledger file, which may involve creating it if it doesn't yet exist.
579 let ledger_path = config.data_dir.join("buffer.db");
580 let mut ledger_handle = config
581 .filesystem
582 .open_file_writable(&ledger_path)
583 .await
584 .context(IoSnafu)?;
585
586 // If we just created the ledger file, then we need to create the default ledger state, and
587 // then serialize and write to the file, before trying to load it as a memory-mapped file.
588 let ledger_metadata = ledger_handle.metadata().await.context(IoSnafu)?;
589 let ledger_len = ledger_metadata.len();
590 if ledger_len == 0 {
591 debug!("Ledger file empty. Initializing with default ledger state.");
592 let mut buf = BytesMut::new();
593 loop {
594 match BackedArchive::from_value(&mut buf, LedgerState::default()) {
595 Ok(archive) => {
596 ledger_handle
597 .write_all(archive.get_backing_ref())
598 .await
599 .context(IoSnafu)?;
600 break;
601 }
602 Err(SerializeError::FailedToSerialize(reason)) => {
603 return Err(LedgerLoadCreateError::FailedToSerialize { reason });
604 }
605 // Our buffer wasn't big enough, but that's OK! Resize it and try again.
606 Err(SerializeError::BackingStoreTooSmall(_, min_len)) => buf.resize(min_len, 0),
607 }
608 }
609
610 // Now sync the file to ensure everything is on disk before proceeding.
611 ledger_handle.sync_all().await.context(IoSnafu)?;
612 }
613
614 // Load the ledger state by memory-mapping the ledger file, and zero-copy deserializing our
615 // ledger state back out of it.
616 let ledger_mmap = config
617 .filesystem
618 .open_mmap_writable(&ledger_path)
619 .await
620 .context(IoSnafu)?;
621 let ledger_state = match BackedArchive::from_backing(ledger_mmap) {
622 // Deserialized the ledger state without issue from an existing file.
623 Ok(backed) => backed,
624 // Either invalid data, or the buffer doesn't represent a valid ledger structure.
625 Err(e) => {
626 return Err(LedgerLoadCreateError::FailedToDeserialize {
627 reason: e.into_inner(),
628 });
629 }
630 };
631
632 // Create the ledger object, and synchronize the buffer statistics with the buffer usage
633 // handle. This handles making sure we account for the starting size of the buffer, and
634 // what not.
635 let mut ledger = Ledger {
636 config,
637 lock,
638 state: ledger_state,
639 total_buffer_size: AtomicU64::new(0),
640 reader_notify: Notify::new(),
641 writer_notify: Notify::new(),
642 writer_done: AtomicBool::new(false),
643 pending_acks: AtomicU64::new(0),
644 unacked_reader_file_id_offset: AtomicU16::new(0),
645 last_flush: AtomicCell::new(Instant::now()),
646 usage_handle,
647 };
648 ledger.update_buffer_size().await?;
649
650 Ok(ledger)
651 }
652
653 async fn update_buffer_size(&mut self) -> Result<(), LedgerLoadCreateError> {
654 // Under normal operation, the reader and writer maintain a consistent state within the
655 // ledger. However, due to the nature of how we update the ledger, process crashes could
656 // lead to missed updates as we execute reads and writes as non-atomic units of execution:
657 // update a field, do the read/write, update some more fields depending on success or
658 // failure, etc.
659 //
660 // This is an issue because we depend on knowing the total buffer size (the total size of
661 // unread records, specifically) so that we can correctly limit writes when we've reached
662 // the configured maximum buffer size.
663 //
664 // While it's not terribly efficient, and I'd like to eventually formulate a better design,
665 // this approach is absolutely correct: get the file size of every data file on disk,
666 // and set the "total buffer size" to the sum of all of those file sizes.
667 //
668 // When the reader does any necessary seeking to get to the record it left off on, it will
669 // adjust the "total buffer size" downwards for each record it runs through, leaving "total
670 // buffer size" at the correct value.
671 let mut dat_reader = fs::read_dir(&self.config.data_dir).await.context(IoSnafu)?;
672
673 let mut total_buffer_size = 0;
674 while let Some(dir_entry) = dat_reader.next_entry().await.context(IoSnafu)? {
675 if let Some(file_name) = dir_entry.file_name().to_str() {
676 // I really _do_ want to only find files with a .dat extension, as that's what the
677 // code generates, and having them be .dAt or .Dat or whatever would indicate that
678 // the file is not related to our buffer. If we had to cope with case-sensitivity
679 // of filenames from another program/OS, then it would be a different story.
680 #[allow(clippy::case_sensitive_file_extension_comparisons)]
681 if file_name.ends_with(".dat") {
682 let metadata = dir_entry.metadata().await.context(IoSnafu)?;
683 total_buffer_size += metadata.len();
684
685 debug!(
686 data_file = file_name,
687 file_size = metadata.len(),
688 total_buffer_size,
689 "Found existing data file."
690 );
691 }
692 }
693 }
694
695 self.increment_total_buffer_size(total_buffer_size);
696
697 Ok(())
698 }
699
700 #[must_use]
701 pub(super) fn spawn_finalizer(self: Arc<Self>) -> OrderedFinalizer<u64> {
702 let (finalizer, mut stream) = OrderedFinalizer::new(None);
703 tokio::spawn(async move {
704 while let Some((_status, amount)) = stream.next().await {
705 self.increment_pending_acks(amount);
706 self.notify_writer_waiters();
707 }
708 });
709 finalizer
710 }
711}
712
713impl<FS> fmt::Debug for Ledger<FS>
714where
715 FS: Filesystem + fmt::Debug,
716{
717 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
718 f.debug_struct("Ledger")
719 .field("config", &self.config)
720 .field("state", &self.state.get_archive_ref())
721 .field(
722 "total_buffer_size",
723 &self.total_buffer_size.load(Ordering::Acquire),
724 )
725 .field("pending_acks", &self.pending_acks.load(Ordering::Acquire))
726 .field(
727 "unacked_reader_file_id_offset",
728 &self.unacked_reader_file_id_offset.load(Ordering::Acquire),
729 )
730 .field("writer_done", &self.writer_done.load(Ordering::Acquire))
731 .field("last_flush", &self.last_flush.load())
732 .finish_non_exhaustive()
733 }
734}