vector_buffers/topology/
acks.rs

1use std::{collections::VecDeque, fmt};
2
3use num_traits::{Bounded, CheckedAdd, CheckedSub, Unsigned, WrappingAdd, WrappingSub};
4
5#[derive(Clone, Copy, Debug, PartialEq)]
6enum PendingMarkerLength<N> {
7    Known(N),
8    Assumed(N),
9    Unknown,
10}
11
12struct PendingMarker<N, D> {
13    id: N,
14    len: PendingMarkerLength<N>,
15    data: Option<D>,
16}
17
18impl<N, D> fmt::Debug for PendingMarker<N, D>
19where
20    N: fmt::Debug,
21    D: fmt::Debug,
22{
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("PendingMarker")
25            .field("id", &self.id)
26            .field("len", &self.len)
27            .field("data", &self.data)
28            .finish()
29    }
30}
31
32/// The length of an eligible marker.
33#[derive(Clone, Copy, Debug, PartialEq, Eq)]
34pub enum EligibleMarkerLength<N> {
35    /// The marker's length was declared upfront when added.
36    Known(N),
37
38    /// The marker's length was calculated based on imperfect information, and so while it should
39    /// accurately represent a correct range that covers any gaps in the marker range, it may or may
40    /// not represent one true marker, or possibly multiple markers.
41    Assumed(N),
42}
43
44impl<N: Copy> EligibleMarkerLength<N> {
45    fn len(&self) -> N {
46        match self {
47            EligibleMarkerLength::Known(len) | EligibleMarkerLength::Assumed(len) => *len,
48        }
49    }
50}
51
52/// A marker that has been fully acknowledged.
53pub struct EligibleMarker<N, D> {
54    pub id: N,
55    pub len: EligibleMarkerLength<N>,
56    pub data: Option<D>,
57}
58
59impl<N, D> PartialEq for EligibleMarker<N, D>
60where
61    N: PartialEq,
62{
63    fn eq(&self, other: &Self) -> bool {
64        self.id == other.id && self.len == other.len
65    }
66}
67
68impl<N, D> fmt::Debug for EligibleMarker<N, D>
69where
70    N: fmt::Debug,
71    D: fmt::Debug,
72{
73    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74        f.debug_struct("EligibleMarker")
75            .field("id", &self.id)
76            .field("len", &self.len)
77            .field("data", &self.data)
78            .finish()
79    }
80}
81
82/// Error returned by `OrderedAcknowledgements::add_marker`.
83///
84/// In general, this error represents a breaking of ID monotonicity, or more likely, the loss of
85/// records where entire records may have been skipped as an attempted add provides an ID that not
86/// the next expected ID.
87///
88/// While the exact condition must be determined by the caller, we attempt to provide as much
89/// information as we reasonably based on the data we have, whether it's simply that the ID didn't
90/// match the next expected ID, or that we know it is definitively ahead or behind the next expected
91/// ID.
92#[derive(Clone, Debug, PartialEq, Eq)]
93pub enum MarkerError {
94    /// The given marker ID is behind the next expected marker ID.
95    ///
96    /// As `OrderedAcknowledgements` expects monotonic marker IDs, this represents a violation of
97    /// the acknowledgement state, and must be handled by the caller.  Generally speaking, this is
98    /// an unrecoverable error.
99    MonotonicityViolation,
100}
101
102/// Result of comparing a potential pending marker ID with the expected next pending marker ID.
103pub enum MarkerOffset<N> {
104    /// The given marker ID is aligned with the next expected marker ID.
105    Aligned,
106
107    /// The given marker ID is ahead of the next expected marker ID.
108    ///
109    /// When the last pending marker has a fixed-size, we can calculate the exact marker ID that we
110    /// expect to see next. In turn, we can tell how far ahead the given marker ID from the next
111    /// expected marker ID.
112    ///
113    /// The next expected marker ID, and the amount (gap) that the given marker ID and the next
114    /// expected marker ID differ, are provided.
115    Gap(N, N),
116
117    /// The given marker ID may or may not be aligned.
118    ///
119    /// This occurs when the last pending marker has an unknown size, as we cannot determine
120    /// whether the given marker ID is the next true marker ID without knowing where the last
121    /// pending marker should end.
122    ///
123    /// The last pending marker ID is provided.
124    NotEnoughInformation(N),
125
126    /// The given marker ID is behind the next expected marker ID.
127    ///
128    /// As `OrderedAcknowledgements` expects monotonic marker IDs, this represents a violation of
129    /// the acknowledgement state, and must be handled by the caller.  Generally speaking, this is
130    /// an unrecoverable error.
131    MonotonicityViolation,
132}
133
134/// `OrderedAcknowledgements` allows determining when a record is eligible for deletion.
135///
136/// ### Purpose
137///
138/// In disk buffers, a record may potentially represent multiple events. As these events
139/// may be processed at different times by a sink, and in a potentially different order than when
140/// stored in the record, a record cannot be considered fully processed until all of the events have
141/// been accounted for.  As well, only once a record has been fully processed can it be considered
142/// for deletion to free up space in the buffer.
143///
144/// To complicate matters, a record may sometimes not be decodable -- on-disk corruption, invalid
145/// encoding scheme that is no longer supported, etc -- but still needs to be accounted for to know
146/// when it can be deleted, and so that the correct metrics can be generated to determine how many
147/// events were lost by the record not being able to be processed normally.
148///
149/// ### Functionality
150///
151/// `OrderedAcknowledgements` provides the ability to add "markers", which are a virtual token mapped
152/// to a record. Markers track the ID of a record, how long the record is (if known), and optional
153/// data that is specific to the record.  It also provides the ability to add acknowledgements which
154/// can then be consumed to allow yielding markers which have collected enough acknowledgements and
155/// are thus "eligible".
156///
157/// ### Detecting record gaps and the length of undecodable records
158///
159/// Additionally, and as hinted at above, markers can be added without a known length: this may
160/// happen when a record is read but it cannot be decoded, and thus determining the true length is
161/// not possible.
162///
163/// When markers that have an unknown length are added, `OrderedAcknowledgements` will do one of two things:
164/// - figure out if the marker is ahead of the next expected marker ID, and add a synthetic "gap"
165///   marker to compensate
166/// - update the unknown length with an assumed length, based on the difference between its ID and
167///   the ID of the next marker that gets added
168///
169/// In this way, `OrderedAcknowledgements` provides a contiguous range of marker IDs, which allows
170/// detecting not only the presumed length of a record that couldn't be decoded, but also if any
171/// records were deleted from disk or unable to be read at all.  Based on the invariant of expecting
172/// IDs to be monotonic and contiguous, we know that if we expect our next marker ID to be 5, but
173/// instead get one with an ID of 8, that there's 3 missing events in the middle that have not been
174/// accounted for.
175///
176/// Similarly, even when we don't know what the next expected marker ID should be, we can determine
177/// the number of events that were lost when the next marker is added, as marker IDs represent the
178/// start of a record, and so simple arithmetic can determine the number of events that have
179/// theoretically been lost.
180pub struct OrderedAcknowledgements<N, D> {
181    unclaimed_acks: N,
182    acked_marker_id: N,
183    pending_markers: VecDeque<PendingMarker<N, D>>,
184}
185
186impl<N, D> OrderedAcknowledgements<N, D>
187where
188    N: fmt::Display
189        + Bounded
190        + CheckedAdd
191        + CheckedSub
192        + Copy
193        + PartialEq
194        + PartialOrd
195        + Unsigned
196        + WrappingAdd
197        + WrappingSub,
198{
199    pub fn from_acked(acked_marker_id: N) -> Self {
200        Self {
201            unclaimed_acks: N::min_value(),
202            acked_marker_id,
203            pending_markers: VecDeque::new(),
204        }
205    }
206
207    /// Adds the given number of acknowledgements.
208    ///
209    /// Acknowledgements should be given by the caller to update the acknowledgement state before
210    /// trying to get any eligible markers.
211    ///
212    /// # Panics
213    ///
214    /// Will panic if adding ack amount overflows.
215    pub fn add_acknowledgements(&mut self, amount: N) {
216        self.unclaimed_acks = self
217            .unclaimed_acks
218            .checked_add(&amount)
219            .expect("overflowing unclaimed acknowledgements is a serious bug");
220
221        trace!(
222            unclaimed_acks = %self.unclaimed_acks,
223            added_acks = %amount,
224            "Added acknowledgements."
225        );
226    }
227
228    /// Gets the marker ID offset for the given ID.
229    ///
230    /// If the given ID matches our next expected marker ID, then `MarkerOffset::Aligned` is
231    /// returned.
232    ///
233    /// Otherwise, we return one of the following variants:
234    /// - if we have no pending markers, `MarkerOffset::Gap` is returned, and contains the delta
235    ///   between the given ID and the next expected marker ID
236    /// - if we have pending markers, and the given ID is logically behind the next expected marker
237    ///   ID, `MarkerOffset::MonotonicityViolation` is returned, indicating that the monotonicity
238    ///   invariant has been violated
239    /// - if we have pending markers, and the given ID is logically ahead of the next expected
240    ///   marker, `MarkerOffset::Gap` is returned, specifying how far ahead of the next expected
241    ///   marker ID it is
242    /// - if we have pending markers, and the last pending marker has an unknown length,
243    ///   `MarkerOffset::NotEnoughInformation` is returned, as we require a fixed-size marker to
244    ///   correctly calculate the next expected marker ID
245    fn get_marker_id_offset(&self, id: N) -> MarkerOffset<N> {
246        if self.pending_markers.is_empty() {
247            // We have no pending markers, but our acknowledged ID offset should match the marker ID
248            // being given here, otherwise it would imply that the markers were not contiguous.
249            //
250            // We return the difference between the ID and our acknowledged ID offset with the
251            // assumption that the new ID is monotonic.  Since IDs wraparound, we don't bother
252            // looking at if it's higher or lower because we can't reasonably tell if this record ID
253            // is actually correct but other markers in between went missing, etc.
254            //
255            // Basically, it's up to the caller to figure this out.  We're just trying to give them
256            // as much information as we can.
257            if self.acked_marker_id != id {
258                return MarkerOffset::Gap(
259                    self.acked_marker_id,
260                    id.wrapping_sub(&self.acked_marker_id),
261                );
262            }
263        } else {
264            let back = self
265                .pending_markers
266                .back()
267                .expect("pending markers should have items");
268
269            // When we know the length of the previously added pending marker, we can figure out
270            // where this marker's ID should land, as we don't not allow for noncontiguous marker ID
271            // ranges.
272            if let PendingMarkerLength::Known(len) = back.len {
273                // If we know the length of the back item, then we know exactly what the ID for the
274                // next marker to follow it should be.  If this incoming marker doesn't match,
275                // something is wrong.
276                let expected_next = back.id.wrapping_add(&len);
277                if id != expected_next {
278                    if expected_next < back.id && id < expected_next {
279                        return MarkerOffset::MonotonicityViolation;
280                    }
281
282                    return MarkerOffset::Gap(expected_next, id.wrapping_sub(&expected_next));
283                }
284            } else {
285                // Without a fixed-size marker, we cannot be sure whether this marker ID is aligned
286                // or not.
287                return MarkerOffset::NotEnoughInformation(back.id);
288            }
289        }
290
291        MarkerOffset::Aligned
292    }
293
294    /// Adds a marker.
295    ///
296    /// The marker is tracked internally, and once the acknowledgement state has been advanced
297    /// enough such that it is at or ahead of the marker, the marker will become eligible.
298    ///
299    /// ## Gap detection and unknown length markers
300    ///
301    /// When a gap is detected between the given marker ID and the next expected marker ID, we
302    /// insert a synthetic marker to represent that gap.  For example, if we had a marker with an ID
303    /// of 0 and a length of 5,  we would expect the next marker to have an ID of 5.  If instead, a
304    /// marker with an ID of 7 was given, that would represent a gap of 2.  We insert a synthetic
305    /// marker with an ID of 5 and a length of 2 before adding the marker with the ID of 7. This
306    /// keeps the marker range contiguous and allows getting an eligible marker for the gap so the
307    /// caller can detect that a gap occurred.
308    ///
309    /// Likewise, when a caller inserts an unknown length marker, we cannot know its length until
310    /// the next marker is added.  When that happens, we assume the given marker ID is monotonic,
311    /// and thus that the length of the previous marker, which has an unknown length, must have a
312    /// length equal to the difference between the given marker ID and the unknown length marker
313    /// ID.  We update the unknown length marker to reflect this.
314    ///
315    /// In both cases, the markers will have a length that indicates that the amount represents a
316    /// gap, and not a marker that was directly added by the caller themselves.
317    ///
318    /// ## Errors
319    ///
320    /// When other pending markers are present, and the given ID is logically behind the next
321    /// expected marker ID, `Err(MarkerError::MonotonicityViolation)` is returned.
322    ///
323    /// # Panics
324    ///
325    /// Panics if pending markers is empty when last pending marker is an unknown size.
326    pub fn add_marker(
327        &mut self,
328        id: N,
329        marker_len: Option<N>,
330        data: Option<D>,
331    ) -> Result<(), MarkerError> {
332        // First, figure out where this given marker ID stands compared to our next expected marker
333        // ID, and the pending marker state in general.
334        match self.get_marker_id_offset(id) {
335            // The last pending marker is fixed-size, and the given marker ID is past where that
336            // marker ends, so we need to inject a synthetic gap marker to compensate for that.
337            MarkerOffset::Gap(expected_id, amount) => {
338                self.pending_markers.push_back(PendingMarker {
339                    id: expected_id,
340                    len: PendingMarkerLength::Assumed(amount),
341                    data: None,
342                });
343            }
344            // The last pending marker is an unknown size, so we're using this given marker ID to
345            // calculate the length of that last pending marker, and in turn, we're going to adjust
346            // its length before adding the new pending marker.
347            MarkerOffset::NotEnoughInformation(last_marker_id) => {
348                let len = id.wrapping_sub(&last_marker_id);
349                let last_marker = self
350                    .pending_markers
351                    .back_mut()
352                    .unwrap_or_else(|| unreachable!("pending markers should not be empty"));
353
354                last_marker.len = PendingMarkerLength::Assumed(len);
355            }
356            // We detected a monotonicity violation, which we can't do anything about, so just
357            // immediately inform the caller.
358            MarkerOffset::MonotonicityViolation => return Err(MarkerError::MonotonicityViolation),
359            // We have enough information to determine the given marker ID is the next expected
360            // marker ID, so we can proceed normally.
361            MarkerOffset::Aligned => {}
362        }
363
364        // Now insert our new pending marker.
365        self.pending_markers.push_back(PendingMarker {
366            id,
367            len: marker_len.map_or(PendingMarkerLength::Unknown, PendingMarkerLength::Known),
368            data,
369        });
370
371        Ok(())
372    }
373
374    /// Gets the next marker which has been fully acknowledged.
375    ///
376    /// A pending marker becomes eligible when the acknowledged marker ID is at or past the pending
377    /// marker ID plus the marker length.
378    ///
379    /// For pending markers with an unknown length, another pending marker must be present after it
380    /// in order to calculate the ID offsets and determine the marker length.
381    #[cfg_attr(test, instrument(skip(self), level = "trace"))]
382    pub fn get_next_eligible_marker(&mut self) -> Option<EligibleMarker<N, D>> {
383        let effective_acked_marker_id = self.acked_marker_id.wrapping_add(&self.unclaimed_acks);
384
385        trace!(
386            acked_marker_id = %self.acked_marker_id,
387            %effective_acked_marker_id,
388            unclaimed_acks = %self.unclaimed_acks,
389            pending_markers = self.pending_markers.len(),
390            "Searching for eligible marker."
391        );
392
393        let maybe_eligible_marker =
394            self.pending_markers
395                .front()
396                .and_then(|marker| match marker.len {
397                    // If the acked marker ID is ahead of this marker ID, plus its length, it's been fully
398                    // acknowledged and we can consume and yield the marker.  We have to double
399                    // verify this by checking that there's enough unclaimed acks to support this
400                    // length because otherwise we might fall victim to markers that simply generate
401                    // a required acked marker ID that is equal to the actual acked marker ID when
402                    // an amount of unclaimed acks exists that is not enough for this marker but is
403                    // enough to align the effective/required IDs.
404                    PendingMarkerLength::Known(len) => {
405                        let required_acked_marker_id = marker.id.wrapping_add(&len);
406                        if required_acked_marker_id <= effective_acked_marker_id
407                            && self.unclaimed_acks >= len
408                        {
409                            Some((EligibleMarkerLength::Known(len), len))
410                        } else {
411                            None
412                        }
413                    }
414                    // The marker has an assumed length, which means a marker was added after it,
415                    // which implies that it is de facto eligible as unknown length markers do not
416                    // consume acknowledgements and so are immediately eligible once an assumed
417                    // length can be determined.
418                    PendingMarkerLength::Assumed(len) => {
419                        Some((EligibleMarkerLength::Assumed(len), N::min_value()))
420                    }
421                    // We don't yet know what the length is for this marker, so we're stuck waiting
422                    // for another marker to be added before that can be determined.
423                    PendingMarkerLength::Unknown => None,
424                });
425
426        // If we actually got an eligible marker, we need to actually remove it from the pending
427        // marker queue and potentially adjust the amount of unclaimed acks we have.
428        match maybe_eligible_marker {
429            Some((len, acks_to_claim)) => {
430                // If we actually got an eligible marker, we need to actually remove it from the pending
431                // marker queue, potentially adjust the amount of unclaimed acks we have, and adjust
432                // our acked marker ID.
433                let PendingMarker { id, data, .. } = self
434                    .pending_markers
435                    .pop_front()
436                    .unwrap_or_else(|| unreachable!("pending markers cannot be empty"));
437
438                if acks_to_claim > N::min_value() {
439                    self.unclaimed_acks = self
440                        .unclaimed_acks
441                        .checked_sub(&acks_to_claim)
442                        .unwrap_or_else(|| {
443                            unreachable!("should not be able to claim more acks than are unclaimed")
444                        });
445                }
446
447                self.acked_marker_id = id.wrapping_add(&len.len());
448
449                Some(EligibleMarker { id, len, data })
450            }
451            None => None,
452        }
453    }
454}
455
456impl<N, D> fmt::Debug for OrderedAcknowledgements<N, D>
457where
458    N: fmt::Debug,
459    D: fmt::Debug,
460{
461    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
462        f.debug_struct("OrderedAcknowledgements")
463            .field("unclaimed_acks", &self.unclaimed_acks)
464            .field("acked_marker_id", &self.acked_marker_id)
465            .field("pending_markers", &self.pending_markers)
466            .finish()
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use std::collections::{HashSet, VecDeque};
473
474    use proptest::{
475        arbitrary::any,
476        collection::vec as arb_vec,
477        prop_assert_eq, prop_oneof, proptest,
478        strategy::{Just, Strategy},
479    };
480
481    use super::{EligibleMarker, EligibleMarkerLength};
482    use crate::topology::acks::{MarkerError, OrderedAcknowledgements, PendingMarkerLength};
483
484    #[derive(Debug, Clone, Copy)]
485    enum Action {
486        Acknowledge(u64),
487        AddMarker((u64, Option<u64>)),
488        GetNextEligibleMarker,
489    }
490
491    #[derive(Debug, PartialEq)]
492    enum ActionResult {
493        // Number of unclaimed acknowledgements.
494        Acknowledge(u64),
495        AddMarker(Result<(), MarkerError>),
496        GetNextEligibleMarker(Option<EligibleMarker<u64, ()>>),
497    }
498
499    fn arb_ordered_acks_action() -> impl Strategy<Value = Action> {
500        prop_oneof![
501            10 => any::<u32>().prop_map(|n| Action::Acknowledge(u64::from(n))),
502            10 => any::<(u64, Option<u64>)>().prop_map(|(a, b)| Action::AddMarker((a, b))),
503            10 => Just(Action::GetNextEligibleMarker),
504        ]
505    }
506
507    fn apply_action_sut(
508        sut: &mut OrderedAcknowledgements<u64, ()>,
509        action: Action,
510    ) -> ActionResult {
511        match action {
512            Action::Acknowledge(amount) => {
513                sut.add_acknowledgements(amount);
514                ActionResult::Acknowledge(sut.unclaimed_acks)
515            }
516            Action::AddMarker((id, maybe_len)) => {
517                let result = sut.add_marker(id, maybe_len, None);
518                ActionResult::AddMarker(result)
519            }
520            Action::GetNextEligibleMarker => {
521                let result = sut.get_next_eligible_marker();
522                ActionResult::GetNextEligibleMarker(result)
523            }
524        }
525    }
526
527    macro_rules! step {
528        ($action_name:ident, result => $result_input:expr) => {
529            (
530                Action::$action_name,
531                ActionResult::$action_name($result_input),
532            )
533        };
534        ($action_name:ident, input => $action_input:expr, result => $result_input:expr) => {
535            (
536                Action::$action_name($action_input),
537                ActionResult::$action_name($result_input),
538            )
539        };
540    }
541
542    #[test]
543    fn basic_cases() {
544        // Smoke test.
545        run_test_case("empty", vec![step!(GetNextEligibleMarker, result => None)]);
546
547        // Simple through-and-through:
548        run_test_case(
549            "through_and_through",
550            vec![
551                step!(AddMarker, input => (0, Some(5)), result => Ok(())),
552                step!(Acknowledge, input => 5, result => 5),
553                step!(GetNextEligibleMarker, result => Some(
554                    EligibleMarker {
555                        id: 0, len: EligibleMarkerLength::Known(5), data: None,
556                    }
557                )),
558            ],
559        );
560    }
561
562    #[test]
563    fn invariant_cases() {
564        // Checking for an eligible record between incremental acknowledgement:
565        run_test_case(
566            "eligible_multi_ack",
567            vec![
568                step!(AddMarker, input => (0, Some(13)), result => Ok(())),
569                step!(Acknowledge, input => 5, result => 5),
570                step!(GetNextEligibleMarker, result => None),
571                step!(Acknowledge, input => 5, result => 10),
572                step!(GetNextEligibleMarker, result => None),
573                step!(Acknowledge, input => 5, result => 15),
574                step!(GetNextEligibleMarker, result => Some(
575                    EligibleMarker {
576                        id: 0, len: EligibleMarkerLength::Known(13), data: None,
577                    }
578                )),
579            ],
580        );
581
582        // Unknown length markers can't be returned until a marker exists after them, even if we
583        // could maximally acknowledge them:
584        run_test_case(
585            "unknown_len_no_subsequent_marker",
586            vec![
587                step!(AddMarker, input => (0, None), result => Ok(())),
588                step!(Acknowledge, input => 5, result => 5),
589                step!(GetNextEligibleMarker, result => None),
590            ],
591        );
592
593        // We can always get back an unknown marker, with its length, regardless of
594        // acknowledgements, so long as there's a marker exists after them: fixed.
595        run_test_case(
596            "unknown_len_subsequent_marker_fixed",
597            vec![
598                step!(AddMarker, input => (0, None), result => Ok(())),
599                step!(AddMarker, input => (5, Some(1)), result => Ok(())),
600                step!(GetNextEligibleMarker, result => Some(
601                    EligibleMarker {
602                        id: 0, len: EligibleMarkerLength::Assumed(5), data: None,
603                    }
604                )),
605                step!(GetNextEligibleMarker, result => None),
606            ],
607        );
608
609        // We can always get back an unknown marker, with its length, regardless of
610        // acknowledgements, so long as there's a marker exists after them: unknown.
611        run_test_case(
612            "unknown_len_subsequent_marker_unknown",
613            vec![
614                step!(AddMarker, input => (0, None), result => Ok(())),
615                step!(AddMarker, input => (5, None), result => Ok(())),
616                step!(GetNextEligibleMarker, result => Some(
617                    EligibleMarker {
618                        id: 0, len: EligibleMarkerLength::Assumed(5), data: None,
619                    }
620                )),
621                step!(GetNextEligibleMarker, result => None),
622            ],
623        );
624
625        // Can add a marker without a known length and it will generate a synthetic gap marker
626        // that is immediately eligible:
627        run_test_case(
628            "unknown_len_no_pending_synthetic_gap",
629            vec![
630                step!(AddMarker, input => (1, None), result => Ok(())),
631                step!(GetNextEligibleMarker, result => Some(
632                    EligibleMarker {
633                        id: 0, len: EligibleMarkerLength::Assumed(1), data: None,
634                    }
635                )),
636                step!(GetNextEligibleMarker, result => None),
637            ],
638        );
639
640        // When another marker exists, and is fixed size, we correctly detect when trying to add
641        // another marker whose ID comes before the last pending marker we have:
642        run_test_case(
643            "detect_monotonicity_violation",
644            vec![
645                step!(AddMarker, input => (u64::MAX, Some(3)), result => Ok(())),
646                step!(AddMarker, input => (1, Some(2)), result => Err(MarkerError::MonotonicityViolation)),
647            ],
648        );
649
650        // When another marker exists, and is fixed size, we correctly detect when trying to add
651        // another marker whose ID comes after the last pending marker we have, including the
652        // length of the last pending marker, by updating the marker's unknown length to an
653        // assumed length, which is immediately eligible:
654        run_test_case(
655            "unknown_len_updated_fixed_marker",
656            vec![
657                step!(AddMarker, input => (0, Some(4)), result => Ok(())),
658                step!(AddMarker, input => (9, Some(3)), result => Ok(())),
659                step!(Acknowledge, input => 4, result => 4),
660                step!(GetNextEligibleMarker, result => Some(
661                    EligibleMarker {
662                        id: 0, len: EligibleMarkerLength::Known(4), data: None,
663                    }
664                )),
665                step!(GetNextEligibleMarker, result => Some(
666                    EligibleMarker {
667                        id: 4, len: EligibleMarkerLength::Assumed(5), data: None,
668                    }
669                )),
670                step!(GetNextEligibleMarker, result => None),
671            ],
672        );
673    }
674
675    #[test]
676    fn advanced_cases() {
677        // A marker with a length of 0 should be immediately available:
678        run_test_case(
679            "zero_length_eligible",
680            vec![
681                step!(AddMarker, input => (0, Some(0)), result => Ok(())),
682                step!(GetNextEligibleMarker, result => Some(
683                    EligibleMarker {
684                        id: 0, len: EligibleMarkerLength::Known(0), data: None,
685                    }
686                )),
687            ],
688        );
689
690        // When we have a fixed-size marker whose required acked marker ID lands right on the
691        // current acked marker ID, it should not be eligible unless there are enough unclaimed
692        // acks to actually account for it:
693        run_test_case(
694            "fixed_size_u64_boundary_overlap",
695            vec![
696                step!(AddMarker, input => (2_686_784_444_737_799_532, Some(15_759_959_628_971_752_084)), result => Ok(())),
697                step!(AddMarker, input => (0, None), result => Ok(())),
698                step!(AddMarker, input => (8_450_737_568, None), result => Ok(())),
699                step!(GetNextEligibleMarker, result => Some(
700                    EligibleMarker {
701                        id: 0, len: EligibleMarkerLength::Assumed(2_686_784_444_737_799_532), data: None,
702                    }
703                )),
704                step!(GetNextEligibleMarker, result => None),
705                step!(Acknowledge, input => 15_759_959_628_971_752_084, result => 15_759_959_628_971_752_084),
706                step!(GetNextEligibleMarker, result => Some(
707                    EligibleMarker {
708                        id: 2_686_784_444_737_799_532, len: EligibleMarkerLength::Known(15_759_959_628_971_752_084), data: None,
709                    }
710                )),
711                step!(GetNextEligibleMarker, result => Some(
712                    EligibleMarker {
713                        id: 0, len: EligibleMarkerLength::Assumed(8_450_737_568), data: None,
714                    }
715                )),
716                step!(GetNextEligibleMarker, result => None),
717            ],
718        );
719    }
720
721    fn run_test_case(name: &str, case: Vec<(Action, ActionResult)>) {
722        let mut sut = OrderedAcknowledgements::from_acked(0u64);
723        for (action, expected_result) in case {
724            let actual_result = apply_action_sut(&mut sut, action);
725            assert_eq!(
726                expected_result, actual_result,
727                "{name}: ran action {action:?} expecting result {expected_result:?}, but got result {actual_result:?} instead"
728            );
729        }
730    }
731
732    #[test]
733    #[should_panic(expected = "overflowing unclaimed acknowledgements is a serious bug")]
734    fn panic_when_unclaimed_acks_overflows() {
735        let actions = vec![Action::Acknowledge(u64::MAX), Action::Acknowledge(1)];
736
737        let mut sut = OrderedAcknowledgements::<u64, ()>::from_acked(0);
738        for action in actions {
739            apply_action_sut(&mut sut, action);
740        }
741    }
742
743    proptest! {
744        #[test]
745        fn property_test(
746            mut acked_marker_id in any::<u64>(),
747            actions in arb_vec(arb_ordered_acks_action(), 0..1000)
748        ) {
749            let mut sut = OrderedAcknowledgements::from_acked(acked_marker_id);
750
751            let mut unclaimed_acks = 0;
752            let mut marker_state = HashSet::new();
753            let mut marker_stack: VecDeque<(u64, PendingMarkerLength<u64>)> = VecDeque::new();
754
755            for action in actions {
756                match action {
757                    Action::Acknowledge(amount) => {
758                        unclaimed_acks += amount;
759
760                        prop_assert_eq!(
761                            ActionResult::Acknowledge(unclaimed_acks),
762                            apply_action_sut(&mut sut, action)
763                        );
764                    },
765                    Action::AddMarker((id, maybe_len)) => {
766                        // We do gap detection/unknown length fix-up first.
767                        let expected_result = if marker_stack.is_empty() {
768                            // Our only comparison is the acked marker ID, which, if it doesn't
769                            // match, we generate a gap marker for.
770                            if id != acked_marker_id {
771                                assert!(marker_state.insert(acked_marker_id), "should not be able to add marker that is already in-flight");
772
773                                let len = PendingMarkerLength::Assumed(id.wrapping_sub(acked_marker_id));
774                                marker_stack.push_back((acked_marker_id, len));
775                            }
776
777                            Ok(())
778                        } else {
779                            let (back_id, back_len) = marker_stack.back().copied().expect("must exist");
780                            match back_len {
781                                PendingMarkerLength::Known(len) => {
782                                    // We know exactly where the next marker ID should be.
783                                    let expected_id = back_id.wrapping_add(len);
784                                    if id == expected_id {
785                                        Ok(())
786                                    } else if expected_id < back_id && id < expected_id {
787                                        // Assuming monotonic IDs from the caller, this is a
788                                        // monotonicity violation.
789                                        Err(MarkerError::MonotonicityViolation)
790                                    } else {
791                                        assert!(marker_state.insert(expected_id), "should not be able to add marker that is already in-flight");
792
793                                        // Gap between ID and expected ID, add a gap marker.
794                                        let len = PendingMarkerLength::Assumed(id.wrapping_sub(expected_id));
795                                        marker_stack.push_back((expected_id, len));
796
797                                        Ok(())
798                                    }
799                                },
800                                PendingMarkerLength::Assumed(_) => panic!("should never have an assumed length at back"),
801                                PendingMarkerLength::Unknown => {
802                                    // Now that we have an ID range, we have enough information to
803                                    // give the unknown length marker an assumed length.
804                                    let len = id.wrapping_sub(back_id);
805                                    let (_, back_len_mut) = marker_stack.back_mut().expect("must exist");
806                                    *back_len_mut = PendingMarkerLength::Assumed(len);
807
808                                    Ok(())
809                                },
810                            }
811                        };
812
813                        let sut_result = apply_action_sut(&mut sut, action);
814                        match sut_result {
815                            ActionResult::AddMarker(result) => {
816                                prop_assert_eq!(expected_result, result.clone());
817
818                                // If adding the pending marker actually succeeded, now we need to track it.
819                                if result.is_ok() {
820                                    let len = maybe_len
821                                        .map_or(PendingMarkerLength::Unknown, PendingMarkerLength::Known);
822
823                                    assert!(marker_state.insert(id), "should not be able to add marker that is already in-flight");
824                                    marker_stack.push_back((id, len));
825                                }
826                            },
827                            a => panic!("got unexpected action after adding pending marker: {a:?}"),
828                        }
829                    },
830                    Action::GetNextEligibleMarker => match sut.get_next_eligible_marker() {
831                        Some(marker) => {
832                            let (expected_marker_id, original_len) = marker_stack.pop_front()
833                                .expect("marker stack should not be empty");
834
835                            assert_eq!(expected_marker_id, marker.id, "SUT eligible marker doesn't match expected ID");
836
837                            let unclaimed_acks_to_consume = match (original_len, marker.len) {
838                                (PendingMarkerLength::Known(a), EligibleMarkerLength::Known(b)) => {
839                                    prop_assert_eq!(a, b, "SUT marker len and model marker len should match");
840                                    b
841                                },
842                                (PendingMarkerLength::Assumed(a), EligibleMarkerLength::Assumed(b)) => {
843                                    prop_assert_eq!(a, b, "SUT marker len and model marker len should match");
844                                    0
845                                }
846                                (PendingMarkerLength::Unknown, _) =>
847                                    panic!("SUT had eligible marker but marker stack had unknown length marker"),
848                                (a, b) => panic!("unknown SUT/model marker len combination: {a:?}, {b:?}"),
849                            };
850
851                            assert!(unclaimed_acks_to_consume <= unclaimed_acks,
852                                "can't have returned an eligible fixed-size item with a length greater than unclaimed acks");
853
854                            unclaimed_acks -= unclaimed_acks_to_consume;
855
856                            assert!(marker_state.remove(&marker.id), "should be state entry for marker if it was eligible");
857
858                            // Update our acked marker ID.
859                            acked_marker_id = marker.id.wrapping_add(marker.len.len());
860                        },
861                        None => {
862                            // This may be valid based on the given state of pending vs unclaimed
863                            // acks, as well as whether we have enough information to figure out any
864                            // unknown length markers.
865                            if let Some((marker_id, marker_len)) = marker_stack.front() {
866                                match marker_len {
867                                    PendingMarkerLength::Assumed(_) =>
868                                        panic!("SUT returned None, but model had assumed length marker"),
869                                    PendingMarkerLength::Known(len) => {
870                                        let required_acked_offset = marker_id.wrapping_add(*len);
871                                        let effective_offset = acked_marker_id.wrapping_add(unclaimed_acks);
872                                        let is_eligible = required_acked_offset <= effective_offset && required_acked_offset >= *marker_id;
873                                        assert!(!is_eligible,
874                                            "SUT returned None but next fixed-size marker on stack is eligible: id: {marker_id}, len: {len}, acked_id_offset: {acked_marker_id}");
875                                    },
876                                    PendingMarkerLength::Unknown => {
877                                        // If we have an unknown marker, the only we shouldn't be
878                                        // able to return it is if there's no other markers to
879                                        // calculate the gap from.
880                                        //
881                                        // This is most likely a bug in our model if it happens.
882                                        assert!(marker_stack.len() == 1,
883                                            "SUT returned None but unknown length marker is calculable");
884                                    },
885                                }
886                            }
887                        }
888                    },
889                }
890            }
891        }
892    }
893}