1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369
//! # Disk buffer v2.
//!
//! This disk buffer implementation focuses on a simplistic on-disk format with minimal
//! reader/writer coordination, and no exotic I/O techniques, such that the buffer is easy to write
//! to and read from and can provide simplistic, but reliable, recovery mechanisms when errors or
//! corruption are encountered.
//!
//! ## Design constraints
//!
//! These constraints, or more often, invariants, are the groundwork for ensuring that the design
//! can stay simple and understandable:
//!
//! - data files do not exceed 128MB
//! - no more than 65,536 data files can exist at any given time
//! - buffer can grow to a maximum of ~8TB in total size (65k files * 128MB)
//! - all records are checksummed (CRC32C)
//! - all records are written sequentially/contiguously, and do not span over multiple data files
//! - writers create and write to data files, while readers read from and delete data files
//! - endianness of the files is based on the host system (we don't support loading the buffer files
//! on a system with different endianness)
//!
//! ## High-level design
//!
//! ### Records
//!
//! A record is an length-prefixed payload, where an arbitrary number of bytes are contained,
//! alongside a monotonically increasing ID, and protected by a CRC32C checksum. Since a record
//! simply stores opaque bytes, one or more events can be stored per record.
//!
//! The writer assigns record IDs based on number of events written to a record, such that a record
//! ID of N can be determined to contain M-N events, where M is the record ID of the next record.
//!
//! #### On-disk format
//!
//! Records are represented by the following pseudo-structure:
//!
//! ```text
//! record:
//! record_len: uint64
//! checksum: uint32(CRC32C of record_id + payload)
//! record_id: uint64
//! payload: uint8[record_len]
//! ```
//!
//! We say "pseudo-structure" as a helper serialization library, [`rkyv`][rkyv], is used to handle
//! serialization, and zero-copy deserialization, of records. This effectively adds some amount of
//! padding to record fields, due to the need to structure record field data in a way that makes it
//! transparent to access during zero-copy deserialization, when the raw buffer of a record that was
//! read is able to be accessed as if it was a native Rust type/value.
//!
//! While this padding/overhead is small, and fixed, we do not quantify it here as it can
//! potentially changed based on the payload that a record contains. The only safe way to access the
//! records in a disk buffer should be through the reader/writer interface in this module.
//!
//! ### Data files
//!
//! Data files contain the buffered records and nothing else. Records are written
//! sequentially/contiguously, and are not padded out to meet a minimum block/write size, except for
//! internal padding requirements of the serialization library used.
//!
//! Data files have a maximum size, configured statically within a given Vector binary, which can
//! never be exceeded: if a write would cause a data file to grow past the maximum file size, it
//! must be written to the next data file.
//!
//! A maximum number of 65,536 data files can exist at any given time, due to the inclusion of a
//! file ID in the data file name, which is represented by a 16-bit unsigned integer.
//!
//! ### Ledger
//!
//! The ledger is a small file which tracks two important items for both the reader and writer:
//! which data file they're currently reading or writing to, and what record ID they left off on.
//!
//! The ledger is read during buffer initialization to determine a reader should pick up reading
//! from, but is also used to attempt to detect where a writer left off, and if records are missing
//! from the current writer data file according to what the writer believes it did (as in
//! write/flush bytes to disk) and what the reality is, based on the actual data in the current
//! writer data file.
//!
//! The ledger is a memory-mapped file that is updated atomically in terms of its fields, but is not
//! updated atomically in terms of reader/writer activity.
//!
//! #### On-disk format
//!
//! Like records, the ledger file consists of a simplified structure that is optimized for being shared
//! via a memory-mapped file interface between the reader and writer.
//!
//! ```text
//! buffer.db:
//! writer_next_record_id: uint64
//! writer_current_data_file_id: uint16
//! reader_current_data_file_id: uint16
//! reader_last_record_id: uint64
//! ```
//!
//! As the disk buffer structure is meant to emulate a ring buffer, most of the bookkeeping resolves
//! around the writer and reader being able to quickly figure out where they left off. Record and
//! data file IDs are simply rolled over when they reach the maximum of their data type, and are
//! incremented monotonically as new data files are created, rather than trying to always allocate
//! from the lowest available ID.
//!
//! ## Buffer operation
//!
//! ### Writing records
//!
//! As mentioned above, records are added to a data file sequentially, and contiguously, with no
//! gaps or data alignment adjustments, excluding the padding/alignment used by `rkyv` itself to
//! allow for zero-copy deserialization. This continues until adding another would exceed the
//! configured data file size limit. When this occurs, the current data file is flushed and
//! synchronized to disk, and a new data file will be opened.
//!
//! If the number of data files on disk exceeds the maximum (65,536), or if the total data file size
//! limit is exceeded, the writer will wait until enough space has been freed such that the record
//! can be written. As data files are only deleted after being read entirely, this means that space
//! is recovered in increments of the target data file size, which is 128MB. Thus, the minimum size
//! for a buffer must be equal to or greater than the target size of a single data file.
//! Additionally, as data files are uniquely named based on an incrementing integer, of which will
//! wrap around at 65,536 (2^16), the maximum data file size in total for a given buffer is ~8TB (6
//! 5k files * 128MB).
//!
//! ### Reading records
//!
//! Due to the on-disk layout, reading records is an incredibly straight-forward progress: we open a
//! file, read it until there's no more data and we know the writer is done writing to the file, and
//! then we open the next one, and repeat the process.
//!
//! ### Deleting acknowledged records
//!
//! As the reader emits records, we cannot yet consider them fully processed until they are
//! acknowledged. The acknowledgement process is tied into the normal acknowledgement machinery, and
//! the reader collects and processes those acknowledgements incrementally as reads occur.
//!
//! When all records from a data file have been fully acknowledged, the data file is scheduled for
//! deletion. We only delete entire data files, rather than truncating them piecemeal, which reduces
//! the I/O burden of the buffer. This does mean, however, that a data file will stick around until
//! it is entirely processed and acknowledged. We compensate for this fact in the buffer
//! configuration by adjusting the logical buffer size based on when records are acknowledged, so
//! that the writer can make progress as records are acknowledged, even if the buffer is close to,
//! or at the maximum buffer size limit.
//!
//! ### Record ID generation, and its relation of events
//!
//! While the buffer talks a lot about writing "records", records are ostensibly a single event, or
//! collection of events. We manage the organization and grouping of events at a higher level
//! (i.e. `EventArray`), but we're still required to confront this fact at the buffer layer. In
//! order to maintain as little extra metadata as possible as records, and within the ledger, we
//! encode the number of events in a buffer into the record ID. We do this by using the value
//! returned by `EventCount::event_count` on a per-record basis.
//!
//! For example, a fresh buffer starts at a record ID of 1 for the writer: that is, the next write
//! will start at 1. If we write a record that contains 10 events, we add that event count to the
//! record ID we started from, which gives us 11. The next record write will start at 11, and the
//! pattern continues going forward.
//!
//! The other reason we do this is to allow us to quickly and easily determine how many events exist
//! in a buffer. Since we have the invariant of knowing that record IDs are tied, in a way, to event
//! count, we can quickly and easily find the first and last unread record in the buffer, and do
//! simple subtraction to calculate how many events we have outstanding. While there is logic that
//! handles corrupted records, or other corner case errors, the core premise, and logic, follows
//! this pattern.
//!
//! We need to track our reader progress, both in the form of how much data we've read in this data
//! file, as well as the record ID. This is required not only for ensuring our general buffer
//! accounting (event count, buffer size, etc) is accurate, but also to be able to handle corrupted
//! records.
//!
//! We make sure to track enough information such that when we encounter a corrupted record, or if
//! we skip records due to missing data, we can figure out how many events we've dropped or lost,
//! and handle the necessary adjustments to the buffer accounting.
//!
//! [rkyv]: https://docs.rs/rkyv
use core::fmt;
use std::{
error::Error,
marker::PhantomData,
num::NonZeroU64,
path::{Path, PathBuf},
sync::Arc,
};
use async_trait::async_trait;
use snafu::{ResultExt, Snafu};
use vector_common::finalization::Finalizable;
mod backed_archive;
mod common;
mod io;
mod ledger;
mod reader;
mod record;
mod ser;
mod writer;
#[cfg(test)]
mod tests;
use self::ledger::Ledger;
pub use self::{
common::{DiskBufferConfig, DiskBufferConfigBuilder},
io::{Filesystem, ProductionFilesystem},
ledger::LedgerLoadCreateError,
reader::{BufferReader, ReaderError},
writer::{BufferWriter, WriterError},
};
use crate::{
buffer_usage_data::BufferUsageHandle,
topology::{
builder::IntoBuffer,
channel::{ReceiverAdapter, SenderAdapter},
},
Bufferable,
};
/// Error that occurred when creating/loading a disk buffer.
#[derive(Debug, Snafu)]
pub enum BufferError<T>
where
T: Bufferable,
{
/// Failed to create/load the ledger.
#[snafu(display("failed to load/create ledger: {}", source))]
LedgerError { source: LedgerLoadCreateError },
/// Failed to initialize/catch the reader up to where it left off.
#[snafu(display("failed to seek to position where reader left off: {}", source))]
ReaderSeekFailed { source: ReaderError<T> },
/// Failed to initialize/catch the writer up to where it left off.
#[snafu(display("failed to seek to position where writer left off: {}", source))]
WriterSeekFailed { source: WriterError<T> },
}
/// Helper type for creating a disk buffer.
pub struct Buffer<T> {
_t: PhantomData<T>,
}
impl<T> Buffer<T>
where
T: Bufferable,
{
#[cfg_attr(test, instrument(skip(config, usage_handle), level = "trace"))]
pub(crate) async fn from_config_inner<FS>(
config: DiskBufferConfig<FS>,
usage_handle: BufferUsageHandle,
) -> Result<(BufferWriter<T, FS>, BufferReader<T, FS>, Arc<Ledger<FS>>), BufferError<T>>
where
FS: Filesystem + fmt::Debug + Clone + 'static,
FS::File: Unpin,
{
let ledger = Ledger::load_or_create(config, usage_handle)
.await
.context(LedgerSnafu)?;
let ledger = Arc::new(ledger);
let mut writer = BufferWriter::new(Arc::clone(&ledger));
writer
.validate_last_write()
.await
.context(WriterSeekFailedSnafu)?;
let finalizer = Arc::clone(&ledger).spawn_finalizer();
let mut reader = BufferReader::new(Arc::clone(&ledger), finalizer);
reader
.seek_to_next_record()
.await
.context(ReaderSeekFailedSnafu)?;
ledger.synchronize_buffer_usage();
Ok((writer, reader, ledger))
}
/// Creates a new disk buffer from the given [`DiskBufferConfig`].
///
/// If successful, a [`Writer`] and [`Reader`] value, representing the write/read sides of the
/// buffer, respectively, will be returned. Records are considered durably processed and able
/// to be deleted from the buffer when they are dropped by the reader, via event finalization.
///
/// # Errors
///
/// If an error occurred during the creation or loading of the disk buffer, an error variant
/// will be returned describing the error.
#[cfg_attr(test, instrument(skip(config, usage_handle), level = "trace"))]
pub async fn from_config<FS>(
config: DiskBufferConfig<FS>,
usage_handle: BufferUsageHandle,
) -> Result<(BufferWriter<T, FS>, BufferReader<T, FS>), BufferError<T>>
where
FS: Filesystem + fmt::Debug + Clone + 'static,
FS::File: Unpin,
{
let (writer, reader, _) = Self::from_config_inner(config, usage_handle).await?;
Ok((writer, reader))
}
}
pub struct DiskV2Buffer {
id: String,
data_dir: PathBuf,
max_size: NonZeroU64,
}
impl DiskV2Buffer {
pub fn new(id: String, data_dir: PathBuf, max_size: NonZeroU64) -> Self {
Self {
id,
data_dir,
max_size,
}
}
}
#[async_trait]
impl<T> IntoBuffer<T> for DiskV2Buffer
where
T: Bufferable + Clone + Finalizable,
{
fn provides_instrumentation(&self) -> bool {
true
}
async fn into_buffer_parts(
self: Box<Self>,
usage_handle: BufferUsageHandle,
) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>> {
let (writer, reader) = build_disk_v2_buffer(
usage_handle,
&self.data_dir,
self.id.as_str(),
self.max_size,
)
.await?;
Ok((writer.into(), reader.into()))
}
}
async fn build_disk_v2_buffer<T>(
usage_handle: BufferUsageHandle,
data_dir: &Path,
id: &str,
max_size: NonZeroU64,
) -> Result<
(
BufferWriter<T, ProductionFilesystem>,
BufferReader<T, ProductionFilesystem>,
),
Box<dyn Error + Send + Sync>,
>
where
T: Bufferable + Clone,
{
usage_handle.set_buffer_limits(Some(max_size.get()), None);
let buffer_path = get_disk_v2_data_dir_path(data_dir, id);
let config = DiskBufferConfigBuilder::from_path(buffer_path)
.max_buffer_size(max_size.get())
.build()?;
Buffer::from_config(config, usage_handle)
.await
.map_err(Into::into)
}
pub(crate) fn get_disk_v2_data_dir_path(base_dir: &Path, buffer_id: &str) -> PathBuf {
base_dir.join("buffer").join("v2").join(buffer_id)
}