1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::{cmp, num::NonZeroUsize, pin::Pin};

use futures::{
    task::{Context, Poll},
    {Stream, StreamExt},
};

use crate::event::{EventArray, EventContainer};

const ARRAY_BUFFER_DEFAULT_SIZE: usize = 16;

/// A stream combinator aimed at improving the performance of event streams under load.
///
/// This is similar in spirit to `StreamExt::ready_chunks`, but built specifically `EventArray`.
/// The more general `FoldReady` is left as an exercise to the reader.
pub struct ReadyArrays<T> {
    inner: T,
    /// Storage for ready `EventArray` instances. The size of `enqueued` is
    /// distinct from the `enqueued_size` field. While that field is in units of
    /// `Event`s this field is in units of `EventArray`. In the worst case where
    /// all `EventArray`s from the inner stream contain only a single `Event`
    /// the size of `enqueued` will grow to `enqueued_limit`.
    enqueued: Vec<EventArray>,
    /// Distinct from `enqueued.len()`, counts the number of total `Event`
    /// instances in all sub-arrays.
    enqueued_size: usize,
    /// Limit for the total number of `Event` instances, soft.
    enqueued_limit: usize,
}

impl<T> ReadyArrays<T>
where
    T: Stream<Item = EventArray> + Unpin,
{
    /// Create a new `ReadyArrays` with a specified capacity.
    ///
    /// The specified capacity is on the total number of `Event` instances
    /// enqueued here at one time. This is a soft limit. Chunks may be returned
    /// that contain more than that number of items.
    pub fn with_capacity(inner: T, capacity: NonZeroUsize) -> Self {
        Self {
            inner,
            enqueued: Vec::with_capacity(ARRAY_BUFFER_DEFAULT_SIZE),
            enqueued_size: 0,
            enqueued_limit: capacity.get(),
        }
    }

    fn flush(&mut self) -> Vec<EventArray> {
        // Size the next `enqueued` to the maximum of ARRAY_BUFFER_DEFAULT_SIZE
        // or the current length of `self.enqueued`. This means, in practice,
        // that we will always allocate at least the base size but possibly up
        // to `enqueued_limit` if the underlying stream passes singleton
        // EventArrays.
        let mut enqueued =
            Vec::with_capacity(cmp::max(self.enqueued.len(), ARRAY_BUFFER_DEFAULT_SIZE));
        std::mem::swap(&mut enqueued, &mut self.enqueued);
        self.enqueued_size = 0;
        enqueued
    }
}

impl<T> Stream for ReadyArrays<T>
where
    T: Stream<Item = EventArray> + Unpin,
{
    type Item = Vec<EventArray>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            match self.inner.poll_next_unpin(cx) {
                Poll::Ready(Some(array)) => {
                    self.enqueued_size += array.len();
                    self.enqueued.push(array);
                    // NOTE pushing and then checking sizes here is what gives
                    // this struct a 'soft' limit guarantee. If we had a stash
                    // field we could make a hard limit, at the expense of
                    // sending undersized `Item`s. Slightly too big is fine.
                    if self.enqueued_size >= self.enqueued_limit {
                        return Poll::Ready(Some(self.flush()));
                    }
                }
                Poll::Ready(None) => {
                    // When the inner stream is empty flush everything we've got
                    // enqueued here. Next time we're polled we'll signal that
                    // we're complete too.
                    return Poll::Ready((!self.enqueued.is_empty()).then(|| self.flush()));
                }
                Poll::Pending => {
                    // When the inner stream signals pending flush everything
                    // we've got enqueued here. Next time we're polled we'll
                    // signal pending.
                    if !self.enqueued.is_empty() {
                        return Poll::Ready(Some(self.flush()));
                    } else {
                        return Poll::Pending;
                    }
                }
            }
        }
    }
}