vector_buffers/variants/disk_v2/reader.rs
1use std::{
2 cmp, fmt,
3 io::{self, ErrorKind},
4 marker::PhantomData,
5 num::NonZeroU64,
6 path::PathBuf,
7 sync::Arc,
8};
9
10use crc32fast::Hasher;
11use rkyv::{archived_root, AlignedVec};
12use snafu::{ResultExt, Snafu};
13use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
14use vector_common::{finalization::BatchNotifier, finalizer::OrderedFinalizer};
15
16use super::{
17 common::create_crc32c_hasher,
18 ledger::Ledger,
19 record::{validate_record_archive, ArchivedRecord, Record, RecordStatus},
20 Filesystem,
21};
22use crate::{
23 encoding::{AsMetadata, Encodable},
24 internal_events::BufferReadError,
25 topology::acks::{EligibleMarker, EligibleMarkerLength, MarkerError, OrderedAcknowledgements},
26 variants::disk_v2::{io::AsyncFile, record::try_as_record_archive},
27 Bufferable,
28};
29
30pub(super) struct ReadToken {
31 record_id: u64,
32 record_bytes: usize,
33}
34
35impl ReadToken {
36 pub fn new(record_id: u64, record_bytes: usize) -> Self {
37 Self {
38 record_id,
39 record_bytes,
40 }
41 }
42
43 pub fn record_id(&self) -> u64 {
44 self.record_id
45 }
46
47 pub fn record_bytes(&self) -> usize {
48 self.record_bytes
49 }
50
51 fn into_record_id(self) -> u64 {
52 self.record_id
53 }
54}
55
56/// Error that occurred during calls to [`BufferReader`].
57#[derive(Debug, Snafu)]
58pub enum ReaderError<T>
59where
60 T: Bufferable,
61{
62 /// A general I/O error occurred.
63 ///
64 /// Different methods will capture specific I/O errors depending on the situation, as some
65 /// errors may be expected and considered normal by design. For all I/O errors that are
66 /// considered atypical, they will be returned as this variant.
67 #[snafu(display("read I/O error: {}", source))]
68 Io { source: io::Error },
69
70 /// The reader failed to deserialize the record.
71 ///
72 /// In most cases, this indicates that the data file being read was corrupted or truncated in
73 /// some fashion. Callers of [`BufferReader::next`] will not actually receive this error, as it is
74 /// handled internally by moving to the next data file, as corruption may have affected other
75 /// records in a way that is not easily detectable and could lead to records which
76 /// deserialize/decode but contain invalid data.
77 #[snafu(display("failed to deserialize encoded record from buffer: {}", reason))]
78 Deserialization { reason: String },
79
80 /// The record's checksum did not match.
81 ///
82 /// In most cases, this indicates that the data file being read was corrupted or truncated in
83 /// some fashion. Callers of [`BufferReader::next`] will not actually receive this error, as it is
84 /// handled internally by moving to the next data file, as corruption may have affected other
85 /// records in a way that is not easily detectable and could lead to records which
86 /// deserialize/decode but contain invalid data.
87 #[snafu(display(
88 "calculated checksum did not match the actual checksum: ({} vs {})",
89 calculated,
90 actual
91 ))]
92 Checksum { calculated: u32, actual: u32 },
93
94 /// The decoder encountered an issue during decoding.
95 ///
96 /// At this stage, the record can be assumed to have been written correctly, and read correctly
97 /// from disk, as the checksum was also validated.
98 #[snafu(display("failed to decoded record: {:?}", source))]
99 Decode {
100 source: <T as Encodable>::DecodeError,
101 },
102
103 /// The record is not compatible with this version of Vector.
104 ///
105 /// This can occur when records written to a buffer in previous versions of Vector are read by
106 /// newer versions of Vector where the encoding scheme, or record schema, used in the previous
107 /// version of Vector are no longer able to be decoded in this version of Vector.
108 #[snafu(display("record version not compatible: {}", reason))]
109 Incompatible { reason: String },
110
111 /// The reader detected that a data file contains a partially-written record.
112 ///
113 /// Records should never be partially written to a data file (we don't split records across data
114 /// files) so this would be indicative of a write that was never properly written/flushed, or
115 /// some issue with the write where it was acknowledged but the data/file was corrupted in same way.
116 ///
117 /// This is effectively the same class of error as an invalid checksum/failed deserialization.
118 PartialWrite,
119
120 /// The record reported an event count of zero.
121 ///
122 /// Empty records should not be allowed to be written, so this represents either a bug with the
123 /// writing logic of the buffer, or a record that does not use a symmetrical encoding scheme,
124 /// which is also not supported.
125 EmptyRecord,
126}
127
128impl<T> ReaderError<T>
129where
130 T: Bufferable,
131{
132 fn is_bad_read(&self) -> bool {
133 matches!(
134 self,
135 ReaderError::Checksum { .. }
136 | ReaderError::Deserialization { .. }
137 | ReaderError::PartialWrite
138 )
139 }
140
141 fn as_error_code(&self) -> &'static str {
142 match self {
143 ReaderError::Io { .. } => "io_error",
144 ReaderError::Deserialization { .. } => "deser_failed",
145 ReaderError::Checksum { .. } => "checksum_mismatch",
146 ReaderError::Decode { .. } => "decode_failed",
147 ReaderError::Incompatible { .. } => "incompatible_record_version",
148 ReaderError::PartialWrite => "partial_write",
149 ReaderError::EmptyRecord => "empty_record",
150 }
151 }
152
153 pub fn as_recoverable_error(&self) -> Option<BufferReadError> {
154 let error = self.to_string();
155 let error_code = self.as_error_code();
156
157 match self {
158 ReaderError::Io { .. } | ReaderError::EmptyRecord => None,
159 ReaderError::Deserialization { .. }
160 | ReaderError::Checksum { .. }
161 | ReaderError::Decode { .. }
162 | ReaderError::Incompatible { .. }
163 | ReaderError::PartialWrite => Some(BufferReadError { error_code, error }),
164 }
165 }
166}
167
168impl<T: Bufferable> PartialEq for ReaderError<T> {
169 fn eq(&self, other: &Self) -> bool {
170 match (self, other) {
171 (Self::Io { source: l_source }, Self::Io { source: r_source }) => {
172 l_source.kind() == r_source.kind()
173 }
174 (
175 Self::Deserialization { reason: l_reason },
176 Self::Deserialization { reason: r_reason },
177 ) => l_reason == r_reason,
178 (
179 Self::Checksum {
180 calculated: l_calculated,
181 actual: l_actual,
182 },
183 Self::Checksum {
184 calculated: r_calculated,
185 actual: r_actual,
186 },
187 ) => l_calculated == r_calculated && l_actual == r_actual,
188 (Self::Decode { .. }, Self::Decode { .. }) => true,
189 (Self::Incompatible { reason: l_reason }, Self::Incompatible { reason: r_reason }) => {
190 l_reason == r_reason
191 }
192 _ => core::mem::discriminant(self) == core::mem::discriminant(other),
193 }
194 }
195}
196
197/// Buffered reader that handles deserialization, checksumming, and decoding of records.
198pub(super) struct RecordReader<R, T> {
199 reader: BufReader<R>,
200 aligned_buf: AlignedVec,
201 checksummer: Hasher,
202 current_record_id: u64,
203 _t: PhantomData<T>,
204}
205
206impl<R, T> RecordReader<R, T>
207where
208 R: AsyncRead + Unpin,
209 T: Bufferable,
210{
211 /// Creates a new [`RecordReader`] around the provided reader.
212 ///
213 /// Internally, the reader is wrapped in a [`BufReader`], so callers should not pass in an
214 /// already buffered reader.
215 pub fn new(reader: R) -> Self {
216 Self {
217 reader: BufReader::with_capacity(256 * 1024, reader),
218 aligned_buf: AlignedVec::new(),
219 checksummer: create_crc32c_hasher(),
220 current_record_id: 0,
221 _t: PhantomData,
222 }
223 }
224
225 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
226 async fn read_length_delimiter(
227 &mut self,
228 is_finalized: bool,
229 ) -> Result<Option<usize>, ReaderError<T>> {
230 loop {
231 let available = self.reader.buffer().len();
232 if available >= 8 {
233 let length_buf = &self.reader.buffer()[..8];
234 let length = length_buf
235 .try_into()
236 .expect("the slice is the length of a u64");
237 self.reader.consume(8);
238
239 // By default, records cannot exceed 8MB in length, so whether our `usize` is a u32
240 // or u64, we're not going to overflow it. While the maximum record size _can_ be
241 // changed, it's not currently exposed to users. Even further, if it was exposed to
242 // users, it's currently a `usize`, so again, we know that we're not going to exceed
243 // 64-bit. And even further still, the writer fallibly attempts to get a `u64` of the
244 // record size based on the encoding buffer, which gives its length in `usize`, and
245 // so would fail if `usize` was larger than `u64`, meaning we at least will panic if
246 // Vector is running on a 128-bit CPU in the future, storing records that are larger
247 // than 2^64+1. :)
248 let record_len = u64::from_be_bytes(length)
249 .try_into()
250 .expect("record length should never exceed usize");
251 return Ok(Some(record_len));
252 }
253
254 // We don't have enough bytes, so we need to fill our buffer again.
255 let buf = self.reader.fill_buf().await.context(IoSnafu)?;
256 if buf.is_empty() {
257 return Ok(None);
258 }
259
260 // If we tried to read more bytes, and we still don't have enough for the record
261 // delimiter, and the data file has been finalized already: we've got a partial
262 // write situation on our hands.
263 if buf.len() < 8 && is_finalized {
264 return Err(ReaderError::PartialWrite);
265 }
266 }
267 }
268
269 /// Attempts to read a record.
270 ///
271 /// Records are preceded by a length delimiter, a fixed-size integer (currently 8 bytes) that
272 /// tells the reader how many more bytes to read in order to completely read the next record.
273 ///
274 /// If there are no more bytes to read, we return early in order to allow the caller to wait
275 /// until such a time where there should be more data, as no wake-ups can be generated when
276 /// reading a file after reaching EOF.
277 ///
278 /// If there is any data available, we attempt to continue reading until both a length
279 /// delimiter, and the accompanying record, can be read in their entirety.
280 ///
281 /// If a record is able to be read in its entirety, a token is returned to caller that can be
282 /// used with [`read_record`] in order to get an owned `T`. This is due to a quirk with the
283 /// compiler's ability to track stacked mutable references through conditional control flows, of
284 /// which is handled by splitting the "do we have a valid record in our buffer?" logic from the
285 /// "read that record and decode it" logic.
286 ///
287 /// # Finalized reads
288 ///
289 /// All of the above logic applies when `is_finalized` is `false`, which signals that a data
290 /// file is still currently being written to. If `is_finalized` is `true`, most of the above
291 /// logic applies but in cases where we detect a partial write, we explicitly return an error
292 /// for a partial read.
293 ///
294 /// In practice, what this means is that when we believe a file should be "finalized" -- the
295 /// writer flushed the file to disk, the ledger has been flushed, etc -- then we also expect to
296 /// be able to read all bytes with no leftover. A partially-written length delimiter, or
297 /// record, would be indicative of a bug with the writer or OS/disks, essentially telling us
298 /// that the current data file is not valid for reads anymore. We don't know _why_ it's in this
299 /// state, only that something is not right and that we must skip the file.
300 ///
301 /// # Errors
302 ///
303 /// Errors can occur during the I/O or deserialization stage. If an error occurs during any of
304 /// these stages, an appropriate error variant will be returned describing the error.
305 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
306 pub async fn try_next_record(
307 &mut self,
308 is_finalized: bool,
309 ) -> Result<Option<ReadToken>, ReaderError<T>> {
310 let Some(record_len) = self.read_length_delimiter(is_finalized).await? else {
311 return Ok(None);
312 };
313
314 if record_len == 0 {
315 return Err(ReaderError::Deserialization {
316 reason: "record length was zero".to_string(),
317 });
318 }
319
320 // Read in all of the bytes we need first.
321 self.aligned_buf.clear();
322 while self.aligned_buf.len() < record_len {
323 let needed = record_len - self.aligned_buf.len();
324 let buf = self.reader.fill_buf().await.context(IoSnafu)?;
325 if buf.is_empty() && is_finalized {
326 // If we needed more data, but there was none available, and we're finalized: we've
327 // got ourselves a partial write situation.
328 return Err(ReaderError::PartialWrite);
329 }
330
331 let available = cmp::min(buf.len(), needed);
332 self.aligned_buf.extend_from_slice(&buf[..available]);
333 self.reader.consume(available);
334 }
335
336 // Now see if we can deserialize our archived record from this.
337 let buf = self.aligned_buf.as_slice();
338 match validate_record_archive(buf, &self.checksummer) {
339 RecordStatus::FailedDeserialization(de) => Err(ReaderError::Deserialization {
340 reason: de.into_inner(),
341 }),
342 RecordStatus::Corrupted { calculated, actual } => {
343 Err(ReaderError::Checksum { calculated, actual })
344 }
345 RecordStatus::Valid { id, .. } => {
346 self.current_record_id = id;
347 // TODO: Another spot where our hardcoding of the length delimiter size in bytes is fragile.
348 Ok(Some(ReadToken::new(id, 8 + buf.len())))
349 }
350 }
351 }
352
353 /// Reads the record associated with the given [`ReadToken`].
354 ///
355 /// # Errors
356 ///
357 /// If an error occurs during decoding, an error variant will be returned describing the error.
358 ///
359 /// # Panics
360 ///
361 /// If a `ReadToken` is not used in a call to `read_record` before again calling
362 /// `try_next_record`, and the `ReadToken` from _that_ call is used, this method will panic due
363 /// to an out-of-order read.
364 pub fn read_record(&mut self, token: ReadToken) -> Result<T, ReaderError<T>> {
365 let record_id = token.into_record_id();
366 assert_eq!(
367 self.current_record_id, record_id,
368 "using expired read token; this is a serious bug"
369 );
370
371 // SAFETY:
372 // - `try_next_record` is the only method that can hand back a `ReadToken`
373 // - we only get a `ReadToken` if there's a valid record in `self.aligned_buf`
374 // - `try_next_record` does all the archive checks, checksum validation, etc
375 let record = unsafe { archived_root::<Record<'_>>(&self.aligned_buf) };
376
377 decode_record_payload(record)
378 }
379}
380
381impl<R, T> fmt::Debug for RecordReader<R, T>
382where
383 R: fmt::Debug,
384{
385 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386 f.debug_struct("RecordReader")
387 .field("reader", &self.reader)
388 .field("aligned_buf", &self.aligned_buf)
389 .field("checksummer", &self.checksummer)
390 .field("current_record_id", &self.current_record_id)
391 .finish()
392 }
393}
394
395/// Reads records from the buffer.
396#[derive(Debug)]
397pub struct BufferReader<T, FS>
398where
399 FS: Filesystem,
400{
401 ledger: Arc<Ledger<FS>>,
402 reader: Option<RecordReader<FS::File, T>>,
403 bytes_read: u64,
404 last_reader_record_id: u64,
405 data_file_start_record_id: Option<u64>,
406 data_file_record_count: u64,
407 data_file_marked_record_count: u64,
408 ready_to_read: bool,
409 record_acks: OrderedAcknowledgements<u64, u64>,
410 data_file_acks: OrderedAcknowledgements<u64, (PathBuf, u64)>,
411 finalizer: OrderedFinalizer<u64>,
412 _t: PhantomData<T>,
413}
414
415impl<T, FS> BufferReader<T, FS>
416where
417 T: Bufferable,
418 FS: Filesystem,
419 FS::File: Unpin,
420{
421 /// Creates a new [`BufferReader`] attached to the given [`Ledger`].
422 pub(crate) fn new(ledger: Arc<Ledger<FS>>, finalizer: OrderedFinalizer<u64>) -> Self {
423 let ledger_last_reader_record_id = ledger.state().get_last_reader_record_id();
424 let next_expected_record_id = ledger_last_reader_record_id.wrapping_add(1);
425
426 Self {
427 ledger,
428 reader: None,
429 bytes_read: 0,
430 last_reader_record_id: 0,
431 data_file_start_record_id: None,
432 data_file_record_count: 0,
433 data_file_marked_record_count: 0,
434 ready_to_read: false,
435 record_acks: OrderedAcknowledgements::from_acked(next_expected_record_id),
436 data_file_acks: OrderedAcknowledgements::from_acked(0),
437 finalizer,
438 _t: PhantomData,
439 }
440 }
441
442 fn reset(&mut self) {
443 self.reader = None;
444 self.bytes_read = 0;
445 self.data_file_start_record_id = None;
446 }
447
448 fn track_read(&mut self, record_id: u64, record_bytes: u64, event_count: NonZeroU64) {
449 // We explicitly reduce the event count by one here in order to correctly calculate the
450 // "last" record ID, which you can visualize as follows...
451 //
452 // [ record 1 ] [ record 2 ] [ record 3 ] [....]
453 // [0] [1] [2] [3] [4] [5] [....]
454 //
455 // For each of these records, their "last ID" is simply the ID of the first event within the
456 // record, plus the event count, minus one. Another way to look at it is that the "last"
457 // reader record ID is always one behind the next expected record ID. In the above example,
458 // the next record ID we would expect would be 6, regardless of how many events the record has.
459 self.last_reader_record_id = record_id.wrapping_add(event_count.get() - 1);
460 if self.data_file_start_record_id.is_none() {
461 self.data_file_start_record_id = Some(record_id);
462 }
463
464 // Track the amount of data we read. If we're still loading the buffer, then the only thing
465 // other we need to do is update the total buffer size. Everything else below only matters
466 // when we're doing real record reads.
467 self.bytes_read += record_bytes;
468 if !self.ready_to_read {
469 self.ledger.decrement_total_buffer_size(record_bytes);
470 return;
471 }
472
473 // We've done a "real" record read, so we need to track it for acknowledgement. Check our
474 // acknowledge state first to see if this is the next record ID we expected.
475 self.data_file_record_count += 1;
476 if let Err(me) =
477 self.record_acks
478 .add_marker(record_id, Some(event_count.get()), Some(record_bytes))
479 {
480 match me {
481 MarkerError::MonotonicityViolation => {
482 panic!("record ID monotonicity violation detected; this is a serious bug")
483 }
484 }
485 }
486 }
487
488 #[cfg_attr(test, instrument(skip_all, level = "debug"))]
489 async fn delete_completed_data_file(
490 &mut self,
491 data_file_path: PathBuf,
492 bytes_read: Option<u64>,
493 ) -> io::Result<()> {
494 // TODO: Could we actually make this a background task to remove the tail latency from the
495 // read path? Technically all that's needed is a handle to the ledger and the data file
496 // path, so as long as the logic is still right, we can notify writers out-of-band.
497 debug!(
498 data_file_path = data_file_path.to_string_lossy().as_ref(),
499 bytes_read, "Deleting completed data file."
500 );
501
502 // Grab the size of the data file before we delete it, which gives us a chance to fix up the
503 // total buffer size for corrupted files or fast-forwarded files.
504 //
505 // Since we only decrement the buffer size after a successful read in normal cases, skipping
506 // the rest of a corrupted file could lead to the total buffer size being unsynchronized.
507 // We use the difference between the number of bytes read and the file size to figure out if
508 // we need to make a manual adjustment.
509 //
510 // Likewise, when we skip over a file in "fast forward" mode during initialization, no reads
511 // occur at all, so we're relying on this method to correct the buffer size for us. This is
512 // why `bytes_read` is optional: when it's specified, we calculate a delta for handling
513 // partial-read scenarios, otherwise, we just use the entire data file size as is.
514 let data_file = self
515 .ledger
516 .filesystem()
517 .open_file_readable(&data_file_path)
518 .await?;
519 let metadata = data_file.metadata().await?;
520
521 let decrease_amount = bytes_read.map_or_else(
522 || metadata.len(),
523 |bytes_read| {
524 let size_delta = metadata.len() - bytes_read;
525 if size_delta > 0 {
526 debug!(
527 actual_file_size = metadata.len(),
528 bytes_read,
529 "Data file was only partially read. Adjusting buffer size to compensate.",
530 );
531 }
532
533 size_delta
534 },
535 );
536
537 if decrease_amount > 0 {
538 self.ledger.decrement_total_buffer_size(decrease_amount);
539 }
540
541 drop(data_file);
542
543 // Delete the current data file, and increment our actual reader file ID.
544 self.ledger
545 .filesystem()
546 .delete_file(&data_file_path)
547 .await?;
548 self.ledger.increment_acked_reader_file_id();
549 self.ledger.flush()?;
550
551 debug!("Flushed after deleting data file, notifying writers and continuing.");
552
553 // Notify any waiting writers that we've deleted a data file, which they may be waiting on
554 // because they're looking to reuse the file ID of the file we just finished reading.
555 self.ledger.notify_reader_waiters();
556
557 Ok(())
558 }
559
560 #[cfg_attr(test, instrument(skip(self), level = "debug"))]
561 async fn handle_pending_acknowledgements(
562 &mut self,
563 force_check_pending_data_files: bool,
564 ) -> io::Result<()> {
565 // Acknowledgements effectively happen in two layers: record acknowledgement and data file
566 // acknowledgement. Since records can contain multiple events, we need to track when a
567 // record itself has been fully acknowledged. Likewise, data files contain multiple records,
568 // so we need to track when all records we've read from a data file have been acknowledged.
569
570 // Drive record acknowledgement first.
571 //
572 // We only do this if we actually consume any acknowledgements, and immediately update the
573 // buffer and ledger to more quickly get those metrics into good shape. We defer notifying
574 // writers until after, though, in case we also have data files to delete, so that we can
575 // coalesce the notifications together at the very end of the method.
576 let mut had_eligible_records = false;
577 let mut records_acknowledged: u64 = 0;
578 let mut events_acknowledged: u64 = 0;
579 let mut events_skipped: u64 = 0;
580 let mut bytes_acknowledged: u64 = 0;
581
582 let consumed_acks = self.ledger.consume_pending_acks();
583 if consumed_acks > 0 {
584 self.record_acks.add_acknowledgements(consumed_acks);
585
586 while let Some(EligibleMarker { len, data, .. }) =
587 self.record_acks.get_next_eligible_marker()
588 {
589 had_eligible_records = true;
590
591 match len {
592 // Any marker with an assumed length implies a gap marker, which gets added
593 // automatically and represents a portion of the record ID range that was
594 // expected but missing. This is a long way of saying: we're missing records.
595 //
596 // We tally this up so that we can emit a single log event/set of metrics, as
597 // there may be many gap markers and emitting for each of them could be very noisy.
598 EligibleMarkerLength::Assumed(count) => {
599 events_skipped = events_skipped
600 .checked_add(count)
601 .expect("skipping more than 2^64 events at a time is obviously a bug");
602 }
603 // We got a valid marker representing a known number of events.
604 EligibleMarkerLength::Known(len) => {
605 // We specifically pass the size of the record, in bytes, as the marker data.
606 let record_bytes = data.expect("record bytes should always be known");
607
608 records_acknowledged = records_acknowledged.checked_add(1).expect(
609 "acknowledging more than 2^64 records at a time is obviously a bug",
610 );
611 events_acknowledged = events_acknowledged.checked_add(len).expect(
612 "acknowledging more than 2^64 events at a time is obviously a bug",
613 );
614 bytes_acknowledged = bytes_acknowledged.checked_add(record_bytes).expect(
615 "acknowledging more than 2^64 bytes at a time is obviously a bug",
616 );
617 }
618 }
619 }
620
621 // We successfully processed at least one record, so update our buffer and ledger accounting.
622 if had_eligible_records {
623 self.ledger
624 .track_reads(events_acknowledged, bytes_acknowledged);
625
626 // We need to account for skipped events, too, so that our "last reader record ID"
627 // value stays correct as we process these gap markers.
628 let last_increment_amount = events_acknowledged + events_skipped;
629 self.ledger
630 .state()
631 .increment_last_reader_record_id(last_increment_amount);
632
633 self.data_file_acks
634 .add_acknowledgements(records_acknowledged);
635 }
636
637 // If any events were skipped, do our logging/metrics for that.
638 if events_skipped > 0 {
639 self.ledger.track_dropped_events(events_skipped);
640 }
641 }
642
643 // If we processed any eligible records, we may now also have eligible data files.
644 //
645 // Alternatively, the core `next` logic may have just rolled over to a new data file, and
646 // we're seeing if we can fast track any eligible data file deletions rather than waiting
647 // for more acknowledgements to come in.
648 let mut had_eligible_data_files = false;
649 let mut data_files_deleted: u16 = 0;
650
651 if had_eligible_records || force_check_pending_data_files {
652 // Now handle data file deletion. We unconditionally check to see if any data files are
653 // eligible for deletion, and process them immediately.
654
655 while let Some(EligibleMarker { data, .. }) =
656 self.data_file_acks.get_next_eligible_marker()
657 {
658 had_eligible_data_files = true;
659
660 let (data_file_path, bytes_read) =
661 data.expect("data file deletion marker should never be empty");
662 self.delete_completed_data_file(data_file_path, Some(bytes_read))
663 .await?;
664
665 data_files_deleted = data_files_deleted
666 .checked_add(1)
667 .expect("deleting more than 2^16 data files at a time is obviously a bug");
668 }
669 }
670
671 // If we managed to processed any records _or_ any data file deletions, we've made
672 // meaningful progress that writers may care about, so notify them.
673 if had_eligible_data_files || had_eligible_records {
674 self.ledger.notify_reader_waiters();
675
676 if self.ready_to_read {
677 trace!(
678 current_buffer_size = self.ledger.get_total_buffer_size(),
679 records_acknowledged,
680 events_acknowledged,
681 events_skipped,
682 bytes_acknowledged,
683 data_files_deleted,
684 "Finished handling acknowledgements."
685 );
686 }
687 }
688
689 Ok(())
690 }
691
692 /// Switches the reader over to the next data file to read.
693 #[cfg_attr(test, instrument(skip(self), level = "debug"))]
694 fn roll_to_next_data_file(&mut self) {
695 // Add a marker for this data file so we know when it can be safely deleted. We also need
696 // to track the necessary data to do our buffer accounting when it's eligible for deletion.
697 //
698 // In the rare case where the very first read in a new data file is corrupted/invalid and we
699 // roll to the next data file, we simply use the last reader record ID we have, which yields
700 // a marker with a length of 0.
701 let data_file_start_record_id = self
702 .data_file_start_record_id
703 .take()
704 .unwrap_or(self.last_reader_record_id);
705 // Record IDs are inclusive, so if last is 1 and start is 0, that means we had two events,
706 // potentially from one or two records.
707 let data_file_event_count = self
708 .last_reader_record_id
709 .wrapping_sub(data_file_start_record_id)
710 .saturating_add(1);
711 let data_file_record_count = self.data_file_record_count;
712 let data_file_path = self.ledger.get_current_reader_data_file_path();
713 let bytes_read = self.bytes_read;
714
715 debug!(
716 data_file_path = data_file_path.to_string_lossy().as_ref(),
717 first_record_id = data_file_start_record_id,
718 last_record_id = self.last_reader_record_id,
719 record_count = data_file_record_count,
720 event_count = data_file_event_count,
721 bytes_read,
722 "Marking data file for deletion."
723 );
724
725 let data_file_marker_id = self.data_file_marked_record_count;
726 self.data_file_marked_record_count += data_file_record_count;
727 self.data_file_record_count = 0;
728
729 self.data_file_acks
730 .add_marker(
731 data_file_marker_id,
732 Some(data_file_record_count),
733 Some((data_file_path, bytes_read)),
734 )
735 .expect("should not fail to add marker for data file deletion");
736
737 // Now reset our internal state so we can go for the next data file.
738 self.reset();
739 self.ledger.increment_unacked_reader_file_id();
740
741 debug!("Rolling to next data file.");
742 }
743
744 /// Ensures this reader is ready to attempt reading the next record.
745 #[cfg_attr(test, instrument(skip(self), level = "debug"))]
746 async fn ensure_ready_for_read(&mut self) -> io::Result<()> {
747 // We have nothing to do if we already have a data file open.
748 if self.reader.is_some() {
749 return Ok(());
750 }
751
752 // Try to open the current reader data file. This might not _yet_ exist, in which case
753 // we'll simply wait for the writer to signal to us that progress has been made, which
754 // implies a data file existing.
755 loop {
756 let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();
757 let data_file_path = self.ledger.get_current_reader_data_file_path();
758 let data_file = match self
759 .ledger
760 .filesystem()
761 .open_file_readable(&data_file_path)
762 .await
763 {
764 Ok(data_file) => data_file,
765 Err(e) => match e.kind() {
766 ErrorKind::NotFound => {
767 if reader_file_id == writer_file_id {
768 debug!(
769 data_file_path = data_file_path.to_string_lossy().as_ref(),
770 "Data file does not yet exist. Waiting for writer to create."
771 );
772 self.ledger.wait_for_writer().await;
773 } else {
774 self.ledger.increment_acked_reader_file_id();
775 }
776 continue;
777 }
778 // This is a valid I/O error, so bubble that back up.
779 _ => return Err(e),
780 },
781 };
782
783 debug!(
784 data_file_path = data_file_path.to_string_lossy().as_ref(),
785 "Opened data file for reading."
786 );
787
788 self.reader = Some(RecordReader::new(data_file));
789 return Ok(());
790 }
791 }
792
793 /// Seeks to where this reader previously left off.
794 ///
795 /// In cases where Vector has restarted, but the reader hasn't yet finished a file, we would
796 /// open the correct data file for reading, but our file cursor would be at the very
797 /// beginning, essentially pointed at the wrong record. We read out records here until we
798 /// reach a point where we've read up to the record referenced by `get_last_reader_record_id`.
799 ///
800 /// This ensures that a subsequent call to `next` is ready to read the correct record.
801 ///
802 /// # Errors
803 ///
804 /// If an error occurs during seeking to the next record, an error variant will be returned
805 /// describing the error.
806 #[cfg_attr(test, instrument(skip(self), level = "debug"))]
807 pub(super) async fn seek_to_next_record(&mut self) -> Result<(), ReaderError<T>> {
808 // We don't try seeking again once we're all caught up.
809 if self.ready_to_read {
810 warn!("Reader already initialized.");
811 return Ok(());
812 }
813
814 // We rely on `next` to close out the data file if we've actually reached the end, and we
815 // also rely on it to reset the data file before trying to read, and we _also_ rely on it to
816 // update `self.last_reader_record_id`, so basically... just keep reading records until we
817 // get to the one we left off with last time.
818 let ledger_last = self.ledger.state().get_last_reader_record_id();
819 debug!(
820 last_acknowledged_record_id = ledger_last,
821 "Seeking to last acknowledged record for reader."
822 );
823
824 // We may end up in a situation where a data file hasn't yet been deleted but we've moved on
825 // to the next data file, including reading acknowledging records within it. If Vector
826 // is stopped at a point like this, and we restart it and load the buffer, we'll start on
827 // the old data file. That's wasteful to read all over again.
828 //
829 // In our seek loop, we have a fast path where we check the last record of a data file while
830 // the reader and writer file IDs don't match. If we see that the record is still below the
831 // last reader record ID, we do the necessary clean up to delete that file and move to the
832 // next file. This is safe because we know that if we managed to acknowledge records with
833 // an ID higher than the highest record ID in the data file, it was meant to have been
834 // deleted.
835 //
836 // Once the reader/writer file IDs are identical, we fall back to the slow path.
837 while self.ledger.get_current_reader_file_id() != self.ledger.get_current_writer_file_id() {
838 let data_file_path = self.ledger.get_current_reader_data_file_path();
839 self.ensure_ready_for_read().await.context(IoSnafu)?;
840 let data_file_mmap = self
841 .ledger
842 .filesystem()
843 .open_mmap_readable(&data_file_path)
844 .await
845 .context(IoSnafu)?;
846
847 match validate_record_archive(data_file_mmap.as_ref(), &Hasher::new()) {
848 RecordStatus::Valid {
849 id: last_record_id, ..
850 } => {
851 let record = try_as_record_archive(data_file_mmap.as_ref())
852 .expect("record was already validated");
853
854 let Ok(item) = decode_record_payload::<T>(record) else {
855 // If there's an error decoding the item, just fall back to the slow path,
856 // because this file might actually be where we left off, so we don't want
857 // to incorrectly skip ahead or anything.
858 break;
859 };
860
861 // We have to remove 1 from the event count here because otherwise the ID would
862 // be the _next_ record's ID we'd expect, not the last ID of the record we are
863 // acknowledged up to. (Record IDs start at N and consume up to N+M-1 where M is
864 // the number of events in the record, which is how we can determine the event
865 // count from the record IDs alone, without having to read every record in the
866 // buffer during startup.)
867 let record_events = u64::try_from(item.event_count())
868 .expect("event count should never exceed u64");
869 let last_record_id_in_data_file =
870 last_record_id.wrapping_add(record_events.saturating_sub(1));
871
872 // If we're past this data file, delete it and move on. We do this manually
873 // versus faking it via `roll_to_next_data_file` because that emits a deletion
874 // marker, but the internal state tracking first/last record ID, bytes read,
875 // etc, won't actually be usable.
876 if ledger_last > last_record_id_in_data_file {
877 // By passing 0 bytes, `delete_completed_data_file` does the work of
878 // ensuring the buffer size is updated to reflect the data file being
879 // deleted in its entirety.
880 self.delete_completed_data_file(data_file_path, None)
881 .await
882 .context(IoSnafu)?;
883 self.reset();
884 } else {
885 // We've hit a point where the current data file we're on has records newer
886 // than where we left off, so we can catch up from here.
887 break;
888 }
889 }
890 // Similar to the comment above about when decoding fails, we fallback to the slow
891 // path in case any error is encountered, lest we risk incorrectly skipping ahead to
892 // the wrong data file.
893 _ => break,
894 }
895 }
896
897 // We rely on `next` to close out the data file if we've actually reached the end, and we
898 // also rely on it to reset the data file before trying to read, and we _also_ rely on it to
899 // update `self.last_reader_record_id`, so basically... just keep reading records until
900 // we're past the last record we had acknowledged.
901 while self.last_reader_record_id < ledger_last {
902 match self.next().await {
903 Ok(maybe_record) => {
904 if maybe_record.is_none() {
905 // We've hit the end of the current data file so we've gone as far as we can.
906 break;
907 }
908 }
909 Err(e) if e.is_bad_read() => {
910 // If we hit a bad read during initialization, we should only continue calling
911 // `next` if we have not advanced _past_ the writer in terms of file ID.
912 //
913 // If the writer saw the same error we just saw, it will have rolled itself to
914 // the next file, lazily: for example, it discovers a bad record at the end of
915 // file ID 3, so it marks itself to open file ID 4 next, but hasn't yet
916 // created it, and is still technically indicated as being on file ID 3.
917 //
918 // Meanwhile, if _we_ try to also roll to file ID 4 and read from it, we'll deadlock
919 // ourselves because it doesn't yet exist. However, `next` immediately updates our
920 // reader file ID as soon as it hits a bad read error, so in this scenario,
921 // we're now marked as being on file ID 4 while the writer is still on file ID
922 // 3.
923 //
924 // From that, we can determine that when we've hit a bad read error, that if our
925 // file ID is greater than the writer's file ID, we're now essentially
926 // synchronized.
927 let (reader_file_id, writer_file_id) =
928 self.ledger.get_current_reader_writer_file_id();
929 if reader_file_id > writer_file_id {
930 break;
931 }
932 }
933 Err(e) => return Err(e),
934 }
935 }
936
937 debug!(
938 last_record_id_read = self.last_reader_record_id,
939 "Synchronized with ledger. Reader ready."
940 );
941
942 self.ready_to_read = true;
943
944 Ok(())
945 }
946
947 /// Reads a record.
948 ///
949 /// If the writer is closed and there is no more data in the buffer, `None` is returned.
950 /// Otherwise, reads the next record or waits until the next record is available.
951 ///
952 /// # Errors
953 ///
954 /// If an error occurred while reading a record, an error variant will be returned describing
955 /// the error.
956 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
957 pub async fn next(&mut self) -> Result<Option<T>, ReaderError<T>> {
958 let mut force_check_pending_data_files = false;
959
960 let token = loop {
961 // Handle any pending acknowledgements first.
962 self.handle_pending_acknowledgements(force_check_pending_data_files)
963 .await
964 .context(IoSnafu)?;
965 force_check_pending_data_files = false;
966
967 // If the writer has marked themselves as done, and the buffer has been emptied, then
968 // we're done and can return. We have to look at something besides simply the writer
969 // being marked as done to know if we're actually done or not, and "buffer size" is better
970 // than "total records" because we update buffer size when handling acknowledgements,
971 // whether it's an individual ack or an entire file being deleted.
972 //
973 // If we used "total records", we could end up stuck in cases where we skipped
974 // corrupted records, but hadn't yet had a "good" record that we could read, since the
975 // "we skipped records due to corruption" logic requires performing valid read to
976 // detect, and calculate a valid delta from.
977 if self.ledger.is_writer_done() {
978 let total_buffer_size = self.ledger.get_total_buffer_size();
979 if total_buffer_size == 0 {
980 return Ok(None);
981 }
982 }
983
984 self.ensure_ready_for_read().await.context(IoSnafu)?;
985
986 let reader = self
987 .reader
988 .as_mut()
989 .expect("reader should exist after `ensure_ready_for_read`");
990
991 let (reader_file_id, writer_file_id) = self.ledger.get_current_reader_writer_file_id();
992
993 // Essentially: is the writer still writing to this data file or not, and are we
994 // actually ready to read (aka initialized)?
995 //
996 // This is a necessary invariant to understand if the record reader should actually keep
997 // waiting for data, or if a data file had a partial write/missing data and should be
998 // skipped. In particular, not only does this matter for deadlocking during shutdown due
999 // to improper writer behavior/flushing, but it also matters during initialization in
1000 // case where the current data file had a partial write.
1001 let is_finalized = (reader_file_id != writer_file_id) || !self.ready_to_read;
1002
1003 // Try reading a record, which if successful, gives us a token to actually read/get a
1004 // reference to the record. This is a slightly-tricky song-and-dance due to rustc not
1005 // yet fully understanding mutable borrows when conditional control flow is involved.
1006 match reader.try_next_record(is_finalized).await {
1007 // Not even enough data to read a length delimiter, so we need to wait for the
1008 // writer to signal us that there's some actual data to read.
1009 Ok(None) => {}
1010 // We got a valid record, so keep the token.
1011 Ok(Some(token)) => break token,
1012 // A length-delimited payload was read, but we failed to deserialize it as a valid
1013 // record, or we deserialized it and the checksum was invalid. Either way, we're not
1014 // sure the rest of the data file is even valid, so roll to the next file.
1015 //
1016 // TODO: Explore the concept of putting a data file into a "one more attempt to read
1017 // a valid record" state, almost like a semi-open circuit breaker. There's a
1018 // possibility that the length delimiter we got is valid, and all the data was
1019 // written for the record, but the data was invalid... and that if we just kept
1020 // reading, we might actually encounter a valid record.
1021 //
1022 // Theoretically, based on both the validation done by `rkyv` and the checksum, it
1023 // should be incredibly unlikely to read a valid record after getting a
1024 // corrupted record if there was missing data or more invalid data. We use
1025 // checksumming to assert errors within a given chunk of the payload, so one payload
1026 // being corrupted doesn't always, in fact, mean that other records after it are
1027 // corrupted too.
1028 Err(e) => {
1029 // Invalid checksums and deserialization failures can't really be acted upon by
1030 // the caller, but they might be expecting a read-after-write behavior, so we
1031 // return the error to them after ensuring that we roll to the next file first.
1032 if e.is_bad_read() {
1033 self.roll_to_next_data_file();
1034 }
1035
1036 return Err(e);
1037 }
1038 }
1039
1040 // Fundamentally, when `try_read_record` returns `None`, there's three possible
1041 // scenarios:
1042 //
1043 // 1. we are entirely caught up to the writer
1044 // 2. we've hit the end of the data file and need to go to the next one
1045 // 3. the writer has closed/dropped/finished/etc
1046 //
1047 // When we're at this point, we check the reader/writer file IDs. If the file IDs are
1048 // not identical, we now know the writer has moved on. Crucially, since we always flush
1049 // our writes before waking up, including before moving to a new file, then we know that
1050 // if the reader/writer were not identical at the start the loop, and `try_read_record`
1051 // returned `None`, that we have hit the actual end of the reader's current data file,
1052 // and need to move on.
1053 //
1054 // If the file IDs were identical, it would imply that reader is still on the writer's
1055 // current data file. We then "wait" for the writer to wake us up. It may lead to the
1056 // same thing -- `try_read_record` returning `None` with an identical reader/writer file
1057 // ID -- but that's OK, because it would mean we were actually waiting for the writer to
1058 // make progress now. If the wake-up was valid, due to writer progress, then, well...
1059 // we'd actually be able to read data.
1060 //
1061 // The case of "the writer has closed/dropped/finished/etc" is handled at the top of the
1062 // loop, because otherwise we could get stuck waiting for the writer after an empty
1063 // `try_read_record` attempt when the writer is done and we're at the end of the file,
1064 // etc.
1065 if self.ready_to_read {
1066 if reader_file_id != writer_file_id {
1067 debug!(
1068 reader_file_id,
1069 writer_file_id, "Reached the end of current data file."
1070 );
1071
1072 self.roll_to_next_data_file();
1073 force_check_pending_data_files = true;
1074 continue;
1075 }
1076
1077 self.ledger.wait_for_writer().await;
1078 } else {
1079 debug!(
1080 bytes_read = self.bytes_read,
1081 "Current data file has no more data."
1082 );
1083
1084 if reader_file_id == writer_file_id {
1085 // We're currently just seeking to where we left off the last time this buffer was
1086 // running, which might mean there's no records for us to read at all because we
1087 // were already caught up. All we can do is signal to `seek_to_next_record` that
1088 // we're caught up.
1089 return Ok(None);
1090 }
1091 }
1092 };
1093
1094 // We got a read token, so our record is present in the reader, and now we can actually read
1095 // it out and return it.
1096 let record_id = token.record_id();
1097 let record_bytes = token.record_bytes() as u64;
1098
1099 let reader = self
1100 .reader
1101 .as_mut()
1102 .expect("reader should exist after `ensure_ready_for_read`");
1103 let mut record = reader.read_record(token)?;
1104
1105 let record_events: u64 = record
1106 .event_count()
1107 .try_into()
1108 .expect("Event count for a record cannot exceed 2^64 events.");
1109 let record_events = record_events
1110 .try_into()
1111 .map_err(|_| ReaderError::EmptyRecord)?;
1112 self.track_read(record_id, record_bytes, record_events);
1113
1114 let (batch, receiver) = BatchNotifier::new_with_receiver();
1115 record.add_batch_notifier(batch);
1116 self.finalizer.add(record_events.get(), receiver);
1117
1118 if self.ready_to_read {
1119 trace!(
1120 record_id,
1121 record_events,
1122 record_bytes,
1123 data_file_id = self.ledger.get_current_reader_file_id(),
1124 "Read record."
1125 );
1126 }
1127
1128 Ok(Some(record))
1129 }
1130}
1131
1132pub(crate) fn decode_record_payload<T: Bufferable>(
1133 record: &ArchivedRecord<'_>,
1134) -> Result<T, ReaderError<T>> {
1135 // Try and convert the raw record metadata into the true metadata type used by `T`, and then
1136 // also verify that `T` is able to decode records with the metadata used for this record in particular.
1137 let metadata = T::Metadata::from_u32(record.metadata()).ok_or(ReaderError::Incompatible {
1138 reason: format!("invalid metadata for {}", std::any::type_name::<T>()),
1139 })?;
1140
1141 if !T::can_decode(metadata) {
1142 return Err(ReaderError::Incompatible {
1143 reason: format!(
1144 "record metadata not supported (metadata: {:#036b})",
1145 record.metadata()
1146 ),
1147 });
1148 }
1149
1150 // Now we can finally try decoding.
1151 T::decode(metadata, record.payload()).context(DecodeSnafu)
1152}