vector_buffers/variants/disk_v2/
writer.rs

1use std::{
2    cmp::Ordering,
3    convert::Infallible as StdInfallible,
4    fmt,
5    io::{self, ErrorKind},
6    marker::PhantomData,
7    num::NonZeroUsize,
8    sync::Arc,
9};
10
11use bytes::BufMut;
12use crc32fast::Hasher;
13use rkyv::{
14    ser::{
15        serializers::{
16            AlignedSerializer, AllocScratch, AllocScratchError, BufferScratch, CompositeSerializer,
17            CompositeSerializerError, FallbackScratch,
18        },
19        Serializer,
20    },
21    AlignedVec, Infallible,
22};
23use snafu::{ResultExt, Snafu};
24use tokio::io::{AsyncWrite, AsyncWriteExt};
25
26use super::{
27    common::{create_crc32c_hasher, DiskBufferConfig},
28    io::Filesystem,
29    ledger::Ledger,
30    record::{validate_record_archive, Record, RecordStatus},
31};
32use crate::{
33    encoding::{AsMetadata, Encodable},
34    variants::disk_v2::{
35        io::AsyncFile,
36        reader::decode_record_payload,
37        record::{try_as_record_archive, RECORD_HEADER_LEN},
38    },
39    Bufferable,
40};
41
42/// Error that occurred during calls to [`BufferWriter`].
43#[derive(Debug, Snafu)]
44pub enum WriterError<T>
45where
46    T: Bufferable,
47{
48    /// A general I/O error occurred.
49    ///
50    /// Different methods will capture specific I/O errors depending on the situation, as some
51    /// errors may be expected and considered normal by design.  For all I/O errors that are
52    /// considered atypical, they will be returned as this variant.
53    #[snafu(display("write I/O error: {}", source))]
54    Io { source: io::Error },
55
56    /// The record attempting to be written was too large.
57    ///
58    /// In practice, most encoders will throw their own error if they cannot write all of the
59    /// necessary bytes during encoding, and so this error will typically only be emitted when the
60    /// encoder throws no error during the encoding step itself, but manages to fill up the encoding
61    /// buffer to the limit.
62    #[snafu(display("record too large: limit is {}", limit))]
63    RecordTooLarge { limit: usize },
64
65    /// The data file did not have enough remaining space to write the record.
66    ///
67    /// This could be because the data file is legitimately full, but is more commonly related to a
68    /// record being big enough that it would exceed the max data file size.
69    ///
70    /// The record that was given to write is returned.
71    #[snafu(display("data file full or record would exceed max data file size"))]
72    DataFileFull { record: T, serialized_len: usize },
73
74    /// A record reported that it contained more events than the number of bytes when encoded.
75    ///
76    /// This is nonsensical because we don't intend to ever support encoding zero-sized types
77    /// through the buffer, and the logic we use to count the number of actual events in the buffer
78    /// transitively depends on not being able to represent more than one event per encoded byte.
79    #[snafu(display(
80        "record reported event count ({}) higher than encoded length ({})",
81        encoded_len,
82        event_count
83    ))]
84    NonsensicalEventCount {
85        encoded_len: usize,
86        event_count: usize,
87    },
88
89    /// The encoder encountered an issue during encoding.
90    ///
91    /// For common encoders, failure to write all of the bytes of the input will be the most common
92    /// error, and in fact, some encoders, it's the only possible error that can occur.
93    #[snafu(display("failed to encode record: {:?}", source))]
94    FailedToEncode {
95        source: <T as Encodable>::EncodeError,
96    },
97
98    /// The writer failed to serialize the record.
99    ///
100    /// As records are encoded and then wrapped in a container which carries metadata about the size
101    /// of the encoded record, and so on, there is a chance that we could fail to serialize that
102    /// container during the write step.
103    ///
104    /// In practice, this should generally only occur if the system is unable to allocate enough
105    /// memory during the serialization step aka the system itself is literally out of memory to
106    /// give to processes.  Rare, indeed.
107    #[snafu(display("failed to serialize encoded record to buffer: {}", reason))]
108    FailedToSerialize { reason: String },
109
110    /// The writer failed to validate the last written record.
111    ///
112    /// Specifically, for `BufferWriter`, this can only ever be returned when creating the buffer, during
113    /// validation of the last written record.  While it's technically possible that it may be
114    /// something else, this error is most likely to occur when the records in a buffer were written
115    /// in a different version of Vector that cannot be decoded in this version of Vector.
116    #[snafu(display("failed to validate the last written record: {}", reason))]
117    FailedToValidate { reason: String },
118
119    /// The writer entered an inconsistent state that represents an unrecoverable error.
120    ///
121    /// In some cases, like expecting to be able to decode an event we just encoded, we might hit an
122    /// error.  This would be an entirely unexpected error -- how is it possible to not be able to
123    /// decode an event we literally just encoded on the line above? -- and as such, the only
124    /// reasonable thing to do would be to give up.
125    ///
126    /// This error is the writer, and thus the buffer, giving up.
127    #[snafu(display("writer entered inconsistent state: {}", reason))]
128    InconsistentState { reason: String },
129
130    /// The record reported an event count of zero.
131    ///
132    /// Empty records are not supported.
133    EmptyRecord,
134}
135
136impl<T: Bufferable + PartialEq> PartialEq for WriterError<T> {
137    fn eq(&self, other: &Self) -> bool {
138        match (self, other) {
139            (Self::Io { source: l_source }, Self::Io { source: r_source }) => {
140                l_source.kind() == r_source.kind()
141            }
142            (Self::RecordTooLarge { limit: l_limit }, Self::RecordTooLarge { limit: r_limit }) => {
143                l_limit == r_limit
144            }
145            (
146                Self::DataFileFull {
147                    record: l_record,
148                    serialized_len: l_serialized_len,
149                },
150                Self::DataFileFull {
151                    record: r_record,
152                    serialized_len: r_serialized_len,
153                },
154            ) => l_record == r_record && l_serialized_len == r_serialized_len,
155            (
156                Self::NonsensicalEventCount {
157                    encoded_len: l_encoded_len,
158                    event_count: l_event_count,
159                },
160                Self::NonsensicalEventCount {
161                    encoded_len: r_encoded_len,
162                    event_count: r_event_count,
163                },
164            ) => l_encoded_len == r_encoded_len && l_event_count == r_event_count,
165            (
166                Self::FailedToSerialize { reason: l_reason },
167                Self::FailedToSerialize { reason: r_reason },
168            )
169            | (
170                Self::FailedToValidate { reason: l_reason },
171                Self::FailedToValidate { reason: r_reason },
172            )
173            | (
174                Self::InconsistentState { reason: l_reason },
175                Self::InconsistentState { reason: r_reason },
176            ) => l_reason == r_reason,
177            _ => core::mem::discriminant(self) == core::mem::discriminant(other),
178        }
179    }
180}
181
182impl<T> From<CompositeSerializerError<StdInfallible, AllocScratchError, StdInfallible>>
183    for WriterError<T>
184where
185    T: Bufferable,
186{
187    fn from(e: CompositeSerializerError<StdInfallible, AllocScratchError, StdInfallible>) -> Self {
188        match e {
189            CompositeSerializerError::ScratchSpaceError(sse) => WriterError::FailedToSerialize {
190                reason: format!("insufficient space to serialize encoded record: {sse}"),
191            },
192            // Only our scratch space strategy is fallible, so we should never get here.
193            _ => unreachable!(),
194        }
195    }
196}
197
198impl<T> From<io::Error> for WriterError<T>
199where
200    T: Bufferable,
201{
202    fn from(source: io::Error) -> Self {
203        WriterError::Io { source }
204    }
205}
206
207#[derive(Debug)]
208pub(super) struct WriteToken {
209    event_count: usize,
210    serialized_len: usize,
211}
212
213impl WriteToken {
214    pub fn event_count(&self) -> usize {
215        self.event_count
216    }
217
218    pub fn serialized_len(&self) -> usize {
219        self.serialized_len
220    }
221}
222
223#[derive(Debug, Default, PartialEq)]
224pub(super) struct FlushResult {
225    pub events_flushed: u64,
226    pub bytes_flushed: u64,
227}
228
229/// Wraps an [`AsyncWrite`] value and buffers individual writes, while signalling implicit flushes.
230///
231/// As the [`BufferWriter`] must track when writes have theoretically made it to disk, we care about
232/// situations where the internal write buffer for a data file has been flushed to make room.  In
233/// order to provide this information, we track the number of events represented by a record when
234/// writing its serialized form.
235///
236/// If an implicit buffer flush must be performed before a write can complete, or a manual flush is
237/// requested, we return this information to the caller, letting them know how many events, and how
238/// many bytes, were flushed.
239struct TrackingBufWriter<W> {
240    inner: W,
241    buf: Vec<u8>,
242    unflushed_events: usize,
243}
244
245impl<W: AsyncWrite + Unpin> TrackingBufWriter<W> {
246    /// Creates a new `TrackingBufWriter` with the specified buffer capacity.
247    fn with_capacity(cap: usize, inner: W) -> Self {
248        Self {
249            inner,
250            buf: Vec::with_capacity(cap),
251            unflushed_events: 0,
252        }
253    }
254
255    /// Writes the given buffer.
256    ///
257    /// If enough internal buffer capacity is available, then this write will be buffered internally
258    /// until [`flush`] is called.  If there's not enough remaining internal buffer capacity, then
259    /// the internal buffer will be flushed to the inner writer first.  If the given buffer is
260    /// larger than the internal buffer capacity, then it will be written directly to the inner
261    /// writer.
262    ///
263    /// Internally, a counter is kept of how many buffered events are waiting to be flushed. This
264    /// count is incremented every time `write` can fully buffer the record without having to flush
265    /// to the inner writer.
266    ///
267    /// If this call requires the internal buffer to be flushed out to the inner writer, then the
268    /// write result will indicate how many buffered events were flushed, and their total size in
269    /// bytes.  Additionally, if the given buffer is larger than the internal buffer itself, it will
270    /// also be included in the write result as well.
271    ///
272    /// # Errors
273    ///
274    /// If a write to the inner writer occurs, and that write encounters an error, an error variant
275    /// will be returned describing the error.
276    async fn write(&mut self, event_count: usize, buf: &[u8]) -> io::Result<Option<FlushResult>> {
277        let mut flush_result = None;
278
279        // If this write would cause us to exceed our internal buffer capacity, flush whatever we
280        // have buffered already.
281        if self.buf.len() + buf.len() > self.buf.capacity() {
282            flush_result = self.flush().await?;
283        }
284
285        // If the given buffer is too large to be buffered at all, then bypass the internal buffer.
286        if buf.len() >= self.buf.capacity() {
287            self.inner.write_all(buf).await?;
288
289            let flush_result = flush_result.get_or_insert(FlushResult::default());
290            flush_result.events_flushed += event_count as u64;
291            flush_result.bytes_flushed += buf.len() as u64;
292        } else {
293            self.buf.extend_from_slice(buf);
294            self.unflushed_events += event_count;
295        }
296
297        Ok(flush_result)
298    }
299
300    /// Flushes the internal buffer to the underlying writer.
301    ///
302    /// Internally, a counter is kept of how many buffered events are waiting to be flushed. This
303    /// count is incremented every time `write` can fully buffer the record without having to flush
304    /// to the inner writer.
305    ///
306    /// If any buffered record are present, then the write result will indicate how many
307    /// individual events were flushed, including their total size in bytes.
308    ///
309    /// # Errors
310    ///
311    /// If a write to the underlying writer occurs, and that write encounters an error, an error variant
312    /// will be returned describing the error.
313    async fn flush(&mut self) -> io::Result<Option<FlushResult>> {
314        if self.buf.is_empty() {
315            return Ok(None);
316        }
317
318        let events_flushed = self.unflushed_events as u64;
319        let bytes_flushed = self.buf.len() as u64;
320
321        let result = self.inner.write_all(&self.buf[..]).await;
322        self.unflushed_events = 0;
323        self.buf.clear();
324
325        result.map(|()| {
326            Some(FlushResult {
327                events_flushed,
328                bytes_flushed,
329            })
330        })
331    }
332
333    /// Gets a reference to the underlying writer.
334    #[cfg(test)]
335    fn get_ref(&self) -> &W {
336        &self.inner
337    }
338
339    /// Gets a mutable reference to the underlying writer.
340    fn get_mut(&mut self) -> &mut W {
341        &mut self.inner
342    }
343}
344
345impl<W: fmt::Debug> fmt::Debug for TrackingBufWriter<W> {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347        f.debug_struct("TrackingBufWriter")
348            .field("writer", &self.inner)
349            .field(
350                "buffer",
351                &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
352            )
353            .field("unflushed_events", &self.unflushed_events)
354            .finish()
355    }
356}
357
358/// Buffered writer that handles encoding, checksumming, and serialization of records.
359#[derive(Debug)]
360pub(super) struct RecordWriter<W, T> {
361    writer: TrackingBufWriter<W>,
362    encode_buf: Vec<u8>,
363    ser_buf: AlignedVec,
364    ser_scratch: AlignedVec,
365    checksummer: Hasher,
366    max_record_size: usize,
367    current_data_file_size: u64,
368    max_data_file_size: u64,
369    _t: PhantomData<T>,
370}
371
372impl<W, T> RecordWriter<W, T>
373where
374    W: AsyncFile + Unpin,
375    T: Bufferable,
376{
377    /// Creates a new [`RecordWriter`] around the provided writer.
378    ///
379    /// Internally, the writer is wrapped in a [`BufWriter`], so callers should not pass in an
380    /// already buffered writer.
381    pub fn new(
382        writer: W,
383        current_data_file_size: u64,
384        write_buffer_size: usize,
385        max_data_file_size: u64,
386        max_record_size: usize,
387    ) -> Self {
388        // These should also be getting checked at a higher level, but we're double-checking them here to be absolutely sure.
389        let max_record_size_converted = u64::try_from(max_record_size)
390            .expect("Maximum record size must be less than 2^64 bytes.");
391
392        debug_assert!(
393            max_record_size > RECORD_HEADER_LEN,
394            "maximum record length must be larger than size of record header itself"
395        );
396        debug_assert!(
397            max_data_file_size >= max_record_size_converted,
398            "must always be able to fit at least one record into a data file"
399        );
400
401        // We subtract the length of the record header from our allowed maximum record size, because we have to make sure
402        // that when we go to actually wrap and serialize the encoded record, we're limiting the actual bytes we write
403        // to disk to within `max_record_size`.
404        //
405        // This could lead to us reducing the encode buffer size limit by slightly more than necessary, since
406        // `RECORD_HEADER_LEN` might be overaligned compared to what it would be necessary when we look at the
407        // encoded/serialized record... but that's OK, but it's only going to differ by 8 bytes at most.
408        let max_record_size = max_record_size - RECORD_HEADER_LEN;
409
410        Self {
411            writer: TrackingBufWriter::with_capacity(write_buffer_size, writer),
412            encode_buf: Vec::with_capacity(16_384),
413            ser_buf: AlignedVec::with_capacity(16_384),
414            ser_scratch: AlignedVec::with_capacity(16_384),
415            checksummer: create_crc32c_hasher(),
416            max_record_size,
417            current_data_file_size,
418            max_data_file_size,
419            _t: PhantomData,
420        }
421    }
422
423    /// Gets a reference to the underlying writer.
424    #[cfg(test)]
425    pub fn get_ref(&self) -> &W {
426        self.writer.get_ref()
427    }
428
429    /// Whether or not `amount` bytes could be written while obeying the data file size limit.
430    ///
431    /// If no bytes have written at all to a data file, then `amount` is allowed to exceed the
432    /// limit, otherwise a record would never be able to be written.
433    fn can_write(&self, amount: usize) -> bool {
434        let amount = u64::try_from(amount).expect("`amount` should need ever 2^64 bytes.");
435
436        self.current_data_file_size + amount <= self.max_data_file_size
437    }
438
439    /// Archives a record.
440    ///
441    /// This encodes the record, as well as serializes it into its archival format that will be
442    /// stored on disk.  The total size of the archived record, including the length delimiter
443    /// inserted before the archived record, will be returned.
444    ///
445    /// # Errors
446    ///
447    /// Errors can occur during the encoding or serialization stage.  If an error occurs
448    /// during any of these stages, an appropriate error variant will be returned describing the error.
449    #[instrument(skip(self, record), level = "trace")]
450    pub fn archive_record(&mut self, id: u64, record: T) -> Result<WriteToken, WriterError<T>> {
451        let event_count = record.event_count();
452
453        self.encode_buf.clear();
454        self.ser_buf.clear();
455        self.ser_scratch.clear();
456
457        // We first encode the record, which puts it into the desired encoded form.  This is where
458        // we assert the record is within size limits, etc.
459        //
460        // NOTE: Some encoders may not write to the buffer in a way that fills it up before
461        // themselves returning an error because they know the buffer is too small.  This means we
462        // may often return the "failed to encode" error variant when the true error is that the
463        // payload size, when encoded, exceeds our limit.
464        //
465        // Unfortunately, there's not a whole lot for us to do here beyond allowing our buffer to
466        // grow beyond the limit so that we can try to allow encoding to succeed so that we can grab
467        // the actual encoded size and then check it against the limit.
468        //
469        // C'est la vie.
470        let encode_result = {
471            let mut encode_buf = (&mut self.encode_buf).limit(self.max_record_size);
472            record.encode(&mut encode_buf)
473        };
474        let encoded_len = encode_result
475            .map(|()| self.encode_buf.len())
476            .context(FailedToEncodeSnafu)?;
477        if encoded_len > self.max_record_size {
478            return Err(WriterError::RecordTooLarge {
479                limit: self.max_record_size,
480            });
481        }
482
483        let metadata = T::get_metadata().into_u32();
484        let wrapped_record =
485            Record::with_checksum(id, metadata, &self.encode_buf, &self.checksummer);
486
487        // Push 8 dummy bytes where our length delimiter will sit.  We'll fix this up after
488        // serialization.  Notably, `AlignedSerializer` will report the serializer position as
489        // the length of its backing store, which now includes our 8 bytes, so we _subtract_
490        // those from the position when figuring out the actual value to write back after.
491        //
492        // We write it this way -- in the serializer buffer, and not as a separate write -- so that
493        // we can do a single write but also so that we always have an aligned buffer.
494        self.ser_buf
495            .extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
496
497        // Now serialize the record, which puts it into its archived form.  This is what powers our
498        // ability to do zero-copy deserialization from disk.
499        let mut serializer = CompositeSerializer::new(
500            AlignedSerializer::new(&mut self.ser_buf),
501            FallbackScratch::new(
502                BufferScratch::new(&mut self.ser_scratch),
503                AllocScratch::new(),
504            ),
505            Infallible,
506        );
507
508        let serialized_len = serializer
509            .serialize_value(&wrapped_record)
510            .map(|_| serializer.pos())?;
511
512        // Sanity check before we do our length math.
513        if serialized_len <= 8 || self.ser_buf.len() != serialized_len {
514            return Err(WriterError::FailedToSerialize {
515                reason: format!(
516                    "serializer position invalid for context: pos={} len={}",
517                    serialized_len,
518                    self.ser_buf.len(),
519                ),
520            });
521        }
522
523        // With the record archived and serialized, do our final check to ensure we can fit this
524        // write.  We're doing this earlier than the actual call to flush it because it gives us
525        // a chance to hand back the event so that the caller can roll to a new data file first
526        // before attempting the writer again.
527        if !self.can_write(serialized_len) {
528            debug!(
529                current_data_file_size = self.current_data_file_size,
530                max_data_file_size = self.max_data_file_size,
531                archive_on_disk_len = serialized_len,
532                "Archived record is too large to fit in remaining free space of current data file."
533            );
534
535            // We have to decode the record back out to actually be able to give it back.  If we
536            // can't decode it for some reason, this is entirely an unrecoverable error, since an
537            // encoded record should always be decodable within the same process that encoded it.
538            let record = T::decode(T::get_metadata(), &self.encode_buf[..]).map_err(|_| {
539                WriterError::InconsistentState {
540                    reason: "failed to decode record immediately after encoding it".to_string(),
541                }
542            })?;
543
544            return Err(WriterError::DataFileFull {
545                record,
546                serialized_len,
547            });
548        }
549
550        // Fix up our length delimiter.
551        let archive_len = serialized_len - 8;
552        let wire_archive_len: u64 = archive_len
553            .try_into()
554            .expect("archive len should always fit into a u64");
555        let archive_len_buf = wire_archive_len.to_be_bytes();
556
557        let length_delimiter_dst = &mut self.ser_buf[0..8];
558        length_delimiter_dst.copy_from_slice(&archive_len_buf[..]);
559
560        Ok(WriteToken {
561            event_count,
562            serialized_len,
563        })
564    }
565
566    /// Writes a record.
567    ///
568    /// If the write is successful, the number of bytes written to the buffer are returned.
569    /// Additionally, if any internal buffers required an implicit flush, the result of that flush
570    /// operation is returned as well.
571    ///
572    /// As we internally buffers write to the underlying data file, to reduce the number of syscalls
573    /// required to pushed serialized records to the data file, we sometimes will write a record
574    /// which would overflow the internal buffer.  Doing so means we have to first flush the buffer
575    /// before continuing with buffering the current write.  As some invariants are based on knowing
576    /// when a record has actually been written to the data file, we return any information of
577    /// implicit flushes so that the writer can be aware of when data has actually made it to the
578    /// data file or not.
579    ///
580    /// # Errors
581    ///
582    /// Errors can occur during the encoding, serialization, or I/O stage.  If an error occurs
583    /// during any of these stages, an appropriate error variant will be returned describing the error.
584    #[instrument(skip(self, record), level = "trace")]
585    #[cfg(test)]
586    pub async fn write_record(
587        &mut self,
588        id: u64,
589        record: T,
590    ) -> Result<(usize, Option<FlushResult>), WriterError<T>> {
591        let token = self.archive_record(id, record)?;
592        self.flush_record(token).await
593    }
594
595    /// Flushes the previously-archived record.
596    ///
597    /// If the flush is successful, the number of bytes written to the buffer are returned.
598    /// Additionally, if any internal buffers required an implicit flush, the result of that flush
599    /// operation is returned as well.
600    ///
601    /// As we internally buffers write to the underlying data file, to reduce the number of syscalls
602    /// required to pushed serialized records to the data file, we sometimes will write a record
603    /// which would overflow the internal buffer.  Doing so means we have to first flush the buffer
604    /// before continuing with buffering the current write.  As some invariants are based on knowing
605    /// when a record has actually been written to the data file, we return any information of
606    /// implicit flushes so that the writer can be aware of when data has actually made it to the
607    /// data file or not.
608    #[instrument(skip(self), level = "trace")]
609    pub async fn flush_record(
610        &mut self,
611        token: WriteToken,
612    ) -> Result<(usize, Option<FlushResult>), WriterError<T>> {
613        // Make sure the write token we've been given matches whatever the last call to `archive_record` generated.
614        let event_count = token.event_count();
615        let serialized_len = token.serialized_len();
616        debug_assert_eq!(
617            serialized_len,
618            self.ser_buf.len(),
619            "using write token from non-contiguous archival call"
620        );
621
622        let flush_result = self
623            .writer
624            .write(event_count, &self.ser_buf[..])
625            .await
626            .context(IoSnafu)?;
627
628        // Update our current data file size.
629        self.current_data_file_size += u64::try_from(serialized_len)
630            .expect("Serialized length of record should never exceed 2^64 bytes.");
631
632        Ok((serialized_len, flush_result))
633    }
634
635    /// Recovers an archived record that has not yet been flushed.
636    ///
637    /// In some cases, we must archive a record to see how large the resulting archived record is, and potentially
638    /// recover the original record if it's too large, and so on.
639    ///
640    /// This method allows decoding an archived record that is still sitting in the internal buffers waiting to be
641    /// flushed. Technically, this decodes the original record back from its archived/encoded form, and so this isn't a
642    /// clone but it does mean incurring the cost of decoding directly.
643    ///
644    /// # Errors
645    ///
646    /// If the archived record cannot be deserialized from its archival form, or can't be decoded back to its original
647    /// form `T`, an error variant will be returned describing the error. Notably, the only error we return is
648    /// `InconsistentState`, as being unable to immediately deserialize and decode a record we just serialized and
649    /// encoded implies a fatal, and unrecoverable, error with the buffer implementation as a whole.
650    #[instrument(skip(self), level = "trace")]
651    pub fn recover_archived_record(&mut self, token: &WriteToken) -> Result<T, WriterError<T>> {
652        // Make sure the write token we've been given matches whatever the last call to `archive_record` generated.
653        let serialized_len = token.serialized_len();
654        debug_assert_eq!(
655            serialized_len,
656            self.ser_buf.len(),
657            "using write token from non-contiguous archival call"
658        );
659
660        // First, decode the archival wrapper. This means skipping the length delimiter.
661        let wrapped_record = try_as_record_archive(&self.ser_buf[8..]).map_err(|_| {
662            WriterError::InconsistentState {
663                reason: "failed to decode archived record immediately after archiving it"
664                    .to_string(),
665            }
666        })?;
667
668        // Now we can actually decode it as `T`.
669        let record_metadata = T::Metadata::from_u32(wrapped_record.metadata()).ok_or(
670            WriterError::InconsistentState {
671                reason: "failed to decode record metadata immediately after encoding it"
672                    .to_string(),
673            },
674        )?;
675
676        T::decode(record_metadata, wrapped_record.payload()).map_err(|_| {
677            WriterError::InconsistentState {
678                reason: "failed to decode record immediately after encoding it".to_string(),
679            }
680        })
681    }
682
683    /// Flushes the writer.
684    ///
685    /// This flushes both the internal buffered writer and the underlying writer object.
686    ///
687    /// # Errors
688    ///
689    /// If there is an I/O error while flushing either the buffered writer or the underlying writer,
690    /// an error variant will be returned describing the error.
691    #[instrument(skip(self), level = "debug")]
692    pub async fn flush(&mut self) -> io::Result<Option<FlushResult>> {
693        self.writer.flush().await
694    }
695
696    /// Synchronizes the underlying file to disk.
697    ///
698    /// This tries to synchronize both data and metadata.
699    ///
700    /// # Errors
701    ///
702    /// If there is an I/O error while syncing the file, an error variant will be returned
703    /// describing the error.
704    #[instrument(skip(self), level = "debug")]
705    pub async fn sync_all(&mut self) -> io::Result<()> {
706        self.writer.get_mut().sync_all().await
707    }
708}
709
710/// Writes records to the buffer.
711#[derive(Debug)]
712pub struct BufferWriter<T, FS>
713where
714    FS: Filesystem,
715    FS::File: Unpin,
716{
717    ledger: Arc<Ledger<FS>>,
718    config: DiskBufferConfig<FS>,
719    writer: Option<RecordWriter<FS::File, T>>,
720    next_record_id: u64,
721    unflushed_events: u64,
722    data_file_size: u64,
723    unflushed_bytes: u64,
724    data_file_full: bool,
725    skip_to_next: bool,
726    ready_to_write: bool,
727    _t: PhantomData<T>,
728}
729
730impl<T, FS> BufferWriter<T, FS>
731where
732    T: Bufferable,
733    FS: Filesystem + fmt::Debug + Clone,
734    FS::File: Unpin,
735{
736    /// Creates a new [`BufferWriter`] attached to the given [`Ledger`].
737    pub(crate) fn new(ledger: Arc<Ledger<FS>>) -> Self {
738        let config = ledger.config().clone();
739        let next_record_id = ledger.state().get_next_writer_record_id();
740        BufferWriter {
741            ledger,
742            config,
743            writer: None,
744            data_file_size: 0,
745            data_file_full: false,
746            unflushed_bytes: 0,
747            skip_to_next: false,
748            ready_to_write: false,
749            next_record_id,
750            unflushed_events: 0,
751            _t: PhantomData,
752        }
753    }
754
755    fn get_next_record_id(&mut self) -> u64 {
756        self.next_record_id.wrapping_add(self.unflushed_events)
757    }
758
759    fn track_write(&mut self, event_count: usize, record_size: u64) {
760        self.data_file_size += record_size;
761        self.unflushed_events += event_count as u64;
762        self.unflushed_bytes += record_size;
763    }
764
765    fn flush_write_state(&mut self) {
766        self.flush_write_state_partial(self.unflushed_events, self.unflushed_bytes);
767    }
768
769    fn flush_write_state_partial(&mut self, flushed_events: u64, flushed_bytes: u64) {
770        debug_assert!(
771            flushed_events <= self.unflushed_events,
772            "tried to flush more events than are currently unflushed"
773        );
774        debug_assert!(
775            flushed_bytes <= self.unflushed_bytes,
776            "tried to flush more bytes than are currently unflushed"
777        );
778
779        self.next_record_id = self
780            .ledger
781            .state()
782            .increment_next_writer_record_id(flushed_events);
783        self.unflushed_events -= flushed_events;
784        self.unflushed_bytes -= flushed_bytes;
785
786        self.ledger.track_write(flushed_events, flushed_bytes);
787    }
788
789    fn can_write(&self) -> bool {
790        !self.data_file_full && self.data_file_size < self.config.max_data_file_size
791    }
792
793    fn can_write_record(&self, amount: usize) -> bool {
794        let total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes;
795        let potential_write_len =
796            u64::try_from(amount).expect("Vector only supports 64-bit architectures.");
797
798        self.can_write() && total_buffer_size + potential_write_len <= self.config.max_buffer_size
799    }
800
801    #[instrument(skip(self), level = "debug")]
802    fn mark_data_file_full(&mut self) {
803        self.data_file_full = true;
804    }
805
806    #[instrument(skip(self), level = "debug")]
807    fn reset(&mut self) {
808        self.writer = None;
809        self.data_file_size = 0;
810        self.data_file_full = false;
811    }
812
813    #[instrument(skip(self), level = "debug")]
814    fn mark_for_skip(&mut self) {
815        self.skip_to_next = true;
816    }
817
818    fn should_skip(&mut self) -> bool {
819        let should_skip = self.skip_to_next;
820        if should_skip {
821            self.skip_to_next = false;
822        }
823
824        should_skip
825    }
826
827    /// Validates that the last write in the current writer data file matches the ledger.
828    ///
829    /// # Errors
830    ///
831    /// If the current data file is not an empty, and there is an error reading it to perform
832    /// validation, an error variant will be returned that describes the error.
833    ///
834    /// Practically speaking, however, this method will only return I/O-related errors as all
835    /// logical errors, such as the record being invalid, are captured in order to logically adjust
836    /// the writer/ledger state to start a new file, etc.
837    #[instrument(skip(self), level = "debug")]
838    pub(super) async fn validate_last_write(&mut self) -> Result<(), WriterError<T>> {
839        // We don't try validating again after doing so initially.
840        if self.ready_to_write {
841            warn!("Writer already initialized.");
842            return Ok(());
843        }
844
845        debug!(
846            current_writer_data_file = ?self.ledger.get_current_writer_data_file_path(),
847            "Validating last written record in current data file."
848        );
849        self.ensure_ready_for_write().await.context(IoSnafu)?;
850
851        // If our current file is empty, there's no sense doing this check.
852        if self.data_file_size == 0 {
853            self.ready_to_write = true;
854            return Ok(());
855        }
856
857        // We do a neat little trick here where we open an immutable memory-mapped region against our
858        // current writer data file, which lets us treat it as one big buffer... which is useful for
859        // asking `rkyv` to deserialize just the last record from the file, without having to seek
860        // directly to the start of the record where the length delimiter is.
861        let data_file_path = self.ledger.get_current_writer_data_file_path();
862        let data_file_mmap = self
863            .ledger
864            .filesystem()
865            .open_mmap_readable(&data_file_path)
866            .await
867            .context(IoSnafu)?;
868
869        // We have bytes, so we should have an archived record... hopefully!  Go through the motions
870        // of verifying it.  If we hit any invalid states, then we should bump to the next data file
871        // since the reader will have to stop once it hits the first error in a given file.
872        let should_skip_to_next_file = match validate_record_archive(
873            data_file_mmap.as_ref(),
874            &Hasher::new(),
875        ) {
876            RecordStatus::Valid {
877                id: last_record_id, ..
878            } => {
879                // We now know the record is valid from the perspective of being framed correctly,
880                // and the checksum matching, etc.  We'll attempt to actually decode it now so we
881                // can get the actual item that was written, which we need to understand where the
882                // next writer record ID should be.
883                let record = try_as_record_archive(data_file_mmap.as_ref())
884                    .expect("record was already validated");
885                let item = decode_record_payload::<T>(record).map_err(|e| {
886                    WriterError::FailedToValidate {
887                        reason: e.to_string(),
888                    }
889                })?;
890
891                // Since we have a valid record, checksum and all, see if the writer record ID
892                // in the ledger lines up with the record ID we have here.  Specifically, the record
893                // ID plus the number of events in the record should be the next record ID that gets used.
894                let ledger_next = self.ledger.state().get_next_writer_record_id();
895                let record_events =
896                    u64::try_from(item.event_count()).expect("event count should never exceed u64");
897                let record_next = last_record_id.wrapping_add(record_events);
898
899                match ledger_next.cmp(&record_next) {
900                    Ordering::Equal => {
901                        // We're exactly where the ledger thinks we should be, so nothing to do.
902                        debug!(
903                            ledger_next,
904                            last_record_id,
905                            record_events,
906                            "Synchronized with ledger. Writer ready."
907                        );
908                        false
909                    }
910                    Ordering::Greater => {
911                        // Our last write is behind where the ledger thinks we should be, so we
912                        // likely missed flushing some records, or partially flushed the data file.
913                        // Better roll over to be safe.
914                        error!(
915                            ledger_next, last_record_id, record_events,
916                            "Last record written to data file is behind expected position. Events have likely been lost.");
917                        true
918                    }
919                    Ordering::Less => {
920                        // We're actually _ahead_ of the ledger, which is to say we wrote a valid
921                        // record to the data file, but never incremented our "writer next record
922                        // ID" field.  Given that record IDs are monotonic, it's safe to forward
923                        // ourselves to make the "writer next record ID" in the ledger match the
924                        // reality of the data file.  If there were somehow gaps in the data file,
925                        // the reader will detect it, and this way, we avoid duplicate record IDs.
926                        debug!(
927                            ledger_next,
928                            last_record_id,
929                            record_events,
930                            new_ledger_next = record_next,
931                            "Ledger desynchronized from data files. Fast forwarding ledger state."
932                        );
933                        let ledger_record_delta = record_next - ledger_next;
934                        let next_record_id = self
935                            .ledger
936                            .state()
937                            .increment_next_writer_record_id(ledger_record_delta);
938                        self.next_record_id = next_record_id;
939                        self.unflushed_events = 0;
940
941                        false
942                    }
943                }
944            }
945            // The record payload was corrupted, somehow: we know the checksum failed to match on
946            // both sides, but it could be cosmic radiation that flipped a bit or some process
947            // trampled over the data file... who knows.
948            //
949            // We skip to the next data file to try and start from a clean slate.
950            RecordStatus::Corrupted { .. } => {
951                error!(
952                    "Last written record did not match the expected checksum. Corruption likely."
953                );
954                true
955            }
956            // The record itself was corrupted, somehow: it was sufficiently different that `rkyv`
957            // couldn't even validate it, which likely means missing bytes but could also be certain
958            // bytes being invalid for the struct fields they represent.  Like invalid checksums, we
959            // really don't know why it happened, only that it happened.
960            //
961            // We skip to the next data file to try and start from a clean slate.
962            RecordStatus::FailedDeserialization(de) => {
963                let reason = de.into_inner();
964                error!(
965                    ?reason,
966                    "Last written record was unable to be deserialized. Corruption likely."
967                );
968                true
969            }
970        };
971
972        // Reset our internal state, which closes the initial data file we opened, and mark
973        // ourselves as needing to skip to the next data file.  This is a little convoluted, but we
974        // need to ensure we follow the normal behavior of trying to open the next data file,
975        // waiting for the reader to delete it if it already exists and hasn't been fully read yet,
976        // etc.
977        //
978        // Essentially, we defer the actual skipping to avoid deadlocking here trying to open a
979        // data file we might not be able to open yet.
980        if should_skip_to_next_file {
981            self.reset();
982            self.mark_for_skip();
983        }
984
985        self.ready_to_write = true;
986
987        Ok(())
988    }
989
990    fn is_buffer_full(&self) -> bool {
991        let total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes;
992        let max_buffer_size = self.config.max_buffer_size;
993        total_buffer_size >= max_buffer_size
994    }
995
996    /// Ensures this writer is ready to attempt writer the next record.
997    #[instrument(skip(self), level = "debug")]
998    async fn ensure_ready_for_write(&mut self) -> io::Result<()> {
999        // Check the overall size of the buffer and figure out if we can write.
1000        loop {
1001            // If we haven't yet exceeded the maximum buffer size, then we can proceed. Likewise, if
1002            // we're still validating our last write, then we know it doesn't matter if the buffer
1003            // is full or not because we're not doing any actual writing here.
1004            //
1005            // Otherwise, wait for the reader to signal that they've made some progress.
1006            if !self.is_buffer_full() || !self.ready_to_write {
1007                break;
1008            }
1009
1010            trace!(
1011                total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes,
1012                max_buffer_size = self.config.max_buffer_size,
1013                "Buffer size limit reached. Waiting for reader progress."
1014            );
1015
1016            self.ledger.wait_for_reader().await;
1017        }
1018
1019        // If we already have an open writer, and we have no more space in the data file to write,
1020        // flush and close the file and mark ourselves as needing to open the _next_ data file.
1021        //
1022        // Likewise, if initialization detected an invalid record on the starting data file, and we
1023        // need to skip to the next file, we honor that here.
1024        let mut should_open_next = self.should_skip();
1025        if self.writer.is_some() {
1026            if self.can_write() {
1027                return Ok(());
1028            }
1029
1030            // Our current data file is full, so we need to open a new one.  Signal to the loop
1031            // that we we want to try and open the next file, and not the current file,
1032            // essentially to avoid marking the writer as already having moved on to the next
1033            // file before we're sure it isn't already an existing file on disk waiting to be
1034            // read.
1035            //
1036            // We still flush ourselves to disk, etc, to make sure all of the data is there.
1037            should_open_next = true;
1038            self.flush_inner(true).await?;
1039            self.flush_write_state();
1040
1041            self.reset();
1042        }
1043
1044        loop {
1045            // Normally, readers will keep up with the writers, and so there will only ever be a
1046            // single data file or two on disk.  If there was an issue with a sink reading from this
1047            // buffer, though, we could conceivably have a stalled reader while the writer
1048            // progresses and continues to create new data file.
1049            //
1050            // At some point, the file ID will wrap around and the writer will want to open a "new"
1051            // file for writing that already exists: a previously-written file that has not been
1052            // read yet.
1053            //
1054            // In order to handle this situation, we loop here, trying to create the file.  Readers
1055            // are responsible deleting a file once they have read it entirely, so our first loop
1056            // iteration is the happy path, trying to create the new file.  If we can't create it,
1057            // this may be because it already exists and we're just picking up where we left off
1058            // from last time, but it could also be a data file that a reader hasn't completed yet.
1059            let data_file_path = if should_open_next {
1060                self.ledger.get_next_writer_data_file_path()
1061            } else {
1062                self.ledger.get_current_writer_data_file_path()
1063            };
1064
1065            let maybe_data_file = self
1066                .ledger
1067                .filesystem()
1068                .open_file_writable_atomic(&data_file_path)
1069                .await;
1070            let file = match maybe_data_file {
1071                // We were able to create the file, so we're good to proceed.
1072                Ok(data_file) => Some((data_file, 0)),
1073                // We got back an error trying to open the file: might be that it already exists,
1074                // might be something else.
1075                Err(e) => match e.kind() {
1076                    ErrorKind::AlreadyExists => {
1077                        // We open the file again, without the atomic "create new" behavior.  If we
1078                        // can do that successfully, we check its length.  There's three main
1079                        // situations we encounter:
1080                        // - the reader may have deleted the data file between the atomic create
1081                        //   open and this one, and so we would expect the file length to be zero
1082                        // - the file still exists, and it's full: the reader may still be reading
1083                        //   it, or waiting for acknowledgements to be able to delete it
1084                        // - it may not be full, which could be because it's the data file the
1085                        //   writer left off on last time
1086                        let data_file = self
1087                            .ledger
1088                            .filesystem()
1089                            .open_file_writable(&data_file_path)
1090                            .await?;
1091                        let metadata = data_file.metadata().await?;
1092                        let file_len = metadata.len();
1093                        if file_len == 0 || !should_open_next {
1094                            // The file is either empty, which means we created it and "own it" now,
1095                            // or it's not empty but we're not skipping to the next file, which can
1096                            // only mean that we're still initializing, and so this would be the
1097                            // data file we left off writing to.
1098                            Some((data_file, file_len))
1099                        } else {
1100                            // The file isn't empty, and we're not in initialization anymore, which
1101                            // means this data file is one that the reader still hasn't finished
1102                            // reading through yet, and so we must wait for the reader to delete it
1103                            // before we can proceed.
1104                            None
1105                        }
1106                    }
1107                    // Legitimate I/O error with the operation, bubble this up.
1108                    _ => return Err(e),
1109                },
1110            };
1111
1112            if let Some((data_file, data_file_size)) = file {
1113                // We successfully opened the file and it can be written to.
1114                debug!(
1115                    data_file_path = data_file_path.to_string_lossy().as_ref(),
1116                    existing_file_size = data_file_size,
1117                    "Opened data file for writing."
1118                );
1119
1120                // Make sure the file is flushed to disk, especially if we just created it.
1121                data_file.sync_all().await?;
1122
1123                self.writer = Some(RecordWriter::new(
1124                    data_file,
1125                    data_file_size,
1126                    self.config.write_buffer_size,
1127                    self.config.max_data_file_size,
1128                    self.config.max_record_size,
1129                ));
1130                self.data_file_size = data_file_size;
1131
1132                // If we opened the "next" data file, we need to increment the current writer
1133                // file ID now to signal that the writer has moved on.
1134                if should_open_next {
1135                    self.ledger.state().increment_writer_file_id();
1136                    self.ledger.notify_writer_waiters();
1137
1138                    debug!(
1139                        new_writer_file_id = self.ledger.get_current_writer_file_id(),
1140                        "Writer now on new data file."
1141                    );
1142                }
1143
1144                return Ok(());
1145            }
1146
1147            // The file is still present and waiting for a reader to finish reading it in order
1148            // to delete it.  Wait until the reader signals progress and try again.
1149            debug!("Target data file is still present and not yet processed. Waiting for reader.");
1150            self.ledger.wait_for_reader().await;
1151        }
1152    }
1153
1154    /// Attempts to write a record.
1155    ///
1156    /// If the buffer is currently full, the original record will be immediately returned.
1157    /// Otherwise, a write will be executed, which will run to completion, and `None` will be returned.
1158    ///
1159    /// # Errors
1160    ///
1161    /// If an error occurred while writing the record, an error variant will be returned describing
1162    /// the error.
1163    pub async fn try_write_record(&mut self, record: T) -> Result<Option<T>, WriterError<T>> {
1164        self.try_write_record_inner(record).await.map(Result::err)
1165    }
1166
1167    #[instrument(skip_all, level = "debug")]
1168    async fn try_write_record_inner(
1169        &mut self,
1170        mut record: T,
1171    ) -> Result<Result<usize, T>, WriterError<T>> {
1172        // If the buffer is already full, we definitely can't complete this write.
1173        if self.is_buffer_full() {
1174            return Ok(Err(record));
1175        }
1176
1177        let record_events: NonZeroUsize = record
1178            .event_count()
1179            .try_into()
1180            .map_err(|_| WriterError::EmptyRecord)?;
1181
1182        // Grab the next record ID and attempt to write the record.
1183        let record_id = self.get_next_record_id();
1184
1185        let token = loop {
1186            // Make sure we have an open data file to write to, which might also be us opening the
1187            // next data file because our first attempt at writing had to finalize a data file that
1188            // was already full.
1189            self.ensure_ready_for_write().await.context(IoSnafu)?;
1190
1191            let writer = self
1192                .writer
1193                .as_mut()
1194                .expect("writer should exist after `ensure_ready_for_write`");
1195
1196            // Archive the record, which if it succeeds in terms of encoding, etc, will give us a token that we can use
1197            // to eventually write it to storage. This may fail if the record writer detects it can't fit the archived
1198            // record in the current data file, so we handle that separately. All other errors must be handled by the caller.
1199            match writer.archive_record(record_id, record) {
1200                Ok(token) => break token,
1201                Err(we) => match we {
1202                    WriterError::DataFileFull {
1203                        record: old_record,
1204                        serialized_len,
1205                    } => {
1206                        // The data file is full, so we need to roll to the next one before attempting
1207                        // the write again.  We also recapture the record for the next write attempt.
1208                        self.mark_data_file_full();
1209                        record = old_record;
1210
1211                        debug!(
1212                            current_data_file_size = self.data_file_size,
1213                            max_data_file_size = self.config.max_data_file_size,
1214                            last_attempted_write_size = serialized_len,
1215                            "Current data file reached maximum size. Rolling to the next data file."
1216                        );
1217                    }
1218                    e => return Err(e),
1219                },
1220            }
1221        };
1222
1223        // Now that we know the record was archived successfully -- record wasn't too large, etc -- we actually need
1224        // to check if it will fit based on our current buffer size. If not, we recover the record from the writer's
1225        // internal buffers, as we haven't yet flushed it, and we return it to the caller.
1226        //
1227        // Otherwise, we proceed with flushing like we normally would.
1228        let can_write_record = self.can_write_record(token.serialized_len());
1229        let writer = self
1230            .writer
1231            .as_mut()
1232            .expect("writer should exist after `ensure_ready_for_write`");
1233
1234        let (bytes_written, flush_result) = if can_write_record {
1235            // We always return errors here because flushing the record won't return a recoverable error like
1236            // `DataFileFull`, as that gets checked during archiving.
1237            writer.flush_record(token).await?
1238        } else {
1239            // The record would not fit given the current size of the buffer, so we need to recover it from the
1240            // writer and hand it back. This looks a little weird because we want to surface deserialize/decoding
1241            // errors if we encounter them, but if we recover the record successfully, we're returning
1242            // `Ok(Err(record))` to signal that our attempt failed but the record is able to be retried again later.
1243            return Ok(Err(writer.recover_archived_record(&token)?));
1244        };
1245
1246        // Track our write since things appear to have succeeded. This only updates our internal
1247        // state as we have not yet authoritatively flushed the write to the data file. This tracks
1248        // not only how many bytes we have buffered, but also how many events, which in turn drives
1249        // record ID generation.  We do this after the write appears to succeed to avoid issues with
1250        // setting the ledger state to a record ID that we may never have actually written, which
1251        // could lead to record ID gaps.
1252        self.track_write(record_events.get(), bytes_written as u64);
1253
1254        // If we did flush some buffered writes during this write, however, we now compensate for
1255        // that after updating our internal state.  We'll also notify the reader, too, since the
1256        // data should be available to read:
1257        if let Some(flush_result) = flush_result {
1258            self.flush_write_state_partial(flush_result.events_flushed, flush_result.bytes_flushed);
1259            self.ledger.notify_writer_waiters();
1260        }
1261
1262        trace!(
1263            record_id,
1264            record_events,
1265            bytes_written,
1266            data_file_id = self.ledger.get_current_writer_file_id(),
1267            "Wrote record."
1268        );
1269
1270        Ok(Ok(bytes_written))
1271    }
1272
1273    /// Writes a record.
1274    ///
1275    /// If the record was written successfully, the number of bytes written to the data file will be
1276    /// returned.
1277    ///
1278    /// # Errors
1279    ///
1280    /// If an error occurred while writing the record, an error variant will be returned describing
1281    /// the error.
1282    #[instrument(skip_all, level = "debug")]
1283    pub async fn write_record(&mut self, mut record: T) -> Result<usize, WriterError<T>> {
1284        loop {
1285            match self.try_write_record_inner(record).await? {
1286                Ok(bytes_written) => return Ok(bytes_written),
1287                Err(old_record) => {
1288                    record = old_record;
1289                    self.ledger.wait_for_reader().await;
1290                }
1291            }
1292        }
1293    }
1294
1295    #[instrument(skip(self), level = "debug")]
1296    async fn flush_inner(&mut self, force_full_flush: bool) -> io::Result<()> {
1297        // We always flush the `BufWriter` when this is called, but we don't always flush to disk or
1298        // flush the ledger.  This is enough for readers on Linux since the file ends up in the page
1299        // cache, as we don't do any O_DIRECT fanciness, and the new contents can be immediately
1300        // read.
1301        //
1302        // TODO: Windows has a page cache as well, and macOS _should_, but we should verify this
1303        // behavior works on those platforms as well.
1304        if let Some(writer) = self.writer.as_mut() {
1305            writer.flush().await?;
1306            self.ledger.notify_writer_waiters();
1307        }
1308
1309        if self.ledger.should_flush() || force_full_flush {
1310            if let Some(writer) = self.writer.as_mut() {
1311                writer.sync_all().await?;
1312            }
1313
1314            self.ledger.flush()
1315        } else {
1316            Ok(())
1317        }
1318    }
1319
1320    /// Flushes the writer.
1321    ///
1322    /// This must be called for the reader to be able to make progress.
1323    ///
1324    /// This does not ensure that the data is fully synchronized (i.e. `fsync`) to disk, however it
1325    /// may sometimes perform a full synchronization if the time since the last full synchronization
1326    /// occurred has exceeded a configured limit.
1327    ///
1328    /// # Errors
1329    ///
1330    /// If there is an error while flushing either the current data file or the ledger, an error
1331    /// variant will be returned describing the error.
1332    #[instrument(skip(self), level = "trace")]
1333    pub async fn flush(&mut self) -> io::Result<()> {
1334        self.flush_inner(false).await?;
1335        self.flush_write_state();
1336        Ok(())
1337    }
1338}
1339
1340impl<T, FS> BufferWriter<T, FS>
1341where
1342    FS: Filesystem,
1343    FS::File: Unpin,
1344{
1345    /// Closes this [`Writer`], marking it as done.
1346    ///
1347    /// Closing the writer signals to the reader that no more records will be written until the
1348    /// buffer is reopened.  Writers and readers effectively share a "session", so until the writer
1349    /// and reader both close, the buffer cannot be reopened by another Vector instance.
1350    ///
1351    /// In turn, the reader is able to know that when the writer is marked as done, and it cannot
1352    /// read any more data, that nothing else is actually coming, and it can terminate by beginning
1353    /// to return `None`.
1354    #[instrument(skip(self), level = "trace")]
1355    pub fn close(&mut self) {
1356        if self.ledger.mark_writer_done() {
1357            debug!("Writer marked as closed.");
1358            self.ledger.notify_writer_waiters();
1359        }
1360    }
1361}
1362
1363impl<T, FS> Drop for BufferWriter<T, FS>
1364where
1365    FS: Filesystem,
1366    FS::File: Unpin,
1367{
1368    fn drop(&mut self) {
1369        self.close();
1370    }
1371}