vector_buffers/variants/disk_v2/
reader.rs

1use std::{
2    cmp, fmt,
3    io::{self, ErrorKind},
4    marker::PhantomData,
5    num::NonZeroU64,
6    path::PathBuf,
7    sync::Arc,
8};
9
10use crc32fast::Hasher;
11use rkyv::{archived_root, AlignedVec};
12use snafu::{ResultExt, Snafu};
13use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
14use vector_common::{finalization::BatchNotifier, finalizer::OrderedFinalizer};
15
16use super::{
17    common::create_crc32c_hasher,
18    ledger::Ledger,
19    record::{validate_record_archive, ArchivedRecord, Record, RecordStatus},
20    Filesystem,
21};
22use crate::{
23    encoding::{AsMetadata, Encodable},
24    internal_events::BufferReadError,
25    topology::acks::{EligibleMarker, EligibleMarkerLength, MarkerError, OrderedAcknowledgements},
26    variants::disk_v2::{io::AsyncFile, record::try_as_record_archive},
27    Bufferable,
28};
29
30pub(super) struct ReadToken {
31    record_id: u64,
32    record_bytes: usize,
33}
34
35impl ReadToken {
36    pub fn new(record_id: u64, record_bytes: usize) -> Self {
37        Self {
38            record_id,
39            record_bytes,
40        }
41    }
42
43    pub fn record_id(&self) -> u64 {
44        self.record_id
45    }
46
47    pub fn record_bytes(&self) -> usize {
48        self.record_bytes
49    }
50
51    fn into_record_id(self) -> u64 {
52        self.record_id
53    }
54}
55
56/// Error that occurred during calls to [`BufferReader`].
57#[derive(Debug, Snafu)]
58pub enum ReaderError<T>
59where
60    T: Bufferable,
61{
62    /// A general I/O error occurred.
63    ///
64    /// Different methods will capture specific I/O errors depending on the situation, as some
65    /// errors may be expected and considered normal by design.  For all I/O errors that are
66    /// considered atypical, they will be returned as this variant.
67    #[snafu(display("read I/O error: {}", source))]
68    Io { source: io::Error },
69
70    /// The reader failed to deserialize the record.
71    ///
72    /// In most cases, this indicates that the data file being read was corrupted or truncated in
73    /// some fashion.  Callers of [`BufferReader::next`] will not actually receive this error, as it is
74    /// handled internally by moving to the next data file, as corruption may have affected other
75    /// records in a way that is not easily detectable and could lead to records which
76    /// deserialize/decode but contain invalid data.
77    #[snafu(display("failed to deserialize encoded record from buffer: {}", reason))]
78    Deserialization { reason: String },
79
80    /// The record's checksum did not match.
81    ///
82    /// In most cases, this indicates that the data file being read was corrupted or truncated in
83    /// some fashion.  Callers of [`BufferReader::next`] will not actually receive this error, as it is
84    /// handled internally by moving to the next data file, as corruption may have affected other
85    /// records in a way that is not easily detectable and could lead to records which
86    /// deserialize/decode but contain invalid data.
87    #[snafu(display(
88        "calculated checksum did not match the actual checksum: ({} vs {})",
89        calculated,
90        actual
91    ))]
92    Checksum { calculated: u32, actual: u32 },
93
94    /// The decoder encountered an issue during decoding.
95    ///
96    /// At this stage, the record can be assumed to have been written correctly, and read correctly
97    /// from disk, as the checksum was also validated.
98    #[snafu(display("failed to decoded record: {:?}", source))]
99    Decode {
100        source: <T as Encodable>::DecodeError,
101    },
102
103    /// The record is not compatible with this version of Vector.
104    ///
105    /// This can occur when records written to a buffer in previous versions of Vector are read by
106    /// newer versions of Vector where the encoding scheme, or record schema, used in the previous
107    /// version of Vector are no longer able to be decoded in this version of Vector.
108    #[snafu(display("record version not compatible: {}", reason))]
109    Incompatible { reason: String },
110
111    /// The reader detected that a data file contains a partially-written record.
112    ///
113    /// Records should never be partially written to a data file (we don't split records across data
114    /// files) so this would be indicative of a write that was never properly written/flushed, or
115    /// some issue with the write where it was acknowledged but the data/file was corrupted in same way.
116    ///
117    /// This is effectively the same class of error as an invalid checksum/failed deserialization.
118    PartialWrite,
119
120    /// The record reported an event count of zero.
121    ///
122    /// Empty records should not be allowed to be written, so this represents either a bug with the
123    /// writing logic of the buffer, or a record that does not use a symmetrical encoding scheme,
124    /// which is also not supported.
125    EmptyRecord,
126}
127
128impl<T> ReaderError<T>
129where
130    T: Bufferable,
131{
132    fn is_bad_read(&self) -> bool {
133        matches!(
134            self,
135            ReaderError::Checksum { .. }
136                | ReaderError::Deserialization { .. }
137                | ReaderError::PartialWrite
138        )
139    }
140
141    fn as_error_code(&self) -> &'static str {
142        match self {
143            ReaderError::Io { .. } => "io_error",
144            ReaderError::Deserialization { .. } => "deser_failed",
145            ReaderError::Checksum { .. } => "checksum_mismatch",
146            ReaderError::Decode { .. } => "decode_failed",
147            ReaderError::Incompatible { .. } => "incompatible_record_version",
148            ReaderError::PartialWrite => "partial_write",
149            ReaderError::EmptyRecord => "empty_record",
150        }
151    }
152
153    pub fn as_recoverable_error(&self) -> Option<BufferReadError> {
154        let error = self.to_string();
155        let error_code = self.as_error_code();
156
157        match self {
158            ReaderError::Io { .. } | ReaderError::EmptyRecord => None,
159            ReaderError::Deserialization { .. }
160            | ReaderError::Checksum { .. }
161            | ReaderError::Decode { .. }
162            | ReaderError::Incompatible { .. }
163            | ReaderError::PartialWrite => Some(BufferReadError { error_code, error }),
164        }
165    }
166}
167
168impl<T: Bufferable> PartialEq for ReaderError<T> {
169    fn eq(&self, other: &Self) -> bool {
170        match (self, other) {
171            (Self::Io { source: l_source }, Self::Io { source: r_source }) => {
172                l_source.kind() == r_source.kind()
173            }
174            (
175                Self::Deserialization { reason: l_reason },
176                Self::Deserialization { reason: r_reason },
177            ) => l_reason == r_reason,
178            (
179                Self::Checksum {
180                    calculated: l_calculated,
181                    actual: l_actual,
182                },
183                Self::Checksum {
184                    calculated: r_calculated,
185                    actual: r_actual,
186                },
187            ) => l_calculated == r_calculated && l_actual == r_actual,
188            (Self::Decode { .. }, Self::Decode { .. }) => true,
189            (Self::Incompatible { reason: l_reason }, Self::Incompatible { reason: r_reason }) => {
190                l_reason == r_reason
191            }
192            _ => core::mem::discriminant(self) == core::mem::discriminant(other),
193        }
194    }
195}
196
197/// Buffered reader that handles deserialization, checksumming, and decoding of records.
198pub(super) struct RecordReader<R, T> {
199    reader: BufReader<R>,
200    aligned_buf: AlignedVec,
201    checksummer: Hasher,
202    current_record_id: u64,
203    _t: PhantomData<T>,
204}
205
206impl<R, T> RecordReader<R, T>
207where
208    R: AsyncRead + Unpin,
209    T: Bufferable,
210{
211    /// Creates a new [`RecordReader`] around the provided reader.
212    ///
213    /// Internally, the reader is wrapped in a [`BufReader`], so callers should not pass in an
214    /// already buffered reader.
215    pub fn new(reader: R) -> Self {
216        Self {
217            reader: BufReader::with_capacity(256 * 1024, reader),
218            aligned_buf: AlignedVec::new(),
219            checksummer: create_crc32c_hasher(),
220            current_record_id: 0,
221            _t: PhantomData,
222        }
223    }
224
225    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
226    async fn read_length_delimiter(
227        &mut self,
228        is_finalized: bool,
229    ) -> Result<Option<usize>, ReaderError<T>> {
230        loop {
231            let available = self.reader.buffer().len();
232            if available >= 8 {
233                let length_buf = &self.reader.buffer()[..8];
234                let length = length_buf
235                    .try_into()
236                    .expect("the slice is the length of a u64");
237                self.reader.consume(8);
238
239                // By default, records cannot exceed 8MB in length, so whether our `usize` is a u32
240                // or u64, we're not going to overflow it.  While the maximum record size _can_ be
241                // changed, it's not currently exposed to users.  Even further, if it was exposed to
242                // users, it's currently a `usize`, so again, we know that we're not going to exceed
243                // 64-bit. And even further still, the writer fallibly attempts to get a `u64` of the
244                // record size based on the encoding buffer, which gives its length in `usize`, and
245                // so would fail if `usize` was larger than `u64`, meaning we at least will panic if
246                // Vector is running on a 128-bit CPU in the future, storing records that are larger
247                // than 2^64+1. :)
248                let record_len = u64::from_be_bytes(length)
249                    .try_into()
250                    .expect("record length should never exceed usize");
251                return Ok(Some(record_len));
252            }
253
254            // We don't have enough bytes, so we need to fill our buffer again.
255            let buf = self.reader.fill_buf().await.context(IoSnafu)?;
256            if buf.is_empty() {
257                return Ok(None);
258            }
259
260            // If we tried to read more bytes, and we still don't have enough for the record
261            // delimiter, and the data file has been finalized already: we've got a partial
262            // write situation on our hands.
263            if buf.len() < 8 && is_finalized {
264                return Err(ReaderError::PartialWrite);
265            }
266        }
267    }
268
269    /// Attempts to read a record.
270    ///
271    /// Records are preceded by a length delimiter, a fixed-size integer (currently 8 bytes) that
272    /// tells the reader how many more bytes to read in order to completely read the next record.
273    ///
274    /// If there are no more bytes to read, we return early in order to allow the caller to wait
275    /// until such a time where there should be more data, as no wake-ups can be generated when
276    /// reading a file after reaching EOF.
277    ///
278    /// If there is any data available, we attempt to continue reading until both a length
279    /// delimiter, and the accompanying record, can be read in their entirety.
280    ///
281    /// If a record is able to be read in its entirety, a token is returned to caller that can be
282    /// used with [`read_record`] in order to get an owned `T`.  This is due to a quirk with the
283    /// compiler's ability to track stacked mutable references through conditional control flows, of
284    /// which is handled by splitting the "do we have a valid record in our buffer?" logic from the
285    /// "read that record and decode it" logic.
286    ///
287    /// # Finalized reads
288    ///
289    /// All of the above logic applies when `is_finalized` is `false`, which signals that a data
290    /// file is still currently being written to.  If `is_finalized` is `true`, most of the above
291    /// logic applies but in cases where we detect a partial write, we explicitly return an error
292    /// for a partial read.
293    ///
294    /// In practice, what this means is that when we believe a file should be "finalized" -- the
295    /// writer flushed the file to disk, the ledger has been flushed, etc -- then we also expect to
296    /// be able to read all bytes with no leftover.  A partially-written length delimiter, or
297    /// record, would be indicative of a bug with the writer or OS/disks, essentially telling us
298    /// that the current data file is not valid for reads anymore.  We don't know _why_ it's in this
299    /// state, only that something is not right and that we must skip the file.
300    ///
301    /// # Errors
302    ///
303    /// Errors can occur during the I/O or deserialization stage.  If an error occurs during any of
304    /// these stages, an appropriate error variant will be returned describing the error.
305    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
306    pub async fn try_next_record(
307        &mut self,
308        is_finalized: bool,
309    ) -> Result<Option<ReadToken>, ReaderError<T>> {
310        let Some(record_len) = self.read_length_delimiter(is_finalized).await? else {
311            return Ok(None);
312        };
313
314        if record_len == 0 {
315            return Err(ReaderError::Deserialization {
316                reason: "record length was zero".to_string(),
317            });
318        }
319
320        // Read in all of the bytes we need first.
321        self.aligned_buf.clear();
322        while self.aligned_buf.len() < record_len {
323            let needed = record_len - self.aligned_buf.len();
324            let buf = self.reader.fill_buf().await.context(IoSnafu)?;
325            if buf.is_empty() && is_finalized {
326                // If we needed more data, but there was none available, and we're finalized: we've
327                // got ourselves a partial write situation.
328                return Err(ReaderError::PartialWrite);
329            }
330
331            let available = cmp::min(buf.len(), needed);
332            self.aligned_buf.extend_from_slice(&buf[..available]);
333            self.reader.consume(available);
334        }
335
336        // Now see if we can deserialize our archived record from this.
337        let buf = self.aligned_buf.as_slice();
338        match validate_record_archive(buf, &self.checksummer) {
339            RecordStatus::FailedDeserialization(de) => Err(ReaderError::Deserialization {
340                reason: de.into_inner(),
341            }),
342            RecordStatus::Corrupted { calculated, actual } => {
343                Err(ReaderError::Checksum { calculated, actual })
344            }
345            RecordStatus::Valid { id, .. } => {
346                self.current_record_id = id;
347                // TODO: Another spot where our hardcoding of the length delimiter size in bytes is fragile.
348                Ok(Some(ReadToken::new(id, 8 + buf.len())))
349            }
350        }
351    }
352
353    /// Reads the record associated with the given [`ReadToken`].
354    ///
355    /// # Errors
356    ///
357    /// If an error occurs during decoding, an error variant will be returned describing the error.
358    ///
359    /// # Panics
360    ///
361    /// If a `ReadToken` is not used in a call to `read_record` before again calling
362    /// `try_next_record`, and the `ReadToken` from _that_ call is used, this method will panic due
363    /// to an out-of-order read.
364    pub fn read_record(&mut self, token: ReadToken) -> Result<T, ReaderError<T>> {
365        let record_id = token.into_record_id();
366        assert_eq!(
367            self.current_record_id, record_id,
368            "using expired read token; this is a serious bug"
369        );
370
371        // SAFETY:
372        // - `try_next_record` is the only method that can hand back a `ReadToken`
373        // - we only get a `ReadToken` if there's a valid record in `self.aligned_buf`
374        // - `try_next_record` does all the archive checks, checksum validation, etc
375        let record = unsafe { archived_root::<Record<'_>>(&self.aligned_buf) };
376
377        decode_record_payload(record)
378    }
379}
380
381impl<R, T> fmt::Debug for RecordReader<R, T>
382where
383    R: fmt::Debug,
384{
385    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386        f.debug_struct("RecordReader")
387            .field("reader", &self.reader)
388            .field("aligned_buf", &self.aligned_buf)
389            .field("checksummer", &self.checksummer)
390            .field("current_record_id", &self.current_record_id)
391            .finish()
392    }
393}
394
395/// Reads records from the buffer.
396#[derive(Debug)]
397pub struct BufferReader<T, FS>
398where
399    FS: Filesystem,
400{
401    ledger: Arc<Ledger<FS>>,
402    reader: Option<RecordReader<FS::File, T>>,
403    bytes_read: u64,
404    last_reader_record_id: u64,
405    data_file_start_record_id: Option<u64>,
406    data_file_record_count: u64,
407    data_file_marked_record_count: u64,
408    ready_to_read: bool,
409    record_acks: OrderedAcknowledgements<u64, u64>,
410    data_file_acks: OrderedAcknowledgements<u64, (PathBuf, u64)>,
411    finalizer: OrderedFinalizer<u64>,
412    _t: PhantomData<T>,
413}
414
415impl<T, FS> BufferReader<T, FS>
416where
417    T: Bufferable,
418    FS: Filesystem,
419    FS::File: Unpin,
420{
421    /// Creates a new [`BufferReader`] attached to the given [`Ledger`].
422    pub(crate) fn new(ledger: Arc<Ledger<FS>>, finalizer: OrderedFinalizer<u64>) -> Self {
423        let ledger_last_reader_record_id = ledger.state().get_last_reader_record_id();
424        let next_expected_record_id = ledger_last_reader_record_id.wrapping_add(1);
425
426        Self {
427            ledger,
428            reader: None,
429            bytes_read: 0,
430            last_reader_record_id: 0,
431            data_file_start_record_id: None,
432            data_file_record_count: 0,
433            data_file_marked_record_count: 0,
434            ready_to_read: false,
435            record_acks: OrderedAcknowledgements::from_acked(next_expected_record_id),
436            data_file_acks: OrderedAcknowledgements::from_acked(0),
437            finalizer,
438            _t: PhantomData,
439        }
440    }
441
442    fn reset(&mut self) {
443        self.reader = None;
444        self.bytes_read = 0;
445        self.data_file_start_record_id = None;
446    }
447
448    fn track_read(&mut self, record_id: u64, record_bytes: u64, event_count: NonZeroU64) {
449        // We explicitly reduce the event count by one here in order to correctly calculate the
450        // "last" record ID, which you can visualize as follows...
451        //
452        // [ record 1 ] [ record 2 ] [ record 3 ] [....]
453        // [0]          [1] [2] [3]  [4] [5]      [....]
454        //
455        // For each of these records, their "last ID" is simply the ID of the first event within the
456        // record, plus the event count, minus one.  Another way to look at it is that the "last"
457        // reader record ID is always one behind the next expected record ID.  In the above example,
458        // the next record ID we would expect would be 6, regardless of how many events the record has.
459        self.last_reader_record_id = record_id.wrapping_add(event_count.get() - 1);
460        if self.data_file_start_record_id.is_none() {
461            self.data_file_start_record_id = Some(record_id);
462        }
463
464        // Track the amount of data we read.  If we're still loading the buffer, then the only thing
465        // other we need to do is update the total buffer size.  Everything else below only matters
466        // when we're doing real record reads.
467        self.bytes_read += record_bytes;
468        if !self.ready_to_read {
469            self.ledger.decrement_total_buffer_size(record_bytes);
470            return;
471        }
472
473        // We've done a "real" record read, so we need to track it for acknowledgement.  Check our
474        // acknowledge state first to see if this is the next record ID we expected.
475        self.data_file_record_count += 1;
476        if let Err(me) =
477            self.record_acks
478                .add_marker(record_id, Some(event_count.get()), Some(record_bytes))
479        {
480            match me {
481                MarkerError::MonotonicityViolation => {
482                    panic!("record ID monotonicity violation detected; this is a serious bug")
483                }
484            }
485        }
486    }
487
488    #[cfg_attr(test, instrument(skip_all, level = "debug"))]
489    async fn delete_completed_data_file(
490        &mut self,
491        data_file_path: PathBuf,
492        bytes_read: Option<u64>,
493    ) -> io::Result<()> {
494        // TODO: Could we actually make this a background task to remove the tail latency from the
495        // read path?  Technically all that's needed is a handle to the ledger and the data file
496        // path, so as long as the logic is still right, we can notify writers out-of-band.
497        debug!(
498            data_file_path = data_file_path.to_string_lossy().as_ref(),
499            bytes_read, "Deleting completed data file."
500        );
501
502        // Grab the size of the data file before we delete it, which gives us a chance to fix up the
503        // total buffer size for corrupted files or fast-forwarded files.
504        //
505        // Since we only decrement the buffer size after a successful read in normal cases, skipping
506        // the rest of a corrupted file could lead to the total buffer size being unsynchronized.
507        // We use the difference between the number of bytes read and the file size to figure out if
508        // we need to make a manual adjustment.
509        //
510        // Likewise, when we skip over a file in "fast forward" mode during initialization, no reads
511        // occur at all, so we're relying on this method to correct the buffer size for us.  This is
512        // why `bytes_read` is optional: when it's specified, we calculate a delta for handling
513        // partial-read scenarios, otherwise, we just use the entire data file size as is.
514        let data_file = self
515            .ledger
516            .filesystem()
517            .open_file_readable(&data_file_path)
518            .await?;
519        let metadata = data_file.metadata().await?;
520
521        let decrease_amount = bytes_read.map_or_else(
522            || metadata.len(),
523            |bytes_read| {
524                let size_delta = metadata.len() - bytes_read;
525                if size_delta > 0 {
526                    debug!(
527                        actual_file_size = metadata.len(),
528                        bytes_read,
529                        "Data file was only partially read. Adjusting buffer size to compensate.",
530                    );
531                }
532
533                size_delta
534            },
535        );
536
537        if decrease_amount > 0 {
538            self.ledger.decrement_total_buffer_size(decrease_amount);
539        }
540
541        drop(data_file);
542
543        // Delete the current data file, and increment our actual reader file ID.
544        self.ledger
545            .filesystem()
546            .delete_file(&data_file_path)
547            .await?;
548        self.ledger.increment_acked_reader_file_id();
549        self.ledger.flush()?;
550
551        debug!("Flushed after deleting data file, notifying writers and continuing.");
552
553        // Notify any waiting writers that we've deleted a data file, which they may be waiting on
554        // because they're looking to reuse the file ID of the file we just finished reading.
555        self.ledger.notify_reader_waiters();
556
557        Ok(())
558    }
559
560    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
561    async fn handle_pending_acknowledgements(
562        &mut self,
563        force_check_pending_data_files: bool,
564    ) -> io::Result<()> {
565        // Acknowledgements effectively happen in two layers: record acknowledgement and data file
566        // acknowledgement.  Since records can contain multiple events, we need to track when a
567        // record itself has been fully acknowledged.  Likewise, data files contain multiple records,
568        // so we need to track when all records we've read from a data file have been acknowledged.
569
570        // Drive record acknowledgement first.
571        //
572        // We only do this if we actually consume any acknowledgements, and immediately update the
573        // buffer and ledger to more quickly get those metrics into good shape.  We defer notifying
574        // writers until after, though, in case we also have data files to delete, so that we can
575        // coalesce the notifications together at the very end of the method.
576        let mut had_eligible_records = false;
577        let mut records_acknowledged: u64 = 0;
578        let mut events_acknowledged: u64 = 0;
579        let mut events_skipped: u64 = 0;
580        let mut bytes_acknowledged: u64 = 0;
581
582        let consumed_acks = self.ledger.consume_pending_acks();
583        if consumed_acks > 0 {
584            self.record_acks.add_acknowledgements(consumed_acks);
585
586            while let Some(EligibleMarker { len, data, .. }) =
587                self.record_acks.get_next_eligible_marker()
588            {
589                had_eligible_records = true;
590
591                match len {
592                    // Any marker with an assumed length implies a gap marker, which gets added
593                    // automatically and represents a portion of the record ID range that was
594                    // expected but missing. This is a long way of saying: we're missing records.
595                    //
596                    // We tally this up so that we can emit a single log event/set of metrics, as
597                    // there may be many gap markers and emitting for each of them could be very noisy.
598                    EligibleMarkerLength::Assumed(count) => {
599                        events_skipped = events_skipped
600                            .checked_add(count)
601                            .expect("skipping more than 2^64 events at a time is obviously a bug");
602                    }
603                    // We got a valid marker representing a known number of events.
604                    EligibleMarkerLength::Known(len) => {
605                        // We specifically pass the size of the record, in bytes, as the marker data.
606                        let record_bytes = data.expect("record bytes should always be known");
607
608                        records_acknowledged = records_acknowledged.checked_add(1).expect(
609                            "acknowledging more than 2^64 records at a time is obviously a bug",
610                        );
611                        events_acknowledged = events_acknowledged.checked_add(len).expect(
612                            "acknowledging more than 2^64 events at a time is obviously a bug",
613                        );
614                        bytes_acknowledged = bytes_acknowledged.checked_add(record_bytes).expect(
615                            "acknowledging more than 2^64 bytes at a time is obviously a bug",
616                        );
617                    }
618                }
619            }
620
621            // We successfully processed at least one record, so update our buffer and ledger accounting.
622            if had_eligible_records {
623                self.ledger
624                    .track_reads(events_acknowledged, bytes_acknowledged);
625
626                // We need to account for skipped events, too, so that our "last reader record ID"
627                // value stays correct as we process these gap markers.
628                let last_increment_amount = events_acknowledged + events_skipped;
629                self.ledger
630                    .state()
631                    .increment_last_reader_record_id(last_increment_amount);
632
633                self.data_file_acks
634                    .add_acknowledgements(records_acknowledged);
635            }
636
637            // If any events were skipped, do our logging/metrics for that.
638            if events_skipped > 0 {
639                self.ledger.track_dropped_events(events_skipped);
640            }
641        }
642
643        // If we processed any eligible records, we may now also have eligible data files.
644        //
645        // Alternatively, the core `next` logic may have just rolled over to a new data file, and
646        // we're seeing if we can fast track any eligible data file deletions rather than waiting
647        // for more acknowledgements to come in.
648        let mut had_eligible_data_files = false;
649        let mut data_files_deleted: u16 = 0;
650
651        if had_eligible_records || force_check_pending_data_files {
652            // Now handle data file deletion.  We unconditionally check to see if any data files are
653            // eligible for deletion, and process them immediately.
654
655            while let Some(EligibleMarker { data, .. }) =
656                self.data_file_acks.get_next_eligible_marker()
657            {
658                had_eligible_data_files = true;
659
660                let (data_file_path, bytes_read) =
661                    data.expect("data file deletion marker should never be empty");
662                self.delete_completed_data_file(data_file_path, Some(bytes_read))
663                    .await?;
664
665                data_files_deleted = data_files_deleted
666                    .checked_add(1)
667                    .expect("deleting more than 2^16 data files at a time is obviously a bug");
668            }
669        }
670
671        // If we managed to processed any records _or_ any data file deletions, we've made
672        // meaningful progress that writers may care about, so notify them.
673        if had_eligible_data_files || had_eligible_records {
674            self.ledger.notify_reader_waiters();
675
676            if self.ready_to_read {
677                trace!(
678                    current_buffer_size = self.ledger.get_total_buffer_size(),
679                    records_acknowledged,
680                    events_acknowledged,
681                    events_skipped,
682                    bytes_acknowledged,
683                    data_files_deleted,
684                    "Finished handling acknowledgements."
685                );
686            }
687        }
688
689        Ok(())
690    }
691
692    /// Switches the reader over to the next data file to read.
693    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
694    fn roll_to_next_data_file(&mut self) {
695        // Add a marker for this data file so we know when it can be safely deleted.  We also need
696        // to track the necessary data to do our buffer accounting when it's eligible for deletion.
697        //
698        // In the rare case where the very first read in a new data file is corrupted/invalid and we
699        // roll to the next data file, we simply use the last reader record ID we have, which yields
700        // a marker with a length of 0.
701        let data_file_start_record_id = self
702            .data_file_start_record_id
703            .take()
704            .unwrap_or(self.last_reader_record_id);
705        // Record IDs are inclusive, so if last is 1 and start is 0, that means we had two events,
706        // potentially from one or two records.
707        let data_file_event_count = self
708            .last_reader_record_id
709            .wrapping_sub(data_file_start_record_id)
710            .saturating_add(1);
711        let data_file_record_count = self.data_file_record_count;
712        let data_file_path = self.ledger.get_current_reader_data_file_path();
713        let bytes_read = self.bytes_read;
714
715        debug!(
716            data_file_path = data_file_path.to_string_lossy().as_ref(),
717            first_record_id = data_file_start_record_id,
718            last_record_id = self.last_reader_record_id,
719            record_count = data_file_record_count,
720            event_count = data_file_event_count,
721            bytes_read,
722            "Marking data file for deletion."
723        );
724
725        let data_file_marker_id = self.data_file_marked_record_count;
726        self.data_file_marked_record_count += data_file_record_count;
727        self.data_file_record_count = 0;
728
729        self.data_file_acks
730            .add_marker(
731                data_file_marker_id,
732                Some(data_file_record_count),
733                Some((data_file_path, bytes_read)),
734            )
735            .expect("should not fail to add marker for data file deletion");
736
737        // Now reset our internal state so we can go for the next data file.
738        self.reset();
739        self.ledger.increment_unacked_reader_file_id();
740
741        debug!("Rolling to next data file.");
742    }
743
744    /// Ensures this reader is ready to attempt reading the next record.
745    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
746    async fn ensure_ready_for_read(&mut self) -> io::Result<()> {
747        // We have nothing to do if we already have a data file open.
748        if self.reader.is_some() {
749            return Ok(());
750        }
751
752        // Try to open the current reader data file.  This might not _yet_ exist, in which case
753        // we'll simply wait for the writer to signal to us that progress has been made, which
754        // implies a data file existing.
755        loop {
756            let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();
757            let data_file_path = self.ledger.get_current_reader_data_file_path();
758            let data_file = match self
759                .ledger
760                .filesystem()
761                .open_file_readable(&data_file_path)
762                .await
763            {
764                Ok(data_file) => data_file,
765                Err(e) => match e.kind() {
766                    ErrorKind::NotFound => {
767                        if reader_file_id == writer_file_id {
768                            debug!(
769                                data_file_path = data_file_path.to_string_lossy().as_ref(),
770                                "Data file does not yet exist. Waiting for writer to create."
771                            );
772                            self.ledger.wait_for_writer().await;
773                        } else {
774                            self.ledger.increment_acked_reader_file_id();
775                        }
776                        continue;
777                    }
778                    // This is a valid I/O error, so bubble that back up.
779                    _ => return Err(e),
780                },
781            };
782
783            debug!(
784                data_file_path = data_file_path.to_string_lossy().as_ref(),
785                "Opened data file for reading."
786            );
787
788            self.reader = Some(RecordReader::new(data_file));
789            return Ok(());
790        }
791    }
792
793    /// Seeks to where this reader previously left off.
794    ///
795    /// In cases where Vector has restarted, but the reader hasn't yet finished a file, we would
796    /// open the correct data file for reading, but our file cursor would be at the very
797    /// beginning, essentially pointed at the wrong record.  We read out records here until we
798    /// reach a point where we've read up to the record referenced by `get_last_reader_record_id`.
799    ///
800    /// This ensures that a subsequent call to `next` is ready to read the correct record.
801    ///
802    /// # Errors
803    ///
804    /// If an error occurs during seeking to the next record, an error variant will be returned
805    /// describing the error.
806    #[cfg_attr(test, instrument(skip(self), level = "debug"))]
807    pub(super) async fn seek_to_next_record(&mut self) -> Result<(), ReaderError<T>> {
808        // We don't try seeking again once we're all caught up.
809        if self.ready_to_read {
810            warn!("Reader already initialized.");
811            return Ok(());
812        }
813
814        // We rely on `next` to close out the data file if we've actually reached the end, and we
815        // also rely on it to reset the data file before trying to read, and we _also_ rely on it to
816        // update `self.last_reader_record_id`, so basically... just keep reading records until we
817        // get to the one we left off with last time.
818        let ledger_last = self.ledger.state().get_last_reader_record_id();
819        debug!(
820            last_acknowledged_record_id = ledger_last,
821            "Seeking to last acknowledged record for reader."
822        );
823
824        // We may end up in a situation where a data file hasn't yet been deleted but we've moved on
825        // to the next data file, including reading acknowledging records within it.  If Vector
826        // is stopped at a point like this, and we restart it and load the buffer, we'll start on
827        // the old data file.  That's wasteful to read all over again.
828        //
829        // In our seek loop, we have a fast path where we check the last record of a data file while
830        // the reader and writer file IDs don't match.  If we see that the record is still below the
831        // last reader record ID, we do the necessary clean up to delete that file and move to the
832        // next file.  This is safe because we know that if we managed to acknowledge records with
833        // an ID higher than the highest record ID in the data file, it was meant to have been
834        // deleted.
835        //
836        // Once the reader/writer file IDs are identical, we fall back to the slow path.
837        while self.ledger.get_current_reader_file_id() != self.ledger.get_current_writer_file_id() {
838            let data_file_path = self.ledger.get_current_reader_data_file_path();
839            self.ensure_ready_for_read().await.context(IoSnafu)?;
840            let data_file_mmap = self
841                .ledger
842                .filesystem()
843                .open_mmap_readable(&data_file_path)
844                .await
845                .context(IoSnafu)?;
846
847            match validate_record_archive(data_file_mmap.as_ref(), &Hasher::new()) {
848                RecordStatus::Valid {
849                    id: last_record_id, ..
850                } => {
851                    let record = try_as_record_archive(data_file_mmap.as_ref())
852                        .expect("record was already validated");
853
854                    let Ok(item) = decode_record_payload::<T>(record) else {
855                        // If there's an error decoding the item, just fall back to the slow path,
856                        // because this file might actually be where we left off, so we don't want
857                        // to incorrectly skip ahead or anything.
858                        break;
859                    };
860
861                    // We have to remove 1 from the event count here because otherwise the ID would
862                    // be the _next_ record's ID we'd expect, not the last ID of the record we are
863                    // acknowledged up to. (Record IDs start at N and consume up to N+M-1 where M is
864                    // the number of events in the record, which is how we can determine the event
865                    // count from the record IDs alone, without having to read every record in the
866                    // buffer during startup.)
867                    let record_events = u64::try_from(item.event_count())
868                        .expect("event count should never exceed u64");
869                    let last_record_id_in_data_file =
870                        last_record_id.wrapping_add(record_events.saturating_sub(1));
871
872                    // If we're past this data file, delete it and move on. We do this manually
873                    // versus faking it via `roll_to_next_data_file` because that emits a deletion
874                    // marker, but the internal state tracking first/last record ID, bytes read,
875                    // etc, won't actually be usable.
876                    if ledger_last > last_record_id_in_data_file {
877                        // By passing 0 bytes, `delete_completed_data_file` does the work of
878                        // ensuring the buffer size is updated to reflect the data file being
879                        // deleted in its entirety.
880                        self.delete_completed_data_file(data_file_path, None)
881                            .await
882                            .context(IoSnafu)?;
883                        self.reset();
884                    } else {
885                        // We've hit a point where the current data file we're on has records newer
886                        // than where we left off, so we can catch up from here.
887                        break;
888                    }
889                }
890                // Similar to the comment above about when decoding fails, we fallback to the slow
891                // path in case any error is encountered, lest we risk incorrectly skipping ahead to
892                // the wrong data file.
893                _ => break,
894            }
895        }
896
897        // We rely on `next` to close out the data file if we've actually reached the end, and we
898        // also rely on it to reset the data file before trying to read, and we _also_ rely on it to
899        // update `self.last_reader_record_id`, so basically... just keep reading records until
900        // we're past the last record we had acknowledged.
901        while self.last_reader_record_id < ledger_last {
902            match self.next().await {
903                Ok(maybe_record) => {
904                    if maybe_record.is_none() {
905                        // We've hit the end of the current data file so we've gone as far as we can.
906                        break;
907                    }
908                }
909                Err(e) if e.is_bad_read() => {
910                    // If we hit a bad read during initialization, we should only continue calling
911                    // `next` if we have not advanced _past_ the writer in terms of file ID.
912                    //
913                    // If the writer saw the same error we just saw, it will have rolled itself to
914                    // the next file, lazily: for example, it discovers a bad record at the end of
915                    // file ID 3, so it marks itself to open file ID 4 next, but hasn't yet
916                    // created it, and is still technically indicated as being on file ID 3.
917                    //
918                    // Meanwhile, if _we_ try to also roll to file ID 4 and read from it, we'll deadlock
919                    // ourselves because it doesn't yet exist. However, `next` immediately updates our
920                    // reader file ID as soon as it hits a bad read error, so in this scenario,
921                    // we're now marked as being on file ID 4 while the writer is still on file ID
922                    // 3.
923                    //
924                    // From that, we can determine that when we've hit a bad read error, that if our
925                    // file ID is greater than the writer's file ID, we're now essentially
926                    // synchronized.
927                    let (reader_file_id, writer_file_id) =
928                        self.ledger.get_current_reader_writer_file_id();
929                    if reader_file_id > writer_file_id {
930                        break;
931                    }
932                }
933                Err(e) => return Err(e),
934            }
935        }
936
937        debug!(
938            last_record_id_read = self.last_reader_record_id,
939            "Synchronized with ledger. Reader ready."
940        );
941
942        self.ready_to_read = true;
943
944        Ok(())
945    }
946
947    /// Reads a record.
948    ///
949    /// If the writer is closed and there is no more data in the buffer, `None` is returned.
950    /// Otherwise, reads the next record or waits until the next record is available.
951    ///
952    /// # Errors
953    ///
954    /// If an error occurred while reading a record, an error variant will be returned describing
955    /// the error.
956    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
957    pub async fn next(&mut self) -> Result<Option<T>, ReaderError<T>> {
958        let mut force_check_pending_data_files = false;
959
960        let token = loop {
961            // Handle any pending acknowledgements first.
962            self.handle_pending_acknowledgements(force_check_pending_data_files)
963                .await
964                .context(IoSnafu)?;
965            force_check_pending_data_files = false;
966
967            // If the writer has marked themselves as done, and the buffer has been emptied, then
968            // we're done and can return.  We have to look at something besides simply the writer
969            // being marked as done to know if we're actually done or not, and "buffer size" is better
970            // than "total records" because we update buffer size when handling acknowledgements,
971            // whether it's an individual ack or an entire file being deleted.
972            //
973            // If we used "total records", we could end up stuck in cases where we skipped
974            // corrupted records, but hadn't yet had a "good" record that we could read, since the
975            // "we skipped records due to corruption" logic requires performing valid read to
976            // detect, and calculate a valid delta from.
977            if self.ledger.is_writer_done() {
978                let total_buffer_size = self.ledger.get_total_buffer_size();
979                if total_buffer_size == 0 {
980                    return Ok(None);
981                }
982            }
983
984            self.ensure_ready_for_read().await.context(IoSnafu)?;
985
986            let reader = self
987                .reader
988                .as_mut()
989                .expect("reader should exist after `ensure_ready_for_read`");
990
991            let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();
992
993            // Essentially: is the writer still writing to this data file or not, and are we
994            // actually ready to read (aka initialized)?
995            //
996            // This is a necessary invariant to understand if the record reader should actually keep
997            // waiting for data, or if a data file had a partial write/missing data and should be
998            // skipped. In particular, not only does this matter for deadlocking during shutdown due
999            // to improper writer behavior/flushing, but it also matters during initialization in
1000            // case where the current data file had a partial write.
1001            let is_finalized = (reader_file_id != writer_file_id) || !self.ready_to_read;
1002
1003            // Try reading a record, which if successful, gives us a token to actually read/get a
1004            // reference to the record.  This is a slightly-tricky song-and-dance due to rustc not
1005            // yet fully understanding mutable borrows when conditional control flow is involved.
1006            match reader.try_next_record(is_finalized).await {
1007                // Not even enough data to read a length delimiter, so we need to wait for the
1008                // writer to signal us that there's some actual data to read.
1009                Ok(None) => {}
1010                // We got a valid record, so keep the token.
1011                Ok(Some(token)) => break token,
1012                // A length-delimited payload was read, but we failed to deserialize it as a valid
1013                // record, or we deserialized it and the checksum was invalid.  Either way, we're not
1014                // sure the rest of the data file is even valid, so roll to the next file.
1015                //
1016                // TODO: Explore the concept of putting a data file into a "one more attempt to read
1017                // a valid record" state, almost like a semi-open circuit breaker.  There's a
1018                // possibility that the length delimiter we got is valid, and all the data was
1019                // written for the record, but the data was invalid... and that if we just kept
1020                // reading, we might actually encounter a valid record.
1021                //
1022                // Theoretically, based on both the validation done by `rkyv` and the checksum, it
1023                // should be incredibly unlikely to read a valid record after getting a
1024                // corrupted record if there was missing data or more invalid data.  We use
1025                // checksumming to assert errors within a given chunk of the payload, so one payload
1026                // being corrupted doesn't always, in fact, mean that other records after it are
1027                // corrupted too.
1028                Err(e) => {
1029                    // Invalid checksums and deserialization failures can't really be acted upon by
1030                    // the caller, but they might be expecting a read-after-write behavior, so we
1031                    // return the error to them after ensuring that we roll to the next file first.
1032                    if e.is_bad_read() {
1033                        self.roll_to_next_data_file();
1034                    }
1035
1036                    return Err(e);
1037                }
1038            }
1039
1040            // Fundamentally, when `try_read_record` returns `None`, there's three possible
1041            // scenarios:
1042            //
1043            // 1. we are entirely caught up to the writer
1044            // 2. we've hit the end of the data file and need to go to the next one
1045            // 3. the writer has closed/dropped/finished/etc
1046            //
1047            // When we're at this point, we check the reader/writer file IDs.  If the file IDs are
1048            // not identical, we now know the writer has moved on.  Crucially, since we always flush
1049            // our writes before waking up, including before moving to a new file, then we know that
1050            // if the reader/writer were not identical at the start the loop, and `try_read_record`
1051            // returned `None`, that we have hit the actual end of the reader's current data file,
1052            // and need to move on.
1053            //
1054            // If the file IDs were identical, it would imply that reader is still on the writer's
1055            // current data file. We then "wait" for the writer to wake us up. It may lead to the
1056            // same thing -- `try_read_record` returning `None` with an identical reader/writer file
1057            // ID -- but that's OK, because it would mean we were actually waiting for the writer to
1058            // make progress now.  If the wake-up was valid, due to writer progress, then, well...
1059            // we'd actually be able to read data.
1060            //
1061            // The case of "the writer has closed/dropped/finished/etc" is handled at the top of the
1062            // loop, because otherwise we could get stuck waiting for the writer after an empty
1063            // `try_read_record` attempt when the writer is done and we're at the end of the file,
1064            // etc.
1065            if self.ready_to_read {
1066                if reader_file_id != writer_file_id {
1067                    debug!(
1068                        reader_file_id,
1069                        writer_file_id, "Reached the end of current data file."
1070                    );
1071
1072                    self.roll_to_next_data_file();
1073                    force_check_pending_data_files = true;
1074                    continue;
1075                }
1076
1077                self.ledger.wait_for_writer().await;
1078            } else {
1079                debug!(
1080                    bytes_read = self.bytes_read,
1081                    "Current data file has no more data."
1082                );
1083
1084                if reader_file_id == writer_file_id {
1085                    // We're currently just seeking to where we left off the last time this buffer was
1086                    // running, which might mean there's no records for us to read at all because we
1087                    // were already caught up.  All we can do is signal to `seek_to_next_record` that
1088                    // we're caught up.
1089                    return Ok(None);
1090                }
1091            }
1092        };
1093
1094        // We got a read token, so our record is present in the reader, and now we can actually read
1095        // it out and return it.
1096        let record_id = token.record_id();
1097        let record_bytes = token.record_bytes() as u64;
1098
1099        let reader = self
1100            .reader
1101            .as_mut()
1102            .expect("reader should exist after `ensure_ready_for_read`");
1103        let mut record = reader.read_record(token)?;
1104
1105        let record_events: u64 = record
1106            .event_count()
1107            .try_into()
1108            .expect("Event count for a record cannot exceed 2^64 events.");
1109        let record_events = record_events
1110            .try_into()
1111            .map_err(|_| ReaderError::EmptyRecord)?;
1112        self.track_read(record_id, record_bytes, record_events);
1113
1114        let (batch, receiver) = BatchNotifier::new_with_receiver();
1115        record.add_batch_notifier(batch);
1116        self.finalizer.add(record_events.get(), receiver);
1117
1118        if self.ready_to_read {
1119            trace!(
1120                record_id,
1121                record_events,
1122                record_bytes,
1123                data_file_id = self.ledger.get_current_reader_file_id(),
1124                "Read record."
1125            );
1126        }
1127
1128        Ok(Some(record))
1129    }
1130}
1131
1132pub(crate) fn decode_record_payload<T: Bufferable>(
1133    record: &ArchivedRecord<'_>,
1134) -> Result<T, ReaderError<T>> {
1135    // Try and convert the raw record metadata into the true metadata type used by `T`, and then
1136    // also verify that `T` is able to decode records with the metadata used for this record in particular.
1137    let metadata = T::Metadata::from_u32(record.metadata()).ok_or(ReaderError::Incompatible {
1138        reason: format!("invalid metadata for {}", std::any::type_name::<T>()),
1139    })?;
1140
1141    if !T::can_decode(metadata) {
1142        return Err(ReaderError::Incompatible {
1143            reason: format!(
1144                "record metadata not supported (metadata: {:#036b})",
1145                record.metadata()
1146            ),
1147        });
1148    }
1149
1150    // Now we can finally try decoding.
1151    T::decode(metadata, record.payload()).context(DecodeSnafu)
1152}