vector_buffers/topology/acks.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893
use std::{collections::VecDeque, fmt};
use num_traits::{Bounded, CheckedAdd, CheckedSub, Unsigned, WrappingAdd, WrappingSub};
#[derive(Clone, Copy, Debug, PartialEq)]
enum PendingMarkerLength<N> {
Known(N),
Assumed(N),
Unknown,
}
struct PendingMarker<N, D> {
id: N,
len: PendingMarkerLength<N>,
data: Option<D>,
}
impl<N, D> fmt::Debug for PendingMarker<N, D>
where
N: fmt::Debug,
D: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PendingMarker")
.field("id", &self.id)
.field("len", &self.len)
.field("data", &self.data)
.finish()
}
}
/// The length of an eligible marker.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum EligibleMarkerLength<N> {
/// The marker's length was declared upfront when added.
Known(N),
/// The marker's length was calculated based on imperfect information, and so while it should
/// accurately represent a correct range that covers any gaps in the marker range, it may or may
/// not represent one true marker, or possibly multiple markers.
Assumed(N),
}
impl<N: Copy> EligibleMarkerLength<N> {
fn len(&self) -> N {
match self {
EligibleMarkerLength::Known(len) | EligibleMarkerLength::Assumed(len) => *len,
}
}
}
/// A marker that has been fully acknowledged.
pub struct EligibleMarker<N, D> {
pub id: N,
pub len: EligibleMarkerLength<N>,
pub data: Option<D>,
}
impl<N, D> PartialEq for EligibleMarker<N, D>
where
N: PartialEq,
{
fn eq(&self, other: &Self) -> bool {
self.id == other.id && self.len == other.len
}
}
impl<N, D> fmt::Debug for EligibleMarker<N, D>
where
N: fmt::Debug,
D: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EligibleMarker")
.field("id", &self.id)
.field("len", &self.len)
.field("data", &self.data)
.finish()
}
}
/// Error returned by `OrderedAcknowledgements::add_marker`.
///
/// In general, this error represents a breaking of ID monotonicity, or more likely, the loss of
/// records where entire records may have been skipped as an attempted add provides an ID that not
/// the next expected ID.
///
/// While the exact condition must be determined by the caller, we attempt to provide as much
/// information as we reasonably based on the data we have, whether it's simply that the ID didn't
/// match the next expected ID, or that we know it is definitively ahead or behind the next expected
/// ID.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum MarkerError {
/// The given marker ID is behind the next expected marker ID.
///
/// As `OrderedAcknowledgements` expects monotonic marker IDs, this represents a violation of
/// the acknowledgement state, and must be handled by the caller. Generally speaking, this is
/// an unrecoverable error.
MonotonicityViolation,
}
/// Result of comparing a potential pending marker ID with the expected next pending marker ID.
pub enum MarkerOffset<N> {
/// The given marker ID is aligned with the next expected marker ID.
Aligned,
/// The given marker ID is ahead of the next expected marker ID.
///
/// When the last pending marker has a fixed-size, we can calculate the exact marker ID that we
/// expect to see next. In turn, we can tell how far ahead the given marker ID from the next
/// expected marker ID.
///
/// The next expected marker ID, and the amount (gap) that the given marker ID and the next
/// expected marker ID differ, are provided.
Gap(N, N),
/// The given marker ID may or may not be aligned.
///
/// This occurs when the last pending marker has an unknown size, as we cannot determine
/// whether the given marker ID is the next true marker ID without knowing where the last
/// pending marker should end.
///
/// The last pending marker ID is provided.
NotEnoughInformation(N),
/// The given marker ID is behind the next expected marker ID.
///
/// As `OrderedAcknowledgements` expects monotonic marker IDs, this represents a violation of
/// the acknowledgement state, and must be handled by the caller. Generally speaking, this is
/// an unrecoverable error.
MonotonicityViolation,
}
/// `OrderedAcknowledgements` allows determining when a record is eligible for deletion.
///
/// ### Purpose
///
/// In disk buffers, a record may potentially represent multiple events. As these events
/// may be processed at different times by a sink, and in a potentially different order than when
/// stored in the record, a record cannot be considered fully processed until all of the events have
/// been accounted for. As well, only once a record has been fully processed can it be considered
/// for deletion to free up space in the buffer.
///
/// To complicate matters, a record may sometimes not be decodable -- on-disk corruption, invalid
/// encoding scheme that is no longer supported, etc -- but still needs to be accounted for to know
/// when it can be deleted, and so that the correct metrics can be generated to determine how many
/// events were lost by the record not being able to be processed normally.
///
/// ### Functionality
///
/// `OrderedAcknowledgements` provides the ability to add "markers", which are a virtual token mapped
/// to a record. Markers track the ID of a record, how long the record is (if known), and optional
/// data that is specific to the record. It also provides the ability to add acknowledgements which
/// can then be consumed to allow yielding markers which have collected enough acknowledgements and
/// are thus "eligible".
///
/// ### Detecting record gaps and the length of undecodable records
///
/// Additionally, and as hinted at above, markers can be added without a known length: this may
/// happen when a record is read but it cannot be decoded, and thus determining the true length is
/// not possible.
///
/// When markers that have an unknown length are added, `OrderedAcknowledgements` will do one of two things:
/// - figure out if the marker is ahead of the next expected marker ID, and add a synthetic "gap"
/// marker to compensate
/// - update the unknown length with an assumed length, based on the difference between its ID and
/// the ID of the next marker that gets added
///
/// In this way, `OrderedAcknowledgements` provides a contiguous range of marker IDs, which allows
/// detecting not only the presumed length of a record that couldn't be decoded, but also if any
/// records were deleted from disk or unable to be read at all. Based on the invariant of expecting
/// IDs to be monotonic and contiguous, we know that if we expect our next marker ID to be 5, but
/// instead get one with an ID of 8, that there's 3 missing events in the middle that have not been
/// accounted for.
///
/// Similarly, even when we don't know what the next expected marker ID should be, we can determine
/// the number of events that were lost when the next marker is added, as marker IDs represent the
/// start of a record, and so simple arithmetic can determine the number of events that have
/// theoretically been lost.
pub struct OrderedAcknowledgements<N, D> {
unclaimed_acks: N,
acked_marker_id: N,
pending_markers: VecDeque<PendingMarker<N, D>>,
}
impl<N, D> OrderedAcknowledgements<N, D>
where
N: fmt::Display
+ Bounded
+ CheckedAdd
+ CheckedSub
+ Copy
+ PartialEq
+ PartialOrd
+ Unsigned
+ WrappingAdd
+ WrappingSub,
{
pub fn from_acked(acked_marker_id: N) -> Self {
Self {
unclaimed_acks: N::min_value(),
acked_marker_id,
pending_markers: VecDeque::new(),
}
}
/// Adds the given number of acknowledgements.
///
/// Acknowledgements should be given by the caller to update the acknowledgement state before
/// trying to get any eligible markers.
///
/// # Panics
///
/// Will panic if adding ack amount overflows.
pub fn add_acknowledgements(&mut self, amount: N) {
self.unclaimed_acks = self
.unclaimed_acks
.checked_add(&amount)
.expect("overflowing unclaimed acknowledgements is a serious bug");
trace!(
unclaimed_acks = %self.unclaimed_acks,
added_acks = %amount,
"Added acknowledgements."
);
}
/// Gets the marker ID offset for the given ID.
///
/// If the given ID matches our next expected marker ID, then `MarkerOffset::Aligned` is
/// returned.
///
/// Otherwise, we return one of the following variants:
/// - if we have no pending markers, `MarkerOffset::Gap` is returned, and contains the delta
/// between the given ID and the next expected marker ID
/// - if we have pending markers, and the given ID is logically behind the next expected marker
/// ID, `MarkerOffset::MonotonicityViolation` is returned, indicating that the monotonicity
/// invariant has been violated
/// - if we have pending markers, and the given ID is logically ahead of the next expected
/// marker, `MarkerOffset::Gap` is returned, specifying how far ahead of the next expected
/// marker ID it is
/// - if we have pending markers, and the last pending marker has an unknown length,
/// `MarkerOffset::NotEnoughInformation` is returned, as we require a fixed-size marker to
/// correctly calculate the next expected marker ID
fn get_marker_id_offset(&self, id: N) -> MarkerOffset<N> {
if self.pending_markers.is_empty() {
// We have no pending markers, but our acknowledged ID offset should match the marker ID
// being given here, otherwise it would imply that the markers were not contiguous.
//
// We return the difference between the ID and our acknowledged ID offset with the
// assumption that the new ID is monotonic. Since IDs wraparound, we don't bother
// looking at if it's higher or lower because we can't reasonably tell if this record ID
// is actually correct but other markers in between went missing, etc.
//
// Basically, it's up to the caller to figure this out. We're just trying to give them
// as much information as we can.
if self.acked_marker_id != id {
return MarkerOffset::Gap(
self.acked_marker_id,
id.wrapping_sub(&self.acked_marker_id),
);
}
} else {
let back = self
.pending_markers
.back()
.expect("pending markers should have items");
// When we know the length of the previously added pending marker, we can figure out
// where this marker's ID should land, as we don't not allow for noncontiguous marker ID
// ranges.
if let PendingMarkerLength::Known(len) = back.len {
// If we know the length of the back item, then we know exactly what the ID for the
// next marker to follow it should be. If this incoming marker doesn't match,
// something is wrong.
let expected_next = back.id.wrapping_add(&len);
if id != expected_next {
if expected_next < back.id && id < expected_next {
return MarkerOffset::MonotonicityViolation;
}
return MarkerOffset::Gap(expected_next, id.wrapping_sub(&expected_next));
}
} else {
// Without a fixed-size marker, we cannot be sure whether this marker ID is aligned
// or not.
return MarkerOffset::NotEnoughInformation(back.id);
}
}
MarkerOffset::Aligned
}
/// Adds a marker.
///
/// The marker is tracked internally, and once the acknowledgement state has been advanced
/// enough such that it is at or ahead of the marker, the marker will become eligible.
///
/// ## Gap detection and unknown length markers
///
/// When a gap is detected between the given marker ID and the next expected marker ID, we
/// insert a synthetic marker to represent that gap. For example, if we had a marker with an ID
/// of 0 and a length of 5, we would expect the next marker to have an ID of 5. If instead, a
/// marker with an ID of 7 was given, that would represent a gap of 2. We insert a synthetic
/// marker with an ID of 5 and a length of 2 before adding the marker with the ID of 7. This
/// keeps the marker range contiguous and allows getting an eligible marker for the gap so the
/// caller can detect that a gap occurred.
///
/// Likewise, when a caller inserts an unknown length marker, we cannot know its length until
/// the next marker is added. When that happens, we assume the given marker ID is monotonic,
/// and thus that the length of the previous marker, which has an unknown length, must have a
/// length equal to the difference between the given marker ID and the unknown length marker
/// ID. We update the unknown length marker to reflect this.
///
/// In both cases, the markers will have a length that indicates that the amount represents a
/// gap, and not a marker that was directly added by the caller themselves.
///
/// ## Errors
///
/// When other pending markers are present, and the given ID is logically behind the next
/// expected marker ID, `Err(MarkerError::MonotonicityViolation)` is returned.
///
/// # Panics
///
/// Panics if pending markers is empty when last pending marker is an unknown size.
pub fn add_marker(
&mut self,
id: N,
marker_len: Option<N>,
data: Option<D>,
) -> Result<(), MarkerError> {
// First, figure out where this given marker ID stands compared to our next expected marker
// ID, and the pending marker state in general.
match self.get_marker_id_offset(id) {
// The last pending marker is fixed-size, and the given marker ID is past where that
// marker ends, so we need to inject a synthetic gap marker to compensate for that.
MarkerOffset::Gap(expected_id, amount) => {
self.pending_markers.push_back(PendingMarker {
id: expected_id,
len: PendingMarkerLength::Assumed(amount),
data: None,
});
}
// The last pending marker is an unknown size, so we're using this given marker ID to
// calculate the length of that last pending marker, and in turn, we're going to adjust
// its length before adding the new pending marker.
MarkerOffset::NotEnoughInformation(last_marker_id) => {
let len = id.wrapping_sub(&last_marker_id);
let last_marker = self
.pending_markers
.back_mut()
.unwrap_or_else(|| unreachable!("pending markers should not be empty"));
last_marker.len = PendingMarkerLength::Assumed(len);
}
// We detected a monotonicity violation, which we can't do anything about, so just
// immediately inform the caller.
MarkerOffset::MonotonicityViolation => return Err(MarkerError::MonotonicityViolation),
// We have enough information to determine the given marker ID is the next expected
// marker ID, so we can proceed normally.
MarkerOffset::Aligned => {}
}
// Now insert our new pending marker.
self.pending_markers.push_back(PendingMarker {
id,
len: marker_len.map_or(PendingMarkerLength::Unknown, PendingMarkerLength::Known),
data,
});
Ok(())
}
/// Gets the next marker which has been fully acknowledged.
///
/// A pending marker becomes eligible when the acknowledged marker ID is at or past the pending
/// marker ID plus the marker length.
///
/// For pending markers with an unknown length, another pending marker must be present after it
/// in order to calculate the ID offsets and determine the marker length.
#[cfg_attr(test, instrument(skip(self), level = "trace"))]
pub fn get_next_eligible_marker(&mut self) -> Option<EligibleMarker<N, D>> {
let effective_acked_marker_id = self.acked_marker_id.wrapping_add(&self.unclaimed_acks);
trace!(
acked_marker_id = %self.acked_marker_id,
%effective_acked_marker_id,
unclaimed_acks = %self.unclaimed_acks,
pending_markers = self.pending_markers.len(),
"Searching for eligible marker."
);
let maybe_eligible_marker =
self.pending_markers
.front()
.and_then(|marker| match marker.len {
// If the acked marker ID is ahead of this marker ID, plus its length, it's been fully
// acknowledged and we can consume and yield the marker. We have to double
// verify this by checking that there's enough unclaimed acks to support this
// length because otherwise we might fall victim to markers that simply generate
// a required acked marker ID that is equal to the actual acked marker ID when
// an amount of unclaimed acks exists that is not enough for this marker but is
// enough to align the effective/required IDs.
PendingMarkerLength::Known(len) => {
let required_acked_marker_id = marker.id.wrapping_add(&len);
if required_acked_marker_id <= effective_acked_marker_id
&& self.unclaimed_acks >= len
{
Some((EligibleMarkerLength::Known(len), len))
} else {
None
}
}
// The marker has an assumed length, which means a marker was added after it,
// which implies that it is de facto eligible as unknown length markers do not
// consume acknowledgements and so are immediately eligible once an assumed
// length can be determined.
PendingMarkerLength::Assumed(len) => {
Some((EligibleMarkerLength::Assumed(len), N::min_value()))
}
// We don't yet know what the length is for this marker, so we're stuck waiting
// for another marker to be added before that can be determined.
PendingMarkerLength::Unknown => None,
});
// If we actually got an eligible marker, we need to actually remove it from the pending
// marker queue and potentially adjust the amount of unclaimed acks we have.
match maybe_eligible_marker {
Some((len, acks_to_claim)) => {
// If we actually got an eligible marker, we need to actually remove it from the pending
// marker queue, potentially adjust the amount of unclaimed acks we have, and adjust
// our acked marker ID.
let PendingMarker { id, data, .. } = self
.pending_markers
.pop_front()
.unwrap_or_else(|| unreachable!("pending markers cannot be empty"));
if acks_to_claim > N::min_value() {
self.unclaimed_acks = self
.unclaimed_acks
.checked_sub(&acks_to_claim)
.unwrap_or_else(|| {
unreachable!("should not be able to claim more acks than are unclaimed")
});
}
self.acked_marker_id = id.wrapping_add(&len.len());
Some(EligibleMarker { id, len, data })
}
None => None,
}
}
}
impl<N, D> fmt::Debug for OrderedAcknowledgements<N, D>
where
N: fmt::Debug,
D: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("OrderedAcknowledgements")
.field("unclaimed_acks", &self.unclaimed_acks)
.field("acked_marker_id", &self.acked_marker_id)
.field("pending_markers", &self.pending_markers)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::collections::{HashSet, VecDeque};
use proptest::{
arbitrary::any,
collection::vec as arb_vec,
prop_assert_eq, prop_oneof, proptest,
strategy::{Just, Strategy},
};
use super::{EligibleMarker, EligibleMarkerLength};
use crate::topology::acks::{MarkerError, OrderedAcknowledgements, PendingMarkerLength};
#[derive(Debug, Clone, Copy)]
enum Action {
Acknowledge(u64),
AddMarker((u64, Option<u64>)),
GetNextEligibleMarker,
}
#[derive(Debug, PartialEq)]
enum ActionResult {
// Number of unclaimed acknowledgements.
Acknowledge(u64),
AddMarker(Result<(), MarkerError>),
GetNextEligibleMarker(Option<EligibleMarker<u64, ()>>),
}
fn arb_ordered_acks_action() -> impl Strategy<Value = Action> {
prop_oneof![
10 => any::<u32>().prop_map(|n| Action::Acknowledge(u64::from(n))),
10 => any::<(u64, Option<u64>)>().prop_map(|(a, b)| Action::AddMarker((a, b))),
10 => Just(Action::GetNextEligibleMarker),
]
}
fn apply_action_sut(
sut: &mut OrderedAcknowledgements<u64, ()>,
action: Action,
) -> ActionResult {
match action {
Action::Acknowledge(amount) => {
sut.add_acknowledgements(amount);
ActionResult::Acknowledge(sut.unclaimed_acks)
}
Action::AddMarker((id, maybe_len)) => {
let result = sut.add_marker(id, maybe_len, None);
ActionResult::AddMarker(result)
}
Action::GetNextEligibleMarker => {
let result = sut.get_next_eligible_marker();
ActionResult::GetNextEligibleMarker(result)
}
}
}
macro_rules! step {
($action_name:ident, result => $result_input:expr) => {
(
Action::$action_name,
ActionResult::$action_name($result_input),
)
};
($action_name:ident, input => $action_input:expr, result => $result_input:expr) => {
(
Action::$action_name($action_input),
ActionResult::$action_name($result_input),
)
};
}
#[test]
fn basic_cases() {
// Smoke test.
run_test_case("empty", vec![step!(GetNextEligibleMarker, result => None)]);
// Simple through-and-through:
run_test_case(
"through_and_through",
vec![
step!(AddMarker, input => (0, Some(5)), result => Ok(())),
step!(Acknowledge, input => 5, result => 5),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Known(5), data: None,
}
)),
],
);
}
#[test]
fn invariant_cases() {
// Checking for an eligible record between incremental acknowledgement:
run_test_case(
"eligible_multi_ack",
vec![
step!(AddMarker, input => (0, Some(13)), result => Ok(())),
step!(Acknowledge, input => 5, result => 5),
step!(GetNextEligibleMarker, result => None),
step!(Acknowledge, input => 5, result => 10),
step!(GetNextEligibleMarker, result => None),
step!(Acknowledge, input => 5, result => 15),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Known(13), data: None,
}
)),
],
);
// Unknown length markers can't be returned until a marker exists after them, even if we
// could maximally acknowledge them:
run_test_case(
"unknown_len_no_subsequent_marker",
vec![
step!(AddMarker, input => (0, None), result => Ok(())),
step!(Acknowledge, input => 5, result => 5),
step!(GetNextEligibleMarker, result => None),
],
);
// We can always get back an unknown marker, with its length, regardless of
// acknowledgements, so long as there's a marker exists after them: fixed.
run_test_case(
"unknown_len_subsequent_marker_fixed",
vec![
step!(AddMarker, input => (0, None), result => Ok(())),
step!(AddMarker, input => (5, Some(1)), result => Ok(())),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Assumed(5), data: None,
}
)),
step!(GetNextEligibleMarker, result => None),
],
);
// We can always get back an unknown marker, with its length, regardless of
// acknowledgements, so long as there's a marker exists after them: unknown.
run_test_case(
"unknown_len_subsequent_marker_unknown",
vec![
step!(AddMarker, input => (0, None), result => Ok(())),
step!(AddMarker, input => (5, None), result => Ok(())),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Assumed(5), data: None,
}
)),
step!(GetNextEligibleMarker, result => None),
],
);
// Can add a marker without a known length and it will generate a synthetic gap marker
// that is immediately eligible:
run_test_case(
"unknown_len_no_pending_synthetic_gap",
vec![
step!(AddMarker, input => (1, None), result => Ok(())),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Assumed(1), data: None,
}
)),
step!(GetNextEligibleMarker, result => None),
],
);
// When another marker exists, and is fixed size, we correctly detect when trying to add
// another marker whose ID comes before the last pending marker we have:
run_test_case(
"detect_monotonicity_violation",
vec![
step!(AddMarker, input => (u64::MAX, Some(3)), result => Ok(())),
step!(AddMarker, input => (1, Some(2)), result => Err(MarkerError::MonotonicityViolation)),
],
);
// When another marker exists, and is fixed size, we correctly detect when trying to add
// another marker whose ID comes after the last pending marker we have, including the
// length of the last pending marker, by updating the marker's unknown length to an
// assumed length, which is immediately eligible:
run_test_case(
"unknown_len_updated_fixed_marker",
vec![
step!(AddMarker, input => (0, Some(4)), result => Ok(())),
step!(AddMarker, input => (9, Some(3)), result => Ok(())),
step!(Acknowledge, input => 4, result => 4),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Known(4), data: None,
}
)),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 4, len: EligibleMarkerLength::Assumed(5), data: None,
}
)),
step!(GetNextEligibleMarker, result => None),
],
);
}
#[test]
fn advanced_cases() {
// A marker with a length of 0 should be immediately available:
run_test_case(
"zero_length_eligible",
vec![
step!(AddMarker, input => (0, Some(0)), result => Ok(())),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Known(0), data: None,
}
)),
],
);
// When we have a fixed-size marker whose required acked marker ID lands right on the
// current acked marker ID, it should not be eligible unless there are enough unclaimed
// acks to actually account for it:
run_test_case(
"fixed_size_u64_boundary_overlap",
vec![
step!(AddMarker, input => (2_686_784_444_737_799_532, Some(15_759_959_628_971_752_084)), result => Ok(())),
step!(AddMarker, input => (0, None), result => Ok(())),
step!(AddMarker, input => (8_450_737_568, None), result => Ok(())),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Assumed(2_686_784_444_737_799_532), data: None,
}
)),
step!(GetNextEligibleMarker, result => None),
step!(Acknowledge, input => 15_759_959_628_971_752_084, result => 15_759_959_628_971_752_084),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 2_686_784_444_737_799_532, len: EligibleMarkerLength::Known(15_759_959_628_971_752_084), data: None,
}
)),
step!(GetNextEligibleMarker, result => Some(
EligibleMarker {
id: 0, len: EligibleMarkerLength::Assumed(8_450_737_568), data: None,
}
)),
step!(GetNextEligibleMarker, result => None),
],
);
}
fn run_test_case(name: &str, case: Vec<(Action, ActionResult)>) {
let mut sut = OrderedAcknowledgements::from_acked(0u64);
for (action, expected_result) in case {
let actual_result = apply_action_sut(&mut sut, action);
assert_eq!(
expected_result, actual_result,
"{name}: ran action {action:?} expecting result {expected_result:?}, but got result {actual_result:?} instead"
);
}
}
#[test]
#[should_panic(expected = "overflowing unclaimed acknowledgements is a serious bug")]
fn panic_when_unclaimed_acks_overflows() {
let actions = vec![Action::Acknowledge(u64::MAX), Action::Acknowledge(1)];
let mut sut = OrderedAcknowledgements::<u64, ()>::from_acked(0);
for action in actions {
apply_action_sut(&mut sut, action);
}
}
proptest! {
#[test]
fn property_test(
mut acked_marker_id in any::<u64>(),
actions in arb_vec(arb_ordered_acks_action(), 0..1000)
) {
let mut sut = OrderedAcknowledgements::from_acked(acked_marker_id);
let mut unclaimed_acks = 0;
let mut marker_state = HashSet::new();
let mut marker_stack: VecDeque<(u64, PendingMarkerLength<u64>)> = VecDeque::new();
for action in actions {
match action {
Action::Acknowledge(amount) => {
unclaimed_acks += amount;
prop_assert_eq!(
ActionResult::Acknowledge(unclaimed_acks),
apply_action_sut(&mut sut, action)
);
},
Action::AddMarker((id, maybe_len)) => {
// We do gap detection/unknown length fix-up first.
let expected_result = if marker_stack.is_empty() {
// Our only comparison is the acked marker ID, which, if it doesn't
// match, we generate a gap marker for.
if id != acked_marker_id {
assert!(marker_state.insert(acked_marker_id), "should not be able to add marker that is already in-flight");
let len = PendingMarkerLength::Assumed(id.wrapping_sub(acked_marker_id));
marker_stack.push_back((acked_marker_id, len));
}
Ok(())
} else {
let (back_id, back_len) = marker_stack.back().copied().expect("must exist");
match back_len {
PendingMarkerLength::Known(len) => {
// We know exactly where the next marker ID should be.
let expected_id = back_id.wrapping_add(len);
if id == expected_id {
Ok(())
} else if expected_id < back_id && id < expected_id {
// Assuming monotonic IDs from the caller, this is a
// monotonicity violation.
Err(MarkerError::MonotonicityViolation)
} else {
assert!(marker_state.insert(expected_id), "should not be able to add marker that is already in-flight");
// Gap between ID and expected ID, add a gap marker.
let len = PendingMarkerLength::Assumed(id.wrapping_sub(expected_id));
marker_stack.push_back((expected_id, len));
Ok(())
}
},
PendingMarkerLength::Assumed(_) => panic!("should never have an assumed length at back"),
PendingMarkerLength::Unknown => {
// Now that we have an ID range, we have enough information to
// give the unknown length marker an assumed length.
let len = id.wrapping_sub(back_id);
let (_, back_len_mut) = marker_stack.back_mut().expect("must exist");
*back_len_mut = PendingMarkerLength::Assumed(len);
Ok(())
},
}
};
let sut_result = apply_action_sut(&mut sut, action);
match sut_result {
ActionResult::AddMarker(result) => {
prop_assert_eq!(expected_result, result.clone());
// If adding the pending marker actually succeeded, now we need to track it.
if result.is_ok() {
let len = maybe_len
.map_or(PendingMarkerLength::Unknown, PendingMarkerLength::Known);
assert!(marker_state.insert(id), "should not be able to add marker that is already in-flight");
marker_stack.push_back((id, len));
}
},
a => panic!("got unexpected action after adding pending marker: {a:?}"),
}
},
Action::GetNextEligibleMarker => match sut.get_next_eligible_marker() {
Some(marker) => {
let (expected_marker_id, original_len) = marker_stack.pop_front()
.expect("marker stack should not be empty");
assert_eq!(expected_marker_id, marker.id, "SUT eligible marker doesn't match expected ID");
let unclaimed_acks_to_consume = match (original_len, marker.len) {
(PendingMarkerLength::Known(a), EligibleMarkerLength::Known(b)) => {
prop_assert_eq!(a, b, "SUT marker len and model marker len should match");
b
},
(PendingMarkerLength::Assumed(a), EligibleMarkerLength::Assumed(b)) => {
prop_assert_eq!(a, b, "SUT marker len and model marker len should match");
0
}
(PendingMarkerLength::Unknown, _) =>
panic!("SUT had eligible marker but marker stack had unknown length marker"),
(a, b) => panic!("unknown SUT/model marker len combination: {a:?}, {b:?}"),
};
assert!(unclaimed_acks_to_consume <= unclaimed_acks,
"can't have returned an eligible fixed-size item with a length greater than unclaimed acks");
unclaimed_acks -= unclaimed_acks_to_consume;
assert!(marker_state.remove(&marker.id), "should be state entry for marker if it was eligible");
// Update our acked marker ID.
acked_marker_id = marker.id.wrapping_add(marker.len.len());
},
None => {
// This may be valid based on the given state of pending vs unclaimed
// acks, as well as whether we have enough information to figure out any
// unknown length markers.
if let Some((marker_id, marker_len)) = marker_stack.front() {
match marker_len {
PendingMarkerLength::Assumed(_) =>
panic!("SUT returned None, but model had assumed length marker"),
PendingMarkerLength::Known(len) => {
let required_acked_offset = marker_id.wrapping_add(*len);
let effective_offset = acked_marker_id.wrapping_add(unclaimed_acks);
let is_eligible = required_acked_offset <= effective_offset && required_acked_offset >= *marker_id;
assert!(!is_eligible,
"SUT returned None but next fixed-size marker on stack is eligible: id: {marker_id}, len: {len}, acked_id_offset: {acked_marker_id}");
},
PendingMarkerLength::Unknown => {
// If we have an unknown marker, the only we shouldn't be
// able to return it is if there's no other markers to
// calculate the gap from.
//
// This is most likely a bug in our model if it happens.
assert!(marker_stack.len() == 1,
"SUT returned None but unknown length marker is calculable");
},
}
}
}
},
}
}
}
}
}