vector_buffers/variants/disk_v2/
reader.rs

1use std::{
2    cmp, fmt,
3    io::{self, ErrorKind},
4    marker::PhantomData,
5    num::NonZeroU64,
6    path::PathBuf,
7    sync::Arc,
8};
9
10use crc32fast::Hasher;
11use rkyv::{AlignedVec, archived_root};
12use snafu::{ResultExt, Snafu};
13use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
14use vector_common::{finalization::BatchNotifier, finalizer::OrderedFinalizer};
15
16use super::{
17    Filesystem,
18    common::create_crc32c_hasher,
19    ledger::Ledger,
20    record::{ArchivedRecord, Record, RecordStatus, validate_record_archive},
21};
22use crate::{
23    Bufferable,
24    encoding::{AsMetadata, Encodable},
25    internal_events::BufferReadError,
26    topology::acks::{EligibleMarker, EligibleMarkerLength, MarkerError, OrderedAcknowledgements},
27    variants::disk_v2::{io::AsyncFile, record::try_as_record_archive},
28};
29
30pub(super) struct ReadToken {
31    record_id: u64,
32    record_bytes: usize,
33}
34
35impl ReadToken {
36    pub fn new(record_id: u64, record_bytes: usize) -> Self {
37        Self {
38            record_id,
39            record_bytes,
40        }
41    }
42
43    pub fn record_id(&self) -> u64 {
44        self.record_id
45    }
46
47    pub fn record_bytes(&self) -> usize {
48        self.record_bytes
49    }
50
51    fn into_record_id(self) -> u64 {
52        self.record_id
53    }
54}
55
56/// Error that occurred during calls to [`BufferReader`].
57#[derive(Debug, Snafu)]
58pub enum ReaderError<T>
59where
60    T: Bufferable,
61{
62    /// A general I/O error occurred.
63    ///
64    /// Different methods will capture specific I/O errors depending on the situation, as some
65    /// errors may be expected and considered normal by design.  For all I/O errors that are
66    /// considered atypical, they will be returned as this variant.
67    #[snafu(display("read I/O error: {}", source))]
68    Io { source: io::Error },
69
70    /// The reader failed to deserialize the record.
71    ///
72    /// In most cases, this indicates that the data file being read was corrupted or truncated in
73    /// some fashion.  Callers of [`BufferReader::next`] will not actually receive this error, as it is
74    /// handled internally by moving to the next data file, as corruption may have affected other
75    /// records in a way that is not easily detectable and could lead to records which
76    /// deserialize/decode but contain invalid data.
77    #[snafu(display("failed to deserialize encoded record from buffer: {}", reason))]
78    Deserialization { reason: String },
79
80    /// The record's checksum did not match.
81    ///
82    /// In most cases, this indicates that the data file being read was corrupted or truncated in
83    /// some fashion.  Callers of [`BufferReader::next`] will not actually receive this error, as it is
84    /// handled internally by moving to the next data file, as corruption may have affected other
85    /// records in a way that is not easily detectable and could lead to records which
86    /// deserialize/decode but contain invalid data.
87    #[snafu(display(
88        "calculated checksum did not match the actual checksum: ({} vs {})",
89        calculated,
90        actual
91    ))]
92    Checksum { calculated: u32, actual: u32 },
93
94    /// The decoder encountered an issue during decoding.
95    ///
96    /// At this stage, the record can be assumed to have been written correctly, and read correctly
97    /// from disk, as the checksum was also validated.
98    #[snafu(display("failed to decoded record: {:?}", source))]
99    Decode {
100        source: <T as Encodable>::DecodeError,
101    },
102
103    /// The record is not compatible with this version of Vector.
104    ///
105    /// This can occur when records written to a buffer in previous versions of Vector are read by
106    /// newer versions of Vector where the encoding scheme, or record schema, used in the previous
107    /// version of Vector are no longer able to be decoded in this version of Vector.
108    #[snafu(display("record version not compatible: {}", reason))]
109    Incompatible { reason: String },
110
111    /// The reader detected that a data file contains a partially-written record.
112    ///
113    /// Records should never be partially written to a data file (we don't split records across data
114    /// files) so this would be indicative of a write that was never properly written/flushed, or
115    /// some issue with the write where it was acknowledged but the data/file was corrupted in same way.
116    ///
117    /// This is effectively the same class of error as an invalid checksum/failed deserialization.
118    PartialWrite,
119
120    /// The record reported an event count of zero.
121    ///
122    /// Empty records should not be allowed to be written, so this represents either a bug with the
123    /// writing logic of the buffer, or a record that does not use a symmetrical encoding scheme,
124    /// which is also not supported.
125    EmptyRecord,
126}
127
128impl<T> ReaderError<T>
129where
130    T: Bufferable,
131{
132    fn is_bad_read(&self) -> bool {
133        matches!(
134            self,
135            ReaderError::Checksum { .. }
136                | ReaderError::Deserialization { .. }
137                | ReaderError::PartialWrite
138        )
139    }
140
141    fn as_error_code(&self) -> &'static str {
142        match self {
143            ReaderError::Io { .. } => "io_error",
144            ReaderError::Deserialization { .. } => "deser_failed",
145            ReaderError::Checksum { .. } => "checksum_mismatch",
146            ReaderError::Decode { .. } => "decode_failed",
147            ReaderError::Incompatible { .. } => "incompatible_record_version",
148            ReaderError::PartialWrite => "partial_write",
149            ReaderError::EmptyRecord => "empty_record",
150        }
151    }
152
153    pub fn as_recoverable_error(&self) -> Option<BufferReadError> {
154        let error = self.to_string();
155        let error_code = self.as_error_code();
156
157        match self {
158            ReaderError::Io { .. } | ReaderError::EmptyRecord => None,
159            ReaderError::Deserialization { .. }
160            | ReaderError::Checksum { .. }
161            | ReaderError::Decode { .. }
162            | ReaderError::Incompatible { .. }
163            | ReaderError::PartialWrite => Some(BufferReadError { error_code, error }),
164        }
165    }
166}
167
168impl<T: Bufferable> PartialEq for ReaderError<T> {
169    fn eq(&self, other: &Self) -> bool {
170        match (self, other) {
171            (Self::Io { source: l_source }, Self::Io { source: r_source }) => {
172                l_source.kind() == r_source.kind()
173            }
174            (
175                Self::Deserialization { reason: l_reason },
176                Self::Deserialization { reason: r_reason },
177            ) => l_reason == r_reason,
178            (
179                Self::Checksum {
180                    calculated: l_calculated,
181                    actual: l_actual,
182                },
183                Self::Checksum {
184                    calculated: r_calculated,
185                    actual: r_actual,
186                },
187            ) => l_calculated == r_calculated && l_actual == r_actual,
188            (Self::Decode { .. }, Self::Decode { .. }) => true,
189            (Self::Incompatible { reason: l_reason }, Self::Incompatible { reason: r_reason }) => {
190                l_reason == r_reason
191            }
192            _ => core::mem::discriminant(self) == core::mem::discriminant(other),
193        }
194    }
195}
196
197/// Buffered reader that handles deserialization, checksumming, and decoding of records.
198pub(super) struct RecordReader<R, T> {
199    reader: BufReader<R>,
200    aligned_buf: AlignedVec,
201    checksummer: Hasher,
202    current_record_id: u64,
203    _t: PhantomData<T>,
204}
205
206impl<R, T> RecordReader<R, T>
207where
208    R: AsyncRead + Unpin,
209    T: Bufferable,
210{
211    /// Creates a new [`RecordReader`] around the provided reader.
212    ///
213    /// Internally, the reader is wrapped in a [`BufReader`], so callers should not pass in an
214    /// already buffered reader.
215    pub fn new(reader: R) -> Self {
216        Self {
217            reader: BufReader::with_capacity(256 * 1024, reader),
218            aligned_buf: AlignedVec::new(),
219            checksummer: create_crc32c_hasher(),
220            current_record_id: 0,
221            _t: PhantomData,
222        }
223    }
224
225    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
226    async fn read_length_delimiter(
227        &mut self,
228        is_finalized: bool,
229    ) -> Result<Option<usize>, ReaderError<T>> {
230        loop {
231            let available = self.reader.buffer().len();
232            if available >= 8 {
233                let length_buf = &self.reader.buffer()[..8];
234                let length = length_buf
235                    .try_into()
236                    .expect("the slice is the length of a u64");
237                self.reader.consume(8);
238
239                // By default, records cannot exceed 8MB in length, so whether our `usize` is a u32
240                // or u64, we're not going to overflow it.  While the maximum record size _can_ be
241                // changed, it's not currently exposed to users.  Even further, if it was exposed to
242                // users, it's currently a `usize`, so again, we know that we're not going to exceed
243                // 64-bit. And even further still, the writer fallibly attempts to get a `u64` of the
244                // record size based on the encoding buffer, which gives its length in `usize`, and
245                // so would fail if `usize` was larger than `u64`, meaning we at least will panic if
246                // Vector is running on a 128-bit CPU in the future, storing records that are larger
247                // than 2^64+1. :)
248                let record_len = u64::from_be_bytes(length)
249                    .try_into()
250                    .expect("record length should never exceed usize");
251                return Ok(Some(record_len));
252            }
253
254            // We don't have enough bytes, so we need to fill our buffer again.
255            let buf = self.reader.fill_buf().await.context(IoSnafu)?;
256            if buf.is_empty() {
257                return Ok(None);
258            }
259
260            // If we tried to read more bytes, and we still don't have enough for the record
261            // delimiter, and the data file has been finalized already: we've got a partial
262            // write situation on our hands.
263            if buf.len() < 8 && is_finalized {
264                return Err(ReaderError::PartialWrite);
265            }
266        }
267    }
268
269    /// Attempts to read a record.
270    ///
271    /// Records are preceded by a length delimiter, a fixed-size integer (currently 8 bytes) that
272    /// tells the reader how many more bytes to read in order to completely read the next record.
273    ///
274    /// If there are no more bytes to read, we return early in order to allow the caller to wait
275    /// until such a time where there should be more data, as no wake-ups can be generated when
276    /// reading a file after reaching EOF.
277    ///
278    /// If there is any data available, we attempt to continue reading until both a length
279    /// delimiter, and the accompanying record, can be read in their entirety.
280    ///
281    /// If a record is able to be read in its entirety, a token is returned to caller that can be
282    /// used with [`read_record`] in order to get an owned `T`.  This is due to a quirk with the
283    /// compiler's ability to track stacked mutable references through conditional control flows, of
284    /// which is handled by splitting the "do we have a valid record in our buffer?" logic from the
285    /// "read that record and decode it" logic.
286    ///
287    /// # Finalized reads
288    ///
289    /// All of the above logic applies when `is_finalized` is `false`, which signals that a data
290    /// file is still currently being written to.  If `is_finalized` is `true`, most of the above
291    /// logic applies but in cases where we detect a partial write, we explicitly return an error
292    /// for a partial read.
293    ///
294    /// In practice, what this means is that when we believe a file should be "finalized" -- the
295    /// writer flushed the file to disk, the ledger has been flushed, etc -- then we also expect to
296    /// be able to read all bytes with no leftover.  A partially-written length delimiter, or
297    /// record, would be indicative of a bug with the writer or OS/disks, essentially telling us
298    /// that the current data file is not valid for reads anymore.  We don't know _why_ it's in this
299    /// state, only that something is not right and that we must skip the file.
300    ///
301    /// # Errors
302    ///
303    /// Errors can occur during the I/O or deserialization stage.  If an error occurs during any of
304    /// these stages, an appropriate error variant will be returned describing the error.
305    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
306    pub async fn try_next_record(
307        &mut self,
308        is_finalized: bool,
309    ) -> Result<Option<ReadToken>, ReaderError<T>> {
310        let Some(record_len) = self.read_length_delimiter(is_finalized).await? else {
311            return Ok(None);
312        };
313
314        if record_len == 0 {
315            return Err(ReaderError::Deserialization {
316                reason: "record length was zero".to_string(),
317            });
318        }
319
320        // Read in all of the bytes we need first.
321        self.aligned_buf.clear();
322        while self.aligned_buf.len() < record_len {
323            let needed = record_len - self.aligned_buf.len();
324            let buf = self.reader.fill_buf().await.context(IoSnafu)?;
325            if buf.is_empty() && is_finalized {
326                // If we needed more data, but there was none available, and we're finalized: we've
327                // got ourselves a partial write situation.
328                return Err(ReaderError::PartialWrite);
329            }
330
331            let available = cmp::min(buf.len(), needed);
332            self.aligned_buf.extend_from_slice(&buf[..available]);
333            self.reader.consume(available);
334        }
335
336        // Now see if we can deserialize our archived record from this.
337        let buf = self.aligned_buf.as_slice();
338        match validate_record_archive(buf, &self.checksummer) {
339            RecordStatus::FailedDeserialization(de) => Err(ReaderError::Deserialization {
340                reason: de.into_inner(),
341            }),
342            RecordStatus::Corrupted { calculated, actual } => {
343                Err(ReaderError::Checksum { calculated, actual })
344            }
345            RecordStatus::Valid { id, .. } => {
346                self.current_record_id = id;
347                // TODO: Another spot where our hardcoding of the length delimiter size in bytes is fragile.
348                Ok(Some(ReadToken::new(id, 8 + buf.len())))
349            }
350        }
351    }
352
353    /// Reads the record associated with the given [`ReadToken`].
354    ///
355    /// # Errors
356    ///
357    /// If an error occurs during decoding, an error variant will be returned describing the error.
358    ///
359    /// # Panics
360    ///
361    /// If a `ReadToken` is not used in a call to `read_record` before again calling
362    /// `try_next_record`, and the `ReadToken` from _that_ call is used, this method will panic due
363    /// to an out-of-order read.
364    pub fn read_record(&mut self, token: ReadToken) -> Result<T, ReaderError<T>> {
365        let record_id = token.into_record_id();
366        assert_eq!(
367            self.current_record_id, record_id,
368            "using expired read token; this is a serious bug"
369        );
370
371        // SAFETY:
372        // - `try_next_record` is the only method that can hand back a `ReadToken`
373        // - we only get a `ReadToken` if there's a valid record in `self.aligned_buf`
374        // - `try_next_record` does all the archive checks, checksum validation, etc
375        let record = unsafe { archived_root::<Record<'_>>(&self.aligned_buf) };
376
377        decode_record_payload(record)
378    }
379}
380
381impl<R, T> fmt::Debug for RecordReader<R, T>
382where
383    R: fmt::Debug,
384{
385    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386        f.debug_struct("RecordReader")
387            .field("reader", &self.reader)
388            .field("aligned_buf", &self.aligned_buf)
389            .field("checksummer", &self.checksummer)
390            .field("current_record_id", &self.current_record_id)
391            .finish()
392    }
393}
394
395/// Reads records from the buffer.
396#[derive(Debug)]
397pub struct BufferReader<T, FS>
398where
399    FS: Filesystem,
400{
401    ledger: Arc<Ledger<FS>>,
402    reader: Option<RecordReader<FS::File, T>>,
403    bytes_read: u64,
404    last_reader_record_id: u64,
405    data_file_start_record_id: Option<u64>,
406    data_file_record_count: u64,
407    data_file_marked_record_count: u64,
408    ready_to_read: bool,
409    record_acks: OrderedAcknowledgements<u64, u64>,
410    data_file_acks: OrderedAcknowledgements<u64, (PathBuf, u64)>,
411    finalizer: OrderedFinalizer<u64>,
412    _t: PhantomData<T>,
413}
414
415impl<T, FS> BufferReader<T, FS>
416where
417    T: Bufferable,
418    FS: Filesystem,
419    FS::File: Unpin,
420{
421    /// Creates a new [`BufferReader`] attached to the given [`Ledger`].
422    pub(crate) fn new(ledger: Arc<Ledger<FS>>, finalizer: OrderedFinalizer<u64>) -> Self {
423        let ledger_last_reader_record_id = ledger.state().get_last_reader_record_id();
424        let next_expected_record_id = ledger_last_reader_record_id.wrapping_add(1);
425
426        Self {
427            ledger,
428            reader: None,
429            bytes_read: 0,
430            last_reader_record_id: 0,
431            data_file_start_record_id: None,
432            data_file_record_count: 0,
433            data_file_marked_record_count: 0,
434            ready_to_read: false,
435            record_acks: OrderedAcknowledgements::from_acked(next_expected_record_id),
436            data_file_acks: OrderedAcknowledgements::from_acked(0),
437            finalizer,
438            _t: PhantomData,
439        }
440    }
441
442    fn reset(&mut self) {
443        self.reader = None;
444        self.bytes_read = 0;
445        self.data_file_start_record_id = None;
446    }
447
448    fn track_read(&mut self, record_id: u64, record_bytes: u64, event_count: NonZeroU64) {
449        // We explicitly reduce the event count by one here in order to correctly calculate the
450        // "last" record ID, which you can visualize as follows...
451        //
452        // [ record 1 ] [ record 2 ] [ record 3 ] [....]
453        // [0]          [1] [2] [3]  [4] [5]      [....]
454        //
455        // For each of these records, their "last ID" is simply the ID of the first event within the
456        // record, plus the event count, minus one.  Another way to look at it is that the "last"
457        // reader record ID is always one behind the next expected record ID.  In the above example,
458        // the next record ID we would expect would be 6, regardless of how many events the record has.
459        self.last_reader_record_id = record_id.wrapping_add(event_count.get() - 1);
460        if self.data_file_start_record_id.is_none() {
461            self.data_file_start_record_id = Some(record_id);
462        }
463
464        // Track the amount of data we read.  If we're still loading the buffer, then the only thing
465        // other we need to do is update the total buffer size.  Everything else below only matters
466        // when we're doing real record reads.
467        self.bytes_read += record_bytes;
468        if !self.ready_to_read {
469            self.ledger.decrement_total_buffer_size(record_bytes);
470            return;
471        }
472
473        // We've done a "real" record read, so we need to track it for acknowledgement.  Check our
474        // acknowledge state first to see if this is the next record ID we expected.
475        self.data_file_record_count += 1;
476        if let Err(me) =
477            self.record_acks
478                .add_marker(record_id, Some(event_count.get()), Some(record_bytes))
479        {
480            match me {
481                MarkerError::MonotonicityViolation => {
482                    panic!("record ID monotonicity violation detected; this is a serious bug")
483                }
484            }
485        }
486    }
487
488    #[cfg_attr(test, instrument(skip_all, level = "debug"))]
489    async fn delete_completed_data_file(
490        &mut self,
491        data_file_path: PathBuf,
492        bytes_read: Option<u64>,
493    ) -> io::Result<()> {
494        // TODO: Could we actually make this a background task to remove the tail latency from the
495        // read path?  Technically all that's needed is a handle to the ledger and the data file
496        // path, so as long as the logic is still right, we can notify writers out-of-band.
497        debug!(
498            data_file_path = data_file_path.to_string_lossy().as_ref(),
499            bytes_read, "Deleting completed data file."
500        );
501
502        // Grab the size of the data file before we delete it, which gives us a chance to fix up the
503        // total buffer size for corrupted files or fast-forwarded files.
504        //
505        // Since we only decrement the buffer size after a successful read in normal cases, skipping
506        // the rest of a corrupted file could lead to the total buffer size being unsynchronized.
507        // We use the difference between the number of bytes read and the file size to figure out if
508        // we need to make a manual adjustment.
509        //
510        // Likewise, when we skip over a file in "fast forward" mode during initialization, no reads
511        // occur at all, so we're relying on this method to correct the buffer size for us.  This is
512        // why `bytes_read` is optional: when it's specified, we calculate a delta for handling
513        // partial-read scenarios, otherwise, we just use the entire data file size as is.
514        let data_file = self
515            .ledger
516            .filesystem()
517            .open_file_readable(&data_file_path)
518            .await?;
519        let metadata = data_file.metadata().await?;
520
521        let decrease_amount = bytes_read.map_or_else(
522            || metadata.len(),
523            |bytes_read| {
524                let size_delta = metadata.len() - bytes_read;
525                if size_delta > 0 {
526                    debug!(
527                        actual_file_size = metadata.len(),
528                        bytes_read,
529                        "Data file was only partially read. Adjusting buffer size to compensate.",
530                    );
531                }
532
533                size_delta
534            },
535        );
536
537        if decrease_amount > 0 {
538            self.ledger.decrement_total_buffer_size(decrease_amount);
539        }
540
541        drop(data_file);
542
543        // Delete the current data file, and increment our actual reader file ID.
544        self.ledger
545            .filesystem()
546            .delete_file(&data_file_path)
547            .await?;
548        self.ledger.increment_acked_reader_file_id();
549        self.ledger.flush()?;
550
551        debug!("Flushed after deleting data file, notifying writers and continuing.");
552
553        // Notify any waiting writers that we've deleted a data file, which they may be waiting on
554        // because they're looking to reuse the file ID of the file we just finished reading.
555        self.ledger.notify_reader_waiters();
556
557        Ok(())
558    }
559
560    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
561    async fn handle_pending_acknowledgements(
562        &mut self,
563        force_check_pending_data_files: bool,
564    ) -> io::Result<()> {
565        // Acknowledgements effectively happen in two layers: record acknowledgement and data file
566        // acknowledgement.  Since records can contain multiple events, we need to track when a
567        // record itself has been fully acknowledged.  Likewise, data files contain multiple records,
568        // so we need to track when all records we've read from a data file have been acknowledged.
569
570        // Drive record acknowledgement first.
571        //
572        // We only do this if we actually consume any acknowledgements, and immediately update the
573        // buffer and ledger to more quickly get those metrics into good shape.  We defer notifying
574        // writers until after, though, in case we also have data files to delete, so that we can
575        // coalesce the notifications together at the very end of the method.
576        let mut had_eligible_records = false;
577        let mut records_acknowledged: u64 = 0;
578        let mut events_acknowledged: u64 = 0;
579        let mut events_skipped: u64 = 0;
580        let mut bytes_acknowledged: u64 = 0;
581
582        let consumed_acks = self.ledger.consume_pending_acks();
583        if consumed_acks > 0 {
584            self.record_acks.add_acknowledgements(consumed_acks);
585
586            while let Some(EligibleMarker { len, data, .. }) =
587                self.record_acks.get_next_eligible_marker()
588            {
589                had_eligible_records = true;
590
591                match len {
592                    // Any marker with an assumed length implies a gap marker, which gets added
593                    // automatically and represents a portion of the record ID range that was
594                    // expected but missing. This is a long way of saying: we're missing records.
595                    //
596                    // We tally this up so that we can emit a single log event/set of metrics, as
597                    // there may be many gap markers and emitting for each of them could be very noisy.
598                    EligibleMarkerLength::Assumed(count) => {
599                        events_skipped = events_skipped
600                            .checked_add(count)
601                            .expect("skipping more than 2^64 events at a time is obviously a bug");
602                    }
603                    // We got a valid marker representing a known number of events.
604                    EligibleMarkerLength::Known(len) => {
605                        // We specifically pass the size of the record, in bytes, as the marker data.
606                        let record_bytes = data.expect("record bytes should always be known");
607
608                        records_acknowledged = records_acknowledged.checked_add(1).expect(
609                            "acknowledging more than 2^64 records at a time is obviously a bug",
610                        );
611                        events_acknowledged = events_acknowledged.checked_add(len).expect(
612                            "acknowledging more than 2^64 events at a time is obviously a bug",
613                        );
614                        bytes_acknowledged = bytes_acknowledged.checked_add(record_bytes).expect(
615                            "acknowledging more than 2^64 bytes at a time is obviously a bug",
616                        );
617                    }
618                }
619            }
620
621            // We successfully processed at least one record, so update our buffer and ledger accounting.
622            if had_eligible_records {
623                self.ledger
624                    .track_reads(events_acknowledged, bytes_acknowledged);
625
626                // We need to account for skipped events, too, so that our "last reader record ID"
627                // value stays correct as we process these gap markers.
628                let last_increment_amount = events_acknowledged + events_skipped;
629                self.ledger
630                    .state()
631                    .increment_last_reader_record_id(last_increment_amount);
632
633                self.data_file_acks
634                    .add_acknowledgements(records_acknowledged);
635            }
636
637            // If any events were skipped, do our logging/metrics for that.
638            if events_skipped > 0 {
639                self.ledger.track_dropped_events(events_skipped);
640            }
641        }
642
643        // If we processed any eligible records, we may now also have eligible data files.
644        //
645        // Alternatively, the core `next` logic may have just rolled over to a new data file, and
646        // we're seeing if we can fast track any eligible data file deletions rather than waiting
647        // for more acknowledgements to come in.
648        let mut had_eligible_data_files = false;
649        let mut data_files_deleted: u16 = 0;
650
651        if had_eligible_records || force_check_pending_data_files {
652            // Now handle data file deletion.  We unconditionally check to see if any data files are
653            // eligible for deletion, and process them immediately.
654
655            while let Some(EligibleMarker { data, .. }) =
656                self.data_file_acks.get_next_eligible_marker()
657            {
658                had_eligible_data_files = true;
659
660                let (data_file_path, bytes_read) =
661                    data.expect("data file deletion marker should never be empty");
662                self.delete_completed_data_file(data_file_path, Some(bytes_read))
663                    .await?;
664
665                data_files_deleted = data_files_deleted
666                    .checked_add(1)
667                    .expect("deleting more than 2^16 data files at a time is obviously a bug");
668            }
669        }
670
671        // If we managed to processed any records _or_ any data file deletions, we've made
672        // meaningful progress that writers may care about, so notify them.
673        if had_eligible_data_files || had_eligible_records {
674            self.ledger.notify_reader_waiters();
675
676            if self.ready_to_read {
677                trace!(
678                    current_buffer_size = self.ledger.get_total_buffer_size(),
679                    records_acknowledged,
680                    events_acknowledged,
681                    events_skipped,
682                    bytes_acknowledged,
683                    data_files_deleted,
684                    "Finished handling acknowledgements."
685                );
686            }
687        }
688
689        Ok(())
690    }
691
692    /// Switches the reader over to the next data file to read.
693    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
694    fn roll_to_next_data_file(&mut self) {
695        // Add a marker for this data file so we know when it can be safely deleted.  We also need
696        // to track the necessary data to do our buffer accounting when it's eligible for deletion.
697        //
698        // In the rare case where the very first read in a new data file is corrupted/invalid and we
699        // roll to the next data file, we simply use the last reader record ID we have, which yields
700        // a marker with a length of 0.
701        let data_file_start_record_id = self
702            .data_file_start_record_id
703            .take()
704            .unwrap_or(self.last_reader_record_id);
705        // Record IDs are inclusive, so if last is 1 and start is 0, that means we had two events,
706        // potentially from one or two records.
707        let data_file_event_count = self
708            .last_reader_record_id
709            .wrapping_sub(data_file_start_record_id)
710            .saturating_add(1);
711        let data_file_record_count = self.data_file_record_count;
712        let data_file_path = self.ledger.get_current_reader_data_file_path();
713        let bytes_read = self.bytes_read;
714
715        debug!(
716            data_file_path = data_file_path.to_string_lossy().as_ref(),
717            first_record_id = data_file_start_record_id,
718            last_record_id = self.last_reader_record_id,
719            record_count = data_file_record_count,
720            event_count = data_file_event_count,
721            bytes_read,
722            "Marking data file for deletion."
723        );
724
725        let data_file_marker_id = self.data_file_marked_record_count;
726        self.data_file_marked_record_count += data_file_record_count;
727        self.data_file_record_count = 0;
728
729        self.data_file_acks
730            .add_marker(
731                data_file_marker_id,
732                Some(data_file_record_count),
733                Some((data_file_path, bytes_read)),
734            )
735            .expect("should not fail to add marker for data file deletion");
736
737        // Now reset our internal state so we can go for the next data file.
738        self.reset();
739        self.ledger.increment_unacked_reader_file_id();
740
741        debug!("Rolling to next data file.");
742    }
743
744    /// Ensures this reader is ready to attempt reading the next record.
745    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
746    async fn ensure_ready_for_read(&mut self) -> io::Result<()> {
747        // We have nothing to do if we already have a data file open.
748        if self.reader.is_some() {
749            return Ok(());
750        }
751
752        // Try to open the current reader data file.  This might not _yet_ exist, in which case
753        // we'll simply wait for the writer to signal to us that progress has been made, which
754        // implies a data file existing.
755        loop {
756            let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();
757            let data_file_path = self.ledger.get_current_reader_data_file_path();
758            let data_file = match self
759                .ledger
760                .filesystem()
761                .open_file_readable(&data_file_path)
762                .await
763            {
764                Ok(data_file) => data_file,
765                Err(e) => match e.kind() {
766                    ErrorKind::NotFound => {
767                        // reader is either waiting for writer to create the file which can be current writer_file_id or next writer_file_id (if writer has marked for skip)
768                        if reader_file_id == writer_file_id
769                            || reader_file_id == self.ledger.get_next_writer_file_id()
770                        {
771                            debug!(
772                                data_file_path = data_file_path.to_string_lossy().as_ref(),
773                                "Data file does not yet exist. Waiting for writer to create."
774                            );
775                            self.ledger.wait_for_writer().await;
776                        } else {
777                            self.ledger.increment_acked_reader_file_id();
778                        }
779                        continue;
780                    }
781                    // This is a valid I/O error, so bubble that back up.
782                    _ => return Err(e),
783                },
784            };
785
786            debug!(
787                data_file_path = data_file_path.to_string_lossy().as_ref(),
788                "Opened data file for reading."
789            );
790
791            self.reader = Some(RecordReader::new(data_file));
792            return Ok(());
793        }
794    }
795
796    /// Seeks to where this reader previously left off.
797    ///
798    /// In cases where Vector has restarted, but the reader hasn't yet finished a file, we would
799    /// open the correct data file for reading, but our file cursor would be at the very
800    /// beginning, essentially pointed at the wrong record.  We read out records here until we
801    /// reach a point where we've read up to the record referenced by `get_last_reader_record_id`.
802    ///
803    /// This ensures that a subsequent call to `next` is ready to read the correct record.
804    ///
805    /// # Errors
806    ///
807    /// If an error occurs during seeking to the next record, an error variant will be returned
808    /// describing the error.
809    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
810    pub(super) async fn seek_to_next_record(&mut self) -> Result<(), ReaderError<T>> {
811        // We don't try seeking again once we're all caught up.
812        if self.ready_to_read {
813            warn!("Reader already initialized.");
814            return Ok(());
815        }
816
817        // We rely on `next` to close out the data file if we've actually reached the end, and we
818        // also rely on it to reset the data file before trying to read, and we _also_ rely on it to
819        // update `self.last_reader_record_id`, so basically... just keep reading records until we
820        // get to the one we left off with last time.
821        let ledger_last = self.ledger.state().get_last_reader_record_id();
822        debug!(
823            last_acknowledged_record_id = ledger_last,
824            "Seeking to last acknowledged record for reader."
825        );
826
827        // We may end up in a situation where a data file hasn't yet been deleted but we've moved on
828        // to the next data file, including reading acknowledging records within it.  If Vector
829        // is stopped at a point like this, and we restart it and load the buffer, we'll start on
830        // the old data file.  That's wasteful to read all over again.
831        //
832        // In our seek loop, we have a fast path where we check the last record of a data file while
833        // the reader and writer file IDs don't match.  If we see that the record is still below the
834        // last reader record ID, we do the necessary clean up to delete that file and move to the
835        // next file.  This is safe because we know that if we managed to acknowledge records with
836        // an ID higher than the highest record ID in the data file, it was meant to have been
837        // deleted.
838        //
839        // Once the reader/writer file IDs are identical, we fall back to the slow path.
840        while self.ledger.get_current_reader_file_id() != self.ledger.get_current_writer_file_id() {
841            let data_file_path = self.ledger.get_current_reader_data_file_path();
842            self.ensure_ready_for_read().await.context(IoSnafu)?;
843            let data_file_mmap = self
844                .ledger
845                .filesystem()
846                .open_mmap_readable(&data_file_path)
847                .await
848                .context(IoSnafu)?;
849
850            match validate_record_archive(data_file_mmap.as_ref(), &Hasher::new()) {
851                RecordStatus::Valid {
852                    id: last_record_id, ..
853                } => {
854                    let record = try_as_record_archive(data_file_mmap.as_ref())
855                        .expect("record was already validated");
856
857                    let Ok(item) = decode_record_payload::<T>(record) else {
858                        // If there's an error decoding the item, just fall back to the slow path,
859                        // because this file might actually be where we left off, so we don't want
860                        // to incorrectly skip ahead or anything.
861                        break;
862                    };
863
864                    // We have to remove 1 from the event count here because otherwise the ID would
865                    // be the _next_ record's ID we'd expect, not the last ID of the record we are
866                    // acknowledged up to. (Record IDs start at N and consume up to N+M-1 where M is
867                    // the number of events in the record, which is how we can determine the event
868                    // count from the record IDs alone, without having to read every record in the
869                    // buffer during startup.)
870                    let record_events = u64::try_from(item.event_count())
871                        .expect("event count should never exceed u64");
872                    let last_record_id_in_data_file =
873                        last_record_id.wrapping_add(record_events.saturating_sub(1));
874
875                    // If we're past this data file, delete it and move on. We do this manually
876                    // versus faking it via `roll_to_next_data_file` because that emits a deletion
877                    // marker, but the internal state tracking first/last record ID, bytes read,
878                    // etc, won't actually be usable.
879                    if ledger_last > last_record_id_in_data_file {
880                        // By passing 0 bytes, `delete_completed_data_file` does the work of
881                        // ensuring the buffer size is updated to reflect the data file being
882                        // deleted in its entirety.
883                        self.delete_completed_data_file(data_file_path, None)
884                            .await
885                            .context(IoSnafu)?;
886                        self.reset();
887                    } else {
888                        // We've hit a point where the current data file we're on has records newer
889                        // than where we left off, so we can catch up from here.
890                        break;
891                    }
892                }
893                // Similar to the comment above about when decoding fails, we fallback to the slow
894                // path in case any error is encountered, lest we risk incorrectly skipping ahead to
895                // the wrong data file.
896                _ => break,
897            }
898        }
899
900        // We rely on `next` to close out the data file if we've actually reached the end, and we
901        // also rely on it to reset the data file before trying to read, and we _also_ rely on it to
902        // update `self.last_reader_record_id`, so basically... just keep reading records until
903        // we're past the last record we had acknowledged.
904        while self.last_reader_record_id < ledger_last {
905            match self.next().await {
906                Ok(maybe_record) => {
907                    if maybe_record.is_none() {
908                        // We've hit the end of the current data file so we've gone as far as we can.
909                        break;
910                    }
911                }
912                Err(e) if e.is_bad_read() => {
913                    // If we hit a bad read during initialization, we should only continue calling
914                    // `next` if we have not advanced _past_ the writer in terms of file ID.
915                    //
916                    // If the writer saw the same error we just saw, it will have rolled itself to
917                    // the next file, lazily: for example, it discovers a bad record at the end of
918                    // file ID 3, so it marks itself to open file ID 4 next, but hasn't yet
919                    // created it, and is still technically indicated as being on file ID 3.
920                    //
921                    // Meanwhile, if _we_ try to also roll to file ID 4 and read from it, we'll deadlock
922                    // ourselves because it doesn't yet exist. However, `next` immediately updates our
923                    // reader file ID as soon as it hits a bad read error, so in this scenario,
924                    // we're now marked as being on file ID 4 while the writer is still on file ID
925                    // 3.
926                    //
927                    // From that, we can determine that when we've hit a bad read error, that if our
928                    // file ID is greater than the writer's file ID, we're now essentially
929                    // synchronized.
930                    let (reader_file_id, writer_file_id) =
931                        self.ledger.get_current_reader_writer_file_id();
932                    if reader_file_id > writer_file_id {
933                        break;
934                    }
935                }
936                Err(e) => return Err(e),
937            }
938        }
939
940        debug!(
941            last_record_id_read = self.last_reader_record_id,
942            "Synchronized with ledger. Reader ready."
943        );
944
945        self.ready_to_read = true;
946
947        Ok(())
948    }
949
950    /// Reads a record.
951    ///
952    /// If the writer is closed and there is no more data in the buffer, `None` is returned.
953    /// Otherwise, reads the next record or waits until the next record is available.
954    ///
955    /// # Errors
956    ///
957    /// If an error occurred while reading a record, an error variant will be returned describing
958    /// the error.
959    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
960    pub async fn next(&mut self) -> Result<Option<T>, ReaderError<T>> {
961        let mut force_check_pending_data_files = false;
962
963        let token = loop {
964            // Handle any pending acknowledgements first.
965            self.handle_pending_acknowledgements(force_check_pending_data_files)
966                .await
967                .context(IoSnafu)?;
968            force_check_pending_data_files = false;
969
970            // If the writer has marked themselves as done, and the buffer has been emptied, then
971            // we're done and can return.  We have to look at something besides simply the writer
972            // being marked as done to know if we're actually done or not, and "buffer size" is better
973            // than "total records" because we update buffer size when handling acknowledgements,
974            // whether it's an individual ack or an entire file being deleted.
975            //
976            // If we used "total records", we could end up stuck in cases where we skipped
977            // corrupted records, but hadn't yet had a "good" record that we could read, since the
978            // "we skipped records due to corruption" logic requires performing valid read to
979            // detect, and calculate a valid delta from.
980            if self.ledger.is_writer_done() {
981                let total_buffer_size = self.ledger.get_total_buffer_size();
982                if total_buffer_size == 0 {
983                    return Ok(None);
984                }
985            }
986
987            self.ensure_ready_for_read().await.context(IoSnafu)?;
988
989            let reader = self
990                .reader
991                .as_mut()
992                .expect("reader should exist after `ensure_ready_for_read`");
993
994            let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();
995
996            // Essentially: is the writer still writing to this data file or not, and are we
997            // actually ready to read (aka initialized)?
998            //
999            // This is a necessary invariant to understand if the record reader should actually keep
1000            // waiting for data, or if a data file had a partial write/missing data and should be
1001            // skipped. In particular, not only does this matter for deadlocking during shutdown due
1002            // to improper writer behavior/flushing, but it also matters during initialization in
1003            // case where the current data file had a partial write.
1004            let is_finalized = (reader_file_id != writer_file_id) || !self.ready_to_read;
1005
1006            // Try reading a record, which if successful, gives us a token to actually read/get a
1007            // reference to the record.  This is a slightly-tricky song-and-dance due to rustc not
1008            // yet fully understanding mutable borrows when conditional control flow is involved.
1009            match reader.try_next_record(is_finalized).await {
1010                // Not even enough data to read a length delimiter, so we need to wait for the
1011                // writer to signal us that there's some actual data to read.
1012                Ok(None) => {}
1013                // We got a valid record, so keep the token.
1014                Ok(Some(token)) => break token,
1015                // A length-delimited payload was read, but we failed to deserialize it as a valid
1016                // record, or we deserialized it and the checksum was invalid.  Either way, we're not
1017                // sure the rest of the data file is even valid, so roll to the next file.
1018                //
1019                // TODO: Explore the concept of putting a data file into a "one more attempt to read
1020                // a valid record" state, almost like a semi-open circuit breaker.  There's a
1021                // possibility that the length delimiter we got is valid, and all the data was
1022                // written for the record, but the data was invalid... and that if we just kept
1023                // reading, we might actually encounter a valid record.
1024                //
1025                // Theoretically, based on both the validation done by `rkyv` and the checksum, it
1026                // should be incredibly unlikely to read a valid record after getting a
1027                // corrupted record if there was missing data or more invalid data.  We use
1028                // checksumming to assert errors within a given chunk of the payload, so one payload
1029                // being corrupted doesn't always, in fact, mean that other records after it are
1030                // corrupted too.
1031                Err(e) => {
1032                    // Invalid checksums and deserialization failures can't really be acted upon by
1033                    // the caller, but they might be expecting a read-after-write behavior, so we
1034                    // return the error to them after ensuring that we roll to the next file first.
1035                    if e.is_bad_read() {
1036                        self.roll_to_next_data_file();
1037                    }
1038
1039                    return Err(e);
1040                }
1041            }
1042
1043            // Fundamentally, when `try_read_record` returns `None`, there's three possible
1044            // scenarios:
1045            //
1046            // 1. we are entirely caught up to the writer
1047            // 2. we've hit the end of the data file and need to go to the next one
1048            // 3. the writer has closed/dropped/finished/etc
1049            //
1050            // When we're at this point, we check the reader/writer file IDs.  If the file IDs are
1051            // not identical, we now know the writer has moved on.  Crucially, since we always flush
1052            // our writes before waking up, including before moving to a new file, then we know that
1053            // if the reader/writer were not identical at the start the loop, and `try_read_record`
1054            // returned `None`, that we have hit the actual end of the reader's current data file,
1055            // and need to move on.
1056            //
1057            // If the file IDs were identical, it would imply that reader is still on the writer's
1058            // current data file. We then "wait" for the writer to wake us up. It may lead to the
1059            // same thing -- `try_read_record` returning `None` with an identical reader/writer file
1060            // ID -- but that's OK, because it would mean we were actually waiting for the writer to
1061            // make progress now.  If the wake-up was valid, due to writer progress, then, well...
1062            // we'd actually be able to read data.
1063            //
1064            // The case of "the writer has closed/dropped/finished/etc" is handled at the top of the
1065            // loop, because otherwise we could get stuck waiting for the writer after an empty
1066            // `try_read_record` attempt when the writer is done and we're at the end of the file,
1067            // etc.
1068            if self.ready_to_read {
1069                if reader_file_id != writer_file_id {
1070                    debug!(
1071                        reader_file_id,
1072                        writer_file_id, "Reached the end of current data file."
1073                    );
1074
1075                    self.roll_to_next_data_file();
1076                    force_check_pending_data_files = true;
1077                    continue;
1078                }
1079
1080                self.ledger.wait_for_writer().await;
1081            } else {
1082                debug!(
1083                    bytes_read = self.bytes_read,
1084                    "Current data file has no more data."
1085                );
1086
1087                if reader_file_id == writer_file_id {
1088                    // We're currently just seeking to where we left off the last time this buffer was
1089                    // running, which might mean there's no records for us to read at all because we
1090                    // were already caught up.  All we can do is signal to `seek_to_next_record` that
1091                    // we're caught up.
1092                    return Ok(None);
1093                }
1094            }
1095        };
1096
1097        // We got a read token, so our record is present in the reader, and now we can actually read
1098        // it out and return it.
1099        let record_id = token.record_id();
1100        let record_bytes = token.record_bytes() as u64;
1101
1102        let reader = self
1103            .reader
1104            .as_mut()
1105            .expect("reader should exist after `ensure_ready_for_read`");
1106        let mut record = reader.read_record(token)?;
1107
1108        let record_events: u64 = record
1109            .event_count()
1110            .try_into()
1111            .expect("Event count for a record cannot exceed 2^64 events.");
1112        let record_events = record_events
1113            .try_into()
1114            .map_err(|_| ReaderError::EmptyRecord)?;
1115        self.track_read(record_id, record_bytes, record_events);
1116
1117        let (batch, receiver) = BatchNotifier::new_with_receiver();
1118        record.add_batch_notifier(batch);
1119        self.finalizer.add(record_events.get(), receiver);
1120
1121        if self.ready_to_read {
1122            trace!(
1123                record_id,
1124                record_events,
1125                record_bytes,
1126                data_file_id = self.ledger.get_current_reader_file_id(),
1127                "Read record."
1128            );
1129        }
1130
1131        Ok(Some(record))
1132    }
1133}
1134
1135pub(crate) fn decode_record_payload<T: Bufferable>(
1136    record: &ArchivedRecord<'_>,
1137) -> Result<T, ReaderError<T>> {
1138    // Try and convert the raw record metadata into the true metadata type used by `T`, and then
1139    // also verify that `T` is able to decode records with the metadata used for this record in particular.
1140    let metadata = T::Metadata::from_u32(record.metadata()).ok_or(ReaderError::Incompatible {
1141        reason: format!("invalid metadata for {}", std::any::type_name::<T>()),
1142    })?;
1143
1144    if !T::can_decode(metadata) {
1145        return Err(ReaderError::Incompatible {
1146            reason: format!(
1147                "record metadata not supported (metadata: {:#036b})",
1148                record.metadata()
1149            ),
1150        });
1151    }
1152
1153    // Now we can finally try decoding.
1154    T::decode(metadata, record.payload()).context(DecodeSnafu)
1155}