vector_buffers/variants/disk_v2/mod.rs
1//! # Disk buffer v2.
2//!
3//! This disk buffer implementation focuses on a simplistic on-disk format with minimal
4//! reader/writer coordination, and no exotic I/O techniques, such that the buffer is easy to write
5//! to and read from and can provide simplistic, but reliable, recovery mechanisms when errors or
6//! corruption are encountered.
7//!
8//! ## Design constraints
9//!
10//! These constraints, or more often, invariants, are the groundwork for ensuring that the design
11//! can stay simple and understandable:
12//!
13//! - data files do not exceed 128MB
14//! - no more than 65,536 data files can exist at any given time
15//! - buffer can grow to a maximum of ~8TB in total size (65k files * 128MB)
16//! - all records are checksummed (CRC32C)
17//! - all records are written sequentially/contiguously, and do not span over multiple data files
18//! - writers create and write to data files, while readers read from and delete data files
19//! - endianness of the files is based on the host system (we don't support loading the buffer files
20//! on a system with different endianness)
21//!
22//! ## High-level design
23//!
24//! ### Records
25//!
26//! A record is an length-prefixed payload, where an arbitrary number of bytes are contained,
27//! alongside a monotonically increasing ID, and protected by a CRC32C checksum. Since a record
28//! simply stores opaque bytes, one or more events can be stored per record.
29//!
30//! The writer assigns record IDs based on number of events written to a record, such that a record
31//! ID of N can be determined to contain M-N events, where M is the record ID of the next record.
32//!
33//! #### On-disk format
34//!
35//! Records are represented by the following pseudo-structure:
36//!
37//! ```text
38//! record:
39//! record_len: uint64
40//! checksum: uint32(CRC32C of record_id + payload)
41//! record_id: uint64
42//! payload: uint8[record_len]
43//! ```
44//!
45//! We say "pseudo-structure" as a helper serialization library, [`rkyv`][rkyv], is used to handle
46//! serialization, and zero-copy deserialization, of records. This effectively adds some amount of
47//! padding to record fields, due to the need to structure record field data in a way that makes it
48//! transparent to access during zero-copy deserialization, when the raw buffer of a record that was
49//! read is able to be accessed as if it was a native Rust type/value.
50//!
51//! While this padding/overhead is small, and fixed, we do not quantify it here as it can
52//! potentially changed based on the payload that a record contains. The only safe way to access the
53//! records in a disk buffer should be through the reader/writer interface in this module.
54//!
55//! ### Data files
56//!
57//! Data files contain the buffered records and nothing else. Records are written
58//! sequentially/contiguously, and are not padded out to meet a minimum block/write size, except for
59//! internal padding requirements of the serialization library used.
60//!
61//! Data files have a maximum size, configured statically within a given Vector binary, which can
62//! never be exceeded: if a write would cause a data file to grow past the maximum file size, it
63//! must be written to the next data file.
64//!
65//! A maximum number of 65,536 data files can exist at any given time, due to the inclusion of a
66//! file ID in the data file name, which is represented by a 16-bit unsigned integer.
67//!
68//! ### Ledger
69//!
70//! The ledger is a small file which tracks two important items for both the reader and writer:
71//! which data file they're currently reading or writing to, and what record ID they left off on.
72//!
73//! The ledger is read during buffer initialization to determine a reader should pick up reading
74//! from, but is also used to attempt to detect where a writer left off, and if records are missing
75//! from the current writer data file according to what the writer believes it did (as in
76//! write/flush bytes to disk) and what the reality is, based on the actual data in the current
77//! writer data file.
78//!
79//! The ledger is a memory-mapped file that is updated atomically in terms of its fields, but is not
80//! updated atomically in terms of reader/writer activity.
81//!
82//! #### On-disk format
83//!
84//! Like records, the ledger file consists of a simplified structure that is optimized for being shared
85//! via a memory-mapped file interface between the reader and writer.
86//!
87//! ```text
88//! buffer.db:
89//! writer_next_record_id: uint64
90//! writer_current_data_file_id: uint16
91//! reader_current_data_file_id: uint16
92//! reader_last_record_id: uint64
93//! ```
94//!
95//! As the disk buffer structure is meant to emulate a ring buffer, most of the bookkeeping resolves
96//! around the writer and reader being able to quickly figure out where they left off. Record and
97//! data file IDs are simply rolled over when they reach the maximum of their data type, and are
98//! incremented monotonically as new data files are created, rather than trying to always allocate
99//! from the lowest available ID.
100//!
101//! ## Buffer operation
102//!
103//! ### Writing records
104//!
105//! As mentioned above, records are added to a data file sequentially, and contiguously, with no
106//! gaps or data alignment adjustments, excluding the padding/alignment used by `rkyv` itself to
107//! allow for zero-copy deserialization. This continues until adding another would exceed the
108//! configured data file size limit. When this occurs, the current data file is flushed and
109//! synchronized to disk, and a new data file will be opened.
110//!
111//! If the number of data files on disk exceeds the maximum (65,536), or if the total data file size
112//! limit is exceeded, the writer will wait until enough space has been freed such that the record
113//! can be written. As data files are only deleted after being read entirely, this means that space
114//! is recovered in increments of the target data file size, which is 128MB. Thus, the minimum size
115//! for a buffer must be equal to or greater than the target size of a single data file.
116//! Additionally, as data files are uniquely named based on an incrementing integer, of which will
117//! wrap around at 65,536 (2^16), the maximum data file size in total for a given buffer is ~8TB (6
118//! 5k files * 128MB).
119//!
120//! ### Reading records
121//!
122//! Due to the on-disk layout, reading records is an incredibly straight-forward progress: we open a
123//! file, read it until there's no more data and we know the writer is done writing to the file, and
124//! then we open the next one, and repeat the process.
125//!
126//! ### Deleting acknowledged records
127//!
128//! As the reader emits records, we cannot yet consider them fully processed until they are
129//! acknowledged. The acknowledgement process is tied into the normal acknowledgement machinery, and
130//! the reader collects and processes those acknowledgements incrementally as reads occur.
131//!
132//! When all records from a data file have been fully acknowledged, the data file is scheduled for
133//! deletion. We only delete entire data files, rather than truncating them piecemeal, which reduces
134//! the I/O burden of the buffer. This does mean, however, that a data file will stick around until
135//! it is entirely processed and acknowledged. We compensate for this fact in the buffer
136//! configuration by adjusting the logical buffer size based on when records are acknowledged, so
137//! that the writer can make progress as records are acknowledged, even if the buffer is close to,
138//! or at the maximum buffer size limit.
139//!
140//! ### Record ID generation, and its relation of events
141//!
142//! While the buffer talks a lot about writing "records", records are ostensibly a single event, or
143//! collection of events. We manage the organization and grouping of events at a higher level
144//! (i.e. `EventArray`), but we're still required to confront this fact at the buffer layer. In
145//! order to maintain as little extra metadata as possible as records, and within the ledger, we
146//! encode the number of events in a buffer into the record ID. We do this by using the value
147//! returned by `EventCount::event_count` on a per-record basis.
148//!
149//! For example, a fresh buffer starts at a record ID of 1 for the writer: that is, the next write
150//! will start at 1. If we write a record that contains 10 events, we add that event count to the
151//! record ID we started from, which gives us 11. The next record write will start at 11, and the
152//! pattern continues going forward.
153//!
154//! The other reason we do this is to allow us to quickly and easily determine how many events exist
155//! in a buffer. Since we have the invariant of knowing that record IDs are tied, in a way, to event
156//! count, we can quickly and easily find the first and last unread record in the buffer, and do
157//! simple subtraction to calculate how many events we have outstanding. While there is logic that
158//! handles corrupted records, or other corner case errors, the core premise, and logic, follows
159//! this pattern.
160//!
161//! We need to track our reader progress, both in the form of how much data we've read in this data
162//! file, as well as the record ID. This is required not only for ensuring our general buffer
163//! accounting (event count, buffer size, etc) is accurate, but also to be able to handle corrupted
164//! records.
165//!
166//! We make sure to track enough information such that when we encounter a corrupted record, or if
167//! we skip records due to missing data, we can figure out how many events we've dropped or lost,
168//! and handle the necessary adjustments to the buffer accounting.
169//!
170//! [rkyv]: https://docs.rs/rkyv
171
172use core::fmt;
173use std::{
174 error::Error,
175 marker::PhantomData,
176 num::NonZeroU64,
177 path::{Path, PathBuf},
178 sync::Arc,
179};
180
181use async_trait::async_trait;
182use snafu::{ResultExt, Snafu};
183use vector_common::finalization::Finalizable;
184
185mod backed_archive;
186mod common;
187mod io;
188mod ledger;
189mod reader;
190mod record;
191mod ser;
192mod writer;
193
194#[cfg(test)]
195mod tests;
196
197use self::ledger::Ledger;
198pub use self::{
199 common::{DiskBufferConfig, DiskBufferConfigBuilder},
200 io::{Filesystem, ProductionFilesystem},
201 ledger::LedgerLoadCreateError,
202 reader::{BufferReader, ReaderError},
203 writer::{BufferWriter, WriterError},
204};
205use crate::{
206 buffer_usage_data::BufferUsageHandle,
207 topology::{
208 builder::IntoBuffer,
209 channel::{ReceiverAdapter, SenderAdapter},
210 },
211 Bufferable,
212};
213
214/// Error that occurred when creating/loading a disk buffer.
215#[derive(Debug, Snafu)]
216pub enum BufferError<T>
217where
218 T: Bufferable,
219{
220 /// Failed to create/load the ledger.
221 #[snafu(display("failed to load/create ledger: {}", source))]
222 LedgerError { source: LedgerLoadCreateError },
223
224 /// Failed to initialize/catch the reader up to where it left off.
225 #[snafu(display("failed to seek to position where reader left off: {}", source))]
226 ReaderSeekFailed { source: ReaderError<T> },
227
228 /// Failed to initialize/catch the writer up to where it left off.
229 #[snafu(display("failed to seek to position where writer left off: {}", source))]
230 WriterSeekFailed { source: WriterError<T> },
231}
232
233/// Helper type for creating a disk buffer.
234pub struct Buffer<T> {
235 _t: PhantomData<T>,
236}
237
238impl<T> Buffer<T>
239where
240 T: Bufferable,
241{
242 #[cfg_attr(test, instrument(skip(config, usage_handle), level = "trace"))]
243 pub(crate) async fn from_config_inner<FS>(
244 config: DiskBufferConfig<FS>,
245 usage_handle: BufferUsageHandle,
246 ) -> Result<(BufferWriter<T, FS>, BufferReader<T, FS>, Arc<Ledger<FS>>), BufferError<T>>
247 where
248 FS: Filesystem + fmt::Debug + Clone + 'static,
249 FS::File: Unpin,
250 {
251 let ledger = Ledger::load_or_create(config, usage_handle)
252 .await
253 .context(LedgerSnafu)?;
254 let ledger = Arc::new(ledger);
255
256 let mut writer = BufferWriter::new(Arc::clone(&ledger));
257 writer
258 .validate_last_write()
259 .await
260 .context(WriterSeekFailedSnafu)?;
261
262 let finalizer = Arc::clone(&ledger).spawn_finalizer();
263
264 let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer);
265 reader
266 .seek_to_next_record()
267 .await
268 .context(ReaderSeekFailedSnafu)?;
269
270 ledger.synchronize_buffer_usage();
271
272 Ok((writer, reader, ledger))
273 }
274
275 /// Creates a new disk buffer from the given [`DiskBufferConfig`].
276 ///
277 /// If successful, a [`Writer`] and [`Reader`] value, representing the write/read sides of the
278 /// buffer, respectively, will be returned. Records are considered durably processed and able
279 /// to be deleted from the buffer when they are dropped by the reader, via event finalization.
280 ///
281 /// # Errors
282 ///
283 /// If an error occurred during the creation or loading of the disk buffer, an error variant
284 /// will be returned describing the error.
285 #[cfg_attr(test, instrument(skip(config, usage_handle), level = "trace"))]
286 pub async fn from_config<FS>(
287 config: DiskBufferConfig<FS>,
288 usage_handle: BufferUsageHandle,
289 ) -> Result<(BufferWriter<T, FS>, BufferReader<T, FS>), BufferError<T>>
290 where
291 FS: Filesystem + fmt::Debug + Clone + 'static,
292 FS::File: Unpin,
293 {
294 let (writer, reader, _) = Self::from_config_inner(config, usage_handle).await?;
295
296 Ok((writer, reader))
297 }
298}
299
300pub struct DiskV2Buffer {
301 id: String,
302 data_dir: PathBuf,
303 max_size: NonZeroU64,
304}
305
306impl DiskV2Buffer {
307 pub fn new(id: String, data_dir: PathBuf, max_size: NonZeroU64) -> Self {
308 Self {
309 id,
310 data_dir,
311 max_size,
312 }
313 }
314}
315
316#[async_trait]
317impl<T> IntoBuffer<T> for DiskV2Buffer
318where
319 T: Bufferable + Clone + Finalizable,
320{
321 fn provides_instrumentation(&self) -> bool {
322 true
323 }
324
325 async fn into_buffer_parts(
326 self: Box<Self>,
327 usage_handle: BufferUsageHandle,
328 ) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>> {
329 let (writer, reader) = build_disk_v2_buffer(
330 usage_handle,
331 &self.data_dir,
332 self.id.as_str(),
333 self.max_size,
334 )
335 .await?;
336
337 Ok((writer.into(), reader.into()))
338 }
339}
340
341async fn build_disk_v2_buffer<T>(
342 usage_handle: BufferUsageHandle,
343 data_dir: &Path,
344 id: &str,
345 max_size: NonZeroU64,
346) -> Result<
347 (
348 BufferWriter<T, ProductionFilesystem>,
349 BufferReader<T, ProductionFilesystem>,
350 ),
351 Box<dyn Error + Send + Sync>,
352>
353where
354 T: Bufferable + Clone,
355{
356 usage_handle.set_buffer_limits(Some(max_size.get()), None);
357
358 let buffer_path = get_disk_v2_data_dir_path(data_dir, id);
359 let config = DiskBufferConfigBuilder::from_path(buffer_path)
360 .max_buffer_size(max_size.get())
361 .build()?;
362 Buffer::from_config(config, usage_handle)
363 .await
364 .map_err(Into::into)
365}
366
367pub(crate) fn get_disk_v2_data_dir_path(base_dir: &Path, buffer_id: &str) -> PathBuf {
368 base_dir.join("buffer").join("v2").join(buffer_id)
369}