vector/topology/
ready_arrays.rs

1use std::{cmp, num::NonZeroUsize, pin::Pin};
2
3use futures::{
4    task::{Context, Poll},
5    {Stream, StreamExt},
6};
7
8use crate::event::{EventArray, EventContainer};
9
10const ARRAY_BUFFER_DEFAULT_SIZE: usize = 16;
11
12/// A stream combinator aimed at improving the performance of event streams under load.
13///
14/// This is similar in spirit to `StreamExt::ready_chunks`, but built specifically `EventArray`.
15/// The more general `FoldReady` is left as an exercise to the reader.
16pub struct ReadyArrays<T> {
17    inner: T,
18    /// Storage for ready `EventArray` instances. The size of `enqueued` is
19    /// distinct from the `enqueued_size` field. While that field is in units of
20    /// `Event`s this field is in units of `EventArray`. In the worst case where
21    /// all `EventArray`s from the inner stream contain only a single `Event`
22    /// the size of `enqueued` will grow to `enqueued_limit`.
23    enqueued: Vec<EventArray>,
24    /// Distinct from `enqueued.len()`, counts the number of total `Event`
25    /// instances in all sub-arrays.
26    enqueued_size: usize,
27    /// Limit for the total number of `Event` instances, soft.
28    enqueued_limit: usize,
29}
30
31impl<T> ReadyArrays<T>
32where
33    T: Stream<Item = EventArray> + Unpin,
34{
35    /// Create a new `ReadyArrays` with a specified capacity.
36    ///
37    /// The specified capacity is on the total number of `Event` instances
38    /// enqueued here at one time. This is a soft limit. Chunks may be returned
39    /// that contain more than that number of items.
40    pub fn with_capacity(inner: T, capacity: NonZeroUsize) -> Self {
41        Self {
42            inner,
43            enqueued: Vec::with_capacity(ARRAY_BUFFER_DEFAULT_SIZE),
44            enqueued_size: 0,
45            enqueued_limit: capacity.get(),
46        }
47    }
48
49    fn flush(&mut self) -> Vec<EventArray> {
50        // Size the next `enqueued` to the maximum of ARRAY_BUFFER_DEFAULT_SIZE
51        // or the current length of `self.enqueued`. This means, in practice,
52        // that we will always allocate at least the base size but possibly up
53        // to `enqueued_limit` if the underlying stream passes singleton
54        // EventArrays.
55        let mut enqueued =
56            Vec::with_capacity(cmp::max(self.enqueued.len(), ARRAY_BUFFER_DEFAULT_SIZE));
57        std::mem::swap(&mut enqueued, &mut self.enqueued);
58        self.enqueued_size = 0;
59        enqueued
60    }
61}
62
63impl<T> Stream for ReadyArrays<T>
64where
65    T: Stream<Item = EventArray> + Unpin,
66{
67    type Item = Vec<EventArray>;
68
69    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        loop {
71            match self.inner.poll_next_unpin(cx) {
72                Poll::Ready(Some(array)) => {
73                    self.enqueued_size += array.len();
74                    self.enqueued.push(array);
75                    // NOTE pushing and then checking sizes here is what gives
76                    // this struct a 'soft' limit guarantee. If we had a stash
77                    // field we could make a hard limit, at the expense of
78                    // sending undersized `Item`s. Slightly too big is fine.
79                    if self.enqueued_size >= self.enqueued_limit {
80                        return Poll::Ready(Some(self.flush()));
81                    }
82                }
83                Poll::Ready(None) => {
84                    // When the inner stream is empty flush everything we've got
85                    // enqueued here. Next time we're polled we'll signal that
86                    // we're complete too.
87                    return Poll::Ready((!self.enqueued.is_empty()).then(|| self.flush()));
88                }
89                Poll::Pending => {
90                    // When the inner stream signals pending flush everything
91                    // we've got enqueued here. Next time we're polled we'll
92                    // signal pending.
93                    if !self.enqueued.is_empty() {
94                        return Poll::Ready(Some(self.flush()));
95                    } else {
96                        return Poll::Pending;
97                    }
98                }
99            }
100        }
101    }
102}