vector_buffers/variants/disk_v2/record.rs
1use std::{mem, ptr::addr_of};
2
3use bytecheck::{CheckBytes, ErrorBox, StructCheckError};
4use crc32fast::Hasher;
5use rkyv::{
6 boxed::ArchivedBox,
7 with::{CopyOptimize, RefAsBox},
8 Archive, Archived, Serialize,
9};
10
11use super::{
12 common::align16,
13 ser::{try_as_archive, DeserializeError},
14};
15
16pub const RECORD_HEADER_LEN: usize = align16(mem::size_of::<ArchivedRecord<'_>>() + 8);
17
18/// Result of checking if a buffer contained a valid record.
19pub enum RecordStatus {
20 /// The record was able to be read from the buffer, and the checksum is valid.
21 ///
22 /// Contains the ID for the given record, as well as the metadata.
23 Valid { id: u64 },
24 /// The record was able to be read from the buffer, but the checksum was not valid.
25 Corrupted { calculated: u32, actual: u32 },
26 /// The record was not able to be read from the buffer due to an error during deserialization.
27 FailedDeserialization(DeserializeError),
28}
29
30/// Record container.
31///
32/// [`Record`] encapsulates the encoded form of a record written into the buffer. It is a simple wrapper that
33/// carries only the necessary metadata: the record checksum, and a record ID used internally for
34/// properly tracking the state of the reader and writer.
35///
36/// # Warning
37///
38/// - Do not add fields to this struct.
39/// - Do not remove fields from this struct.
40/// - Do not change the type of fields in this struct.
41/// - Do not change the order of fields this struct.
42///
43/// Doing so will change the serialized representation. This will break things.
44///
45/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
46#[derive(Archive, Serialize, Debug)]
47// Switch back to the derived implementation of CheckBytes once the upstream ICE issue is fixed.
48//
49// Upstream issue: https://github.com/rkyv/rkyv/issues/221
50//#[archive_attr(derive(CheckBytes))]
51pub struct Record<'a> {
52 /// The checksum of the record.
53 ///
54 /// The checksum is CRC32C(BE(id) + BE(metadata) + payload), where BE(x) returns a byte slice of
55 /// the given integer in big endian format.
56 pub(super) checksum: u32,
57
58 /// The record ID.
59 ///
60 /// This is monotonic across records.
61 id: u64,
62
63 /// The record metadata.
64 ///
65 /// Based on `Encodable::Metadata`.
66 pub(super) metadata: u32,
67
68 /// The record payload.
69 ///
70 /// This is the encoded form of the actual record itself.
71 #[with(CopyOptimize, RefAsBox)]
72 payload: &'a [u8],
73}
74
75// Manual implementation of CheckBytes required as the derived version currently causes an internal
76// compiler error.
77//
78// Upstream issue: https://github.com/rkyv/rkyv/issues/221
79impl<'a, C: ?Sized> CheckBytes<C> for ArchivedRecord<'a>
80where
81 rkyv::with::With<&'a [u8], RefAsBox>: Archive<Archived = ArchivedBox<[u8]>>,
82 ArchivedBox<[u8]>: CheckBytes<C>,
83{
84 type Error = StructCheckError;
85 unsafe fn check_bytes<'b>(
86 value: *const Self,
87 context: &mut C,
88 ) -> Result<&'b Self, Self::Error> {
89 Archived::<u32>::check_bytes(addr_of!((*value).checksum), context).map_err(|e| {
90 StructCheckError {
91 field_name: "checksum",
92 inner: ErrorBox::new(e),
93 }
94 })?;
95 Archived::<u64>::check_bytes(addr_of!((*value).id), context).map_err(|e| {
96 StructCheckError {
97 field_name: "id",
98 inner: ErrorBox::new(e),
99 }
100 })?;
101 Archived::<u32>::check_bytes(addr_of!((*value).metadata), context).map_err(|e| {
102 StructCheckError {
103 field_name: "schema_metadata",
104 inner: ErrorBox::new(e),
105 }
106 })?;
107 ArchivedBox::<[u8]>::check_bytes(addr_of!((*value).payload), context).map_err(|e| {
108 StructCheckError {
109 field_name: "payload",
110 inner: ErrorBox::new(e),
111 }
112 })?;
113 Ok(&*value)
114 }
115}
116
117impl<'a> Record<'a> {
118 /// Creates a [`Record`] from the ID and payload, and calculates the checksum.
119 pub fn with_checksum(id: u64, metadata: u32, payload: &'a [u8], checksummer: &Hasher) -> Self {
120 let checksum = generate_checksum(checksummer, id, metadata, payload);
121 Self {
122 checksum,
123 id,
124 metadata,
125 payload,
126 }
127 }
128}
129
130impl ArchivedRecord<'_> {
131 /// Gets the metadata of this record.
132 pub fn metadata(&self) -> u32 {
133 self.metadata
134 }
135
136 /// Gets the payload of this record.
137 pub fn payload(&self) -> &[u8] {
138 &self.payload
139 }
140
141 /// Verifies if the stored checksum of this record matches the record itself.
142 pub fn verify_checksum(&self, checksummer: &Hasher) -> RecordStatus {
143 let calculated = generate_checksum(checksummer, self.id, self.metadata, &self.payload);
144 if self.checksum == calculated {
145 RecordStatus::Valid { id: self.id }
146 } else {
147 RecordStatus::Corrupted {
148 calculated,
149 actual: self.checksum,
150 }
151 }
152 }
153}
154
155fn generate_checksum(checksummer: &Hasher, id: u64, metadata: u32, payload: &[u8]) -> u32 {
156 let mut checksummer = checksummer.clone();
157 checksummer.reset();
158
159 checksummer.update(&id.to_be_bytes()[..]);
160 checksummer.update(&metadata.to_be_bytes()[..]);
161 checksummer.update(payload);
162 checksummer.finalize()
163}
164
165/// Checks whether the given buffer contains a valid [`Record`] archive.
166///
167/// The record archive is assumed to have been serialized as the very last item in `buf`, and
168/// it is also assumed that the provided `buf` has an alignment of 8 bytes.
169///
170/// If a record archive was able to be read from the buffer, then the status will indicate whether
171/// or not the checksum in the record matched the recalculated checksum. Otherwise, the
172/// deserialization error encountered will be provided, which describes the error in a more verbose,
173/// debugging-oriented fashion.
174#[cfg_attr(test, instrument(skip_all, level = "trace"))]
175pub fn validate_record_archive(buf: &[u8], checksummer: &Hasher) -> RecordStatus {
176 match try_as_record_archive(buf) {
177 Ok(archive) => archive.verify_checksum(checksummer),
178 Err(e) => RecordStatus::FailedDeserialization(e),
179 }
180}
181
182/// Attempts to deserialize an archived record from the given buffer.
183///
184/// The record archive is assumed to have been serialized as the very last item in `buf`, and
185/// it is also assumed that the provided `buf` has an alignment of 16 bytes.
186///
187/// If a record archive was able to be read from the buffer, then a reference to its archived form
188/// will be returned. Otherwise, the deserialization error encountered will be provided, which describes the error in a more verbose,
189/// debugging-oriented fashion.
190#[cfg_attr(test, instrument(skip_all, level = "trace"))]
191pub fn try_as_record_archive(buf: &[u8]) -> Result<&ArchivedRecord<'_>, DeserializeError> {
192 try_as_archive::<Record<'_>>(buf)
193}