vector_buffers/variants/disk_v2/writer.rs
1use std::{
2 cmp::Ordering,
3 convert::Infallible as StdInfallible,
4 fmt,
5 io::{self, ErrorKind},
6 marker::PhantomData,
7 num::NonZeroUsize,
8 sync::Arc,
9};
10
11use bytes::BufMut;
12use crc32fast::Hasher;
13use rkyv::{
14 AlignedVec, Infallible,
15 ser::{
16 Serializer,
17 serializers::{
18 AlignedSerializer, AllocScratch, AllocScratchError, BufferScratch, CompositeSerializer,
19 CompositeSerializerError, FallbackScratch,
20 },
21 },
22};
23use snafu::{ResultExt, Snafu};
24use tokio::io::{AsyncWrite, AsyncWriteExt};
25
26use super::{
27 common::{DiskBufferConfig, create_crc32c_hasher},
28 io::Filesystem,
29 ledger::Ledger,
30 record::{Record, RecordStatus, validate_record_archive},
31};
32use crate::{
33 Bufferable,
34 encoding::{AsMetadata, Encodable},
35 variants::disk_v2::{
36 io::AsyncFile,
37 reader::decode_record_payload,
38 record::{RECORD_HEADER_LEN, try_as_record_archive},
39 },
40};
41
42/// Error that occurred during calls to [`BufferWriter`].
43#[derive(Debug, Snafu)]
44pub enum WriterError<T>
45where
46 T: Bufferable,
47{
48 /// A general I/O error occurred.
49 ///
50 /// Different methods will capture specific I/O errors depending on the situation, as some
51 /// errors may be expected and considered normal by design. For all I/O errors that are
52 /// considered atypical, they will be returned as this variant.
53 #[snafu(display("write I/O error: {}", source))]
54 Io { source: io::Error },
55
56 /// The record attempting to be written was too large.
57 ///
58 /// In practice, most encoders will throw their own error if they cannot write all of the
59 /// necessary bytes during encoding, and so this error will typically only be emitted when the
60 /// encoder throws no error during the encoding step itself, but manages to fill up the encoding
61 /// buffer to the limit.
62 #[snafu(display("record too large: limit is {}", limit))]
63 RecordTooLarge { limit: usize },
64
65 /// The data file did not have enough remaining space to write the record.
66 ///
67 /// This could be because the data file is legitimately full, but is more commonly related to a
68 /// record being big enough that it would exceed the max data file size.
69 ///
70 /// The record that was given to write is returned.
71 #[snafu(display("data file full or record would exceed max data file size"))]
72 DataFileFull { record: T, serialized_len: usize },
73
74 /// A record reported that it contained more events than the number of bytes when encoded.
75 ///
76 /// This is nonsensical because we don't intend to ever support encoding zero-sized types
77 /// through the buffer, and the logic we use to count the number of actual events in the buffer
78 /// transitively depends on not being able to represent more than one event per encoded byte.
79 #[snafu(display(
80 "record reported event count ({}) higher than encoded length ({})",
81 encoded_len,
82 event_count
83 ))]
84 NonsensicalEventCount {
85 encoded_len: usize,
86 event_count: usize,
87 },
88
89 /// The encoder encountered an issue during encoding.
90 ///
91 /// For common encoders, failure to write all of the bytes of the input will be the most common
92 /// error, and in fact, some encoders, it's the only possible error that can occur.
93 #[snafu(display("failed to encode record: {:?}", source))]
94 FailedToEncode {
95 source: <T as Encodable>::EncodeError,
96 },
97
98 /// The writer failed to serialize the record.
99 ///
100 /// As records are encoded and then wrapped in a container which carries metadata about the size
101 /// of the encoded record, and so on, there is a chance that we could fail to serialize that
102 /// container during the write step.
103 ///
104 /// In practice, this should generally only occur if the system is unable to allocate enough
105 /// memory during the serialization step aka the system itself is literally out of memory to
106 /// give to processes. Rare, indeed.
107 #[snafu(display("failed to serialize encoded record to buffer: {}", reason))]
108 FailedToSerialize { reason: String },
109
110 /// The writer failed to validate the last written record.
111 ///
112 /// Specifically, for `BufferWriter`, this can only ever be returned when creating the buffer, during
113 /// validation of the last written record. While it's technically possible that it may be
114 /// something else, this error is most likely to occur when the records in a buffer were written
115 /// in a different version of Vector that cannot be decoded in this version of Vector.
116 #[snafu(display("failed to validate the last written record: {}", reason))]
117 FailedToValidate { reason: String },
118
119 /// The writer entered an inconsistent state that represents an unrecoverable error.
120 ///
121 /// In some cases, like expecting to be able to decode an event we just encoded, we might hit an
122 /// error. This would be an entirely unexpected error -- how is it possible to not be able to
123 /// decode an event we literally just encoded on the line above? -- and as such, the only
124 /// reasonable thing to do would be to give up.
125 ///
126 /// This error is the writer, and thus the buffer, giving up.
127 #[snafu(display("writer entered inconsistent state: {}", reason))]
128 InconsistentState { reason: String },
129
130 /// The record reported an event count of zero.
131 ///
132 /// Empty records are not supported.
133 EmptyRecord,
134}
135
136impl<T: Bufferable + PartialEq> PartialEq for WriterError<T> {
137 fn eq(&self, other: &Self) -> bool {
138 match (self, other) {
139 (Self::Io { source: l_source }, Self::Io { source: r_source }) => {
140 l_source.kind() == r_source.kind()
141 }
142 (Self::RecordTooLarge { limit: l_limit }, Self::RecordTooLarge { limit: r_limit }) => {
143 l_limit == r_limit
144 }
145 (
146 Self::DataFileFull {
147 record: l_record,
148 serialized_len: l_serialized_len,
149 },
150 Self::DataFileFull {
151 record: r_record,
152 serialized_len: r_serialized_len,
153 },
154 ) => l_record == r_record && l_serialized_len == r_serialized_len,
155 (
156 Self::NonsensicalEventCount {
157 encoded_len: l_encoded_len,
158 event_count: l_event_count,
159 },
160 Self::NonsensicalEventCount {
161 encoded_len: r_encoded_len,
162 event_count: r_event_count,
163 },
164 ) => l_encoded_len == r_encoded_len && l_event_count == r_event_count,
165 (
166 Self::FailedToSerialize { reason: l_reason },
167 Self::FailedToSerialize { reason: r_reason },
168 )
169 | (
170 Self::FailedToValidate { reason: l_reason },
171 Self::FailedToValidate { reason: r_reason },
172 )
173 | (
174 Self::InconsistentState { reason: l_reason },
175 Self::InconsistentState { reason: r_reason },
176 ) => l_reason == r_reason,
177 _ => core::mem::discriminant(self) == core::mem::discriminant(other),
178 }
179 }
180}
181
182impl<T> From<CompositeSerializerError<StdInfallible, AllocScratchError, StdInfallible>>
183 for WriterError<T>
184where
185 T: Bufferable,
186{
187 fn from(e: CompositeSerializerError<StdInfallible, AllocScratchError, StdInfallible>) -> Self {
188 match e {
189 CompositeSerializerError::ScratchSpaceError(sse) => WriterError::FailedToSerialize {
190 reason: format!("insufficient space to serialize encoded record: {sse}"),
191 },
192 // Only our scratch space strategy is fallible, so we should never get here.
193 _ => unreachable!(),
194 }
195 }
196}
197
198impl<T> From<io::Error> for WriterError<T>
199where
200 T: Bufferable,
201{
202 fn from(source: io::Error) -> Self {
203 WriterError::Io { source }
204 }
205}
206
207#[derive(Debug)]
208pub(super) struct WriteToken {
209 event_count: usize,
210 serialized_len: usize,
211}
212
213impl WriteToken {
214 pub fn event_count(&self) -> usize {
215 self.event_count
216 }
217
218 pub fn serialized_len(&self) -> usize {
219 self.serialized_len
220 }
221}
222
223#[derive(Debug, Default, PartialEq)]
224pub(super) struct FlushResult {
225 pub events_flushed: u64,
226 pub bytes_flushed: u64,
227}
228
229/// Wraps an [`AsyncWrite`] value and buffers individual writes, while signalling implicit flushes.
230///
231/// As the [`BufferWriter`] must track when writes have theoretically made it to disk, we care about
232/// situations where the internal write buffer for a data file has been flushed to make room. In
233/// order to provide this information, we track the number of events represented by a record when
234/// writing its serialized form.
235///
236/// If an implicit buffer flush must be performed before a write can complete, or a manual flush is
237/// requested, we return this information to the caller, letting them know how many events, and how
238/// many bytes, were flushed.
239struct TrackingBufWriter<W> {
240 inner: W,
241 buf: Vec<u8>,
242 unflushed_events: usize,
243}
244
245impl<W: AsyncWrite + Unpin> TrackingBufWriter<W> {
246 /// Creates a new `TrackingBufWriter` with the specified buffer capacity.
247 fn with_capacity(cap: usize, inner: W) -> Self {
248 Self {
249 inner,
250 buf: Vec::with_capacity(cap),
251 unflushed_events: 0,
252 }
253 }
254
255 /// Writes the given buffer.
256 ///
257 /// If enough internal buffer capacity is available, then this write will be buffered internally
258 /// until [`flush`] is called. If there's not enough remaining internal buffer capacity, then
259 /// the internal buffer will be flushed to the inner writer first. If the given buffer is
260 /// larger than the internal buffer capacity, then it will be written directly to the inner
261 /// writer.
262 ///
263 /// Internally, a counter is kept of how many buffered events are waiting to be flushed. This
264 /// count is incremented every time `write` can fully buffer the record without having to flush
265 /// to the inner writer.
266 ///
267 /// If this call requires the internal buffer to be flushed out to the inner writer, then the
268 /// write result will indicate how many buffered events were flushed, and their total size in
269 /// bytes. Additionally, if the given buffer is larger than the internal buffer itself, it will
270 /// also be included in the write result as well.
271 ///
272 /// # Errors
273 ///
274 /// If a write to the inner writer occurs, and that write encounters an error, an error variant
275 /// will be returned describing the error.
276 async fn write(&mut self, event_count: usize, buf: &[u8]) -> io::Result<Option<FlushResult>> {
277 let mut flush_result = None;
278
279 // If this write would cause us to exceed our internal buffer capacity, flush whatever we
280 // have buffered already.
281 if self.buf.len() + buf.len() > self.buf.capacity() {
282 flush_result = self.flush().await?;
283 }
284
285 // If the given buffer is too large to be buffered at all, then bypass the internal buffer.
286 if buf.len() >= self.buf.capacity() {
287 self.inner.write_all(buf).await?;
288
289 let flush_result = flush_result.get_or_insert(FlushResult::default());
290 flush_result.events_flushed += event_count as u64;
291 flush_result.bytes_flushed += buf.len() as u64;
292 } else {
293 self.buf.extend_from_slice(buf);
294 self.unflushed_events += event_count;
295 }
296
297 Ok(flush_result)
298 }
299
300 /// Flushes the internal buffer to the underlying writer.
301 ///
302 /// Internally, a counter is kept of how many buffered events are waiting to be flushed. This
303 /// count is incremented every time `write` can fully buffer the record without having to flush
304 /// to the inner writer.
305 ///
306 /// If any buffered record are present, then the write result will indicate how many
307 /// individual events were flushed, including their total size in bytes.
308 ///
309 /// # Errors
310 ///
311 /// If a write to the underlying writer occurs, and that write encounters an error, an error variant
312 /// will be returned describing the error.
313 async fn flush(&mut self) -> io::Result<Option<FlushResult>> {
314 if self.buf.is_empty() {
315 return Ok(None);
316 }
317
318 let events_flushed = self.unflushed_events as u64;
319 let bytes_flushed = self.buf.len() as u64;
320
321 let result = self.inner.write_all(&self.buf[..]).await;
322 self.unflushed_events = 0;
323 self.buf.clear();
324
325 result.map(|()| {
326 Some(FlushResult {
327 events_flushed,
328 bytes_flushed,
329 })
330 })
331 }
332
333 /// Gets a reference to the underlying writer.
334 #[cfg(test)]
335 fn get_ref(&self) -> &W {
336 &self.inner
337 }
338
339 /// Gets a mutable reference to the underlying writer.
340 fn get_mut(&mut self) -> &mut W {
341 &mut self.inner
342 }
343}
344
345impl<W: fmt::Debug> fmt::Debug for TrackingBufWriter<W> {
346 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
347 f.debug_struct("TrackingBufWriter")
348 .field("writer", &self.inner)
349 .field(
350 "buffer",
351 &format_args!("{}/{}", self.buf.len(), self.buf.capacity()),
352 )
353 .field("unflushed_events", &self.unflushed_events)
354 .finish()
355 }
356}
357
358/// Buffered writer that handles encoding, checksumming, and serialization of records.
359#[derive(Debug)]
360pub(super) struct RecordWriter<W, T> {
361 writer: TrackingBufWriter<W>,
362 encode_buf: Vec<u8>,
363 ser_buf: AlignedVec,
364 ser_scratch: AlignedVec,
365 checksummer: Hasher,
366 max_record_size: usize,
367 current_data_file_size: u64,
368 max_data_file_size: u64,
369 _t: PhantomData<T>,
370}
371
372impl<W, T> RecordWriter<W, T>
373where
374 W: AsyncFile + Unpin,
375 T: Bufferable,
376{
377 /// Creates a new [`RecordWriter`] around the provided writer.
378 ///
379 /// Internally, the writer is wrapped in a [`BufWriter`], so callers should not pass in an
380 /// already buffered writer.
381 pub fn new(
382 writer: W,
383 current_data_file_size: u64,
384 write_buffer_size: usize,
385 max_data_file_size: u64,
386 max_record_size: usize,
387 ) -> Self {
388 // These should also be getting checked at a higher level, but we're double-checking them here to be absolutely sure.
389 let max_record_size_converted = u64::try_from(max_record_size)
390 .expect("Maximum record size must be less than 2^64 bytes.");
391
392 debug_assert!(
393 max_record_size > RECORD_HEADER_LEN,
394 "maximum record length must be larger than size of record header itself"
395 );
396 debug_assert!(
397 max_data_file_size >= max_record_size_converted,
398 "must always be able to fit at least one record into a data file"
399 );
400
401 // We subtract the length of the record header from our allowed maximum record size, because we have to make sure
402 // that when we go to actually wrap and serialize the encoded record, we're limiting the actual bytes we write
403 // to disk to within `max_record_size`.
404 //
405 // This could lead to us reducing the encode buffer size limit by slightly more than necessary, since
406 // `RECORD_HEADER_LEN` might be overaligned compared to what it would be necessary when we look at the
407 // encoded/serialized record... but that's OK, but it's only going to differ by 8 bytes at most.
408 let max_record_size = max_record_size - RECORD_HEADER_LEN;
409
410 Self {
411 writer: TrackingBufWriter::with_capacity(write_buffer_size, writer),
412 encode_buf: Vec::with_capacity(16_384),
413 ser_buf: AlignedVec::with_capacity(16_384),
414 ser_scratch: AlignedVec::with_capacity(16_384),
415 checksummer: create_crc32c_hasher(),
416 max_record_size,
417 current_data_file_size,
418 max_data_file_size,
419 _t: PhantomData,
420 }
421 }
422
423 /// Gets a reference to the underlying writer.
424 #[cfg(test)]
425 pub fn get_ref(&self) -> &W {
426 self.writer.get_ref()
427 }
428
429 /// Whether or not `amount` bytes could be written while obeying the data file size limit.
430 ///
431 /// If no bytes have written at all to a data file, then `amount` is allowed to exceed the
432 /// limit, otherwise a record would never be able to be written.
433 fn can_write(&self, amount: usize) -> bool {
434 let amount = u64::try_from(amount).expect("`amount` should need ever 2^64 bytes.");
435
436 self.current_data_file_size + amount <= self.max_data_file_size
437 }
438
439 /// Archives a record.
440 ///
441 /// This encodes the record, as well as serializes it into its archival format that will be
442 /// stored on disk. The total size of the archived record, including the length delimiter
443 /// inserted before the archived record, will be returned.
444 ///
445 /// # Errors
446 ///
447 /// Errors can occur during the encoding or serialization stage. If an error occurs
448 /// during any of these stages, an appropriate error variant will be returned describing the error.
449 #[instrument(skip(self, record), level = "trace")]
450 pub fn archive_record(&mut self, id: u64, record: T) -> Result<WriteToken, WriterError<T>> {
451 let event_count = record.event_count();
452
453 self.encode_buf.clear();
454 self.ser_buf.clear();
455 self.ser_scratch.clear();
456
457 // We first encode the record, which puts it into the desired encoded form. This is where
458 // we assert the record is within size limits, etc.
459 //
460 // NOTE: Some encoders may not write to the buffer in a way that fills it up before
461 // themselves returning an error because they know the buffer is too small. This means we
462 // may often return the "failed to encode" error variant when the true error is that the
463 // payload size, when encoded, exceeds our limit.
464 //
465 // Unfortunately, there's not a whole lot for us to do here beyond allowing our buffer to
466 // grow beyond the limit so that we can try to allow encoding to succeed so that we can grab
467 // the actual encoded size and then check it against the limit.
468 //
469 // C'est la vie.
470 let encode_result = {
471 let mut encode_buf = (&mut self.encode_buf).limit(self.max_record_size);
472 record.encode(&mut encode_buf)
473 };
474 let encoded_len = encode_result
475 .map(|()| self.encode_buf.len())
476 .context(FailedToEncodeSnafu)?;
477 if encoded_len > self.max_record_size {
478 return Err(WriterError::RecordTooLarge {
479 limit: self.max_record_size,
480 });
481 }
482
483 let metadata = T::get_metadata().into_u32();
484 let wrapped_record =
485 Record::with_checksum(id, metadata, &self.encode_buf, &self.checksummer);
486
487 // Push 8 dummy bytes where our length delimiter will sit. We'll fix this up after
488 // serialization. Notably, `AlignedSerializer` will report the serializer position as
489 // the length of its backing store, which now includes our 8 bytes, so we _subtract_
490 // those from the position when figuring out the actual value to write back after.
491 //
492 // We write it this way -- in the serializer buffer, and not as a separate write -- so that
493 // we can do a single write but also so that we always have an aligned buffer.
494 self.ser_buf
495 .extend_from_slice(&[0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]);
496
497 // Now serialize the record, which puts it into its archived form. This is what powers our
498 // ability to do zero-copy deserialization from disk.
499 let mut serializer = CompositeSerializer::new(
500 AlignedSerializer::new(&mut self.ser_buf),
501 FallbackScratch::new(
502 BufferScratch::new(&mut self.ser_scratch),
503 AllocScratch::new(),
504 ),
505 Infallible,
506 );
507
508 let serialized_len = serializer
509 .serialize_value(&wrapped_record)
510 .map(|_| serializer.pos())?;
511
512 // Sanity check before we do our length math.
513 if serialized_len <= 8 || self.ser_buf.len() != serialized_len {
514 return Err(WriterError::FailedToSerialize {
515 reason: format!(
516 "serializer position invalid for context: pos={} len={}",
517 serialized_len,
518 self.ser_buf.len(),
519 ),
520 });
521 }
522
523 // With the record archived and serialized, do our final check to ensure we can fit this
524 // write. We're doing this earlier than the actual call to flush it because it gives us
525 // a chance to hand back the event so that the caller can roll to a new data file first
526 // before attempting the writer again.
527 if !self.can_write(serialized_len) {
528 debug!(
529 current_data_file_size = self.current_data_file_size,
530 max_data_file_size = self.max_data_file_size,
531 archive_on_disk_len = serialized_len,
532 "Archived record is too large to fit in remaining free space of current data file."
533 );
534
535 // We have to decode the record back out to actually be able to give it back. If we
536 // can't decode it for some reason, this is entirely an unrecoverable error, since an
537 // encoded record should always be decodable within the same process that encoded it.
538 let record = T::decode(T::get_metadata(), &self.encode_buf[..]).map_err(|_| {
539 WriterError::InconsistentState {
540 reason: "failed to decode record immediately after encoding it".to_string(),
541 }
542 })?;
543
544 return Err(WriterError::DataFileFull {
545 record,
546 serialized_len,
547 });
548 }
549
550 // Fix up our length delimiter.
551 let archive_len = serialized_len - 8;
552 let wire_archive_len: u64 = archive_len
553 .try_into()
554 .expect("archive len should always fit into a u64");
555 let archive_len_buf = wire_archive_len.to_be_bytes();
556
557 let length_delimiter_dst = &mut self.ser_buf[0..8];
558 length_delimiter_dst.copy_from_slice(&archive_len_buf[..]);
559
560 Ok(WriteToken {
561 event_count,
562 serialized_len,
563 })
564 }
565
566 /// Writes a record.
567 ///
568 /// If the write is successful, the number of bytes written to the buffer are returned.
569 /// Additionally, if any internal buffers required an implicit flush, the result of that flush
570 /// operation is returned as well.
571 ///
572 /// As we internally buffers write to the underlying data file, to reduce the number of syscalls
573 /// required to pushed serialized records to the data file, we sometimes will write a record
574 /// which would overflow the internal buffer. Doing so means we have to first flush the buffer
575 /// before continuing with buffering the current write. As some invariants are based on knowing
576 /// when a record has actually been written to the data file, we return any information of
577 /// implicit flushes so that the writer can be aware of when data has actually made it to the
578 /// data file or not.
579 ///
580 /// # Errors
581 ///
582 /// Errors can occur during the encoding, serialization, or I/O stage. If an error occurs
583 /// during any of these stages, an appropriate error variant will be returned describing the error.
584 #[instrument(skip(self, record), level = "trace")]
585 #[cfg(test)]
586 pub async fn write_record(
587 &mut self,
588 id: u64,
589 record: T,
590 ) -> Result<(usize, Option<FlushResult>), WriterError<T>> {
591 let token = self.archive_record(id, record)?;
592 self.flush_record(token).await
593 }
594
595 /// Flushes the previously-archived record.
596 ///
597 /// If the flush is successful, the number of bytes written to the buffer are returned.
598 /// Additionally, if any internal buffers required an implicit flush, the result of that flush
599 /// operation is returned as well.
600 ///
601 /// As we internally buffers write to the underlying data file, to reduce the number of syscalls
602 /// required to pushed serialized records to the data file, we sometimes will write a record
603 /// which would overflow the internal buffer. Doing so means we have to first flush the buffer
604 /// before continuing with buffering the current write. As some invariants are based on knowing
605 /// when a record has actually been written to the data file, we return any information of
606 /// implicit flushes so that the writer can be aware of when data has actually made it to the
607 /// data file or not.
608 #[instrument(skip(self), level = "trace")]
609 pub async fn flush_record(
610 &mut self,
611 token: WriteToken,
612 ) -> Result<(usize, Option<FlushResult>), WriterError<T>> {
613 // Make sure the write token we've been given matches whatever the last call to `archive_record` generated.
614 let event_count = token.event_count();
615 let serialized_len = token.serialized_len();
616 debug_assert_eq!(
617 serialized_len,
618 self.ser_buf.len(),
619 "using write token from non-contiguous archival call"
620 );
621
622 let flush_result = self
623 .writer
624 .write(event_count, &self.ser_buf[..])
625 .await
626 .context(IoSnafu)?;
627
628 // Update our current data file size.
629 self.current_data_file_size += u64::try_from(serialized_len)
630 .expect("Serialized length of record should never exceed 2^64 bytes.");
631
632 Ok((serialized_len, flush_result))
633 }
634
635 /// Recovers an archived record that has not yet been flushed.
636 ///
637 /// In some cases, we must archive a record to see how large the resulting archived record is, and potentially
638 /// recover the original record if it's too large, and so on.
639 ///
640 /// This method allows decoding an archived record that is still sitting in the internal buffers waiting to be
641 /// flushed. Technically, this decodes the original record back from its archived/encoded form, and so this isn't a
642 /// clone but it does mean incurring the cost of decoding directly.
643 ///
644 /// # Errors
645 ///
646 /// If the archived record cannot be deserialized from its archival form, or can't be decoded back to its original
647 /// form `T`, an error variant will be returned describing the error. Notably, the only error we return is
648 /// `InconsistentState`, as being unable to immediately deserialize and decode a record we just serialized and
649 /// encoded implies a fatal, and unrecoverable, error with the buffer implementation as a whole.
650 #[instrument(skip(self), level = "trace")]
651 pub fn recover_archived_record(&mut self, token: &WriteToken) -> Result<T, WriterError<T>> {
652 // Make sure the write token we've been given matches whatever the last call to `archive_record` generated.
653 let serialized_len = token.serialized_len();
654 debug_assert_eq!(
655 serialized_len,
656 self.ser_buf.len(),
657 "using write token from non-contiguous archival call"
658 );
659
660 // First, decode the archival wrapper. This means skipping the length delimiter.
661 let wrapped_record = try_as_record_archive(&self.ser_buf[8..]).map_err(|_| {
662 WriterError::InconsistentState {
663 reason: "failed to decode archived record immediately after archiving it"
664 .to_string(),
665 }
666 })?;
667
668 // Now we can actually decode it as `T`.
669 let record_metadata = T::Metadata::from_u32(wrapped_record.metadata()).ok_or(
670 WriterError::InconsistentState {
671 reason: "failed to decode record metadata immediately after encoding it"
672 .to_string(),
673 },
674 )?;
675
676 T::decode(record_metadata, wrapped_record.payload()).map_err(|_| {
677 WriterError::InconsistentState {
678 reason: "failed to decode record immediately after encoding it".to_string(),
679 }
680 })
681 }
682
683 /// Flushes the writer.
684 ///
685 /// This flushes both the internal buffered writer and the underlying writer object.
686 ///
687 /// # Errors
688 ///
689 /// If there is an I/O error while flushing either the buffered writer or the underlying writer,
690 /// an error variant will be returned describing the error.
691 #[instrument(skip(self), level = "debug")]
692 pub async fn flush(&mut self) -> io::Result<Option<FlushResult>> {
693 self.writer.flush().await
694 }
695
696 /// Synchronizes the underlying file to disk.
697 ///
698 /// This tries to synchronize both data and metadata.
699 ///
700 /// # Errors
701 ///
702 /// If there is an I/O error while syncing the file, an error variant will be returned
703 /// describing the error.
704 #[instrument(skip(self), level = "debug")]
705 pub async fn sync_all(&mut self) -> io::Result<()> {
706 self.writer.get_mut().sync_all().await
707 }
708}
709
710/// Writes records to the buffer.
711#[derive(Debug)]
712pub struct BufferWriter<T, FS>
713where
714 FS: Filesystem,
715 FS::File: Unpin,
716{
717 ledger: Arc<Ledger<FS>>,
718 config: DiskBufferConfig<FS>,
719 writer: Option<RecordWriter<FS::File, T>>,
720 next_record_id: u64,
721 unflushed_events: u64,
722 data_file_size: u64,
723 unflushed_bytes: u64,
724 data_file_full: bool,
725 skip_to_next: bool,
726 ready_to_write: bool,
727 _t: PhantomData<T>,
728}
729
730impl<T, FS> BufferWriter<T, FS>
731where
732 T: Bufferable,
733 FS: Filesystem + fmt::Debug + Clone,
734 FS::File: Unpin,
735{
736 /// Creates a new [`BufferWriter`] attached to the given [`Ledger`].
737 pub(crate) fn new(ledger: Arc<Ledger<FS>>) -> Self {
738 let config = ledger.config().clone();
739 let next_record_id = ledger.state().get_next_writer_record_id();
740 BufferWriter {
741 ledger,
742 config,
743 writer: None,
744 data_file_size: 0,
745 data_file_full: false,
746 unflushed_bytes: 0,
747 skip_to_next: false,
748 ready_to_write: false,
749 next_record_id,
750 unflushed_events: 0,
751 _t: PhantomData,
752 }
753 }
754
755 fn get_next_record_id(&mut self) -> u64 {
756 self.next_record_id.wrapping_add(self.unflushed_events)
757 }
758
759 fn track_write(&mut self, event_count: usize, record_size: u64) {
760 self.data_file_size += record_size;
761 self.unflushed_events += event_count as u64;
762 self.unflushed_bytes += record_size;
763 }
764
765 fn flush_write_state(&mut self) {
766 self.flush_write_state_partial(self.unflushed_events, self.unflushed_bytes);
767 }
768
769 fn flush_write_state_partial(&mut self, flushed_events: u64, flushed_bytes: u64) {
770 debug_assert!(
771 flushed_events <= self.unflushed_events,
772 "tried to flush more events than are currently unflushed"
773 );
774 debug_assert!(
775 flushed_bytes <= self.unflushed_bytes,
776 "tried to flush more bytes than are currently unflushed"
777 );
778
779 self.next_record_id = self
780 .ledger
781 .state()
782 .increment_next_writer_record_id(flushed_events);
783 self.unflushed_events -= flushed_events;
784 self.unflushed_bytes -= flushed_bytes;
785
786 self.ledger.track_write(flushed_events, flushed_bytes);
787 }
788
789 fn can_write(&self) -> bool {
790 !self.data_file_full && self.data_file_size < self.config.max_data_file_size
791 }
792
793 fn can_write_record(&self, amount: usize) -> bool {
794 let total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes;
795 let potential_write_len =
796 u64::try_from(amount).expect("Vector only supports 64-bit architectures.");
797
798 self.can_write() && total_buffer_size + potential_write_len <= self.config.max_buffer_size
799 }
800
801 #[instrument(skip(self), level = "debug")]
802 fn mark_data_file_full(&mut self) {
803 self.data_file_full = true;
804 }
805
806 #[instrument(skip(self), level = "debug")]
807 fn reset(&mut self) {
808 self.writer = None;
809 self.data_file_size = 0;
810 self.data_file_full = false;
811 }
812
813 #[instrument(skip(self), level = "debug")]
814 fn mark_for_skip(&mut self) {
815 self.skip_to_next = true;
816 }
817
818 fn should_skip(&mut self) -> bool {
819 let should_skip = self.skip_to_next;
820 if should_skip {
821 self.skip_to_next = false;
822 }
823
824 should_skip
825 }
826
827 /// Validates that the last write in the current writer data file matches the ledger.
828 ///
829 /// # Errors
830 ///
831 /// If the current data file is not an empty, and there is an error reading it to perform
832 /// validation, an error variant will be returned that describes the error.
833 ///
834 /// Practically speaking, however, this method will only return I/O-related errors as all
835 /// logical errors, such as the record being invalid, are captured in order to logically adjust
836 /// the writer/ledger state to start a new file, etc.
837 #[instrument(skip(self), level = "debug")]
838 pub(super) async fn validate_last_write(&mut self) -> Result<(), WriterError<T>> {
839 // We don't try validating again after doing so initially.
840 if self.ready_to_write {
841 warn!("Writer already initialized.");
842 return Ok(());
843 }
844
845 debug!(
846 current_writer_data_file = ?self.ledger.get_current_writer_data_file_path(),
847 "Validating last written record in current data file."
848 );
849 self.ensure_ready_for_write().await.context(IoSnafu)?;
850
851 // If our current file is empty, there's no sense doing this check.
852 if self.data_file_size == 0 {
853 self.ready_to_write = true;
854 return Ok(());
855 }
856
857 // We do a neat little trick here where we open an immutable memory-mapped region against our
858 // current writer data file, which lets us treat it as one big buffer... which is useful for
859 // asking `rkyv` to deserialize just the last record from the file, without having to seek
860 // directly to the start of the record where the length delimiter is.
861 let data_file_path = self.ledger.get_current_writer_data_file_path();
862 let data_file_mmap = self
863 .ledger
864 .filesystem()
865 .open_mmap_readable(&data_file_path)
866 .await
867 .context(IoSnafu)?;
868
869 // We have bytes, so we should have an archived record... hopefully! Go through the motions
870 // of verifying it. If we hit any invalid states, then we should bump to the next data file
871 // since the reader will have to stop once it hits the first error in a given file.
872 let should_skip_to_next_file = match validate_record_archive(
873 data_file_mmap.as_ref(),
874 &Hasher::new(),
875 ) {
876 RecordStatus::Valid {
877 id: last_record_id, ..
878 } => {
879 // We now know the record is valid from the perspective of being framed correctly,
880 // and the checksum matching, etc. We'll attempt to actually decode it now so we
881 // can get the actual item that was written, which we need to understand where the
882 // next writer record ID should be.
883 let record = try_as_record_archive(data_file_mmap.as_ref())
884 .expect("record was already validated");
885 let item = decode_record_payload::<T>(record).map_err(|e| {
886 WriterError::FailedToValidate {
887 reason: e.to_string(),
888 }
889 })?;
890
891 // Since we have a valid record, checksum and all, see if the writer record ID
892 // in the ledger lines up with the record ID we have here. Specifically, the record
893 // ID plus the number of events in the record should be the next record ID that gets used.
894 let ledger_next = self.ledger.state().get_next_writer_record_id();
895 let record_events =
896 u64::try_from(item.event_count()).expect("event count should never exceed u64");
897 let record_next = last_record_id.wrapping_add(record_events);
898
899 match ledger_next.cmp(&record_next) {
900 Ordering::Equal => {
901 // We're exactly where the ledger thinks we should be, so nothing to do.
902 debug!(
903 ledger_next,
904 last_record_id,
905 record_events,
906 "Synchronized with ledger. Writer ready."
907 );
908 false
909 }
910 Ordering::Greater => {
911 // Our last write is behind where the ledger thinks we should be, so we
912 // likely missed flushing some records, or partially flushed the data file.
913 // Better roll over to be safe.
914 error!(
915 ledger_next,
916 last_record_id,
917 record_events,
918 "Last record written to data file is behind expected position. Events have likely been lost."
919 );
920 true
921 }
922 Ordering::Less => {
923 // We're actually _ahead_ of the ledger, which is to say we wrote a valid
924 // record to the data file, but never incremented our "writer next record
925 // ID" field. Given that record IDs are monotonic, it's safe to forward
926 // ourselves to make the "writer next record ID" in the ledger match the
927 // reality of the data file. If there were somehow gaps in the data file,
928 // the reader will detect it, and this way, we avoid duplicate record IDs.
929 debug!(
930 ledger_next,
931 last_record_id,
932 record_events,
933 new_ledger_next = record_next,
934 "Ledger desynchronized from data files. Fast forwarding ledger state."
935 );
936 let ledger_record_delta = record_next - ledger_next;
937 let next_record_id = self
938 .ledger
939 .state()
940 .increment_next_writer_record_id(ledger_record_delta);
941 self.next_record_id = next_record_id;
942 self.unflushed_events = 0;
943
944 false
945 }
946 }
947 }
948 // The record payload was corrupted, somehow: we know the checksum failed to match on
949 // both sides, but it could be cosmic radiation that flipped a bit or some process
950 // trampled over the data file... who knows.
951 //
952 // We skip to the next data file to try and start from a clean slate.
953 RecordStatus::Corrupted { .. } => {
954 error!(
955 "Last written record did not match the expected checksum. Corruption likely."
956 );
957 true
958 }
959 // The record itself was corrupted, somehow: it was sufficiently different that `rkyv`
960 // couldn't even validate it, which likely means missing bytes but could also be certain
961 // bytes being invalid for the struct fields they represent. Like invalid checksums, we
962 // really don't know why it happened, only that it happened.
963 //
964 // We skip to the next data file to try and start from a clean slate.
965 RecordStatus::FailedDeserialization(de) => {
966 let reason = de.into_inner();
967 error!(
968 ?reason,
969 "Last written record was unable to be deserialized. Corruption likely."
970 );
971 true
972 }
973 };
974
975 // Reset our internal state, which closes the initial data file we opened, and mark
976 // ourselves as needing to skip to the next data file. This is a little convoluted, but we
977 // need to ensure we follow the normal behavior of trying to open the next data file,
978 // waiting for the reader to delete it if it already exists and hasn't been fully read yet,
979 // etc.
980 //
981 // Essentially, we defer the actual skipping to avoid deadlocking here trying to open a
982 // data file we might not be able to open yet.
983 if should_skip_to_next_file {
984 self.reset();
985 self.mark_for_skip();
986 }
987
988 self.ready_to_write = true;
989
990 Ok(())
991 }
992
993 fn is_buffer_full(&self) -> bool {
994 let total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes;
995 let max_buffer_size = self.config.max_buffer_size;
996 total_buffer_size >= max_buffer_size
997 }
998
999 /// Ensures this writer is ready to attempt writer the next record.
1000 #[instrument(skip(self), level = "debug")]
1001 async fn ensure_ready_for_write(&mut self) -> io::Result<()> {
1002 // Check the overall size of the buffer and figure out if we can write.
1003 loop {
1004 // If we haven't yet exceeded the maximum buffer size, then we can proceed. Likewise, if
1005 // we're still validating our last write, then we know it doesn't matter if the buffer
1006 // is full or not because we're not doing any actual writing here.
1007 //
1008 // Otherwise, wait for the reader to signal that they've made some progress.
1009 if !self.is_buffer_full() || !self.ready_to_write {
1010 break;
1011 }
1012
1013 trace!(
1014 total_buffer_size = self.ledger.get_total_buffer_size() + self.unflushed_bytes,
1015 max_buffer_size = self.config.max_buffer_size,
1016 "Buffer size limit reached. Waiting for reader progress."
1017 );
1018
1019 self.ledger.wait_for_reader().await;
1020 }
1021
1022 // If we already have an open writer, and we have no more space in the data file to write,
1023 // flush and close the file and mark ourselves as needing to open the _next_ data file.
1024 //
1025 // Likewise, if initialization detected an invalid record on the starting data file, and we
1026 // need to skip to the next file, we honor that here.
1027 let mut should_open_next = self.should_skip();
1028 if self.writer.is_some() {
1029 if self.can_write() {
1030 return Ok(());
1031 }
1032
1033 // Our current data file is full, so we need to open a new one. Signal to the loop
1034 // that we we want to try and open the next file, and not the current file,
1035 // essentially to avoid marking the writer as already having moved on to the next
1036 // file before we're sure it isn't already an existing file on disk waiting to be
1037 // read.
1038 //
1039 // We still flush ourselves to disk, etc, to make sure all of the data is there.
1040 should_open_next = true;
1041 self.flush_inner(true).await?;
1042 self.flush_write_state();
1043
1044 self.reset();
1045 }
1046
1047 loop {
1048 // Normally, readers will keep up with the writers, and so there will only ever be a
1049 // single data file or two on disk. If there was an issue with a sink reading from this
1050 // buffer, though, we could conceivably have a stalled reader while the writer
1051 // progresses and continues to create new data file.
1052 //
1053 // At some point, the file ID will wrap around and the writer will want to open a "new"
1054 // file for writing that already exists: a previously-written file that has not been
1055 // read yet.
1056 //
1057 // In order to handle this situation, we loop here, trying to create the file. Readers
1058 // are responsible deleting a file once they have read it entirely, so our first loop
1059 // iteration is the happy path, trying to create the new file. If we can't create it,
1060 // this may be because it already exists and we're just picking up where we left off
1061 // from last time, but it could also be a data file that a reader hasn't completed yet.
1062 let data_file_path = if should_open_next {
1063 self.ledger.get_next_writer_data_file_path()
1064 } else {
1065 self.ledger.get_current_writer_data_file_path()
1066 };
1067
1068 let maybe_data_file = self
1069 .ledger
1070 .filesystem()
1071 .open_file_writable_atomic(&data_file_path)
1072 .await;
1073 let file = match maybe_data_file {
1074 // We were able to create the file, so we're good to proceed.
1075 Ok(data_file) => Some((data_file, 0)),
1076 // We got back an error trying to open the file: might be that it already exists,
1077 // might be something else.
1078 Err(e) => match e.kind() {
1079 ErrorKind::AlreadyExists => {
1080 // We open the file again, without the atomic "create new" behavior. If we
1081 // can do that successfully, we check its length. There's three main
1082 // situations we encounter:
1083 // - the reader may have deleted the data file between the atomic create
1084 // open and this one, and so we would expect the file length to be zero
1085 // - the file still exists, and it's full: the reader may still be reading
1086 // it, or waiting for acknowledgements to be able to delete it
1087 // - it may not be full, which could be because it's the data file the
1088 // writer left off on last time
1089 let data_file = self
1090 .ledger
1091 .filesystem()
1092 .open_file_writable(&data_file_path)
1093 .await?;
1094 let metadata = data_file.metadata().await?;
1095 let file_len = metadata.len();
1096 if file_len == 0 || !should_open_next {
1097 // The file is either empty, which means we created it and "own it" now,
1098 // or it's not empty but we're not skipping to the next file, which can
1099 // only mean that we're still initializing, and so this would be the
1100 // data file we left off writing to.
1101 Some((data_file, file_len))
1102 } else {
1103 // The file isn't empty, and we're not in initialization anymore, which
1104 // means this data file is one that the reader still hasn't finished
1105 // reading through yet, and so we must wait for the reader to delete it
1106 // before we can proceed.
1107 None
1108 }
1109 }
1110 // Legitimate I/O error with the operation, bubble this up.
1111 _ => return Err(e),
1112 },
1113 };
1114
1115 if let Some((data_file, data_file_size)) = file {
1116 // We successfully opened the file and it can be written to.
1117 debug!(
1118 data_file_path = data_file_path.to_string_lossy().as_ref(),
1119 existing_file_size = data_file_size,
1120 "Opened data file for writing."
1121 );
1122
1123 // Make sure the file is flushed to disk, especially if we just created it.
1124 data_file.sync_all().await?;
1125
1126 self.writer = Some(RecordWriter::new(
1127 data_file,
1128 data_file_size,
1129 self.config.write_buffer_size,
1130 self.config.max_data_file_size,
1131 self.config.max_record_size,
1132 ));
1133 self.data_file_size = data_file_size;
1134
1135 // If we opened the "next" data file, we need to increment the current writer
1136 // file ID now to signal that the writer has moved on.
1137 if should_open_next {
1138 self.ledger.state().increment_writer_file_id();
1139 self.ledger.notify_writer_waiters();
1140
1141 debug!(
1142 new_writer_file_id = self.ledger.get_current_writer_file_id(),
1143 "Writer now on new data file."
1144 );
1145 }
1146
1147 return Ok(());
1148 }
1149
1150 // The file is still present and waiting for a reader to finish reading it in order
1151 // to delete it. Wait until the reader signals progress and try again.
1152 debug!("Target data file is still present and not yet processed. Waiting for reader.");
1153 self.ledger.wait_for_reader().await;
1154 }
1155 }
1156
1157 /// Attempts to write a record.
1158 ///
1159 /// If the buffer is currently full, the original record will be immediately returned.
1160 /// Otherwise, a write will be executed, which will run to completion, and `None` will be returned.
1161 ///
1162 /// # Errors
1163 ///
1164 /// If an error occurred while writing the record, an error variant will be returned describing
1165 /// the error.
1166 pub async fn try_write_record(&mut self, record: T) -> Result<Option<T>, WriterError<T>> {
1167 self.try_write_record_inner(record).await.map(Result::err)
1168 }
1169
1170 #[instrument(skip_all, level = "debug")]
1171 async fn try_write_record_inner(
1172 &mut self,
1173 mut record: T,
1174 ) -> Result<Result<usize, T>, WriterError<T>> {
1175 // If the buffer is already full, we definitely can't complete this write.
1176 if self.is_buffer_full() {
1177 return Ok(Err(record));
1178 }
1179
1180 let record_events: NonZeroUsize = record
1181 .event_count()
1182 .try_into()
1183 .map_err(|_| WriterError::EmptyRecord)?;
1184
1185 // Grab the next record ID and attempt to write the record.
1186 let record_id = self.get_next_record_id();
1187
1188 let token = loop {
1189 // Make sure we have an open data file to write to, which might also be us opening the
1190 // next data file because our first attempt at writing had to finalize a data file that
1191 // was already full.
1192 self.ensure_ready_for_write().await.context(IoSnafu)?;
1193
1194 let writer = self
1195 .writer
1196 .as_mut()
1197 .expect("writer should exist after `ensure_ready_for_write`");
1198
1199 // Archive the record, which if it succeeds in terms of encoding, etc, will give us a token that we can use
1200 // to eventually write it to storage. This may fail if the record writer detects it can't fit the archived
1201 // record in the current data file, so we handle that separately. All other errors must be handled by the caller.
1202 match writer.archive_record(record_id, record) {
1203 Ok(token) => break token,
1204 Err(we) => match we {
1205 WriterError::DataFileFull {
1206 record: old_record,
1207 serialized_len,
1208 } => {
1209 // The data file is full, so we need to roll to the next one before attempting
1210 // the write again. We also recapture the record for the next write attempt.
1211 self.mark_data_file_full();
1212 record = old_record;
1213
1214 debug!(
1215 current_data_file_size = self.data_file_size,
1216 max_data_file_size = self.config.max_data_file_size,
1217 last_attempted_write_size = serialized_len,
1218 "Current data file reached maximum size. Rolling to the next data file."
1219 );
1220 }
1221 e => return Err(e),
1222 },
1223 }
1224 };
1225
1226 // Now that we know the record was archived successfully -- record wasn't too large, etc -- we actually need
1227 // to check if it will fit based on our current buffer size. If not, we recover the record from the writer's
1228 // internal buffers, as we haven't yet flushed it, and we return it to the caller.
1229 //
1230 // Otherwise, we proceed with flushing like we normally would.
1231 let can_write_record = self.can_write_record(token.serialized_len());
1232 let writer = self
1233 .writer
1234 .as_mut()
1235 .expect("writer should exist after `ensure_ready_for_write`");
1236
1237 let (bytes_written, flush_result) = if can_write_record {
1238 // We always return errors here because flushing the record won't return a recoverable error like
1239 // `DataFileFull`, as that gets checked during archiving.
1240 writer.flush_record(token).await?
1241 } else {
1242 // The record would not fit given the current size of the buffer, so we need to recover it from the
1243 // writer and hand it back. This looks a little weird because we want to surface deserialize/decoding
1244 // errors if we encounter them, but if we recover the record successfully, we're returning
1245 // `Ok(Err(record))` to signal that our attempt failed but the record is able to be retried again later.
1246 return Ok(Err(writer.recover_archived_record(&token)?));
1247 };
1248
1249 // Track our write since things appear to have succeeded. This only updates our internal
1250 // state as we have not yet authoritatively flushed the write to the data file. This tracks
1251 // not only how many bytes we have buffered, but also how many events, which in turn drives
1252 // record ID generation. We do this after the write appears to succeed to avoid issues with
1253 // setting the ledger state to a record ID that we may never have actually written, which
1254 // could lead to record ID gaps.
1255 self.track_write(record_events.get(), bytes_written as u64);
1256
1257 // If we did flush some buffered writes during this write, however, we now compensate for
1258 // that after updating our internal state. We'll also notify the reader, too, since the
1259 // data should be available to read:
1260 if let Some(flush_result) = flush_result {
1261 self.flush_write_state_partial(flush_result.events_flushed, flush_result.bytes_flushed);
1262 self.ledger.notify_writer_waiters();
1263 }
1264
1265 trace!(
1266 record_id,
1267 record_events,
1268 bytes_written,
1269 data_file_id = self.ledger.get_current_writer_file_id(),
1270 "Wrote record."
1271 );
1272
1273 Ok(Ok(bytes_written))
1274 }
1275
1276 /// Writes a record.
1277 ///
1278 /// If the record was written successfully, the number of bytes written to the data file will be
1279 /// returned.
1280 ///
1281 /// # Errors
1282 ///
1283 /// If an error occurred while writing the record, an error variant will be returned describing
1284 /// the error.
1285 #[instrument(skip_all, level = "debug")]
1286 pub async fn write_record(&mut self, mut record: T) -> Result<usize, WriterError<T>> {
1287 loop {
1288 match self.try_write_record_inner(record).await? {
1289 Ok(bytes_written) => return Ok(bytes_written),
1290 Err(old_record) => {
1291 record = old_record;
1292 self.ledger.wait_for_reader().await;
1293 }
1294 }
1295 }
1296 }
1297
1298 #[instrument(skip(self), level = "debug")]
1299 async fn flush_inner(&mut self, force_full_flush: bool) -> io::Result<()> {
1300 // We always flush the `BufWriter` when this is called, but we don't always flush to disk or
1301 // flush the ledger. This is enough for readers on Linux since the file ends up in the page
1302 // cache, as we don't do any O_DIRECT fanciness, and the new contents can be immediately
1303 // read.
1304 //
1305 // TODO: Windows has a page cache as well, and macOS _should_, but we should verify this
1306 // behavior works on those platforms as well.
1307 if let Some(writer) = self.writer.as_mut() {
1308 writer.flush().await?;
1309 self.ledger.notify_writer_waiters();
1310 }
1311
1312 if self.ledger.should_flush() || force_full_flush {
1313 if let Some(writer) = self.writer.as_mut() {
1314 writer.sync_all().await?;
1315 }
1316
1317 self.ledger.flush()
1318 } else {
1319 Ok(())
1320 }
1321 }
1322
1323 /// Flushes the writer.
1324 ///
1325 /// This must be called for the reader to be able to make progress.
1326 ///
1327 /// This does not ensure that the data is fully synchronized (i.e. `fsync`) to disk, however it
1328 /// may sometimes perform a full synchronization if the time since the last full synchronization
1329 /// occurred has exceeded a configured limit.
1330 ///
1331 /// # Errors
1332 ///
1333 /// If there is an error while flushing either the current data file or the ledger, an error
1334 /// variant will be returned describing the error.
1335 #[instrument(skip(self), level = "trace")]
1336 pub async fn flush(&mut self) -> io::Result<()> {
1337 self.flush_inner(false).await?;
1338 self.flush_write_state();
1339 Ok(())
1340 }
1341}
1342
1343impl<T, FS> BufferWriter<T, FS>
1344where
1345 FS: Filesystem,
1346 FS::File: Unpin,
1347{
1348 /// Closes this [`Writer`], marking it as done.
1349 ///
1350 /// Closing the writer signals to the reader that no more records will be written until the
1351 /// buffer is reopened. Writers and readers effectively share a "session", so until the writer
1352 /// and reader both close, the buffer cannot be reopened by another Vector instance.
1353 ///
1354 /// In turn, the reader is able to know that when the writer is marked as done, and it cannot
1355 /// read any more data, that nothing else is actually coming, and it can terminate by beginning
1356 /// to return `None`.
1357 #[instrument(skip(self), level = "trace")]
1358 pub fn close(&mut self) {
1359 if self.ledger.mark_writer_done() {
1360 debug!("Writer marked as closed.");
1361 self.ledger.notify_writer_waiters();
1362 }
1363 }
1364}
1365
1366impl<T, FS> Drop for BufferWriter<T, FS>
1367where
1368 FS: Filesystem,
1369 FS::File: Unpin,
1370{
1371 fn drop(&mut self) {
1372 self.close();
1373 }
1374}