pub struct OrderedAcknowledgements<N, D> { /* private fields */ }
Expand description
OrderedAcknowledgements
allows determining when a record is eligible for deletion.
§Purpose
In disk buffers, a record may potentially represent multiple events. As these events may be processed at different times by a sink, and in a potentially different order than when stored in the record, a record cannot be considered fully processed until all of the events have been accounted for. As well, only once a record has been fully processed can it be considered for deletion to free up space in the buffer.
To complicate matters, a record may sometimes not be decodable – on-disk corruption, invalid encoding scheme that is no longer supported, etc – but still needs to be accounted for to know when it can be deleted, and so that the correct metrics can be generated to determine how many events were lost by the record not being able to be processed normally.
§Functionality
OrderedAcknowledgements
provides the ability to add “markers”, which are a virtual token mapped
to a record. Markers track the ID of a record, how long the record is (if known), and optional
data that is specific to the record. It also provides the ability to add acknowledgements which
can then be consumed to allow yielding markers which have collected enough acknowledgements and
are thus “eligible”.
§Detecting record gaps and the length of undecodable records
Additionally, and as hinted at above, markers can be added without a known length: this may happen when a record is read but it cannot be decoded, and thus determining the true length is not possible.
When markers that have an unknown length are added, OrderedAcknowledgements
will do one of two things:
- figure out if the marker is ahead of the next expected marker ID, and add a synthetic “gap” marker to compensate
- update the unknown length with an assumed length, based on the difference between its ID and the ID of the next marker that gets added
In this way, OrderedAcknowledgements
provides a contiguous range of marker IDs, which allows
detecting not only the presumed length of a record that couldn’t be decoded, but also if any
records were deleted from disk or unable to be read at all. Based on the invariant of expecting
IDs to be monotonic and contiguous, we know that if we expect our next marker ID to be 5, but
instead get one with an ID of 8, that there’s 3 missing events in the middle that have not been
accounted for.
Similarly, even when we don’t know what the next expected marker ID should be, we can determine the number of events that were lost when the next marker is added, as marker IDs represent the start of a record, and so simple arithmetic can determine the number of events that have theoretically been lost.
Implementations§
source§impl<N, D> OrderedAcknowledgements<N, D>where
N: Display + Bounded + CheckedAdd + CheckedSub + Copy + PartialEq + PartialOrd + Unsigned + WrappingAdd + WrappingSub,
impl<N, D> OrderedAcknowledgements<N, D>where
N: Display + Bounded + CheckedAdd + CheckedSub + Copy + PartialEq + PartialOrd + Unsigned + WrappingAdd + WrappingSub,
pub fn from_acked(acked_marker_id: N) -> Self
sourcepub fn add_acknowledgements(&mut self, amount: N)
pub fn add_acknowledgements(&mut self, amount: N)
Adds the given number of acknowledgements.
Acknowledgements should be given by the caller to update the acknowledgement state before trying to get any eligible markers.
§Panics
Will panic if adding ack amount overflows.
sourcepub fn add_marker(
&mut self,
id: N,
marker_len: Option<N>,
data: Option<D>,
) -> Result<(), MarkerError>
pub fn add_marker( &mut self, id: N, marker_len: Option<N>, data: Option<D>, ) -> Result<(), MarkerError>
Adds a marker.
The marker is tracked internally, and once the acknowledgement state has been advanced enough such that it is at or ahead of the marker, the marker will become eligible.
§Gap detection and unknown length markers
When a gap is detected between the given marker ID and the next expected marker ID, we insert a synthetic marker to represent that gap. For example, if we had a marker with an ID of 0 and a length of 5, we would expect the next marker to have an ID of 5. If instead, a marker with an ID of 7 was given, that would represent a gap of 2. We insert a synthetic marker with an ID of 5 and a length of 2 before adding the marker with the ID of 7. This keeps the marker range contiguous and allows getting an eligible marker for the gap so the caller can detect that a gap occurred.
Likewise, when a caller inserts an unknown length marker, we cannot know its length until the next marker is added. When that happens, we assume the given marker ID is monotonic, and thus that the length of the previous marker, which has an unknown length, must have a length equal to the difference between the given marker ID and the unknown length marker ID. We update the unknown length marker to reflect this.
In both cases, the markers will have a length that indicates that the amount represents a gap, and not a marker that was directly added by the caller themselves.
§Errors
When other pending markers are present, and the given ID is logically behind the next
expected marker ID, Err(MarkerError::MonotonicityViolation)
is returned.
§Panics
Panics if pending markers is empty when last pending marker is an unknown size.
sourcepub fn get_next_eligible_marker(&mut self) -> Option<EligibleMarker<N, D>>
pub fn get_next_eligible_marker(&mut self) -> Option<EligibleMarker<N, D>>
Gets the next marker which has been fully acknowledged.
A pending marker becomes eligible when the acknowledged marker ID is at or past the pending marker ID plus the marker length.
For pending markers with an unknown length, another pending marker must be present after it in order to calculate the ID offsets and determine the marker length.
Trait Implementations§
Auto Trait Implementations§
impl<N, D> Freeze for OrderedAcknowledgements<N, D>where
N: Freeze,
impl<N, D> RefUnwindSafe for OrderedAcknowledgements<N, D>where
N: RefUnwindSafe,
D: RefUnwindSafe,
impl<N, D> Send for OrderedAcknowledgements<N, D>
impl<N, D> Sync for OrderedAcknowledgements<N, D>
impl<N, D> Unpin for OrderedAcknowledgements<N, D>
impl<N, D> UnwindSafe for OrderedAcknowledgements<N, D>where
N: UnwindSafe,
D: UnwindSafe,
Blanket Implementations§
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<F, W, T, D> Deserialize<With<T, W>, D> for F
impl<F, W, T, D> Deserialize<With<T, W>, D> for F
§fn deserialize(
&self,
deserializer: &mut D,
) -> Result<With<T, W>, <D as Fallible>::Error>
fn deserialize( &self, deserializer: &mut D, ) -> Result<With<T, W>, <D as Fallible>::Error>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
§impl<Source, Target> OctetsInto<Target> for Sourcewhere
Target: OctetsFrom<Source>,
impl<Source, Target> OctetsInto<Target> for Sourcewhere
Target: OctetsFrom<Source>,
type Error = <Target as OctetsFrom<Source>>::Error
§fn try_octets_into(
self,
) -> Result<Target, <Source as OctetsInto<Target>>::Error>
fn try_octets_into( self, ) -> Result<Target, <Source as OctetsInto<Target>>::Error>
§fn octets_into(self) -> Targetwhere
Self::Error: Into<Infallible>,
fn octets_into(self) -> Targetwhere
Self::Error: Into<Infallible>,
§impl<D> OwoColorize for D
impl<D> OwoColorize for D
§fn fg<C>(&self) -> FgColorDisplay<'_, C, Self>where
C: Color,
fn fg<C>(&self) -> FgColorDisplay<'_, C, Self>where
C: Color,
§fn bg<C>(&self) -> BgColorDisplay<'_, C, Self>where
C: Color,
fn bg<C>(&self) -> BgColorDisplay<'_, C, Self>where
C: Color,
§fn on_magenta(&self) -> BgColorDisplay<'_, Magenta, Self>
fn on_magenta(&self) -> BgColorDisplay<'_, Magenta, Self>
§fn default_color(&self) -> FgColorDisplay<'_, Default, Self>
fn default_color(&self) -> FgColorDisplay<'_, Default, Self>
§fn on_default_color(&self) -> BgColorDisplay<'_, Default, Self>
fn on_default_color(&self) -> BgColorDisplay<'_, Default, Self>
§fn bright_black(&self) -> FgColorDisplay<'_, BrightBlack, Self>
fn bright_black(&self) -> FgColorDisplay<'_, BrightBlack, Self>
§fn on_bright_black(&self) -> BgColorDisplay<'_, BrightBlack, Self>
fn on_bright_black(&self) -> BgColorDisplay<'_, BrightBlack, Self>
§fn bright_red(&self) -> FgColorDisplay<'_, BrightRed, Self>
fn bright_red(&self) -> FgColorDisplay<'_, BrightRed, Self>
§fn on_bright_red(&self) -> BgColorDisplay<'_, BrightRed, Self>
fn on_bright_red(&self) -> BgColorDisplay<'_, BrightRed, Self>
§fn bright_green(&self) -> FgColorDisplay<'_, BrightGreen, Self>
fn bright_green(&self) -> FgColorDisplay<'_, BrightGreen, Self>
§fn on_bright_green(&self) -> BgColorDisplay<'_, BrightGreen, Self>
fn on_bright_green(&self) -> BgColorDisplay<'_, BrightGreen, Self>
§fn bright_yellow(&self) -> FgColorDisplay<'_, BrightYellow, Self>
fn bright_yellow(&self) -> FgColorDisplay<'_, BrightYellow, Self>
§fn on_bright_yellow(&self) -> BgColorDisplay<'_, BrightYellow, Self>
fn on_bright_yellow(&self) -> BgColorDisplay<'_, BrightYellow, Self>
§fn bright_blue(&self) -> FgColorDisplay<'_, BrightBlue, Self>
fn bright_blue(&self) -> FgColorDisplay<'_, BrightBlue, Self>
§fn on_bright_blue(&self) -> BgColorDisplay<'_, BrightBlue, Self>
fn on_bright_blue(&self) -> BgColorDisplay<'_, BrightBlue, Self>
§fn bright_magenta(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
fn bright_magenta(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
§fn on_bright_magenta(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
fn on_bright_magenta(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
§fn bright_purple(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
fn bright_purple(&self) -> FgColorDisplay<'_, BrightMagenta, Self>
§fn on_bright_purple(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
fn on_bright_purple(&self) -> BgColorDisplay<'_, BrightMagenta, Self>
§fn bright_cyan(&self) -> FgColorDisplay<'_, BrightCyan, Self>
fn bright_cyan(&self) -> FgColorDisplay<'_, BrightCyan, Self>
§fn on_bright_cyan(&self) -> BgColorDisplay<'_, BrightCyan, Self>
fn on_bright_cyan(&self) -> BgColorDisplay<'_, BrightCyan, Self>
§fn bright_white(&self) -> FgColorDisplay<'_, BrightWhite, Self>
fn bright_white(&self) -> FgColorDisplay<'_, BrightWhite, Self>
§fn on_bright_white(&self) -> BgColorDisplay<'_, BrightWhite, Self>
fn on_bright_white(&self) -> BgColorDisplay<'_, BrightWhite, Self>
§fn blink_fast(&self) -> BlinkFastDisplay<'_, Self>
fn blink_fast(&self) -> BlinkFastDisplay<'_, Self>
§fn strikethrough(&self) -> StrikeThroughDisplay<'_, Self>
fn strikethrough(&self) -> StrikeThroughDisplay<'_, Self>
§fn color<Color>(&self, color: Color) -> FgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
fn color<Color>(&self, color: Color) -> FgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
OwoColorize::fg
] or
a color-specific method, such as [OwoColorize::green
], Read more§fn on_color<Color>(&self, color: Color) -> BgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
fn on_color<Color>(&self, color: Color) -> BgDynColorDisplay<'_, Color, Self>where
Color: DynColor,
OwoColorize::bg
] or
a color-specific method, such as [OwoColorize::on_yellow
], Read more