vector_buffers/variants/disk_v2/
mod.rs

1//! # Disk buffer v2.
2//!
3//! This disk buffer implementation focuses on a simplistic on-disk format with minimal
4//! reader/writer coordination, and no exotic I/O techniques, such that the buffer is easy to write
5//! to and read from and can provide simplistic, but reliable, recovery mechanisms when errors or
6//! corruption are encountered.
7//!
8//! ## Design constraints
9//!
10//! These constraints, or more often, invariants, are the groundwork for ensuring that the design
11//! can stay simple and understandable:
12//!
13//! - data files do not exceed 128MB
14//! - no more than 65,536 data files can exist at any given time
15//! - buffer can grow to a maximum of ~8TB in total size (65k files * 128MB)
16//! - all records are checksummed (CRC32C)
17//! - all records are written sequentially/contiguously, and do not span over multiple data files
18//! - writers create and write to data files, while readers read from and delete data files
19//! - endianness of the files is based on the host system (we don't support loading the buffer files
20//!   on a system with different endianness)
21//!
22//! ## High-level design
23//!
24//! ### Records
25//!
26//! A record is an length-prefixed payload, where an arbitrary number of bytes are contained,
27//! alongside a monotonically increasing ID, and protected by a CRC32C checksum. Since a record
28//! simply stores opaque bytes, one or more events can be stored per record.
29//!
30//! The writer assigns record IDs based on number of events written to a record, such that a record
31//! ID of N can be determined to contain M-N events, where M is the record ID of the next record.
32//!
33//! #### On-disk format
34//!
35//! Records are represented by the following pseudo-structure:
36//!
37//! ```text
38//! record:
39//!   record_len: uint64
40//!   checksum:   uint32(CRC32C of record_id + payload)
41//!   record_id:  uint64
42//!   payload:    uint8[record_len]
43//! ```
44//!
45//! We say "pseudo-structure" as a helper serialization library, [`rkyv`][rkyv], is used to handle
46//! serialization, and zero-copy deserialization, of records. This effectively adds some amount of
47//! padding to record fields, due to the need to structure record field data in a way that makes it
48//! transparent to access during zero-copy deserialization, when the raw buffer of a record that was
49//! read is able to be accessed as if it was a native Rust type/value.
50//!
51//! While this padding/overhead is small, and fixed, we do not quantify it here as it can
52//! potentially changed based on the payload that a record contains. The only safe way to access the
53//! records in a disk buffer should be through the reader/writer interface in this module.
54//!
55//! ### Data files
56//!
57//! Data files contain the buffered records and nothing else. Records are written
58//! sequentially/contiguously, and are not padded out to meet a minimum block/write size, except for
59//! internal padding requirements of the serialization library used.
60//!
61//! Data files have a maximum size, configured statically within a given Vector binary, which can
62//! never be exceeded: if a write would cause a data file to grow past the maximum file size, it
63//! must be written to the next data file.
64//!
65//! A maximum number of 65,536 data files can exist at any given time, due to the inclusion of a
66//! file ID in the data file name, which is represented by a 16-bit unsigned integer.
67//!
68//! ### Ledger
69//!
70//! The ledger is a small file which tracks two important items for both the reader and writer:
71//! which data file they're currently reading or writing to, and what record ID they left off on.
72//!
73//! The ledger is read during buffer initialization to determine a reader should pick up reading
74//! from, but is also used to attempt to detect where a writer left off, and if records are missing
75//! from the current writer data file according to what the writer believes it did (as in
76//! write/flush bytes to disk) and what the reality is, based on the actual data in the current
77//! writer data file.
78//!
79//! The ledger is a memory-mapped file that is updated atomically in terms of its fields, but is not
80//! updated atomically in terms of reader/writer activity.
81//!
82//! #### On-disk format
83//!
84//! Like records, the ledger file consists of a simplified structure that is optimized for being shared
85//! via a memory-mapped file interface between the reader and writer.
86//!
87//! ```text
88//! buffer.db:
89//!   writer_next_record_id:       uint64
90//!   writer_current_data_file_id: uint16
91//!   reader_current_data_file_id: uint16
92//!   reader_last_record_id:       uint64
93//! ```
94//!
95//! As the disk buffer structure is meant to emulate a ring buffer, most of the bookkeeping resolves
96//! around the writer and reader being able to quickly figure out where they left off. Record and
97//! data file IDs are simply rolled over when they reach the maximum of their data type, and are
98//! incremented monotonically as new data files are created, rather than trying to always allocate
99//! from the lowest available ID.
100//!
101//! ## Buffer operation
102//!
103//! ### Writing records
104//!
105//! As mentioned above, records are added to a data file sequentially, and contiguously, with no
106//! gaps or data alignment adjustments, excluding the padding/alignment used by `rkyv` itself to
107//! allow for zero-copy deserialization. This continues until adding another would exceed the
108//! configured data file size limit. When this occurs, the current data file is flushed and
109//! synchronized to disk, and a new data file will be opened.
110//!
111//! If the number of data files on disk exceeds the maximum (65,536), or if the total data file size
112//! limit is exceeded, the writer will wait until enough space has been freed such that the record
113//! can be written. As data files are only deleted after being read entirely, this means that space
114//! is recovered in increments of the target data file size, which is 128MB. Thus, the minimum size
115//! for a buffer must be equal to or greater than the target size of a single data file.
116//! Additionally, as data files are uniquely named based on an incrementing integer, of which will
117//! wrap around at 65,536 (2^16), the maximum data file size in total for a given buffer is ~8TB (6
118//! 5k files * 128MB).
119//!
120//! ### Reading records
121//!
122//! Due to the on-disk layout, reading records is an incredibly straight-forward progress: we open a
123//! file, read it until there's no more data and we know the writer is done writing to the file, and
124//! then we open the next one, and repeat the process.
125//!
126//! ### Deleting acknowledged records
127//!
128//! As the reader emits records, we cannot yet consider them fully processed until they are
129//! acknowledged. The acknowledgement process is tied into the normal acknowledgement machinery, and
130//! the reader collects and processes those acknowledgements incrementally as reads occur.
131//!
132//! When all records from a data file have been fully acknowledged, the data file is scheduled for
133//! deletion. We only delete entire data files, rather than truncating them piecemeal, which reduces
134//! the I/O burden of the buffer. This does mean, however, that a data file will stick around until
135//! it is entirely processed and acknowledged. We compensate for this fact in the buffer
136//! configuration by adjusting the logical buffer size based on when records are acknowledged, so
137//! that the writer can make progress as records are acknowledged, even if the buffer is close to,
138//! or at the maximum buffer size limit.
139//!
140//! ### Record ID generation, and its relation of events
141//!
142//! While the buffer talks a lot about writing "records", records are ostensibly a single event, or
143//! collection of events. We manage the organization and grouping of events at a higher level
144//! (i.e. `EventArray`), but we're still required to confront this fact at the buffer layer. In
145//! order to maintain as little extra metadata as possible as records, and within the ledger, we
146//! encode the number of events in a buffer into the record ID. We do this by using the value
147//! returned by `EventCount::event_count` on a per-record basis.
148//!
149//! For example, a fresh buffer starts at a record ID of 1 for the writer: that is, the next write
150//! will start at 1. If we write a record that contains 10 events, we add that event count to the
151//! record ID we started from, which gives us 11. The next record write will start at 11, and the
152//! pattern continues going forward.
153//!
154//! The other reason we do this is to allow us to quickly and easily determine how many events exist
155//! in a buffer. Since we have the invariant of knowing that record IDs are tied, in a way, to event
156//! count, we can quickly and easily find the first and last unread record in the buffer, and do
157//! simple subtraction to calculate how many events we have outstanding. While there is logic that
158//! handles corrupted records, or other corner case errors, the core premise, and logic, follows
159//! this pattern.
160//!
161//! We need to track our reader progress, both in the form of how much data we've read in this data
162//! file, as well as the record ID. This is required not only for ensuring our general buffer
163//! accounting (event count, buffer size, etc) is accurate, but also to be able to handle corrupted
164//! records.
165//!
166//! We make sure to track enough information such that when we encounter a corrupted record, or if
167//! we skip records due to missing data, we can figure out how many events we've dropped or lost,
168//! and handle the necessary adjustments to the buffer accounting.
169//!
170//! [rkyv]: https://docs.rs/rkyv
171
172use core::fmt;
173use std::{
174    error::Error,
175    marker::PhantomData,
176    num::NonZeroU64,
177    path::{Path, PathBuf},
178    sync::Arc,
179};
180
181use async_trait::async_trait;
182use snafu::{ResultExt, Snafu};
183use vector_common::finalization::Finalizable;
184
185mod backed_archive;
186mod common;
187mod io;
188mod ledger;
189mod reader;
190mod record;
191mod ser;
192mod writer;
193
194#[cfg(test)]
195mod tests;
196
197use self::ledger::Ledger;
198pub use self::{
199    common::{DiskBufferConfig, DiskBufferConfigBuilder},
200    io::{Filesystem, ProductionFilesystem},
201    ledger::LedgerLoadCreateError,
202    reader::{BufferReader, ReaderError},
203    writer::{BufferWriter, WriterError},
204};
205use crate::{
206    buffer_usage_data::BufferUsageHandle,
207    topology::{
208        builder::IntoBuffer,
209        channel::{ReceiverAdapter, SenderAdapter},
210    },
211    Bufferable,
212};
213
214/// Error that occurred when creating/loading a disk buffer.
215#[derive(Debug, Snafu)]
216pub enum BufferError<T>
217where
218    T: Bufferable,
219{
220    /// Failed to create/load the ledger.
221    #[snafu(display("failed to load/create ledger: {}", source))]
222    LedgerError { source: LedgerLoadCreateError },
223
224    /// Failed to initialize/catch the reader up to where it left off.
225    #[snafu(display("failed to seek to position where reader left off: {}", source))]
226    ReaderSeekFailed { source: ReaderError<T> },
227
228    /// Failed to initialize/catch the writer up to where it left off.
229    #[snafu(display("failed to seek to position where writer left off: {}", source))]
230    WriterSeekFailed { source: WriterError<T> },
231}
232
233/// Helper type for creating a disk buffer.
234pub struct Buffer<T> {
235    _t: PhantomData<T>,
236}
237
238impl<T> Buffer<T>
239where
240    T: Bufferable,
241{
242    #[cfg_attr(test, instrument(skip(config, usage_handle), level = "trace"))]
243    pub(crate) async fn from_config_inner<FS>(
244        config: DiskBufferConfig<FS>,
245        usage_handle: BufferUsageHandle,
246    ) -> Result<(BufferWriter<T, FS>, BufferReader<T, FS>, Arc<Ledger<FS>>), BufferError<T>>
247    where
248        FS: Filesystem + fmt::Debug + Clone + 'static,
249        FS::File: Unpin,
250    {
251        let ledger = Ledger::load_or_create(config, usage_handle)
252            .await
253            .context(LedgerSnafu)?;
254        let ledger = Arc::new(ledger);
255
256        let mut writer = BufferWriter::new(Arc::clone(&ledger));
257        writer
258            .validate_last_write()
259            .await
260            .context(WriterSeekFailedSnafu)?;
261
262        let finalizer = Arc::clone(&ledger).spawn_finalizer();
263
264        let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer);
265        reader
266            .seek_to_next_record()
267            .await
268            .context(ReaderSeekFailedSnafu)?;
269
270        ledger.synchronize_buffer_usage();
271
272        Ok((writer, reader, ledger))
273    }
274
275    /// Creates a new disk buffer from the given [`DiskBufferConfig`].
276    ///
277    /// If successful, a [`Writer`] and [`Reader`] value, representing the write/read sides of the
278    /// buffer, respectively, will be returned. Records are considered durably processed and able
279    /// to be deleted from the buffer when they are dropped by the reader, via event finalization.
280    ///
281    /// # Errors
282    ///
283    /// If an error occurred during the creation or loading of the disk buffer, an error variant
284    /// will be returned describing the error.
285    #[cfg_attr(test, instrument(skip(config, usage_handle), level = "trace"))]
286    pub async fn from_config<FS>(
287        config: DiskBufferConfig<FS>,
288        usage_handle: BufferUsageHandle,
289    ) -> Result<(BufferWriter<T, FS>, BufferReader<T, FS>), BufferError<T>>
290    where
291        FS: Filesystem + fmt::Debug + Clone + 'static,
292        FS::File: Unpin,
293    {
294        let (writer, reader, _) = Self::from_config_inner(config, usage_handle).await?;
295
296        Ok((writer, reader))
297    }
298}
299
300pub struct DiskV2Buffer {
301    id: String,
302    data_dir: PathBuf,
303    max_size: NonZeroU64,
304}
305
306impl DiskV2Buffer {
307    pub fn new(id: String, data_dir: PathBuf, max_size: NonZeroU64) -> Self {
308        Self {
309            id,
310            data_dir,
311            max_size,
312        }
313    }
314}
315
316#[async_trait]
317impl<T> IntoBuffer<T> for DiskV2Buffer
318where
319    T: Bufferable + Clone + Finalizable,
320{
321    fn provides_instrumentation(&self) -> bool {
322        true
323    }
324
325    async fn into_buffer_parts(
326        self: Box<Self>,
327        usage_handle: BufferUsageHandle,
328    ) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>> {
329        let (writer, reader) = build_disk_v2_buffer(
330            usage_handle,
331            &self.data_dir,
332            self.id.as_str(),
333            self.max_size,
334        )
335        .await?;
336
337        Ok((writer.into(), reader.into()))
338    }
339}
340
341async fn build_disk_v2_buffer<T>(
342    usage_handle: BufferUsageHandle,
343    data_dir: &Path,
344    id: &str,
345    max_size: NonZeroU64,
346) -> Result<
347    (
348        BufferWriter<T, ProductionFilesystem>,
349        BufferReader<T, ProductionFilesystem>,
350    ),
351    Box<dyn Error + Send + Sync>,
352>
353where
354    T: Bufferable + Clone,
355{
356    usage_handle.set_buffer_limits(Some(max_size.get()), None);
357
358    let buffer_path = get_disk_v2_data_dir_path(data_dir, id);
359    let config = DiskBufferConfigBuilder::from_path(buffer_path)
360        .max_buffer_size(max_size.get())
361        .build()?;
362    Buffer::from_config(config, usage_handle)
363        .await
364        .map_err(Into::into)
365}
366
367pub(crate) fn get_disk_v2_data_dir_path(base_dir: &Path, buffer_id: &str) -> PathBuf {
368    base_dir.join("buffer").join("v2").join(buffer_id)
369}