vector_buffers::topology::acks

Struct OrderedAcknowledgements

source
pub struct OrderedAcknowledgements<N, D> { /* private fields */ }
Expand description

OrderedAcknowledgements allows determining when a record is eligible for deletion.

§Purpose

In disk buffers, a record may potentially represent multiple events. As these events may be processed at different times by a sink, and in a potentially different order than when stored in the record, a record cannot be considered fully processed until all of the events have been accounted for. As well, only once a record has been fully processed can it be considered for deletion to free up space in the buffer.

To complicate matters, a record may sometimes not be decodable – on-disk corruption, invalid encoding scheme that is no longer supported, etc – but still needs to be accounted for to know when it can be deleted, and so that the correct metrics can be generated to determine how many events were lost by the record not being able to be processed normally.

§Functionality

OrderedAcknowledgements provides the ability to add “markers”, which are a virtual token mapped to a record. Markers track the ID of a record, how long the record is (if known), and optional data that is specific to the record. It also provides the ability to add acknowledgements which can then be consumed to allow yielding markers which have collected enough acknowledgements and are thus “eligible”.

§Detecting record gaps and the length of undecodable records

Additionally, and as hinted at above, markers can be added without a known length: this may happen when a record is read but it cannot be decoded, and thus determining the true length is not possible.

When markers that have an unknown length are added, OrderedAcknowledgements will do one of two things:

  • figure out if the marker is ahead of the next expected marker ID, and add a synthetic “gap” marker to compensate
  • update the unknown length with an assumed length, based on the difference between its ID and the ID of the next marker that gets added

In this way, OrderedAcknowledgements provides a contiguous range of marker IDs, which allows detecting not only the presumed length of a record that couldn’t be decoded, but also if any records were deleted from disk or unable to be read at all. Based on the invariant of expecting IDs to be monotonic and contiguous, we know that if we expect our next marker ID to be 5, but instead get one with an ID of 8, that there’s 3 missing events in the middle that have not been accounted for.

Similarly, even when we don’t know what the next expected marker ID should be, we can determine the number of events that were lost when the next marker is added, as marker IDs represent the start of a record, and so simple arithmetic can determine the number of events that have theoretically been lost.

Implementations§

source§

impl<N, D> OrderedAcknowledgements<N, D>

source

pub fn from_acked(acked_marker_id: N) -> Self

source

pub fn add_acknowledgements(&mut self, amount: N)

Adds the given number of acknowledgements.

Acknowledgements should be given by the caller to update the acknowledgement state before trying to get any eligible markers.

§Panics

Will panic if adding ack amount overflows.

source

pub fn add_marker( &mut self, id: N, marker_len: Option<N>, data: Option<D>, ) -> Result<(), MarkerError>

Adds a marker.

The marker is tracked internally, and once the acknowledgement state has been advanced enough such that it is at or ahead of the marker, the marker will become eligible.

§Gap detection and unknown length markers

When a gap is detected between the given marker ID and the next expected marker ID, we insert a synthetic marker to represent that gap. For example, if we had a marker with an ID of 0 and a length of 5, we would expect the next marker to have an ID of 5. If instead, a marker with an ID of 7 was given, that would represent a gap of 2. We insert a synthetic marker with an ID of 5 and a length of 2 before adding the marker with the ID of 7. This keeps the marker range contiguous and allows getting an eligible marker for the gap so the caller can detect that a gap occurred.

Likewise, when a caller inserts an unknown length marker, we cannot know its length until the next marker is added. When that happens, we assume the given marker ID is monotonic, and thus that the length of the previous marker, which has an unknown length, must have a length equal to the difference between the given marker ID and the unknown length marker ID. We update the unknown length marker to reflect this.

In both cases, the markers will have a length that indicates that the amount represents a gap, and not a marker that was directly added by the caller themselves.

§Errors

When other pending markers are present, and the given ID is logically behind the next expected marker ID, Err(MarkerError::MonotonicityViolation) is returned.

§Panics

Panics if pending markers is empty when last pending marker is an unknown size.

source

pub fn get_next_eligible_marker(&mut self) -> Option<EligibleMarker<N, D>>

Gets the next marker which has been fully acknowledged.

A pending marker becomes eligible when the acknowledged marker ID is at or past the pending marker ID plus the marker length.

For pending markers with an unknown length, another pending marker must be present after it in order to calculate the ID offsets and determine the marker length.

Trait Implementations§

source§

impl<N, D> Debug for OrderedAcknowledgements<N, D>
where N: Debug, D: Debug,

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<N, D> Freeze for OrderedAcknowledgements<N, D>
where N: Freeze,

§

impl<N, D> RefUnwindSafe for OrderedAcknowledgements<N, D>

§

impl<N, D> Send for OrderedAcknowledgements<N, D>
where N: Send, D: Send,

§

impl<N, D> Sync for OrderedAcknowledgements<N, D>
where N: Sync, D: Sync,

§

impl<N, D> Unpin for OrderedAcknowledgements<N, D>
where N: Unpin, D: Unpin,

§

impl<N, D> UnwindSafe for OrderedAcknowledgements<N, D>
where N: UnwindSafe, D: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
§

impl<T> ArchivePointee for T

§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
§

impl<F, W, T, D> Deserialize<With<T, W>, D> for F
where W: DeserializeWith<F, T, D>, D: Fallible + ?Sized, F: ?Sized,

§

fn deserialize( &self, deserializer: &mut D, ) -> Result<With<T, W>, <D as Fallible>::Error>

Deserializes using the given deserializer
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> LayoutRaw for T

§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Gets the layout of the type.
§

impl<Source, Target> OctetsInto<Target> for Source
where Target: OctetsFrom<Source>,

§

type Error = <Target as OctetsFrom<Source>>::Error

§

fn try_octets_into( self, ) -> Result<Target, <Source as OctetsInto<Target>>::Error>

Performs the conversion.
§

fn octets_into(self) -> Target
where Self::Error: Into<Infallible>,

Performs an infallible conversion.
§

impl<D> OwoColorize for D

§

fn fg<C>(&self) -> FgColorDisplay<'_, C, Self>
where C: Color,

Set the foreground color generically Read more
§

fn bg<C>(&self) -> BgColorDisplay<'_, C, Self>
where C: Color,

Set the background color generically. Read more
§

fn black(&self) -> FgColorDisplay<'_, Black, Self>

Change the foreground color to black
§

fn on_black(&self) -> BgColorDisplay<'_, Black, Self>

Change the background color to black
§

fn red(&self) -> FgColorDisplay<'_, Red, Self>

Change the foreground color to red
§

fn on_red(&self) -> BgColorDisplay<'_, Red, Self>

Change the background color to red
§

fn green(&self) -> FgColorDisplay<'_, Green, Self>

Change the foreground color to green
§

fn on_green(&self) -> BgColorDisplay<'_, Green, Self>

Change the background color to green
§

fn yellow(&self) -> FgColorDisplay<'_, Yellow, Self>

Change the foreground color to yellow
§

fn on_yellow(&self) -> BgColorDisplay<'_, Yellow, Self>

Change the background color to yellow
§

fn blue(&self) -> FgColorDisplay<'_, Blue, Self>

Change the foreground color to blue
§

fn on_blue(&self) -> BgColorDisplay<'_, Blue, Self>

Change the background color to blue
§

fn magenta(&self) -> FgColorDisplay<'_, Magenta, Self>

Change the foreground color to magenta
§

fn on_magenta(&self) -> BgColorDisplay<'_, Magenta, Self>

Change the background color to magenta
§

fn purple(&self) -> FgColorDisplay<'_, Magenta, Self>

Change the foreground color to purple
§

fn on_purple(&self) -> BgColorDisplay<'_, Magenta, Self>

Change the background color to purple
§

fn cyan(&self) -> FgColorDisplay<'_, Cyan, Self>

Change the foreground color to cyan
§

fn on_cyan(&self) -> BgColorDisplay<'_, Cyan, Self>

Change the background color to cyan
§

fn white(&self) -> FgColorDisplay<'_, White, Self>

Change the foreground color to white
§

fn on_white(&self) -> BgColorDisplay<'_, White, Self>

Change the background color to white
§

fn default_color(&self) -> FgColorDisplay<'_, Default, Self>

Change the foreground color to the terminal default
§

fn on_default_color(&self) -> BgColorDisplay<'_, Default, Self>

Change the background color to the terminal default
§

fn bright_black(&self) -> FgColorDisplay<'_, BrightBlack, Self>

Change the foreground color to bright black
§

fn on_bright_black(&self) -> BgColorDisplay<'_, BrightBlack, Self>

Change the background color to bright black
§

fn bright_red(&self) -> FgColorDisplay<'_, BrightRed, Self>

Change the foreground color to bright red
§

fn on_bright_red(&self) -> BgColorDisplay<'_, BrightRed, Self>

Change the background color to bright red
§

fn bright_green(&self) -> FgColorDisplay<'_, BrightGreen, Self>

Change the foreground color to bright green
§

fn on_bright_green(&self) -> BgColorDisplay<'_, BrightGreen, Self>

Change the background color to bright green
§

fn bright_yellow(&self) -> FgColorDisplay<'_, BrightYellow, Self>

Change the foreground color to bright yellow
§

fn on_bright_yellow(&self) -> BgColorDisplay<'_, BrightYellow, Self>

Change the background color to bright yellow
§

fn bright_blue(&self) -> FgColorDisplay<'_, BrightBlue, Self>

Change the foreground color to bright blue
§

fn on_bright_blue(&self) -> BgColorDisplay<'_, BrightBlue, Self>

Change the background color to bright blue
§

fn bright_magenta(&self) -> FgColorDisplay<'_, BrightMagenta, Self>

Change the foreground color to bright magenta
§

fn on_bright_magenta(&self) -> BgColorDisplay<'_, BrightMagenta, Self>

Change the background color to bright magenta
§

fn bright_purple(&self) -> FgColorDisplay<'_, BrightMagenta, Self>

Change the foreground color to bright purple
§

fn on_bright_purple(&self) -> BgColorDisplay<'_, BrightMagenta, Self>

Change the background color to bright purple
§

fn bright_cyan(&self) -> FgColorDisplay<'_, BrightCyan, Self>

Change the foreground color to bright cyan
§

fn on_bright_cyan(&self) -> BgColorDisplay<'_, BrightCyan, Self>

Change the background color to bright cyan
§

fn bright_white(&self) -> FgColorDisplay<'_, BrightWhite, Self>

Change the foreground color to bright white
§

fn on_bright_white(&self) -> BgColorDisplay<'_, BrightWhite, Self>

Change the background color to bright white
§

fn bold(&self) -> BoldDisplay<'_, Self>

Make the text bold
§

fn dimmed(&self) -> DimDisplay<'_, Self>

Make the text dim
§

fn italic(&self) -> ItalicDisplay<'_, Self>

Make the text italicized
§

fn underline(&self) -> UnderlineDisplay<'_, Self>

Make the text underlined
Make the text blink
Make the text blink (but fast!)
§

fn reversed(&self) -> ReversedDisplay<'_, Self>

Swap the foreground and background colors
§

fn hidden(&self) -> HiddenDisplay<'_, Self>

Hide the text
§

fn strikethrough(&self) -> StrikeThroughDisplay<'_, Self>

Cross out the text
§

fn color<Color>(&self, color: Color) -> FgDynColorDisplay<'_, Color, Self>
where Color: DynColor,

Set the foreground color at runtime. Only use if you do not know which color will be used at compile-time. If the color is constant, use either [OwoColorize::fg] or a color-specific method, such as [OwoColorize::green], Read more
§

fn on_color<Color>(&self, color: Color) -> BgDynColorDisplay<'_, Color, Self>
where Color: DynColor,

Set the background color at runtime. Only use if you do not know what color to use at compile-time. If the color is constant, use either [OwoColorize::bg] or a color-specific method, such as [OwoColorize::on_yellow], Read more
§

fn fg_rgb<const R: u8, const G: u8, const B: u8>( &self, ) -> FgColorDisplay<'_, CustomColor<R, G, B>, Self>

Set the foreground color to a specific RGB value.
§

fn bg_rgb<const R: u8, const G: u8, const B: u8>( &self, ) -> BgColorDisplay<'_, CustomColor<R, G, B>, Self>

Set the background color to a specific RGB value.
§

fn truecolor(&self, r: u8, g: u8, b: u8) -> FgDynColorDisplay<'_, Rgb, Self>

Sets the foreground color to an RGB value.
§

fn on_truecolor(&self, r: u8, g: u8, b: u8) -> BgDynColorDisplay<'_, Rgb, Self>

Sets the background color to an RGB value.
§

fn style(&self, style: Style) -> Styled<&Self>

Apply a runtime-determined style
§

fn if_supports_color<'a, Out, ApplyFn>( &'a self, stream: impl Into<Stream>, apply: ApplyFn, ) -> SupportsColorsDisplay<'a, Self, Out, ApplyFn>
where ApplyFn: Fn(&'a Self) -> Out,

Apply a given transformation function to all formatters if the given stream supports at least basic ANSI colors, allowing you to conditionally apply given styles/colors. Read more
§

impl<T> Pointable for T

§

const ALIGN: usize = _

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
§

impl<T> Pointee for T

§

type Metadata = ()

The type for metadata in pointers and references to Self.
source§

impl<T> Same for T

source§

type Output = T

Should always be Self
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,

§

impl<T> MaybeSend for T
where T: Send,

§

impl<T> MaybeSendSync for T