vector_stream/futures_unordered_count.rs
1use std::{
2 future::Future,
3 mem,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures_util::{stream::FuturesUnordered, Stream};
9use pin_project::pin_project;
10
11/// A set of futures which may complete in any order, and results are returned as a count of ready
12/// futures. This is primarily useful for when we need to track that the futures have finished but
13/// do not need to use their actual result value (ie `Output = ()`).
14///
15/// While callers could poll `FuturesUnordered` directly, only one result can be grabbed at a
16/// time. As well, while the `ready_chunks` helper is available from `futures_util`, it uses an
17/// internally fused stream, meaning that it cannot be used with `FuturesUnordered` as the first
18/// `None` result from polling `FuturesUnordered` "fuses" all future polls of `ReadyChunks`,
19/// effectively causing it to return no further items.
20///
21/// `FuturesUnorderedCount` takes the best of both worlds and combines the batching with the
22/// unordered futures polling so that it can be used in a more straightforward way from user code.
23#[pin_project]
24#[derive(Debug)]
25#[must_use = "streams do nothing unless polled"]
26pub(crate) struct FuturesUnorderedCount<F: Future> {
27 #[pin]
28 futures: FuturesUnordered<F>,
29 items: usize,
30}
31
32impl<F: Future> FuturesUnorderedCount<F> {
33 /// Constructs a new, empty `FuturesUnorderedCount`.
34 ///
35 /// The returned `FuturesUnorderedCount` does not contain any futures. In this state,
36 /// `FuturesUnorderedCount::poll_next` will return `Poll::Ready(None)`.
37 pub(crate) fn new() -> Self {
38 Self {
39 futures: FuturesUnordered::new(),
40 items: 0,
41 }
42 }
43
44 /// Pushes a new future into the set.
45 ///
46 /// Callers must poll this stream in order to drive the underlying futures that have been stored.
47 pub(crate) fn push(&mut self, fut: F) {
48 self.futures.push(fut);
49 }
50
51 /// Returns `true` if the set contains no futures.
52 pub(crate) fn is_empty(&self) -> bool {
53 self.futures.is_empty()
54 }
55
56 /// Returns the number of futures contained in the set.
57 ///
58 /// This represents the total number of in-flight futures.
59 pub(crate) fn len(&self) -> usize {
60 self.futures.len()
61 }
62}
63
64impl<F: Future> Stream for FuturesUnorderedCount<F> {
65 type Item = usize;
66
67 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68 let mut this = self.project();
69
70 loop {
71 match this.futures.as_mut().poll_next(cx) {
72 // The underlying `FuturesUnordered` has no (more) available results, so if we have
73 // anything, return it, otherwise, indicate that we're pending as well.
74 Poll::Pending => {
75 return if *this.items == 0 {
76 Poll::Pending
77 } else {
78 Poll::Ready(Some(mem::take(this.items)))
79 }
80 }
81
82 // We got a future result, so bump the counter.
83 Poll::Ready(Some(_item)) => *this.items += 1,
84
85 // We have no pending futures, so simply return whatever have have stored, if
86 // anything, or `None`.
87 Poll::Ready(None) => {
88 let last = (*this.items > 0).then(|| mem::take(this.items));
89 return Poll::Ready(last);
90 }
91 }
92 }
93 }
94}