vector_buffers/variants/disk_v2/writer.rs
1use std::{
2 cmp::Ordering,
3 convert::Infallible as StdInfallible,
4 fmt,
5 io::{self, ErrorKind},
6 marker::PhantomData,
7 num::NonZeroUsize,
8 sync::Arc,
9};
10
11use bytes::BufMut;
12use crc32fast::Hasher;
13use rkyv::{
14 ser::{
15 serializers::{
16 AlignedSerializer, AllocScratch, AllocScratchError, BufferScratch, CompositeSerializer,
17 CompositeSerializerError, FallbackScratch,
18 },
19 Serializer,
20 },
21 AlignedVec, Infallible,
22};
23use snafu::{ResultExt, Snafu};
24use tokio::io::{AsyncWrite, AsyncWriteExt};
25
26use super::{
27 common::{create_crc32c_hasher, DiskBufferConfig},
28 io::Filesystem,
29 ledger::Ledger,
30 record::{validate_record_archive, Record, RecordStatus},
31};
32use crate::{
33 encoding::{AsMetadata, Encodable},
34 variants::disk_v2::{
35 io::AsyncFile,
36 reader::decode_record_payload,
37 record::{try_as_record_archive, RECORD_HEADER_LEN},
38 },
39 Bufferable,
40};
41
42/// Error that occurred during calls to [`BufferWriter`].
43#[derive(Debug, Snafu)]
44pub enum WriterError<T>
45where
46 T: Bufferable,
47{
48 /// A general I/O error occurred.
49 ///
50 /// Different methods will capture specific I/O errors depending on the situation, as some
51 /// errors may be expected and considered normal by design. For all I/O errors that are
52 /// considered atypical, they will be returned as this variant.
53 #[snafu(display("write I/O error: {}", source))]
54 Io { source: io::Error },
55
56 /// The record attempting to be written was too large.
57 ///
58 /// In practice, most encoders will throw their own error if they cannot write all of the
59 /// necessary bytes during encoding, and so this error will typically only be emitted when the
60 /// encoder throws no error during the encoding step itself, but manages to fill up the encoding
61 /// buffer to the limit.
62 #[snafu(display("record too large: limit is {}", limit))]
63 RecordTooLarge { limit: usize },
64
65 /// The data file did not have enough remaining space to write the record.
66 ///
67 /// This could be because the data file is legitimately full, but is more commonly related to a
68 /// record being big enough that it would exceed the max data file size.
69 ///
70 /// The record that was given to write is returned.
71 #[snafu(display("data file full or record would exceed max data file size"))]
72 DataFileFull { record: T, serialized_len: usize },
73
74 /// A record reported that it contained more events than the number of bytes when encoded.
75 ///
76 /// This is nonsensical because we don't intend to ever support encoding zero-sized types
77 /// through the buffer, and the logic we use to count the number of actual events in the buffer
78 /// transitively depends on not being able to represent more than one event per encoded byte.
79 #[snafu(display(
80 "record reported event count ({}) higher than encoded length ({})",
81 encoded_len,
82 event_count
83 ))]
84 NonsensicalEventCount {
85 encoded_len: usize,
86 event_count: usize,
87 },
88
89 /// The encoder encountered an issue during encoding.
90 ///
91 /// For common encoders, failure to write all of the bytes of the input will be the most common
92 /// error, and in fact, some encoders, it's the only possible error that can occur.
93 #[snafu(display("failed to encode record: {:?}", source))]
94 FailedToEncode {
95 source: <T as Encodable>::EncodeError,
96 },
97
98 /// The writer failed to serialize the record.
99 ///
100 /// As records are encoded and then wrapped in a container which carries metadata about the size
101 /// of the encoded record, and so on, there is a chance that we could fail to serialize that
102 /// container during the write step.
103 ///
104 /// In practice, this should generally only occur if the system is unable to allocate enough
105 /// memory during the serialization step aka the system itself is literally out of memory to
106 /// give to processes. Rare, indeed.
107 #[snafu(display("failed to serialize encoded record to buffer: {}", reason))]
108 FailedToSerialize { reason: String },
109
110 /// The writer failed to validate the last written record.
111 ///
112 /// Specifically, for `BufferWriter`, this can only ever be returned when creating the buffer, during
113 /// validation of the last written record. While it's technically possible that it may be
114 /// something else, this error is most likely to occur when the records in a buffer were written
115 /// in a different version of Vector that cannot be decoded in this version of Vector.
116 #[snafu(display("failed to validate the last written record: {}", reason))]
117 FailedToValidate { reason: String },
118
119 /// The writer entered an inconsistent state that represents an unrecoverable error.
120 ///
121 /// In some cases, like expecting to be able to decode an event we just encoded, we might hit an
122 /// error. This would be an entirely unexpected error -- how is it possible to not be able to
123 /// decode an event we literally just encoded on the line above? -- and as such, the only
124 /// reasonable thing to do would be to give up.
125 ///
126 /// This error is the writer, and thus the buffer, giving up.
127 #[snafu(display("writer entered inconsistent state: {}", reason))]
128 InconsistentState { reason: String },
129
130 /// The record reported an event count of zero.
131 ///
132 /// Empty records are not supported.
133 EmptyRecord,
134}
135
136impl<T: Bufferable + PartialEq> PartialEq for WriterError<T> {
137 fn eq(&self, other: &Self) -> bool {
138 match (self, other) {
139 (Self::Io { source: l_source }, Self::Io { source: r_source }) => {
140 l_source.kind() == r_source.kind()
141 }
142 (Self::RecordTooLarge { limit: l_limit }, Self::RecordTooLarge { limit: r_limit }) => {
143 l_limit == r_limit
144 }
145 (
146 Self::DataFileFull {
147 record: l_record,
148 serialized_len: l_serialized_len,
149 },
150 Self::DataFileFull {
151 record: r_record,
152 serialized_len: r_serialized_len,
153 },
154 ) => l_record == r_record && l_serialized_len == r_serialized_len,
155 (
156 Self::NonsensicalEventCount {
157 encoded_len: l_encoded_len,
158 event_count: l_event_count,
159 },
160 Self::NonsensicalEventCount {
161 encoded_len: r_encoded_len,
162 event_count: r_event_count,
163 },
164 ) => l_encoded_len == r_encoded_len && l_event_count == r_event_count,
165 (
166 Self::FailedToSerialize { reason: l_reason },
167 Self::FailedToSerialize { reason: r_reason },
168 )
169 | (
170 Self::FailedToValidate { reason: l_reason },
171 Self::FailedToValidate { reason: r_reason },
172 )
173 | (
174 Self::InconsistentState { reason: l_reason },
175 Self::InconsistentState { reason: r_reason },
176 ) => l_reason == r_reason,
177 _ => core::mem::discriminant(self) == core::mem::discriminant(other),
178 }
179 }
180}
181
182impl<T> From<CompositeSerializerError<StdInfallible, AllocScratchError, StdInfallible>>
183 for WriterError<T>
184where
185 T: Bufferable,
186{
187 fn from(e: CompositeSerializerError<StdInfallible, AllocScratchError, StdInfallible>) -> Self {
188 match e {
189 CompositeSerializerError::ScratchSpaceError(sse) => WriterError::FailedToSerialize {
190 reason: format!("insufficient space to serialize encoded record: {sse}"),
191 },
192 // Only our scratch space strategy is fallible, so we should never get here.
193 _ => unreachable!(),
194 }
195 }
196}
197
198impl<T> From<io::Error> for WriterError<T>
199where
200 T: Bufferable,
201{
202 fn from(source: io::Error) -> Self {
203 WriterError::Io { source }
204 }
205}
206
207#[derive(Debug)]
208pub(super) struct WriteToken {
209 event_count: usize,
210 serialized_len: usize,
211}
212
213impl WriteToken {
214 pub fn event_count(&self) -> usize {
215 self.event_count
216 }
217
218 pub fn serialized_len(&self) -> usize {
219 self.serialized_len
220 }
221}
222
223#[derive(Debug, Default, PartialEq)]
224pub(super) struct FlushResult {
225 pub events_flushed: u64,
226 pub bytes_flushed: u64,
227}
228
229/// Wraps an [`AsyncWrite`] value and buffers individual writes, while signalling implicit flushes.
230///
231/// As the [`BufferWriter`] must track when writes have theoretically made it to disk, we care about
232/// situations where the internal write buffer for a data file has been flushed to make room. In
233/// order to provide this information, we track the number of events represented by a record when
234/// writing its serialized form.
235///
236/// If an implicit buffer flush must be performed before a write can complete, or a manual flush is
237/// requested, we return this information to the caller, letting them know how many events, and how
238/// many bytes, were flushed.
239struct TrackingBufWriter<W> {
240 inner: W,
241 buf: Vec<u8>,
242 unflushed_events: usize,
243}
244
245impl<W: AsyncWrite + Unpin> TrackingBufWriter<W> {
246 /// Creates a new `TrackingBufWriter` with the specified buffer capacity.
247 fn with_capacity(cap: usize, inner: W) -> Self {
248 Self {
249 inner,
250 buf: Vec::with_capacity(cap),
251 unflushed_events: 0,
252 }
253 }
254
255 /// Writes the given buffer.
256 ///
257 /// If enough internal buffer capacity is available, then this write will be buffered internally
258 /// until [`flush`] is called. If there's not enough remaining internal buffer capacity, then
259 /// the internal buffer will be flushed to the inner writer first. If the given buffer is
260 /// larger than the internal buffer capacity, then it will be written directly to the inner
261 /// writer.
262 ///
263 /// Internally, a counter is kept of how many buffered events are waiting to be flushed. This
264 /// count is incremented every time `write` can fully buffer the record without having to flush
265 /// to the inner writer.
266 ///
267 /// If this call requires the internal buffer to be flushed out to the inner writer, then the
268 /// write result will indicate how many buffered events were flushed, and their total size in
269 /// bytes. Additionally, if the given buffer is larger than the internal buffer itself, it will
270 /// also be included in the write result as well.
271 ///
272 /// # Errors
273 ///
274 /// If a write to the inner writer occurs, and that write encounters an error, an error variant
275 /// will be returned describing the error.
276 async fn write(&mut self, event_count: usize, buf: &[u8]) -> io::Result<Option<FlushResult>> {
277 let mut flush_result = None;
278
279 // If this write would cause us to exceed our internal buffer capacity, flush whatever we
280 // have buffered already.
281 if self.buf.len() + buf.len() > self.buf.capacity() {
282 flush_result = self.flush().await?;
283 }
284
285 // If the given buffer is too large to be buffered at all, then bypass the internal buffer.
286 if buf.len() >= self.buf.capacity() {
287 self.inner.write_all(buf).await?;
288
289 let flush_result = flush_result.get_or_insert(FlushResult::default());
290 flush_result.events_flushed += event_count as u64;
291 flush_result.bytes_flushed += buf.len() as u64;
292 } else {
293 self.buf.extend_from_slice(buf);
294 self.unflushed_events += event_count;
295 }
296
297 Ok(flush_result)
298 }
299
300 /// Flushes the internal buffer to the underlying writer.
301 ///
302 /// Internally, a counter is kept of how many buffered events are waiting to be flushed. This
303 /// count is incremented every time `write` can fully buffer the record without having to flush
304 /// to the inner writer.
305 ///
306 /// If any buffered record are present, then the write result will indicate how many
307 /// individual events were flushed, including their total size in bytes.
308 ///
309 /// # Errors
310 ///
311 /// If a write to the underlying writer occurs, and that write encounters an error, an error variant
312 /// will be returned describing the error.
313 async fn flush(&mut self) -> io::Result<Option<FlushResult>> {
314 if self.buf.is_empty() {
315 return Ok(None);
316 }
317
318 let events_flushed = self.unflushed_events as u64;
319 let bytes_flushed = self.buf.len() as u64;
320
321 let result = self.inner.write_all(&self.buf[..]).await;
322 self.unflushed_events = 0;
323 self.buf.clear();
324
325 result.map(|()| {
326 Some(FlushResult {
327 events_flushed,
328 bytes_flushed,
329 })
330 })
331 }
332
333 /// Gets a reference to the underlying writer.
334 #[cfg(test)]
335 fn get_ref(&self) -> &W {
336 &self.inner
337 }
338
339 /// Gets a mutable reference to the underlying writer.
340 fn get_mut(&mut self) -> &mut W {
341 &mut self.inner
342 }
343}
344
345impl<W: fmt::Debug> fmt::Debug for TrackingBufWriter<W> {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 f.debug_struct("TrackingBufWriter")
348 .field("writer", &self.inner)
349 .field(
350 "buffer",
351 &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
352 )
353 .field("unflushed_events", &self.unflushed_events)
354 .finish()
355 }
356}
357
358/// Buffered writer that handles encoding, checksumming, and serialization of records.
359#[derive(Debug)]
360pub(super) struct RecordWriter<W, T> {
361 writer: TrackingBufWriter<W>,
362 encode_buf: Vec<u8>,
363 ser_buf: AlignedVec,
364 ser_scratch: AlignedVec,
365 checksummer: Hasher,
366 max_record_size: usize,
367 current_data_file_size: u64,
368 max_data_file_size: u64,
369 _t: PhantomData<T>,
370}
371
372impl<W, T> RecordWriter<W, T>
373where
374 W: AsyncFile + Unpin,
375 T: Bufferable,
376{
377 /// Creates a new [`RecordWriter`] around the provided writer.
378 ///
379 /// Internally, the writer is wrapped in a [`BufWriter`], so callers should not pass in an
380 /// already buffered writer.
381 pub fn new(
382 writer: W,
383 current_data_file_size: u64,
384 write_buffer_size: usize,
385 max_data_file_size: u64,
386 max_record_size: usize,
387 ) -> Self {
388 // These should also be getting checked at a higher level, but we're double-checking them here to be absolutely sure.
389 let max_record_size_converted = u64::try_from(max_record_size)
390 .expect("Maximum record size must be less than 2^64 bytes.");
391
392 debug_assert!(
393 max_record_size > RECORD_HEADER_LEN,
394 "maximum record length must be larger than size of record header itself"
395 );
396 debug_assert!(
397 max_data_file_size >= max_record_size_converted,
398 "must always be able to fit at least one record into a data file"
399 );
400
401 // We subtract the length of the record header from our allowed maximum record size, because we have to make sure
402 // that when we go to actually wrap and serialize the encoded record, we're limiting the actual bytes we write
403 // to disk to within `max_record_size`.
404 //
405 // This could lead to us reducing the encode buffer size limit by slightly more than necessary, since
406 // `RECORD_HEADER_LEN` might be overaligned compared to what it would be necessary when we look at the
407 // encoded/serialized record... but that's OK, but it's only going to differ by 8 bytes at most.
408 let max_record_size = max_record_size - RECORD_HEADER_LEN;
409
410 Self {
411 writer: TrackingBufWriter::with_capacity(write_buffer_size, writer),
412 encode_buf: Vec::with_capacity(16_384),
413 ser_buf: AlignedVec::with_capacity(16_384),
414 ser_scratch: AlignedVec::with_capacity(16_384),
415 checksummer: create_crc32c_hasher(),
416 max_record_size,
417 current_data_file_size,
418 max_data_file_size,
419 _t: PhantomData,
420 }
421 }
422
423 /// Gets a reference to the underlying writer.
424 #[cfg(test)]
425 pub fn get_ref(&self) -> &W {
426 self.writer.get_ref()
427 }
428
429 /// Whether or not `amount` bytes could be written while obeying the data file size limit.
430 ///
431 /// If no bytes have written at all to a data file, then `amount` is allowed to exceed the
432 /// limit, otherwise a record would never be able to be written.
433 fn can_write(&self, amount: usize) -> bool {
434 let amount = u64::try_from(amount).expect("`amount` should need ever 2^64 bytes.");
435
436 self.current_data_file_size + amount <= self.max_data_file_size
437 }
438
439 /// Archives a record.
440 ///
441 /// This encodes the record, as well as serializes it into its archival format that will be
442 /// stored on disk. The total size of the archived record, including the length delimiter
443 /// inserted before the archived record, will be returned.
444 ///
445 /// # Errors
446 ///
447 /// Errors can occur during the encoding or serialization stage. If an error occurs
448 /// during any of these stages, an appropriate error variant will be returned describing the error.
449 #[instrument(skip(self, record), level = "trace")]
450 pub fn archive_record(&mut self, id: u64, record: T) -> Result<WriteToken, WriterError<T>> {
451 let event_count = record.event_count();
452
453 self.encode_buf.clear();
454 self.ser_buf.clear();
455 self.ser_scratch.clear();
456
457 // We first encode the record, which puts it into the desired encoded form. This is where
458 // we assert the record is within size limits, etc.
459 //
460 // NOTE: Some encoders may not write to the buffer in a way that fills it up before
461 // themselves returning an error because they know the buffer is too small. This means we
462 // may often return the "failed to encode" error variant when the true error is that the
463 // payload size, when encoded, exceeds our limit.
464 //
465 // Unfortunately, there's not a whole lot for us to do here beyond allowing our buffer to
466 // grow beyond the limit so that we can try to allow encoding to succeed so that we can grab
467 // the actual encoded size and then check it against the limit.
468 //
469 // C'est la vie.
470 let encode_result = {
471 let mut encode_buf = (&mut self.encode_buf).limit(self.max_record_size);
472 record.encode(&mut encode_buf)
473 };
474 let encoded_len = encode_result
475 .map(|()| self.encode_buf.len())
476 .context(FailedToEncodeSnafu)?;
477 if encoded_len > self.max_record_size {
478 return Err(WriterError::RecordTooLarge {
479 limit: self.max_record_size,
480 });
481 }
482
483 let metadata = T::get_metadata().into_u32();
484 let wrapped_record =
485 Record::with_checksum(id, metadata, &self.encode_buf, &self.checksummer);
486
487 // Push 8 dummy bytes where our length delimiter will sit. We'll fix this up after
488 // serialization. Notably, `AlignedSerializer` will report the serializer position as
489 // the length of its backing store, which now includes our 8 bytes, so we _subtract_
490 // those from the position when figuring out the actual value to write back after.
491 //
492 // We write it this way -- in the serializer buffer, and not as a separate write -- so that
493 // we can do a single write but also so that we always have an aligned buffer.
494 self.ser_buf
495 .extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
496
497 // Now serialize the record, which puts it into its archived form. This is what powers our
498 // ability to do zero-copy deserialization from disk.
499 let mut serializer = CompositeSerializer::new(
500 AlignedSerializer::new(&mut self.ser_buf),
501 FallbackScratch::new(
502 BufferScratch::new(&mut self.ser_scratch),
503 AllocScratch::new(),
504 ),
505 Infallible,
506 );
507
508 let serialized_len = serializer
509 .serialize_value(&wrapped_record)
510 .map(|_| serializer.pos())?;
511
512 // Sanity check before we do our length math.
513 if serialized_len <= 8 || self.ser_buf.len() != serialized_len {
514 return Err(WriterError::FailedToSerialize {
515 reason: format!(
516 "serializer position invalid for context: pos={} len={}",
517 serialized_len,
518 self.ser_buf.len(),
519 ),
520 });
521 }
522
523 // With the record archived and serialized, do our final check to ensure we can fit this
524 // write. We're doing this earlier than the actual call to flush it because it gives us
525 // a chance to hand back the event so that the caller can roll to a new data file first
526 // before attempting the writer again.
527 if !self.can_write(serialized_len) {
528 debug!(
529 current_data_file_size = self.current_data_file_size,
530 max_data_file_size = self.max_data_file_size,
531 archive_on_disk_len = serialized_len,
532 "Archived record is too large to fit in remaining free space of current data file."
533 );
534
535 // We have to decode the record back out to actually be able to give it back. If we
536 // can't decode it for some reason, this is entirely an unrecoverable error, since an
537 // encoded record should always be decodable within the same process that encoded it.
538 let record = T::decode(T::get_metadata(), &self.encode_buf[..]).map_err(|_| {
539 WriterError::InconsistentState {
540 reason: "failed to decode record immediately after encoding it".to_string(),
541 }
542 })?;
543
544 return Err(WriterError::DataFileFull {
545 record,
546 serialized_len,
547 });
548 }
549
550 // Fix up our length delimiter.
551 let archive_len = serialized_len - 8;
552 let wire_archive_len: u64 = archive_len
553 .try_into()
554 .expect("archive len should always fit into a u64");
555 let archive_len_buf = wire_archive_len.to_be_bytes();
556
557 let length_delimiter_dst = &mut self.ser_buf[0..8];
558 length_delimiter_dst.copy_from_slice(&archive_len_buf[..]);
559
560 Ok(WriteToken {
561 event_count,
562 serialized_len,
563 })
564 }
565
566 /// Writes a record.
567 ///
568 /// If the write is successful, the number of bytes written to the buffer are returned.
569 /// Additionally, if any internal buffers required an implicit flush, the result of that flush
570 /// operation is returned as well.
571 ///
572 /// As we internally buffers write to the underlying data file, to reduce the number of syscalls
573 /// required to pushed serialized records to the data file, we sometimes will write a record
574 /// which would overflow the internal buffer. Doing so means we have to first flush the buffer
575 /// before continuing with buffering the current write. As some invariants are based on knowing
576 /// when a record has actually been written to the data file, we return any information of
577 /// implicit flushes so that the writer can be aware of when data has actually made it to the
578 /// data file or not.
579 ///
580 /// # Errors
581 ///
582 /// Errors can occur during the encoding, serialization, or I/O stage. If an error occurs
583 /// during any of these stages, an appropriate error variant will be returned describing the error.
584 #[instrument(skip(self, record), level = "trace")]
585 #[cfg(test)]
586 pub async fn write_record(
587 &mut self,
588 id: u64,
589 record: T,
590 ) -> Result<(usize, Option<FlushResult>), WriterError<T>> {
591 let token = self.archive_record(id, record)?;
592 self.flush_record(token).await
593 }
594
595 /// Flushes the previously-archived record.
596 ///
597 /// If the flush is successful, the number of bytes written to the buffer are returned.
598 /// Additionally, if any internal buffers required an implicit flush, the result of that flush
599 /// operation is returned as well.
600 ///
601 /// As we internally buffers write to the underlying data file, to reduce the number of syscalls
602 /// required to pushed serialized records to the data file, we sometimes will write a record
603 /// which would overflow the internal buffer. Doing so means we have to first flush the buffer
604 /// before continuing with buffering the current write. As some invariants are based on knowing
605 /// when a record has actually been written to the data file, we return any information of
606 /// implicit flushes so that the writer can be aware of when data has actually made it to the
607 /// data file or not.
608 #[instrument(skip(self), level = "trace")]
609 pub async fn flush_record(
610 &mut self,
611 token: WriteToken,
612 ) -> Result<(usize, Option<FlushResult>), WriterError<T>> {
613 // Make sure the write token we've been given matches whatever the last call to `archive_record` generated.
614 let event_count = token.event_count();
615 let serialized_len = token.serialized_len();
616 debug_assert_eq!(
617 serialized_len,
618 self.ser_buf.len(),
619 "using write token from non-contiguous archival call"
620 );
621
622 let flush_result = self
623 .writer
624 .write(event_count, &self.ser_buf[..])
625 .await
626 .context(IoSnafu)?;
627
628 // Update our current data file size.
629 self.current_data_file_size += u64::try_from(serialized_len)
630 .expect("Serialized length of record should never exceed 2^64 bytes.");
631
632 Ok((serialized_len, flush_result))
633 }
634
635 /// Recovers an archived record that has not yet been flushed.
636 ///
637 /// In some cases, we must archive a record to see how large the resulting archived record is, and potentially
638 /// recover the original record if it's too large, and so on.
639 ///
640 /// This method allows decoding an archived record that is still sitting in the internal buffers waiting to be
641 /// flushed. Technically, this decodes the original record back from its archived/encoded form, and so this isn't a
642 /// clone but it does mean incurring the cost of decoding directly.
643 ///
644 /// # Errors
645 ///
646 /// If the archived record cannot be deserialized from its archival form, or can't be decoded back to its original
647 /// form `T`, an error variant will be returned describing the error. Notably, the only error we return is
648 /// `InconsistentState`, as being unable to immediately deserialize and decode a record we just serialized and
649 /// encoded implies a fatal, and unrecoverable, error with the buffer implementation as a whole.
650 #[instrument(skip(self), level = "trace")]
651 pub fn recover_archived_record(&mut self, token: &WriteToken) -> Result<T, WriterError<T>> {
652 // Make sure the write token we've been given matches whatever the last call to `archive_record` generated.
653 let serialized_len = token.serialized_len();
654 debug_assert_eq!(
655 serialized_len,
656 self.ser_buf.len(),
657 "using write token from non-contiguous archival call"
658 );
659
660 // First, decode the archival wrapper. This means skipping the length delimiter.
661 let wrapped_record = try_as_record_archive(&self.ser_buf[8..]).map_err(|_| {
662 WriterError::InconsistentState {
663 reason: "failed to decode archived record immediately after archiving it"
664 .to_string(),
665 }
666 })?;
667
668 // Now we can actually decode it as `T`.
669 let record_metadata = T::Metadata::from_u32(wrapped_record.metadata()).ok_or(
670 WriterError::InconsistentState {
671 reason: "failed to decode record metadata immediately after encoding it"
672 .to_string(),
673 },
674 )?;
675
676 T::decode(record_metadata, wrapped_record.payload()).map_err(|_| {
677 WriterError::InconsistentState {
678 reason: "failed to decode record immediately after encoding it".to_string(),
679 }
680 })
681 }
682
683 /// Flushes the writer.
684 ///
685 /// This flushes both the internal buffered writer and the underlying writer object.
686 ///
687 /// # Errors
688 ///
689 /// If there is an I/O error while flushing either the buffered writer or the underlying writer,
690 /// an error variant will be returned describing the error.
691 #[instrument(skip(self), level = "debug")]
692 pub async fn flush(&mut self) -> io::Result<Option<FlushResult>> {
693 self.writer.flush().await
694 }
695
696 /// Synchronizes the underlying file to disk.
697 ///
698 /// This tries to synchronize both data and metadata.
699 ///
700 /// # Errors
701 ///
702 /// If there is an I/O error while syncing the file, an error variant will be returned
703 /// describing the error.
704 #[instrument(skip(self), level = "debug")]
705 pub async fn sync_all(&mut self) -> io::Result<()> {
706 self.writer.get_mut().sync_all().await
707 }
708}
709
710/// Writes records to the buffer.
711#[derive(Debug)]
712pub struct BufferWriter<T, FS>
713where
714 FS: Filesystem,
715 FS::File: Unpin,
716{
717 ledger: Arc<Ledger<FS>>,
718 config: DiskBufferConfig<FS>,
719 writer: Option<RecordWriter<FS::File, T>>,
720 next_record_id: u64,
721 unflushed_events: u64,
722 data_file_size: u64,
723 unflushed_bytes: u64,
724 data_file_full: bool,
725 skip_to_next: bool,
726 ready_to_write: bool,
727 _t: PhantomData<T>,
728}
729
730impl<T, FS> BufferWriter<T, FS>
731where
732 T: Bufferable,
733 FS: Filesystem + fmt::Debug + Clone,
734 FS::File: Unpin,
735{
736 /// Creates a new [`BufferWriter`] attached to the given [`Ledger`].
737 pub(crate) fn new(ledger: Arc<Ledger<FS>>) -> Self {
738 let config = ledger.config().clone();
739 let next_record_id = ledger.state().get_next_writer_record_id();
740 BufferWriter {
741 ledger,
742 config,
743 writer: None,
744 data_file_size: 0,
745 data_file_full: false,
746 unflushed_bytes: 0,
747 skip_to_next: false,
748 ready_to_write: false,
749 next_record_id,
750 unflushed_events: 0,
751 _t: PhantomData,
752 }
753 }
754
755 fn get_next_record_id(&mut self) -> u64 {
756 self.next_record_id.wrapping_add(self.unflushed_events)
757 }
758
759 fn track_write(&mut self, event_count: usize, record_size: u64) {
760 self.data_file_size += record_size;
761 self.unflushed_events += event_count as u64;
762 self.unflushed_bytes += record_size;
763 }
764
765 fn flush_write_state(&mut self) {
766 self.flush_write_state_partial(self.unflushed_events, self.unflushed_bytes);
767 }
768
769 fn flush_write_state_partial(&mut self, flushed_events: u64, flushed_bytes: u64) {
770 debug_assert!(
771 flushed_events <= self.unflushed_events,
772 "tried to flush more events than are currently unflushed"
773 );
774 debug_assert!(
775 flushed_bytes <= self.unflushed_bytes,
776 "tried to flush more bytes than are currently unflushed"
777 );
778
779 self.next_record_id = self
780 .ledger
781 .state()
782 .increment_next_writer_record_id(flushed_events);
783 self.unflushed_events -= flushed_events;
784 self.unflushed_bytes -= flushed_bytes;
785
786 self.ledger.track_write(flushed_events, flushed_bytes);
787 }
788
789 fn can_write(&self) -> bool {
790 !self.data_file_full && self.data_file_size < self.config.max_data_file_size
791 }
792
793 fn can_write_record(&self, amount: usize) -> bool {
794 let total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes;
795 let potential_write_len =
796 u64::try_from(amount).expect("Vector only supports 64-bit architectures.");
797
798 self.can_write() && total_buffer_size + potential_write_len <= self.config.max_buffer_size
799 }
800
801 #[instrument(skip(self), level = "debug")]
802 fn mark_data_file_full(&mut self) {
803 self.data_file_full = true;
804 }
805
806 #[instrument(skip(self), level = "debug")]
807 fn reset(&mut self) {
808 self.writer = None;
809 self.data_file_size = 0;
810 self.data_file_full = false;
811 }
812
813 #[instrument(skip(self), level = "debug")]
814 fn mark_for_skip(&mut self) {
815 self.skip_to_next = true;
816 }
817
818 fn should_skip(&mut self) -> bool {
819 let should_skip = self.skip_to_next;
820 if should_skip {
821 self.skip_to_next = false;
822 }
823
824 should_skip
825 }
826
827 /// Validates that the last write in the current writer data file matches the ledger.
828 ///
829 /// # Errors
830 ///
831 /// If the current data file is not an empty, and there is an error reading it to perform
832 /// validation, an error variant will be returned that describes the error.
833 ///
834 /// Practically speaking, however, this method will only return I/O-related errors as all
835 /// logical errors, such as the record being invalid, are captured in order to logically adjust
836 /// the writer/ledger state to start a new file, etc.
837 #[instrument(skip(self), level = "debug")]
838 pub(super) async fn validate_last_write(&mut self) -> Result<(), WriterError<T>> {
839 // We don't try validating again after doing so initially.
840 if self.ready_to_write {
841 warn!("Writer already initialized.");
842 return Ok(());
843 }
844
845 debug!(
846 current_writer_data_file = ?self.ledger.get_current_writer_data_file_path(),
847 "Validating last written record in current data file."
848 );
849 self.ensure_ready_for_write().await.context(IoSnafu)?;
850
851 // If our current file is empty, there's no sense doing this check.
852 if self.data_file_size == 0 {
853 self.ready_to_write = true;
854 return Ok(());
855 }
856
857 // We do a neat little trick here where we open an immutable memory-mapped region against our
858 // current writer data file, which lets us treat it as one big buffer... which is useful for
859 // asking `rkyv` to deserialize just the last record from the file, without having to seek
860 // directly to the start of the record where the length delimiter is.
861 let data_file_path = self.ledger.get_current_writer_data_file_path();
862 let data_file_mmap = self
863 .ledger
864 .filesystem()
865 .open_mmap_readable(&data_file_path)
866 .await
867 .context(IoSnafu)?;
868
869 // We have bytes, so we should have an archived record... hopefully! Go through the motions
870 // of verifying it. If we hit any invalid states, then we should bump to the next data file
871 // since the reader will have to stop once it hits the first error in a given file.
872 let should_skip_to_next_file = match validate_record_archive(
873 data_file_mmap.as_ref(),
874 &Hasher::new(),
875 ) {
876 RecordStatus::Valid {
877 id: last_record_id, ..
878 } => {
879 // We now know the record is valid from the perspective of being framed correctly,
880 // and the checksum matching, etc. We'll attempt to actually decode it now so we
881 // can get the actual item that was written, which we need to understand where the
882 // next writer record ID should be.
883 let record = try_as_record_archive(data_file_mmap.as_ref())
884 .expect("record was already validated");
885 let item = decode_record_payload::<T>(record).map_err(|e| {
886 WriterError::FailedToValidate {
887 reason: e.to_string(),
888 }
889 })?;
890
891 // Since we have a valid record, checksum and all, see if the writer record ID
892 // in the ledger lines up with the record ID we have here. Specifically, the record
893 // ID plus the number of events in the record should be the next record ID that gets used.
894 let ledger_next = self.ledger.state().get_next_writer_record_id();
895 let record_events =
896 u64::try_from(item.event_count()).expect("event count should never exceed u64");
897 let record_next = last_record_id.wrapping_add(record_events);
898
899 match ledger_next.cmp(&record_next) {
900 Ordering::Equal => {
901 // We're exactly where the ledger thinks we should be, so nothing to do.
902 debug!(
903 ledger_next,
904 last_record_id,
905 record_events,
906 "Synchronized with ledger. Writer ready."
907 );
908 false
909 }
910 Ordering::Greater => {
911 // Our last write is behind where the ledger thinks we should be, so we
912 // likely missed flushing some records, or partially flushed the data file.
913 // Better roll over to be safe.
914 error!(
915 ledger_next, last_record_id, record_events,
916 "Last record written to data file is behind expected position. Events have likely been lost.");
917 true
918 }
919 Ordering::Less => {
920 // We're actually _ahead_ of the ledger, which is to say we wrote a valid
921 // record to the data file, but never incremented our "writer next record
922 // ID" field. Given that record IDs are monotonic, it's safe to forward
923 // ourselves to make the "writer next record ID" in the ledger match the
924 // reality of the data file. If there were somehow gaps in the data file,
925 // the reader will detect it, and this way, we avoid duplicate record IDs.
926 debug!(
927 ledger_next,
928 last_record_id,
929 record_events,
930 new_ledger_next = record_next,
931 "Ledger desynchronized from data files. Fast forwarding ledger state."
932 );
933 let ledger_record_delta = record_next - ledger_next;
934 let next_record_id = self
935 .ledger
936 .state()
937 .increment_next_writer_record_id(ledger_record_delta);
938 self.next_record_id = next_record_id;
939 self.unflushed_events = 0;
940
941 false
942 }
943 }
944 }
945 // The record payload was corrupted, somehow: we know the checksum failed to match on
946 // both sides, but it could be cosmic radiation that flipped a bit or some process
947 // trampled over the data file... who knows.
948 //
949 // We skip to the next data file to try and start from a clean slate.
950 RecordStatus::Corrupted { .. } => {
951 error!(
952 "Last written record did not match the expected checksum. Corruption likely."
953 );
954 true
955 }
956 // The record itself was corrupted, somehow: it was sufficiently different that `rkyv`
957 // couldn't even validate it, which likely means missing bytes but could also be certain
958 // bytes being invalid for the struct fields they represent. Like invalid checksums, we
959 // really don't know why it happened, only that it happened.
960 //
961 // We skip to the next data file to try and start from a clean slate.
962 RecordStatus::FailedDeserialization(de) => {
963 let reason = de.into_inner();
964 error!(
965 ?reason,
966 "Last written record was unable to be deserialized. Corruption likely."
967 );
968 true
969 }
970 };
971
972 // Reset our internal state, which closes the initial data file we opened, and mark
973 // ourselves as needing to skip to the next data file. This is a little convoluted, but we
974 // need to ensure we follow the normal behavior of trying to open the next data file,
975 // waiting for the reader to delete it if it already exists and hasn't been fully read yet,
976 // etc.
977 //
978 // Essentially, we defer the actual skipping to avoid deadlocking here trying to open a
979 // data file we might not be able to open yet.
980 if should_skip_to_next_file {
981 self.reset();
982 self.mark_for_skip();
983 }
984
985 self.ready_to_write = true;
986
987 Ok(())
988 }
989
990 fn is_buffer_full(&self) -> bool {
991 let total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes;
992 let max_buffer_size = self.config.max_buffer_size;
993 total_buffer_size >= max_buffer_size
994 }
995
996 /// Ensures this writer is ready to attempt writer the next record.
997 #[instrument(skip(self), level = "debug")]
998 async fn ensure_ready_for_write(&mut self) -> io::Result<()> {
999 // Check the overall size of the buffer and figure out if we can write.
1000 loop {
1001 // If we haven't yet exceeded the maximum buffer size, then we can proceed. Likewise, if
1002 // we're still validating our last write, then we know it doesn't matter if the buffer
1003 // is full or not because we're not doing any actual writing here.
1004 //
1005 // Otherwise, wait for the reader to signal that they've made some progress.
1006 if !self.is_buffer_full() || !self.ready_to_write {
1007 break;
1008 }
1009
1010 trace!(
1011 total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes,
1012 max_buffer_size = self.config.max_buffer_size,
1013 "Buffer size limit reached. Waiting for reader progress."
1014 );
1015
1016 self.ledger.wait_for_reader().await;
1017 }
1018
1019 // If we already have an open writer, and we have no more space in the data file to write,
1020 // flush and close the file and mark ourselves as needing to open the _next_ data file.
1021 //
1022 // Likewise, if initialization detected an invalid record on the starting data file, and we
1023 // need to skip to the next file, we honor that here.
1024 let mut should_open_next = self.should_skip();
1025 if self.writer.is_some() {
1026 if self.can_write() {
1027 return Ok(());
1028 }
1029
1030 // Our current data file is full, so we need to open a new one. Signal to the loop
1031 // that we we want to try and open the next file, and not the current file,
1032 // essentially to avoid marking the writer as already having moved on to the next
1033 // file before we're sure it isn't already an existing file on disk waiting to be
1034 // read.
1035 //
1036 // We still flush ourselves to disk, etc, to make sure all of the data is there.
1037 should_open_next = true;
1038 self.flush_inner(true).await?;
1039 self.flush_write_state();
1040
1041 self.reset();
1042 }
1043
1044 loop {
1045 // Normally, readers will keep up with the writers, and so there will only ever be a
1046 // single data file or two on disk. If there was an issue with a sink reading from this
1047 // buffer, though, we could conceivably have a stalled reader while the writer
1048 // progresses and continues to create new data file.
1049 //
1050 // At some point, the file ID will wrap around and the writer will want to open a "new"
1051 // file for writing that already exists: a previously-written file that has not been
1052 // read yet.
1053 //
1054 // In order to handle this situation, we loop here, trying to create the file. Readers
1055 // are responsible deleting a file once they have read it entirely, so our first loop
1056 // iteration is the happy path, trying to create the new file. If we can't create it,
1057 // this may be because it already exists and we're just picking up where we left off
1058 // from last time, but it could also be a data file that a reader hasn't completed yet.
1059 let data_file_path = if should_open_next {
1060 self.ledger.get_next_writer_data_file_path()
1061 } else {
1062 self.ledger.get_current_writer_data_file_path()
1063 };
1064
1065 let maybe_data_file = self
1066 .ledger
1067 .filesystem()
1068 .open_file_writable_atomic(&data_file_path)
1069 .await;
1070 let file = match maybe_data_file {
1071 // We were able to create the file, so we're good to proceed.
1072 Ok(data_file) => Some((data_file, 0)),
1073 // We got back an error trying to open the file: might be that it already exists,
1074 // might be something else.
1075 Err(e) => match e.kind() {
1076 ErrorKind::AlreadyExists => {
1077 // We open the file again, without the atomic "create new" behavior. If we
1078 // can do that successfully, we check its length. There's three main
1079 // situations we encounter:
1080 // - the reader may have deleted the data file between the atomic create
1081 // open and this one, and so we would expect the file length to be zero
1082 // - the file still exists, and it's full: the reader may still be reading
1083 // it, or waiting for acknowledgements to be able to delete it
1084 // - it may not be full, which could be because it's the data file the
1085 // writer left off on last time
1086 let data_file = self
1087 .ledger
1088 .filesystem()
1089 .open_file_writable(&data_file_path)
1090 .await?;
1091 let metadata = data_file.metadata().await?;
1092 let file_len = metadata.len();
1093 if file_len == 0 || !should_open_next {
1094 // The file is either empty, which means we created it and "own it" now,
1095 // or it's not empty but we're not skipping to the next file, which can
1096 // only mean that we're still initializing, and so this would be the
1097 // data file we left off writing to.
1098 Some((data_file, file_len))
1099 } else {
1100 // The file isn't empty, and we're not in initialization anymore, which
1101 // means this data file is one that the reader still hasn't finished
1102 // reading through yet, and so we must wait for the reader to delete it
1103 // before we can proceed.
1104 None
1105 }
1106 }
1107 // Legitimate I/O error with the operation, bubble this up.
1108 _ => return Err(e),
1109 },
1110 };
1111
1112 if let Some((data_file, data_file_size)) = file {
1113 // We successfully opened the file and it can be written to.
1114 debug!(
1115 data_file_path = data_file_path.to_string_lossy().as_ref(),
1116 existing_file_size = data_file_size,
1117 "Opened data file for writing."
1118 );
1119
1120 // Make sure the file is flushed to disk, especially if we just created it.
1121 data_file.sync_all().await?;
1122
1123 self.writer = Some(RecordWriter::new(
1124 data_file,
1125 data_file_size,
1126 self.config.write_buffer_size,
1127 self.config.max_data_file_size,
1128 self.config.max_record_size,
1129 ));
1130 self.data_file_size = data_file_size;
1131
1132 // If we opened the "next" data file, we need to increment the current writer
1133 // file ID now to signal that the writer has moved on.
1134 if should_open_next {
1135 self.ledger.state().increment_writer_file_id();
1136 self.ledger.notify_writer_waiters();
1137
1138 debug!(
1139 new_writer_file_id = self.ledger.get_current_writer_file_id(),
1140 "Writer now on new data file."
1141 );
1142 }
1143
1144 return Ok(());
1145 }
1146
1147 // The file is still present and waiting for a reader to finish reading it in order
1148 // to delete it. Wait until the reader signals progress and try again.
1149 debug!("Target data file is still present and not yet processed. Waiting for reader.");
1150 self.ledger.wait_for_reader().await;
1151 }
1152 }
1153
1154 /// Attempts to write a record.
1155 ///
1156 /// If the buffer is currently full, the original record will be immediately returned.
1157 /// Otherwise, a write will be executed, which will run to completion, and `None` will be returned.
1158 ///
1159 /// # Errors
1160 ///
1161 /// If an error occurred while writing the record, an error variant will be returned describing
1162 /// the error.
1163 pub async fn try_write_record(&mut self, record: T) -> Result<Option<T>, WriterError<T>> {
1164 self.try_write_record_inner(record).await.map(Result::err)
1165 }
1166
1167 #[instrument(skip_all, level = "debug")]
1168 async fn try_write_record_inner(
1169 &mut self,
1170 mut record: T,
1171 ) -> Result<Result<usize, T>, WriterError<T>> {
1172 // If the buffer is already full, we definitely can't complete this write.
1173 if self.is_buffer_full() {
1174 return Ok(Err(record));
1175 }
1176
1177 let record_events: NonZeroUsize = record
1178 .event_count()
1179 .try_into()
1180 .map_err(|_| WriterError::EmptyRecord)?;
1181
1182 // Grab the next record ID and attempt to write the record.
1183 let record_id = self.get_next_record_id();
1184
1185 let token = loop {
1186 // Make sure we have an open data file to write to, which might also be us opening the
1187 // next data file because our first attempt at writing had to finalize a data file that
1188 // was already full.
1189 self.ensure_ready_for_write().await.context(IoSnafu)?;
1190
1191 let writer = self
1192 .writer
1193 .as_mut()
1194 .expect("writer should exist after `ensure_ready_for_write`");
1195
1196 // Archive the record, which if it succeeds in terms of encoding, etc, will give us a token that we can use
1197 // to eventually write it to storage. This may fail if the record writer detects it can't fit the archived
1198 // record in the current data file, so we handle that separately. All other errors must be handled by the caller.
1199 match writer.archive_record(record_id, record) {
1200 Ok(token) => break token,
1201 Err(we) => match we {
1202 WriterError::DataFileFull {
1203 record: old_record,
1204 serialized_len,
1205 } => {
1206 // The data file is full, so we need to roll to the next one before attempting
1207 // the write again. We also recapture the record for the next write attempt.
1208 self.mark_data_file_full();
1209 record = old_record;
1210
1211 debug!(
1212 current_data_file_size = self.data_file_size,
1213 max_data_file_size = self.config.max_data_file_size,
1214 last_attempted_write_size = serialized_len,
1215 "Current data file reached maximum size. Rolling to the next data file."
1216 );
1217 }
1218 e => return Err(e),
1219 },
1220 }
1221 };
1222
1223 // Now that we know the record was archived successfully -- record wasn't too large, etc -- we actually need
1224 // to check if it will fit based on our current buffer size. If not, we recover the record from the writer's
1225 // internal buffers, as we haven't yet flushed it, and we return it to the caller.
1226 //
1227 // Otherwise, we proceed with flushing like we normally would.
1228 let can_write_record = self.can_write_record(token.serialized_len());
1229 let writer = self
1230 .writer
1231 .as_mut()
1232 .expect("writer should exist after `ensure_ready_for_write`");
1233
1234 let (bytes_written, flush_result) = if can_write_record {
1235 // We always return errors here because flushing the record won't return a recoverable error like
1236 // `DataFileFull`, as that gets checked during archiving.
1237 writer.flush_record(token).await?
1238 } else {
1239 // The record would not fit given the current size of the buffer, so we need to recover it from the
1240 // writer and hand it back. This looks a little weird because we want to surface deserialize/decoding
1241 // errors if we encounter them, but if we recover the record successfully, we're returning
1242 // `Ok(Err(record))` to signal that our attempt failed but the record is able to be retried again later.
1243 return Ok(Err(writer.recover_archived_record(&token)?));
1244 };
1245
1246 // Track our write since things appear to have succeeded. This only updates our internal
1247 // state as we have not yet authoritatively flushed the write to the data file. This tracks
1248 // not only how many bytes we have buffered, but also how many events, which in turn drives
1249 // record ID generation. We do this after the write appears to succeed to avoid issues with
1250 // setting the ledger state to a record ID that we may never have actually written, which
1251 // could lead to record ID gaps.
1252 self.track_write(record_events.get(), bytes_written as u64);
1253
1254 // If we did flush some buffered writes during this write, however, we now compensate for
1255 // that after updating our internal state. We'll also notify the reader, too, since the
1256 // data should be available to read:
1257 if let Some(flush_result) = flush_result {
1258 self.flush_write_state_partial(flush_result.events_flushed, flush_result.bytes_flushed);
1259 self.ledger.notify_writer_waiters();
1260 }
1261
1262 trace!(
1263 record_id,
1264 record_events,
1265 bytes_written,
1266 data_file_id = self.ledger.get_current_writer_file_id(),
1267 "Wrote record."
1268 );
1269
1270 Ok(Ok(bytes_written))
1271 }
1272
1273 /// Writes a record.
1274 ///
1275 /// If the record was written successfully, the number of bytes written to the data file will be
1276 /// returned.
1277 ///
1278 /// # Errors
1279 ///
1280 /// If an error occurred while writing the record, an error variant will be returned describing
1281 /// the error.
1282 #[instrument(skip_all, level = "debug")]
1283 pub async fn write_record(&mut self, mut record: T) -> Result<usize, WriterError<T>> {
1284 loop {
1285 match self.try_write_record_inner(record).await? {
1286 Ok(bytes_written) => return Ok(bytes_written),
1287 Err(old_record) => {
1288 record = old_record;
1289 self.ledger.wait_for_reader().await;
1290 }
1291 }
1292 }
1293 }
1294
1295 #[instrument(skip(self), level = "debug")]
1296 async fn flush_inner(&mut self, force_full_flush: bool) -> io::Result<()> {
1297 // We always flush the `BufWriter` when this is called, but we don't always flush to disk or
1298 // flush the ledger. This is enough for readers on Linux since the file ends up in the page
1299 // cache, as we don't do any O_DIRECT fanciness, and the new contents can be immediately
1300 // read.
1301 //
1302 // TODO: Windows has a page cache as well, and macOS _should_, but we should verify this
1303 // behavior works on those platforms as well.
1304 if let Some(writer) = self.writer.as_mut() {
1305 writer.flush().await?;
1306 self.ledger.notify_writer_waiters();
1307 }
1308
1309 if self.ledger.should_flush() || force_full_flush {
1310 if let Some(writer) = self.writer.as_mut() {
1311 writer.sync_all().await?;
1312 }
1313
1314 self.ledger.flush()
1315 } else {
1316 Ok(())
1317 }
1318 }
1319
1320 /// Flushes the writer.
1321 ///
1322 /// This must be called for the reader to be able to make progress.
1323 ///
1324 /// This does not ensure that the data is fully synchronized (i.e. `fsync`) to disk, however it
1325 /// may sometimes perform a full synchronization if the time since the last full synchronization
1326 /// occurred has exceeded a configured limit.
1327 ///
1328 /// # Errors
1329 ///
1330 /// If there is an error while flushing either the current data file or the ledger, an error
1331 /// variant will be returned describing the error.
1332 #[instrument(skip(self), level = "trace")]
1333 pub async fn flush(&mut self) -> io::Result<()> {
1334 self.flush_inner(false).await?;
1335 self.flush_write_state();
1336 Ok(())
1337 }
1338}
1339
1340impl<T, FS> BufferWriter<T, FS>
1341where
1342 FS: Filesystem,
1343 FS::File: Unpin,
1344{
1345 /// Closes this [`Writer`], marking it as done.
1346 ///
1347 /// Closing the writer signals to the reader that no more records will be written until the
1348 /// buffer is reopened. Writers and readers effectively share a "session", so until the writer
1349 /// and reader both close, the buffer cannot be reopened by another Vector instance.
1350 ///
1351 /// In turn, the reader is able to know that when the writer is marked as done, and it cannot
1352 /// read any more data, that nothing else is actually coming, and it can terminate by beginning
1353 /// to return `None`.
1354 #[instrument(skip(self), level = "trace")]
1355 pub fn close(&mut self) {
1356 if self.ledger.mark_writer_done() {
1357 debug!("Writer marked as closed.");
1358 self.ledger.notify_writer_waiters();
1359 }
1360 }
1361}
1362
1363impl<T, FS> Drop for BufferWriter<T, FS>
1364where
1365 FS: Filesystem,
1366 FS::File: Unpin,
1367{
1368 fn drop(&mut self) {
1369 self.close();
1370 }
1371}