vector_buffers/variants/disk_v2/
record.rs

1use std::{mem, ptr::addr_of};
2
3use bytecheck::{CheckBytes, ErrorBox, StructCheckError};
4use crc32fast::Hasher;
5use rkyv::{
6    Archive, Archived, Serialize,
7    boxed::ArchivedBox,
8    with::{CopyOptimize, RefAsBox},
9};
10
11use super::{
12    common::align16,
13    ser::{DeserializeError, try_as_archive},
14};
15
16pub const RECORD_HEADER_LEN: usize = align16(mem::size_of::<ArchivedRecord<'_>>() + 8);
17
18/// Result of checking if a buffer contained a valid record.
19pub enum RecordStatus {
20    /// The record was able to be read from the buffer, and the checksum is valid.
21    ///
22    /// Contains the ID for the given record, as well as the metadata.
23    Valid { id: u64 },
24    /// The record was able to be read from the buffer, but the checksum was not valid.
25    Corrupted { calculated: u32, actual: u32 },
26    /// The record was not able to be read from the buffer due to an error during deserialization.
27    FailedDeserialization(DeserializeError),
28}
29
30/// Record container.
31///
32/// [`Record`] encapsulates the encoded form of a record written into the buffer.  It is a simple wrapper that
33/// carries only the necessary metadata: the record checksum, and a record ID used internally for
34/// properly tracking the state of the reader and writer.
35///
36/// # Warning
37///
38/// - Do not add fields to this struct.
39/// - Do not remove fields from this struct.
40/// - Do not change the type of fields in this struct.
41/// - Do not change the order of fields this struct.
42///
43/// Doing so will change the serialized representation.  This will break things.
44///
45/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
46#[derive(Archive, Serialize, Debug)]
47// Switch back to the derived implementation of CheckBytes once the upstream ICE issue is fixed.
48//
49// Upstream issue: https://github.com/rkyv/rkyv/issues/221
50//#[archive_attr(derive(CheckBytes))]
51pub struct Record<'a> {
52    /// The checksum of the record.
53    ///
54    /// The checksum is CRC32C(BE(id) + BE(metadata) + payload), where BE(x) returns a byte slice of
55    /// the given integer in big endian format.
56    pub(super) checksum: u32,
57
58    /// The record ID.
59    ///
60    /// This is monotonic across records.
61    id: u64,
62
63    /// The record metadata.
64    ///
65    /// Based on `Encodable::Metadata`.
66    pub(super) metadata: u32,
67
68    /// The record payload.
69    ///
70    /// This is the encoded form of the actual record itself.
71    #[with(CopyOptimize, RefAsBox)]
72    payload: &'a [u8],
73}
74
75// Manual implementation of CheckBytes required as the derived version currently causes an internal
76// compiler error.
77//
78// Upstream issue: https://github.com/rkyv/rkyv/issues/221
79impl<'a, C: ?Sized> CheckBytes<C> for ArchivedRecord<'a>
80where
81    rkyv::with::With<&'a [u8], RefAsBox>: Archive<Archived = ArchivedBox<[u8]>>,
82    ArchivedBox<[u8]>: CheckBytes<C>,
83{
84    type Error = StructCheckError;
85    unsafe fn check_bytes<'b>(
86        value: *const Self,
87        context: &mut C,
88    ) -> Result<&'b Self, Self::Error> {
89        unsafe {
90            Archived::<u32>::check_bytes(addr_of!((*value).checksum), context).map_err(|e| {
91                StructCheckError {
92                    field_name: "checksum",
93                    inner: ErrorBox::new(e),
94                }
95            })?;
96            Archived::<u64>::check_bytes(addr_of!((*value).id), context).map_err(|e| {
97                StructCheckError {
98                    field_name: "id",
99                    inner: ErrorBox::new(e),
100                }
101            })?;
102            Archived::<u32>::check_bytes(addr_of!((*value).metadata), context).map_err(|e| {
103                StructCheckError {
104                    field_name: "schema_metadata",
105                    inner: ErrorBox::new(e),
106                }
107            })?;
108            ArchivedBox::<[u8]>::check_bytes(addr_of!((*value).payload), context).map_err(|e| {
109                StructCheckError {
110                    field_name: "payload",
111                    inner: ErrorBox::new(e),
112                }
113            })?;
114            Ok(&*value)
115        }
116    }
117}
118
119impl<'a> Record<'a> {
120    /// Creates a [`Record`] from the ID and payload, and calculates the checksum.
121    pub fn with_checksum(id: u64, metadata: u32, payload: &'a [u8], checksummer: &Hasher) -> Self {
122        let checksum = generate_checksum(checksummer, id, metadata, payload);
123        Self {
124            checksum,
125            id,
126            metadata,
127            payload,
128        }
129    }
130}
131
132impl ArchivedRecord<'_> {
133    /// Gets the metadata of this record.
134    pub fn metadata(&self) -> u32 {
135        self.metadata
136    }
137
138    /// Gets the payload of this record.
139    pub fn payload(&self) -> &[u8] {
140        &self.payload
141    }
142
143    /// Verifies if the stored checksum of this record matches the record itself.
144    pub fn verify_checksum(&self, checksummer: &Hasher) -> RecordStatus {
145        let calculated = generate_checksum(checksummer, self.id, self.metadata, &self.payload);
146        if self.checksum == calculated {
147            RecordStatus::Valid { id: self.id }
148        } else {
149            RecordStatus::Corrupted {
150                calculated,
151                actual: self.checksum,
152            }
153        }
154    }
155}
156
157fn generate_checksum(checksummer: &Hasher, id: u64, metadata: u32, payload: &[u8]) -> u32 {
158    let mut checksummer = checksummer.clone();
159    checksummer.reset();
160
161    checksummer.update(&id.to_be_bytes()[..]);
162    checksummer.update(&metadata.to_be_bytes()[..]);
163    checksummer.update(payload);
164    checksummer.finalize()
165}
166
167/// Checks whether the given buffer contains a valid [`Record`] archive.
168///
169/// The record archive is assumed to have been serialized as the very last item in `buf`, and
170/// it is also assumed that the provided `buf` has an alignment of 8 bytes.
171///
172/// If a record archive was able to be read from the buffer, then the status will indicate whether
173/// or not the checksum in the record matched the recalculated checksum.  Otherwise, the
174/// deserialization error encountered will be provided, which describes the error in a more verbose,
175/// debugging-oriented fashion.
176#[cfg_attr(test, instrument(skip_all, level = "trace"))]
177pub fn validate_record_archive(buf: &[u8], checksummer: &Hasher) -> RecordStatus {
178    match try_as_record_archive(buf) {
179        Ok(archive) => archive.verify_checksum(checksummer),
180        Err(e) => RecordStatus::FailedDeserialization(e),
181    }
182}
183
184/// Attempts to deserialize an archived record from the given buffer.
185///
186/// The record archive is assumed to have been serialized as the very last item in `buf`, and
187/// it is also assumed that the provided `buf` has an alignment of 16 bytes.
188///
189/// If a record archive was able to be read from the buffer, then a reference to its archived form
190/// will be returned.  Otherwise, the deserialization error encountered will be provided, which describes the error in a more verbose,
191/// debugging-oriented fashion.
192#[cfg_attr(test, instrument(skip_all, level = "trace"))]
193pub fn try_as_record_archive(buf: &[u8]) -> Result<&ArchivedRecord<'_>, DeserializeError> {
194    try_as_archive::<Record<'_>>(buf)
195}