vector_buffers/variants/disk_v2/
record.rs

1use std::{mem, ptr::addr_of};
2
3use bytecheck::{CheckBytes, ErrorBox, StructCheckError};
4use crc32fast::Hasher;
5use rkyv::{
6    boxed::ArchivedBox,
7    with::{CopyOptimize, RefAsBox},
8    Archive, Archived, Serialize,
9};
10
11use super::{
12    common::align16,
13    ser::{try_as_archive, DeserializeError},
14};
15
16pub const RECORD_HEADER_LEN: usize = align16(mem::size_of::<ArchivedRecord<'_>>() + 8);
17
18/// Result of checking if a buffer contained a valid record.
19pub enum RecordStatus {
20    /// The record was able to be read from the buffer, and the checksum is valid.
21    ///
22    /// Contains the ID for the given record, as well as the metadata.
23    Valid { id: u64 },
24    /// The record was able to be read from the buffer, but the checksum was not valid.
25    Corrupted { calculated: u32, actual: u32 },
26    /// The record was not able to be read from the buffer due to an error during deserialization.
27    FailedDeserialization(DeserializeError),
28}
29
30/// Record container.
31///
32/// [`Record`] encapsulates the encoded form of a record written into the buffer.  It is a simple wrapper that
33/// carries only the necessary metadata: the record checksum, and a record ID used internally for
34/// properly tracking the state of the reader and writer.
35///
36/// # Warning
37///
38/// - Do not add fields to this struct.
39/// - Do not remove fields from this struct.
40/// - Do not change the type of fields in this struct.
41/// - Do not change the order of fields this struct.
42///
43/// Doing so will change the serialized representation.  This will break things.
44///
45/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
46#[derive(Archive, Serialize, Debug)]
47// Switch back to the derived implementation of CheckBytes once the upstream ICE issue is fixed.
48//
49// Upstream issue: https://github.com/rkyv/rkyv/issues/221
50//#[archive_attr(derive(CheckBytes))]
51pub struct Record<'a> {
52    /// The checksum of the record.
53    ///
54    /// The checksum is CRC32C(BE(id) + BE(metadata) + payload), where BE(x) returns a byte slice of
55    /// the given integer in big endian format.
56    pub(super) checksum: u32,
57
58    /// The record ID.
59    ///
60    /// This is monotonic across records.
61    id: u64,
62
63    /// The record metadata.
64    ///
65    /// Based on `Encodable::Metadata`.
66    pub(super) metadata: u32,
67
68    /// The record payload.
69    ///
70    /// This is the encoded form of the actual record itself.
71    #[with(CopyOptimize, RefAsBox)]
72    payload: &'a [u8],
73}
74
75// Manual implementation of CheckBytes required as the derived version currently causes an internal
76// compiler error.
77//
78// Upstream issue: https://github.com/rkyv/rkyv/issues/221
79impl<'a, C: ?Sized> CheckBytes<C> for ArchivedRecord<'a>
80where
81    rkyv::with::With<&'a [u8], RefAsBox>: Archive<Archived = ArchivedBox<[u8]>>,
82    ArchivedBox<[u8]>: CheckBytes<C>,
83{
84    type Error = StructCheckError;
85    unsafe fn check_bytes<'b>(
86        value: *const Self,
87        context: &mut C,
88    ) -> Result<&'b Self, Self::Error> {
89        Archived::<u32>::check_bytes(addr_of!((*value).checksum), context).map_err(|e| {
90            StructCheckError {
91                field_name: "checksum",
92                inner: ErrorBox::new(e),
93            }
94        })?;
95        Archived::<u64>::check_bytes(addr_of!((*value).id), context).map_err(|e| {
96            StructCheckError {
97                field_name: "id",
98                inner: ErrorBox::new(e),
99            }
100        })?;
101        Archived::<u32>::check_bytes(addr_of!((*value).metadata), context).map_err(|e| {
102            StructCheckError {
103                field_name: "schema_metadata",
104                inner: ErrorBox::new(e),
105            }
106        })?;
107        ArchivedBox::<[u8]>::check_bytes(addr_of!((*value).payload), context).map_err(|e| {
108            StructCheckError {
109                field_name: "payload",
110                inner: ErrorBox::new(e),
111            }
112        })?;
113        Ok(&*value)
114    }
115}
116
117impl<'a> Record<'a> {
118    /// Creates a [`Record`] from the ID and payload, and calculates the checksum.
119    pub fn with_checksum(id: u64, metadata: u32, payload: &'a [u8], checksummer: &Hasher) -> Self {
120        let checksum = generate_checksum(checksummer, id, metadata, payload);
121        Self {
122            checksum,
123            id,
124            metadata,
125            payload,
126        }
127    }
128}
129
130impl ArchivedRecord<'_> {
131    /// Gets the metadata of this record.
132    pub fn metadata(&self) -> u32 {
133        self.metadata
134    }
135
136    /// Gets the payload of this record.
137    pub fn payload(&self) -> &[u8] {
138        &self.payload
139    }
140
141    /// Verifies if the stored checksum of this record matches the record itself.
142    pub fn verify_checksum(&self, checksummer: &Hasher) -> RecordStatus {
143        let calculated = generate_checksum(checksummer, self.id, self.metadata, &self.payload);
144        if self.checksum == calculated {
145            RecordStatus::Valid { id: self.id }
146        } else {
147            RecordStatus::Corrupted {
148                calculated,
149                actual: self.checksum,
150            }
151        }
152    }
153}
154
155fn generate_checksum(checksummer: &Hasher, id: u64, metadata: u32, payload: &[u8]) -> u32 {
156    let mut checksummer = checksummer.clone();
157    checksummer.reset();
158
159    checksummer.update(&id.to_be_bytes()[..]);
160    checksummer.update(&metadata.to_be_bytes()[..]);
161    checksummer.update(payload);
162    checksummer.finalize()
163}
164
165/// Checks whether the given buffer contains a valid [`Record`] archive.
166///
167/// The record archive is assumed to have been serialized as the very last item in `buf`, and
168/// it is also assumed that the provided `buf` has an alignment of 8 bytes.
169///
170/// If a record archive was able to be read from the buffer, then the status will indicate whether
171/// or not the checksum in the record matched the recalculated checksum.  Otherwise, the
172/// deserialization error encountered will be provided, which describes the error in a more verbose,
173/// debugging-oriented fashion.
174#[cfg_attr(test, instrument(skip_all, level = "trace"))]
175pub fn validate_record_archive(buf: &[u8], checksummer: &Hasher) -> RecordStatus {
176    match try_as_record_archive(buf) {
177        Ok(archive) => archive.verify_checksum(checksummer),
178        Err(e) => RecordStatus::FailedDeserialization(e),
179    }
180}
181
182/// Attempts to deserialize an archived record from the given buffer.
183///
184/// The record archive is assumed to have been serialized as the very last item in `buf`, and
185/// it is also assumed that the provided `buf` has an alignment of 16 bytes.
186///
187/// If a record archive was able to be read from the buffer, then a reference to its archived form
188/// will be returned.  Otherwise, the deserialization error encountered will be provided, which describes the error in a more verbose,
189/// debugging-oriented fashion.
190#[cfg_attr(test, instrument(skip_all, level = "trace"))]
191pub fn try_as_record_archive(buf: &[u8]) -> Result<&ArchivedRecord<'_>, DeserializeError> {
192    try_as_archive::<Record<'_>>(buf)
193}