vector/topology/ready_arrays.rs
1use std::{cmp, num::NonZeroUsize, pin::Pin};
2
3use futures::{
4 task::{Context, Poll},
5 {Stream, StreamExt},
6};
7
8use crate::event::{EventArray, EventContainer};
9
10const ARRAY_BUFFER_DEFAULT_SIZE: usize = 16;
11
12/// A stream combinator aimed at improving the performance of event streams under load.
13///
14/// This is similar in spirit to `StreamExt::ready_chunks`, but built specifically `EventArray`.
15/// The more general `FoldReady` is left as an exercise to the reader.
16pub struct ReadyArrays<T> {
17 inner: T,
18 /// Storage for ready `EventArray` instances. The size of `enqueued` is
19 /// distinct from the `enqueued_size` field. While that field is in units of
20 /// `Event`s this field is in units of `EventArray`. In the worst case where
21 /// all `EventArray`s from the inner stream contain only a single `Event`
22 /// the size of `enqueued` will grow to `enqueued_limit`.
23 enqueued: Vec<EventArray>,
24 /// Distinct from `enqueued.len()`, counts the number of total `Event`
25 /// instances in all sub-arrays.
26 enqueued_size: usize,
27 /// Limit for the total number of `Event` instances, soft.
28 enqueued_limit: usize,
29}
30
31impl<T> ReadyArrays<T>
32where
33 T: Stream<Item = EventArray> + Unpin,
34{
35 /// Create a new `ReadyArrays` with a specified capacity.
36 ///
37 /// The specified capacity is on the total number of `Event` instances
38 /// enqueued here at one time. This is a soft limit. Chunks may be returned
39 /// that contain more than that number of items.
40 pub fn with_capacity(inner: T, capacity: NonZeroUsize) -> Self {
41 Self {
42 inner,
43 enqueued: Vec::with_capacity(ARRAY_BUFFER_DEFAULT_SIZE),
44 enqueued_size: 0,
45 enqueued_limit: capacity.get(),
46 }
47 }
48
49 fn flush(&mut self) -> Vec<EventArray> {
50 // Size the next `enqueued` to the maximum of ARRAY_BUFFER_DEFAULT_SIZE
51 // or the current length of `self.enqueued`. This means, in practice,
52 // that we will always allocate at least the base size but possibly up
53 // to `enqueued_limit` if the underlying stream passes singleton
54 // EventArrays.
55 let mut enqueued =
56 Vec::with_capacity(cmp::max(self.enqueued.len(), ARRAY_BUFFER_DEFAULT_SIZE));
57 std::mem::swap(&mut enqueued, &mut self.enqueued);
58 self.enqueued_size = 0;
59 enqueued
60 }
61}
62
63impl<T> Stream for ReadyArrays<T>
64where
65 T: Stream<Item = EventArray> + Unpin,
66{
67 type Item = Vec<EventArray>;
68
69 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 loop {
71 match self.inner.poll_next_unpin(cx) {
72 Poll::Ready(Some(array)) => {
73 self.enqueued_size += array.len();
74 self.enqueued.push(array);
75 // NOTE pushing and then checking sizes here is what gives
76 // this struct a 'soft' limit guarantee. If we had a stash
77 // field we could make a hard limit, at the expense of
78 // sending undersized `Item`s. Slightly too big is fine.
79 if self.enqueued_size >= self.enqueued_limit {
80 return Poll::Ready(Some(self.flush()));
81 }
82 }
83 Poll::Ready(None) => {
84 // When the inner stream is empty flush everything we've got
85 // enqueued here. Next time we're polled we'll signal that
86 // we're complete too.
87 return Poll::Ready((!self.enqueued.is_empty()).then(|| self.flush()));
88 }
89 Poll::Pending => {
90 // When the inner stream signals pending flush everything
91 // we've got enqueued here. Next time we're polled we'll
92 // signal pending.
93 if !self.enqueued.is_empty() {
94 return Poll::Ready(Some(self.flush()));
95 } else {
96 return Poll::Pending;
97 }
98 }
99 }
100 }
101 }
102}