vector_buffers/variants/disk_v2/record.rs
1use std::{mem, ptr::addr_of};
2
3use bytecheck::{CheckBytes, ErrorBox, StructCheckError};
4use crc32fast::Hasher;
5use rkyv::{
6 Archive, Archived, Serialize,
7 boxed::ArchivedBox,
8 with::{CopyOptimize, RefAsBox},
9};
10
11use super::{
12 common::align16,
13 ser::{DeserializeError, try_as_archive},
14};
15
16pub const RECORD_HEADER_LEN: usize = align16(mem::size_of::<ArchivedRecord<'_>>() + 8);
17
18/// Result of checking if a buffer contained a valid record.
19pub enum RecordStatus {
20 /// The record was able to be read from the buffer, and the checksum is valid.
21 ///
22 /// Contains the ID for the given record, as well as the metadata.
23 Valid { id: u64 },
24 /// The record was able to be read from the buffer, but the checksum was not valid.
25 Corrupted { calculated: u32, actual: u32 },
26 /// The record was not able to be read from the buffer due to an error during deserialization.
27 FailedDeserialization(DeserializeError),
28}
29
30/// Record container.
31///
32/// [`Record`] encapsulates the encoded form of a record written into the buffer. It is a simple wrapper that
33/// carries only the necessary metadata: the record checksum, and a record ID used internally for
34/// properly tracking the state of the reader and writer.
35///
36/// # Warning
37///
38/// - Do not add fields to this struct.
39/// - Do not remove fields from this struct.
40/// - Do not change the type of fields in this struct.
41/// - Do not change the order of fields this struct.
42///
43/// Doing so will change the serialized representation. This will break things.
44///
45/// Do not do any of the listed things unless you _absolutely_ know what you're doing. :)
46#[derive(Archive, Serialize, Debug)]
47// Switch back to the derived implementation of CheckBytes once the upstream ICE issue is fixed.
48//
49// Upstream issue: https://github.com/rkyv/rkyv/issues/221
50//#[archive_attr(derive(CheckBytes))]
51pub struct Record<'a> {
52 /// The checksum of the record.
53 ///
54 /// The checksum is CRC32C(BE(id) + BE(metadata) + payload), where BE(x) returns a byte slice of
55 /// the given integer in big endian format.
56 pub(super) checksum: u32,
57
58 /// The record ID.
59 ///
60 /// This is monotonic across records.
61 id: u64,
62
63 /// The record metadata.
64 ///
65 /// Based on `Encodable::Metadata`.
66 pub(super) metadata: u32,
67
68 /// The record payload.
69 ///
70 /// This is the encoded form of the actual record itself.
71 #[with(CopyOptimize, RefAsBox)]
72 payload: &'a [u8],
73}
74
75// Manual implementation of CheckBytes required as the derived version currently causes an internal
76// compiler error.
77//
78// Upstream issue: https://github.com/rkyv/rkyv/issues/221
79impl<'a, C: ?Sized> CheckBytes<C> for ArchivedRecord<'a>
80where
81 rkyv::with::With<&'a [u8], RefAsBox>: Archive<Archived = ArchivedBox<[u8]>>,
82 ArchivedBox<[u8]>: CheckBytes<C>,
83{
84 type Error = StructCheckError;
85 unsafe fn check_bytes<'b>(
86 value: *const Self,
87 context: &mut C,
88 ) -> Result<&'b Self, Self::Error> {
89 unsafe {
90 Archived::<u32>::check_bytes(addr_of!((*value).checksum), context).map_err(|e| {
91 StructCheckError {
92 field_name: "checksum",
93 inner: ErrorBox::new(e),
94 }
95 })?;
96 Archived::<u64>::check_bytes(addr_of!((*value).id), context).map_err(|e| {
97 StructCheckError {
98 field_name: "id",
99 inner: ErrorBox::new(e),
100 }
101 })?;
102 Archived::<u32>::check_bytes(addr_of!((*value).metadata), context).map_err(|e| {
103 StructCheckError {
104 field_name: "schema_metadata",
105 inner: ErrorBox::new(e),
106 }
107 })?;
108 ArchivedBox::<[u8]>::check_bytes(addr_of!((*value).payload), context).map_err(|e| {
109 StructCheckError {
110 field_name: "payload",
111 inner: ErrorBox::new(e),
112 }
113 })?;
114 Ok(&*value)
115 }
116 }
117}
118
119impl<'a> Record<'a> {
120 /// Creates a [`Record`] from the ID and payload, and calculates the checksum.
121 pub fn with_checksum(id: u64, metadata: u32, payload: &'a [u8], checksummer: &Hasher) -> Self {
122 let checksum = generate_checksum(checksummer, id, metadata, payload);
123 Self {
124 checksum,
125 id,
126 metadata,
127 payload,
128 }
129 }
130}
131
132impl ArchivedRecord<'_> {
133 /// Gets the metadata of this record.
134 pub fn metadata(&self) -> u32 {
135 self.metadata
136 }
137
138 /// Gets the payload of this record.
139 pub fn payload(&self) -> &[u8] {
140 &self.payload
141 }
142
143 /// Verifies if the stored checksum of this record matches the record itself.
144 pub fn verify_checksum(&self, checksummer: &Hasher) -> RecordStatus {
145 let calculated = generate_checksum(checksummer, self.id, self.metadata, &self.payload);
146 if self.checksum == calculated {
147 RecordStatus::Valid { id: self.id }
148 } else {
149 RecordStatus::Corrupted {
150 calculated,
151 actual: self.checksum,
152 }
153 }
154 }
155}
156
157fn generate_checksum(checksummer: &Hasher, id: u64, metadata: u32, payload: &[u8]) -> u32 {
158 let mut checksummer = checksummer.clone();
159 checksummer.reset();
160
161 checksummer.update(&id.to_be_bytes()[..]);
162 checksummer.update(&metadata.to_be_bytes()[..]);
163 checksummer.update(payload);
164 checksummer.finalize()
165}
166
167/// Checks whether the given buffer contains a valid [`Record`] archive.
168///
169/// The record archive is assumed to have been serialized as the very last item in `buf`, and
170/// it is also assumed that the provided `buf` has an alignment of 8 bytes.
171///
172/// If a record archive was able to be read from the buffer, then the status will indicate whether
173/// or not the checksum in the record matched the recalculated checksum. Otherwise, the
174/// deserialization error encountered will be provided, which describes the error in a more verbose,
175/// debugging-oriented fashion.
176#[cfg_attr(test, instrument(skip_all, level = "trace"))]
177pub fn validate_record_archive(buf: &[u8], checksummer: &Hasher) -> RecordStatus {
178 match try_as_record_archive(buf) {
179 Ok(archive) => archive.verify_checksum(checksummer),
180 Err(e) => RecordStatus::FailedDeserialization(e),
181 }
182}
183
184/// Attempts to deserialize an archived record from the given buffer.
185///
186/// The record archive is assumed to have been serialized as the very last item in `buf`, and
187/// it is also assumed that the provided `buf` has an alignment of 16 bytes.
188///
189/// If a record archive was able to be read from the buffer, then a reference to its archived form
190/// will be returned. Otherwise, the deserialization error encountered will be provided, which describes the error in a more verbose,
191/// debugging-oriented fashion.
192#[cfg_attr(test, instrument(skip_all, level = "trace"))]
193pub fn try_as_record_archive(buf: &[u8]) -> Result<&ArchivedRecord<'_>, DeserializeError> {
194 try_as_archive::<Record<'_>>(buf)
195}