vector_buffers/topology/acks.rs
1use std::{collections::VecDeque, fmt};
2
3use num_traits::{Bounded, CheckedAdd, CheckedSub, Unsigned, WrappingAdd, WrappingSub};
4
5#[derive(Clone, Copy, Debug, PartialEq)]
6enum PendingMarkerLength<N> {
7 Known(N),
8 Assumed(N),
9 Unknown,
10}
11
12struct PendingMarker<N, D> {
13 id: N,
14 len: PendingMarkerLength<N>,
15 data: Option<D>,
16}
17
18impl<N, D> fmt::Debug for PendingMarker<N, D>
19where
20 N: fmt::Debug,
21 D: fmt::Debug,
22{
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("PendingMarker")
25 .field("id", &self.id)
26 .field("len", &self.len)
27 .field("data", &self.data)
28 .finish()
29 }
30}
31
32/// The length of an eligible marker.
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34pub enum EligibleMarkerLength<N> {
35 /// The marker's length was declared upfront when added.
36 Known(N),
37
38 /// The marker's length was calculated based on imperfect information, and so while it should
39 /// accurately represent a correct range that covers any gaps in the marker range, it may or may
40 /// not represent one true marker, or possibly multiple markers.
41 Assumed(N),
42}
43
44impl<N: Copy> EligibleMarkerLength<N> {
45 fn len(&self) -> N {
46 match self {
47 EligibleMarkerLength::Known(len) | EligibleMarkerLength::Assumed(len) => *len,
48 }
49 }
50}
51
52/// A marker that has been fully acknowledged.
53pub struct EligibleMarker<N, D> {
54 pub id: N,
55 pub len: EligibleMarkerLength<N>,
56 pub data: Option<D>,
57}
58
59impl<N, D> PartialEq for EligibleMarker<N, D>
60where
61 N: PartialEq,
62{
63 fn eq(&self, other: &Self) -> bool {
64 self.id == other.id && self.len == other.len
65 }
66}
67
68impl<N, D> fmt::Debug for EligibleMarker<N, D>
69where
70 N: fmt::Debug,
71 D: fmt::Debug,
72{
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("EligibleMarker")
75 .field("id", &self.id)
76 .field("len", &self.len)
77 .field("data", &self.data)
78 .finish()
79 }
80}
81
82/// Error returned by `OrderedAcknowledgements::add_marker`.
83///
84/// In general, this error represents a breaking of ID monotonicity, or more likely, the loss of
85/// records where entire records may have been skipped as an attempted add provides an ID that not
86/// the next expected ID.
87///
88/// While the exact condition must be determined by the caller, we attempt to provide as much
89/// information as we reasonably based on the data we have, whether it's simply that the ID didn't
90/// match the next expected ID, or that we know it is definitively ahead or behind the next expected
91/// ID.
92#[derive(Clone, Debug, PartialEq, Eq)]
93pub enum MarkerError {
94 /// The given marker ID is behind the next expected marker ID.
95 ///
96 /// As `OrderedAcknowledgements` expects monotonic marker IDs, this represents a violation of
97 /// the acknowledgement state, and must be handled by the caller. Generally speaking, this is
98 /// an unrecoverable error.
99 MonotonicityViolation,
100}
101
102/// Result of comparing a potential pending marker ID with the expected next pending marker ID.
103pub enum MarkerOffset<N> {
104 /// The given marker ID is aligned with the next expected marker ID.
105 Aligned,
106
107 /// The given marker ID is ahead of the next expected marker ID.
108 ///
109 /// When the last pending marker has a fixed-size, we can calculate the exact marker ID that we
110 /// expect to see next. In turn, we can tell how far ahead the given marker ID from the next
111 /// expected marker ID.
112 ///
113 /// The next expected marker ID, and the amount (gap) that the given marker ID and the next
114 /// expected marker ID differ, are provided.
115 Gap(N, N),
116
117 /// The given marker ID may or may not be aligned.
118 ///
119 /// This occurs when the last pending marker has an unknown size, as we cannot determine
120 /// whether the given marker ID is the next true marker ID without knowing where the last
121 /// pending marker should end.
122 ///
123 /// The last pending marker ID is provided.
124 NotEnoughInformation(N),
125
126 /// The given marker ID is behind the next expected marker ID.
127 ///
128 /// As `OrderedAcknowledgements` expects monotonic marker IDs, this represents a violation of
129 /// the acknowledgement state, and must be handled by the caller. Generally speaking, this is
130 /// an unrecoverable error.
131 MonotonicityViolation,
132}
133
134/// `OrderedAcknowledgements` allows determining when a record is eligible for deletion.
135///
136/// ### Purpose
137///
138/// In disk buffers, a record may potentially represent multiple events. As these events
139/// may be processed at different times by a sink, and in a potentially different order than when
140/// stored in the record, a record cannot be considered fully processed until all of the events have
141/// been accounted for. As well, only once a record has been fully processed can it be considered
142/// for deletion to free up space in the buffer.
143///
144/// To complicate matters, a record may sometimes not be decodable -- on-disk corruption, invalid
145/// encoding scheme that is no longer supported, etc -- but still needs to be accounted for to know
146/// when it can be deleted, and so that the correct metrics can be generated to determine how many
147/// events were lost by the record not being able to be processed normally.
148///
149/// ### Functionality
150///
151/// `OrderedAcknowledgements` provides the ability to add "markers", which are a virtual token mapped
152/// to a record. Markers track the ID of a record, how long the record is (if known), and optional
153/// data that is specific to the record. It also provides the ability to add acknowledgements which
154/// can then be consumed to allow yielding markers which have collected enough acknowledgements and
155/// are thus "eligible".
156///
157/// ### Detecting record gaps and the length of undecodable records
158///
159/// Additionally, and as hinted at above, markers can be added without a known length: this may
160/// happen when a record is read but it cannot be decoded, and thus determining the true length is
161/// not possible.
162///
163/// When markers that have an unknown length are added, `OrderedAcknowledgements` will do one of two things:
164/// - figure out if the marker is ahead of the next expected marker ID, and add a synthetic "gap"
165/// marker to compensate
166/// - update the unknown length with an assumed length, based on the difference between its ID and
167/// the ID of the next marker that gets added
168///
169/// In this way, `OrderedAcknowledgements` provides a contiguous range of marker IDs, which allows
170/// detecting not only the presumed length of a record that couldn't be decoded, but also if any
171/// records were deleted from disk or unable to be read at all. Based on the invariant of expecting
172/// IDs to be monotonic and contiguous, we know that if we expect our next marker ID to be 5, but
173/// instead get one with an ID of 8, that there's 3 missing events in the middle that have not been
174/// accounted for.
175///
176/// Similarly, even when we don't know what the next expected marker ID should be, we can determine
177/// the number of events that were lost when the next marker is added, as marker IDs represent the
178/// start of a record, and so simple arithmetic can determine the number of events that have
179/// theoretically been lost.
180pub struct OrderedAcknowledgements<N, D> {
181 unclaimed_acks: N,
182 acked_marker_id: N,
183 pending_markers: VecDeque<PendingMarker<N, D>>,
184}
185
186impl<N, D> OrderedAcknowledgements<N, D>
187where
188 N: fmt::Display
189 + Bounded
190 + CheckedAdd
191 + CheckedSub
192 + Copy
193 + PartialEq
194 + PartialOrd
195 + Unsigned
196 + WrappingAdd
197 + WrappingSub,
198{
199 pub fn from_acked(acked_marker_id: N) -> Self {
200 Self {
201 unclaimed_acks: N::min_value(),
202 acked_marker_id,
203 pending_markers: VecDeque::new(),
204 }
205 }
206
207 /// Adds the given number of acknowledgements.
208 ///
209 /// Acknowledgements should be given by the caller to update the acknowledgement state before
210 /// trying to get any eligible markers.
211 ///
212 /// # Panics
213 ///
214 /// Will panic if adding ack amount overflows.
215 pub fn add_acknowledgements(&mut self, amount: N) {
216 self.unclaimed_acks = self
217 .unclaimed_acks
218 .checked_add(&amount)
219 .expect("overflowing unclaimed acknowledgements is a serious bug");
220
221 trace!(
222 unclaimed_acks = %self.unclaimed_acks,
223 added_acks = %amount,
224 "Added acknowledgements."
225 );
226 }
227
228 /// Gets the marker ID offset for the given ID.
229 ///
230 /// If the given ID matches our next expected marker ID, then `MarkerOffset::Aligned` is
231 /// returned.
232 ///
233 /// Otherwise, we return one of the following variants:
234 /// - if we have no pending markers, `MarkerOffset::Gap` is returned, and contains the delta
235 /// between the given ID and the next expected marker ID
236 /// - if we have pending markers, and the given ID is logically behind the next expected marker
237 /// ID, `MarkerOffset::MonotonicityViolation` is returned, indicating that the monotonicity
238 /// invariant has been violated
239 /// - if we have pending markers, and the given ID is logically ahead of the next expected
240 /// marker, `MarkerOffset::Gap` is returned, specifying how far ahead of the next expected
241 /// marker ID it is
242 /// - if we have pending markers, and the last pending marker has an unknown length,
243 /// `MarkerOffset::NotEnoughInformation` is returned, as we require a fixed-size marker to
244 /// correctly calculate the next expected marker ID
245 fn get_marker_id_offset(&self, id: N) -> MarkerOffset<N> {
246 if self.pending_markers.is_empty() {
247 // We have no pending markers, but our acknowledged ID offset should match the marker ID
248 // being given here, otherwise it would imply that the markers were not contiguous.
249 //
250 // We return the difference between the ID and our acknowledged ID offset with the
251 // assumption that the new ID is monotonic. Since IDs wraparound, we don't bother
252 // looking at if it's higher or lower because we can't reasonably tell if this record ID
253 // is actually correct but other markers in between went missing, etc.
254 //
255 // Basically, it's up to the caller to figure this out. We're just trying to give them
256 // as much information as we can.
257 if self.acked_marker_id != id {
258 return MarkerOffset::Gap(
259 self.acked_marker_id,
260 id.wrapping_sub(&self.acked_marker_id),
261 );
262 }
263 } else {
264 let back = self
265 .pending_markers
266 .back()
267 .expect("pending markers should have items");
268
269 // When we know the length of the previously added pending marker, we can figure out
270 // where this marker's ID should land, as we don't not allow for noncontiguous marker ID
271 // ranges.
272 if let PendingMarkerLength::Known(len) = back.len {
273 // If we know the length of the back item, then we know exactly what the ID for the
274 // next marker to follow it should be. If this incoming marker doesn't match,
275 // something is wrong.
276 let expected_next = back.id.wrapping_add(&len);
277 if id != expected_next {
278 if expected_next < back.id && id < expected_next {
279 return MarkerOffset::MonotonicityViolation;
280 }
281
282 return MarkerOffset::Gap(expected_next, id.wrapping_sub(&expected_next));
283 }
284 } else {
285 // Without a fixed-size marker, we cannot be sure whether this marker ID is aligned
286 // or not.
287 return MarkerOffset::NotEnoughInformation(back.id);
288 }
289 }
290
291 MarkerOffset::Aligned
292 }
293
294 /// Adds a marker.
295 ///
296 /// The marker is tracked internally, and once the acknowledgement state has been advanced
297 /// enough such that it is at or ahead of the marker, the marker will become eligible.
298 ///
299 /// ## Gap detection and unknown length markers
300 ///
301 /// When a gap is detected between the given marker ID and the next expected marker ID, we
302 /// insert a synthetic marker to represent that gap. For example, if we had a marker with an ID
303 /// of 0 and a length of 5, we would expect the next marker to have an ID of 5. If instead, a
304 /// marker with an ID of 7 was given, that would represent a gap of 2. We insert a synthetic
305 /// marker with an ID of 5 and a length of 2 before adding the marker with the ID of 7. This
306 /// keeps the marker range contiguous and allows getting an eligible marker for the gap so the
307 /// caller can detect that a gap occurred.
308 ///
309 /// Likewise, when a caller inserts an unknown length marker, we cannot know its length until
310 /// the next marker is added. When that happens, we assume the given marker ID is monotonic,
311 /// and thus that the length of the previous marker, which has an unknown length, must have a
312 /// length equal to the difference between the given marker ID and the unknown length marker
313 /// ID. We update the unknown length marker to reflect this.
314 ///
315 /// In both cases, the markers will have a length that indicates that the amount represents a
316 /// gap, and not a marker that was directly added by the caller themselves.
317 ///
318 /// ## Errors
319 ///
320 /// When other pending markers are present, and the given ID is logically behind the next
321 /// expected marker ID, `Err(MarkerError::MonotonicityViolation)` is returned.
322 ///
323 /// # Panics
324 ///
325 /// Panics if pending markers is empty when last pending marker is an unknown size.
326 pub fn add_marker(
327 &mut self,
328 id: N,
329 marker_len: Option<N>,
330 data: Option<D>,
331 ) -> Result<(), MarkerError> {
332 // First, figure out where this given marker ID stands compared to our next expected marker
333 // ID, and the pending marker state in general.
334 match self.get_marker_id_offset(id) {
335 // The last pending marker is fixed-size, and the given marker ID is past where that
336 // marker ends, so we need to inject a synthetic gap marker to compensate for that.
337 MarkerOffset::Gap(expected_id, amount) => {
338 self.pending_markers.push_back(PendingMarker {
339 id: expected_id,
340 len: PendingMarkerLength::Assumed(amount),
341 data: None,
342 });
343 }
344 // The last pending marker is an unknown size, so we're using this given marker ID to
345 // calculate the length of that last pending marker, and in turn, we're going to adjust
346 // its length before adding the new pending marker.
347 MarkerOffset::NotEnoughInformation(last_marker_id) => {
348 let len = id.wrapping_sub(&last_marker_id);
349 let last_marker = self
350 .pending_markers
351 .back_mut()
352 .unwrap_or_else(|| unreachable!("pending markers should not be empty"));
353
354 last_marker.len = PendingMarkerLength::Assumed(len);
355 }
356 // We detected a monotonicity violation, which we can't do anything about, so just
357 // immediately inform the caller.
358 MarkerOffset::MonotonicityViolation => return Err(MarkerError::MonotonicityViolation),
359 // We have enough information to determine the given marker ID is the next expected
360 // marker ID, so we can proceed normally.
361 MarkerOffset::Aligned => {}
362 }
363
364 // Now insert our new pending marker.
365 self.pending_markers.push_back(PendingMarker {
366 id,
367 len: marker_len.map_or(PendingMarkerLength::Unknown, PendingMarkerLength::Known),
368 data,
369 });
370
371 Ok(())
372 }
373
374 /// Gets the next marker which has been fully acknowledged.
375 ///
376 /// A pending marker becomes eligible when the acknowledged marker ID is at or past the pending
377 /// marker ID plus the marker length.
378 ///
379 /// For pending markers with an unknown length, another pending marker must be present after it
380 /// in order to calculate the ID offsets and determine the marker length.
381 #[cfg_attr(test, instrument(skip(self), level = "trace"))]
382 pub fn get_next_eligible_marker(&mut self) -> Option<EligibleMarker<N, D>> {
383 let effective_acked_marker_id = self.acked_marker_id.wrapping_add(&self.unclaimed_acks);
384
385 trace!(
386 acked_marker_id = %self.acked_marker_id,
387 %effective_acked_marker_id,
388 unclaimed_acks = %self.unclaimed_acks,
389 pending_markers = self.pending_markers.len(),
390 "Searching for eligible marker."
391 );
392
393 let maybe_eligible_marker =
394 self.pending_markers
395 .front()
396 .and_then(|marker| match marker.len {
397 // If the acked marker ID is ahead of this marker ID, plus its length, it's been fully
398 // acknowledged and we can consume and yield the marker. We have to double
399 // verify this by checking that there's enough unclaimed acks to support this
400 // length because otherwise we might fall victim to markers that simply generate
401 // a required acked marker ID that is equal to the actual acked marker ID when
402 // an amount of unclaimed acks exists that is not enough for this marker but is
403 // enough to align the effective/required IDs.
404 PendingMarkerLength::Known(len) => {
405 let required_acked_marker_id = marker.id.wrapping_add(&len);
406 if required_acked_marker_id <= effective_acked_marker_id
407 && self.unclaimed_acks >= len
408 {
409 Some((EligibleMarkerLength::Known(len), len))
410 } else {
411 None
412 }
413 }
414 // The marker has an assumed length, which means a marker was added after it,
415 // which implies that it is de facto eligible as unknown length markers do not
416 // consume acknowledgements and so are immediately eligible once an assumed
417 // length can be determined.
418 PendingMarkerLength::Assumed(len) => {
419 Some((EligibleMarkerLength::Assumed(len), N::min_value()))
420 }
421 // We don't yet know what the length is for this marker, so we're stuck waiting
422 // for another marker to be added before that can be determined.
423 PendingMarkerLength::Unknown => None,
424 });
425
426 // If we actually got an eligible marker, we need to actually remove it from the pending
427 // marker queue and potentially adjust the amount of unclaimed acks we have.
428 match maybe_eligible_marker {
429 Some((len, acks_to_claim)) => {
430 // If we actually got an eligible marker, we need to actually remove it from the pending
431 // marker queue, potentially adjust the amount of unclaimed acks we have, and adjust
432 // our acked marker ID.
433 let PendingMarker { id, data, .. } = self
434 .pending_markers
435 .pop_front()
436 .unwrap_or_else(|| unreachable!("pending markers cannot be empty"));
437
438 if acks_to_claim > N::min_value() {
439 self.unclaimed_acks = self
440 .unclaimed_acks
441 .checked_sub(&acks_to_claim)
442 .unwrap_or_else(|| {
443 unreachable!("should not be able to claim more acks than are unclaimed")
444 });
445 }
446
447 self.acked_marker_id = id.wrapping_add(&len.len());
448
449 Some(EligibleMarker { id, len, data })
450 }
451 None => None,
452 }
453 }
454}
455
456impl<N, D> fmt::Debug for OrderedAcknowledgements<N, D>
457where
458 N: fmt::Debug,
459 D: fmt::Debug,
460{
461 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462 f.debug_struct("OrderedAcknowledgements")
463 .field("unclaimed_acks", &self.unclaimed_acks)
464 .field("acked_marker_id", &self.acked_marker_id)
465 .field("pending_markers", &self.pending_markers)
466 .finish()
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use std::collections::{HashSet, VecDeque};
473
474 use proptest::{
475 arbitrary::any,
476 collection::vec as arb_vec,
477 prop_assert_eq, prop_oneof, proptest,
478 strategy::{Just, Strategy},
479 };
480
481 use super::{EligibleMarker, EligibleMarkerLength};
482 use crate::topology::acks::{MarkerError, OrderedAcknowledgements, PendingMarkerLength};
483
484 #[derive(Debug, Clone, Copy)]
485 enum Action {
486 Acknowledge(u64),
487 AddMarker((u64, Option<u64>)),
488 GetNextEligibleMarker,
489 }
490
491 #[derive(Debug, PartialEq)]
492 enum ActionResult {
493 // Number of unclaimed acknowledgements.
494 Acknowledge(u64),
495 AddMarker(Result<(), MarkerError>),
496 GetNextEligibleMarker(Option<EligibleMarker<u64, ()>>),
497 }
498
499 fn arb_ordered_acks_action() -> impl Strategy<Value = Action> {
500 prop_oneof![
501 10 => any::<u32>().prop_map(|n| Action::Acknowledge(u64::from(n))),
502 10 => any::<(u64, Option<u64>)>().prop_map(|(a, b)| Action::AddMarker((a, b))),
503 10 => Just(Action::GetNextEligibleMarker),
504 ]
505 }
506
507 fn apply_action_sut(
508 sut: &mut OrderedAcknowledgements<u64, ()>,
509 action: Action,
510 ) -> ActionResult {
511 match action {
512 Action::Acknowledge(amount) => {
513 sut.add_acknowledgements(amount);
514 ActionResult::Acknowledge(sut.unclaimed_acks)
515 }
516 Action::AddMarker((id, maybe_len)) => {
517 let result = sut.add_marker(id, maybe_len, None);
518 ActionResult::AddMarker(result)
519 }
520 Action::GetNextEligibleMarker => {
521 let result = sut.get_next_eligible_marker();
522 ActionResult::GetNextEligibleMarker(result)
523 }
524 }
525 }
526
527 macro_rules! step {
528 ($action_name:ident, result => $result_input:expr) => {
529 (
530 Action::$action_name,
531 ActionResult::$action_name($result_input),
532 )
533 };
534 ($action_name:ident, input => $action_input:expr, result => $result_input:expr) => {
535 (
536 Action::$action_name($action_input),
537 ActionResult::$action_name($result_input),
538 )
539 };
540 }
541
542 #[test]
543 fn basic_cases() {
544 // Smoke test.
545 run_test_case("empty", vec![step!(GetNextEligibleMarker, result => None)]);
546
547 // Simple through-and-through:
548 run_test_case(
549 "through_and_through",
550 vec![
551 step!(AddMarker, input => (0, Some(5)), result => Ok(())),
552 step!(Acknowledge, input => 5, result => 5),
553 step!(GetNextEligibleMarker, result => Some(
554 EligibleMarker {
555 id: 0, len: EligibleMarkerLength::Known(5), data: None,
556 }
557 )),
558 ],
559 );
560 }
561
562 #[test]
563 fn invariant_cases() {
564 // Checking for an eligible record between incremental acknowledgement:
565 run_test_case(
566 "eligible_multi_ack",
567 vec![
568 step!(AddMarker, input => (0, Some(13)), result => Ok(())),
569 step!(Acknowledge, input => 5, result => 5),
570 step!(GetNextEligibleMarker, result => None),
571 step!(Acknowledge, input => 5, result => 10),
572 step!(GetNextEligibleMarker, result => None),
573 step!(Acknowledge, input => 5, result => 15),
574 step!(GetNextEligibleMarker, result => Some(
575 EligibleMarker {
576 id: 0, len: EligibleMarkerLength::Known(13), data: None,
577 }
578 )),
579 ],
580 );
581
582 // Unknown length markers can't be returned until a marker exists after them, even if we
583 // could maximally acknowledge them:
584 run_test_case(
585 "unknown_len_no_subsequent_marker",
586 vec![
587 step!(AddMarker, input => (0, None), result => Ok(())),
588 step!(Acknowledge, input => 5, result => 5),
589 step!(GetNextEligibleMarker, result => None),
590 ],
591 );
592
593 // We can always get back an unknown marker, with its length, regardless of
594 // acknowledgements, so long as there's a marker exists after them: fixed.
595 run_test_case(
596 "unknown_len_subsequent_marker_fixed",
597 vec![
598 step!(AddMarker, input => (0, None), result => Ok(())),
599 step!(AddMarker, input => (5, Some(1)), result => Ok(())),
600 step!(GetNextEligibleMarker, result => Some(
601 EligibleMarker {
602 id: 0, len: EligibleMarkerLength::Assumed(5), data: None,
603 }
604 )),
605 step!(GetNextEligibleMarker, result => None),
606 ],
607 );
608
609 // We can always get back an unknown marker, with its length, regardless of
610 // acknowledgements, so long as there's a marker exists after them: unknown.
611 run_test_case(
612 "unknown_len_subsequent_marker_unknown",
613 vec![
614 step!(AddMarker, input => (0, None), result => Ok(())),
615 step!(AddMarker, input => (5, None), result => Ok(())),
616 step!(GetNextEligibleMarker, result => Some(
617 EligibleMarker {
618 id: 0, len: EligibleMarkerLength::Assumed(5), data: None,
619 }
620 )),
621 step!(GetNextEligibleMarker, result => None),
622 ],
623 );
624
625 // Can add a marker without a known length and it will generate a synthetic gap marker
626 // that is immediately eligible:
627 run_test_case(
628 "unknown_len_no_pending_synthetic_gap",
629 vec![
630 step!(AddMarker, input => (1, None), result => Ok(())),
631 step!(GetNextEligibleMarker, result => Some(
632 EligibleMarker {
633 id: 0, len: EligibleMarkerLength::Assumed(1), data: None,
634 }
635 )),
636 step!(GetNextEligibleMarker, result => None),
637 ],
638 );
639
640 // When another marker exists, and is fixed size, we correctly detect when trying to add
641 // another marker whose ID comes before the last pending marker we have:
642 run_test_case(
643 "detect_monotonicity_violation",
644 vec![
645 step!(AddMarker, input => (u64::MAX, Some(3)), result => Ok(())),
646 step!(AddMarker, input => (1, Some(2)), result => Err(MarkerError::MonotonicityViolation)),
647 ],
648 );
649
650 // When another marker exists, and is fixed size, we correctly detect when trying to add
651 // another marker whose ID comes after the last pending marker we have, including the
652 // length of the last pending marker, by updating the marker's unknown length to an
653 // assumed length, which is immediately eligible:
654 run_test_case(
655 "unknown_len_updated_fixed_marker",
656 vec![
657 step!(AddMarker, input => (0, Some(4)), result => Ok(())),
658 step!(AddMarker, input => (9, Some(3)), result => Ok(())),
659 step!(Acknowledge, input => 4, result => 4),
660 step!(GetNextEligibleMarker, result => Some(
661 EligibleMarker {
662 id: 0, len: EligibleMarkerLength::Known(4), data: None,
663 }
664 )),
665 step!(GetNextEligibleMarker, result => Some(
666 EligibleMarker {
667 id: 4, len: EligibleMarkerLength::Assumed(5), data: None,
668 }
669 )),
670 step!(GetNextEligibleMarker, result => None),
671 ],
672 );
673 }
674
675 #[test]
676 fn advanced_cases() {
677 // A marker with a length of 0 should be immediately available:
678 run_test_case(
679 "zero_length_eligible",
680 vec![
681 step!(AddMarker, input => (0, Some(0)), result => Ok(())),
682 step!(GetNextEligibleMarker, result => Some(
683 EligibleMarker {
684 id: 0, len: EligibleMarkerLength::Known(0), data: None,
685 }
686 )),
687 ],
688 );
689
690 // When we have a fixed-size marker whose required acked marker ID lands right on the
691 // current acked marker ID, it should not be eligible unless there are enough unclaimed
692 // acks to actually account for it:
693 run_test_case(
694 "fixed_size_u64_boundary_overlap",
695 vec![
696 step!(AddMarker, input => (2_686_784_444_737_799_532, Some(15_759_959_628_971_752_084)), result => Ok(())),
697 step!(AddMarker, input => (0, None), result => Ok(())),
698 step!(AddMarker, input => (8_450_737_568, None), result => Ok(())),
699 step!(GetNextEligibleMarker, result => Some(
700 EligibleMarker {
701 id: 0, len: EligibleMarkerLength::Assumed(2_686_784_444_737_799_532), data: None,
702 }
703 )),
704 step!(GetNextEligibleMarker, result => None),
705 step!(Acknowledge, input => 15_759_959_628_971_752_084, result => 15_759_959_628_971_752_084),
706 step!(GetNextEligibleMarker, result => Some(
707 EligibleMarker {
708 id: 2_686_784_444_737_799_532, len: EligibleMarkerLength::Known(15_759_959_628_971_752_084), data: None,
709 }
710 )),
711 step!(GetNextEligibleMarker, result => Some(
712 EligibleMarker {
713 id: 0, len: EligibleMarkerLength::Assumed(8_450_737_568), data: None,
714 }
715 )),
716 step!(GetNextEligibleMarker, result => None),
717 ],
718 );
719 }
720
721 fn run_test_case(name: &str, case: Vec<(Action, ActionResult)>) {
722 let mut sut = OrderedAcknowledgements::from_acked(0u64);
723 for (action, expected_result) in case {
724 let actual_result = apply_action_sut(&mut sut, action);
725 assert_eq!(
726 expected_result, actual_result,
727 "{name}: ran action {action:?} expecting result {expected_result:?}, but got result {actual_result:?} instead"
728 );
729 }
730 }
731
732 #[test]
733 #[should_panic(expected = "overflowing unclaimed acknowledgements is a serious bug")]
734 fn panic_when_unclaimed_acks_overflows() {
735 let actions = vec![Action::Acknowledge(u64::MAX), Action::Acknowledge(1)];
736
737 let mut sut = OrderedAcknowledgements::<u64, ()>::from_acked(0);
738 for action in actions {
739 apply_action_sut(&mut sut, action);
740 }
741 }
742
743 proptest! {
744 #[test]
745 fn property_test(
746 mut acked_marker_id in any::<u64>(),
747 actions in arb_vec(arb_ordered_acks_action(), 0..1000)
748 ) {
749 let mut sut = OrderedAcknowledgements::from_acked(acked_marker_id);
750
751 let mut unclaimed_acks = 0;
752 let mut marker_state = HashSet::new();
753 let mut marker_stack: VecDeque<(u64, PendingMarkerLength<u64>)> = VecDeque::new();
754
755 for action in actions {
756 match action {
757 Action::Acknowledge(amount) => {
758 unclaimed_acks += amount;
759
760 prop_assert_eq!(
761 ActionResult::Acknowledge(unclaimed_acks),
762 apply_action_sut(&mut sut, action)
763 );
764 },
765 Action::AddMarker((id, maybe_len)) => {
766 // We do gap detection/unknown length fix-up first.
767 let expected_result = if marker_stack.is_empty() {
768 // Our only comparison is the acked marker ID, which, if it doesn't
769 // match, we generate a gap marker for.
770 if id != acked_marker_id {
771 assert!(marker_state.insert(acked_marker_id), "should not be able to add marker that is already in-flight");
772
773 let len = PendingMarkerLength::Assumed(id.wrapping_sub(acked_marker_id));
774 marker_stack.push_back((acked_marker_id, len));
775 }
776
777 Ok(())
778 } else {
779 let (back_id, back_len) = marker_stack.back().copied().expect("must exist");
780 match back_len {
781 PendingMarkerLength::Known(len) => {
782 // We know exactly where the next marker ID should be.
783 let expected_id = back_id.wrapping_add(len);
784 if id == expected_id {
785 Ok(())
786 } else if expected_id < back_id && id < expected_id {
787 // Assuming monotonic IDs from the caller, this is a
788 // monotonicity violation.
789 Err(MarkerError::MonotonicityViolation)
790 } else {
791 assert!(marker_state.insert(expected_id), "should not be able to add marker that is already in-flight");
792
793 // Gap between ID and expected ID, add a gap marker.
794 let len = PendingMarkerLength::Assumed(id.wrapping_sub(expected_id));
795 marker_stack.push_back((expected_id, len));
796
797 Ok(())
798 }
799 },
800 PendingMarkerLength::Assumed(_) => panic!("should never have an assumed length at back"),
801 PendingMarkerLength::Unknown => {
802 // Now that we have an ID range, we have enough information to
803 // give the unknown length marker an assumed length.
804 let len = id.wrapping_sub(back_id);
805 let (_, back_len_mut) = marker_stack.back_mut().expect("must exist");
806 *back_len_mut = PendingMarkerLength::Assumed(len);
807
808 Ok(())
809 },
810 }
811 };
812
813 let sut_result = apply_action_sut(&mut sut, action);
814 match sut_result {
815 ActionResult::AddMarker(result) => {
816 prop_assert_eq!(expected_result, result.clone());
817
818 // If adding the pending marker actually succeeded, now we need to track it.
819 if result.is_ok() {
820 let len = maybe_len
821 .map_or(PendingMarkerLength::Unknown, PendingMarkerLength::Known);
822
823 assert!(marker_state.insert(id), "should not be able to add marker that is already in-flight");
824 marker_stack.push_back((id, len));
825 }
826 },
827 a => panic!("got unexpected action after adding pending marker: {a:?}"),
828 }
829 },
830 Action::GetNextEligibleMarker => match sut.get_next_eligible_marker() {
831 Some(marker) => {
832 let (expected_marker_id, original_len) = marker_stack.pop_front()
833 .expect("marker stack should not be empty");
834
835 assert_eq!(expected_marker_id, marker.id, "SUT eligible marker doesn't match expected ID");
836
837 let unclaimed_acks_to_consume = match (original_len, marker.len) {
838 (PendingMarkerLength::Known(a), EligibleMarkerLength::Known(b)) => {
839 prop_assert_eq!(a, b, "SUT marker len and model marker len should match");
840 b
841 },
842 (PendingMarkerLength::Assumed(a), EligibleMarkerLength::Assumed(b)) => {
843 prop_assert_eq!(a, b, "SUT marker len and model marker len should match");
844 0
845 }
846 (PendingMarkerLength::Unknown, _) =>
847 panic!("SUT had eligible marker but marker stack had unknown length marker"),
848 (a, b) => panic!("unknown SUT/model marker len combination: {a:?}, {b:?}"),
849 };
850
851 assert!(unclaimed_acks_to_consume <= unclaimed_acks,
852 "can't have returned an eligible fixed-size item with a length greater than unclaimed acks");
853
854 unclaimed_acks -= unclaimed_acks_to_consume;
855
856 assert!(marker_state.remove(&marker.id), "should be state entry for marker if it was eligible");
857
858 // Update our acked marker ID.
859 acked_marker_id = marker.id.wrapping_add(marker.len.len());
860 },
861 None => {
862 // This may be valid based on the given state of pending vs unclaimed
863 // acks, as well as whether we have enough information to figure out any
864 // unknown length markers.
865 if let Some((marker_id, marker_len)) = marker_stack.front() {
866 match marker_len {
867 PendingMarkerLength::Assumed(_) =>
868 panic!("SUT returned None, but model had assumed length marker"),
869 PendingMarkerLength::Known(len) => {
870 let required_acked_offset = marker_id.wrapping_add(*len);
871 let effective_offset = acked_marker_id.wrapping_add(unclaimed_acks);
872 let is_eligible = required_acked_offset <= effective_offset && required_acked_offset >= *marker_id;
873 assert!(!is_eligible,
874 "SUT returned None but next fixed-size marker on stack is eligible: id: {marker_id}, len: {len}, acked_id_offset: {acked_marker_id}");
875 },
876 PendingMarkerLength::Unknown => {
877 // If we have an unknown marker, the only we shouldn't be
878 // able to return it is if there's no other markers to
879 // calculate the gap from.
880 //
881 // This is most likely a bug in our model if it happens.
882 assert!(marker_stack.len() == 1,
883 "SUT returned None but unknown length marker is calculable");
884 },
885 }
886 }
887 }
888 },
889 }
890 }
891 }
892 }
893}