vector_stream/
futures_unordered_count.rs

1use std::{
2    future::Future,
3    mem,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use futures_util::{stream::FuturesUnordered, Stream};
9use pin_project::pin_project;
10
11/// A set of futures which may complete in any order, and results are returned as a count of ready
12/// futures. This is primarily useful for when we need to track that the futures have finished but
13/// do not need to use their actual result value (ie `Output = ()`).
14///
15/// While callers could poll `FuturesUnordered` directly, only one result can be grabbed at a
16/// time. As well, while the `ready_chunks` helper is available from `futures_util`, it uses an
17/// internally fused stream, meaning that it cannot be used with `FuturesUnordered` as the first
18/// `None` result from polling `FuturesUnordered` "fuses" all future polls of `ReadyChunks`,
19/// effectively causing it to return no further items.
20///
21/// `FuturesUnorderedCount` takes the best of both worlds and combines the batching with the
22/// unordered futures polling so that it can be used in a more straightforward way from user code.
23#[pin_project]
24#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub(crate) struct FuturesUnorderedCount<F: Future> {
27    #[pin]
28    futures: FuturesUnordered<F>,
29    items: usize,
30}
31
32impl<F: Future> FuturesUnorderedCount<F> {
33    /// Constructs a new, empty `FuturesUnorderedCount`.
34    ///
35    /// The returned `FuturesUnorderedCount` does not contain any futures. In this state,
36    /// `FuturesUnorderedCount::poll_next` will return `Poll::Ready(None)`.
37    pub(crate) fn new() -> Self {
38        Self {
39            futures: FuturesUnordered::new(),
40            items: 0,
41        }
42    }
43
44    /// Pushes a new future into the set.
45    ///
46    /// Callers must poll this stream in order to drive the underlying futures that have been stored.
47    pub(crate) fn push(&mut self, fut: F) {
48        self.futures.push(fut);
49    }
50
51    /// Returns `true` if the set contains no futures.
52    pub(crate) fn is_empty(&self) -> bool {
53        self.futures.is_empty()
54    }
55
56    /// Returns the number of futures contained in the set.
57    ///
58    /// This represents the total number of in-flight futures.
59    pub(crate) fn len(&self) -> usize {
60        self.futures.len()
61    }
62}
63
64impl<F: Future> Stream for FuturesUnorderedCount<F> {
65    type Item = usize;
66
67    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68        let mut this = self.project();
69
70        loop {
71            match this.futures.as_mut().poll_next(cx) {
72                // The underlying `FuturesUnordered` has no (more) available results, so if we have
73                // anything, return it, otherwise, indicate that we're pending as well.
74                Poll::Pending => {
75                    return if *this.items == 0 {
76                        Poll::Pending
77                    } else {
78                        Poll::Ready(Some(mem::take(this.items)))
79                    }
80                }
81
82                // We got a future result, so bump the counter.
83                Poll::Ready(Some(_item)) => *this.items += 1,
84
85                // We have no pending futures, so simply return whatever have have stored, if
86                // anything, or `None`.
87                Poll::Ready(None) => {
88                    let last = (*this.items > 0).then(|| mem::take(this.items));
89                    return Poll::Ready(last);
90                }
91            }
92        }
93    }
94}