1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
use std::{mem, ptr::addr_of};
use bytecheck::{CheckBytes, ErrorBox, StructCheckError};
use crc32fast::Hasher;
use rkyv::{
boxed::ArchivedBox,
with::{CopyOptimize, RefAsBox},
Archive, Archived, Serialize,
};
use super::{
common::align16,
ser::{try_as_archive, DeserializeError},
};
pub const RECORD_HEADER_LEN: usize = align16(mem::size_of::<ArchivedRecord<'_>>() + 8);
/// Result of checking if a buffer contained a valid record.
pub enum RecordStatus {
/// The record was able to be read from the buffer, and the checksum is valid.
///
/// Contains the ID for the given record, as well as the metadata.
Valid { id: u64 },
/// The record was able to be read from the buffer, but the checksum was not valid.
Corrupted { calculated: u32, actual: u32 },
/// The record was not able to be read from the buffer due to an error during deserialization.
FailedDeserialization(DeserializeError),
}
/// Record container.
///
/// [`Record`] encapsulates the encoded form of a record written into the buffer. It is a simple wrapper that
/// carries only the necessary metadata: the record checksum, and a record ID used internally for
/// properly tracking the state of the reader and writer.
///
/// # Warning
///
/// - Do not add fields to this struct.
/// - Do not remove fields from this struct.
/// - Do not change the type of fields in this struct.
/// - Do not change the order of fields this struct.
///
/// Doing so will change the serialized representation. This will break things.
///
/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
#[derive(Archive, Serialize, Debug)]
// Switch back to the derived implementation of CheckBytes once the upstream ICE issue is fixed.
//
// Upstream issue: https://github.com/rkyv/rkyv/issues/221
//#[archive_attr(derive(CheckBytes))]
pub struct Record<'a> {
/// The checksum of the record.
///
/// The checksum is CRC32C(BE(id) + BE(metadata) + payload), where BE(x) returns a byte slice of
/// the given integer in big endian format.
pub(super) checksum: u32,
/// The record ID.
///
/// This is monotonic across records.
id: u64,
/// The record metadata.
///
/// Based on `Encodable::Metadata`.
pub(super) metadata: u32,
/// The record payload.
///
/// This is the encoded form of the actual record itself.
#[with(CopyOptimize, RefAsBox)]
payload: &'a [u8],
}
// Manual implementation of CheckBytes required as the derived version currently causes an internal
// compiler error.
//
// Upstream issue: https://github.com/rkyv/rkyv/issues/221
impl<'a, C: ?Sized> CheckBytes<C> for ArchivedRecord<'a>
where
rkyv::with::With<&'a [u8], RefAsBox>: Archive<Archived = ArchivedBox<[u8]>>,
ArchivedBox<[u8]>: CheckBytes<C>,
{
type Error = StructCheckError;
unsafe fn check_bytes<'b>(
value: *const Self,
context: &mut C,
) -> Result<&'b Self, Self::Error> {
Archived::<u32>::check_bytes(addr_of!((*value).checksum), context).map_err(|e| {
StructCheckError {
field_name: "checksum",
inner: ErrorBox::new(e),
}
})?;
Archived::<u64>::check_bytes(addr_of!((*value).id), context).map_err(|e| {
StructCheckError {
field_name: "id",
inner: ErrorBox::new(e),
}
})?;
Archived::<u32>::check_bytes(addr_of!((*value).metadata), context).map_err(|e| {
StructCheckError {
field_name: "schema_metadata",
inner: ErrorBox::new(e),
}
})?;
ArchivedBox::<[u8]>::check_bytes(addr_of!((*value).payload), context).map_err(|e| {
StructCheckError {
field_name: "payload",
inner: ErrorBox::new(e),
}
})?;
Ok(&*value)
}
}
impl<'a> Record<'a> {
/// Creates a [`Record`] from the ID and payload, and calculates the checksum.
pub fn with_checksum(id: u64, metadata: u32, payload: &'a [u8], checksummer: &Hasher) -> Self {
let checksum = generate_checksum(checksummer, id, metadata, payload);
Self {
checksum,
id,
metadata,
payload,
}
}
}
impl<'a> ArchivedRecord<'a> {
/// Gets the metadata of this record.
pub fn metadata(&self) -> u32 {
self.metadata
}
/// Gets the payload of this record.
pub fn payload(&self) -> &[u8] {
&self.payload
}
/// Verifies if the stored checksum of this record matches the record itself.
pub fn verify_checksum(&self, checksummer: &Hasher) -> RecordStatus {
let calculated = generate_checksum(checksummer, self.id, self.metadata, &self.payload);
if self.checksum == calculated {
RecordStatus::Valid { id: self.id }
} else {
RecordStatus::Corrupted {
calculated,
actual: self.checksum,
}
}
}
}
fn generate_checksum(checksummer: &Hasher, id: u64, metadata: u32, payload: &[u8]) -> u32 {
let mut checksummer = checksummer.clone();
checksummer.reset();
checksummer.update(&id.to_be_bytes()[..]);
checksummer.update(&metadata.to_be_bytes()[..]);
checksummer.update(payload);
checksummer.finalize()
}
/// Checks whether the given buffer contains a valid [`Record`] archive.
///
/// The record archive is assumed to have been serialized as the very last item in `buf`, and
/// it is also assumed that the provided `buf` has an alignment of 8 bytes.
///
/// If a record archive was able to be read from the buffer, then the status will indicate whether
/// or not the checksum in the record matched the recalculated checksum. Otherwise, the
/// deserialization error encountered will be provided, which describes the error in a more verbose,
/// debugging-oriented fashion.
#[cfg_attr(test, instrument(skip_all, level = "trace"))]
pub fn validate_record_archive(buf: &[u8], checksummer: &Hasher) -> RecordStatus {
match try_as_record_archive(buf) {
Ok(archive) => archive.verify_checksum(checksummer),
Err(e) => RecordStatus::FailedDeserialization(e),
}
}
/// Attempts to deserialize an archived record from the given buffer.
///
/// The record archive is assumed to have been serialized as the very last item in `buf`, and
/// it is also assumed that the provided `buf` has an alignment of 16 bytes.
///
/// If a record archive was able to be read from the buffer, then a reference to its archived form
/// will be returned. Otherwise, the deserialization error encountered will be provided, which describes the error in a more verbose,
/// debugging-oriented fashion.
#[cfg_attr(test, instrument(skip_all, level = "trace"))]
pub fn try_as_record_archive(buf: &[u8]) -> Result<&ArchivedRecord<'_>, DeserializeError> {
try_as_archive::<Record<'_>>(buf)
}