1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::{
    future::Future,
    num::NonZeroUsize,
    panic,
    pin::Pin,
    task::{ready, Context, Poll},
};

use futures_util::{
    stream::{Fuse, FuturesOrdered},
    Stream, StreamExt,
};
use pin_project::pin_project;
use tokio::task::JoinHandle;

#[pin_project]
pub struct ConcurrentMap<St, T>
where
    St: Stream,
    T: Send + 'static,
{
    #[pin]
    stream: Fuse<St>,
    limit: Option<NonZeroUsize>,
    in_flight: FuturesOrdered<JoinHandle<T>>,
    f: Box<dyn Fn(St::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send>,
}

impl<St, T> ConcurrentMap<St, T>
where
    St: Stream,
    T: Send + 'static,
{
    pub fn new<F>(stream: St, limit: Option<NonZeroUsize>, f: F) -> Self
    where
        F: Fn(St::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send + 'static,
    {
        Self {
            stream: stream.fuse(),
            limit,
            in_flight: FuturesOrdered::new(),
            f: Box::new(f),
        }
    }
}

impl<St, T> Stream for ConcurrentMap<St, T>
where
    St: Stream,
    T: Send + 'static,
{
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();

        // The underlying stream is done, and we have no more in-flight futures.
        if this.stream.is_done() && this.in_flight.is_empty() {
            return Poll::Ready(None);
        }

        loop {
            let can_poll_stream = match this.limit {
                None => true,
                Some(limit) => this.in_flight.len() < limit.get(),
            };

            if can_poll_stream {
                match this.stream.as_mut().poll_next(cx) {
                    // Even if there's no items from the underlying stream, we still have the in-flight
                    // futures to check, so we don't return just yet.
                    Poll::Pending | Poll::Ready(None) => break,
                    Poll::Ready(Some(item)) => {
                        let fut = (this.f)(item);
                        let handle = tokio::spawn(fut);
                        this.in_flight.push_back(handle);
                    }
                }
            } else {
                // We're at our in-flight limit, so stop generating tasks for the moment.
                break;
            }
        }

        match ready!(this.in_flight.poll_next_unpin(cx)) {
            // Either nothing is in-flight, or nothing is ready.
            None => Poll::Pending,
            Some(result) => match result {
                Ok(item) => Poll::Ready(Some(item)),
                Err(e) => {
                    if let Ok(reason) = e.try_into_panic() {
                        // Resume the panic here on the calling task.
                        panic::resume_unwind(reason);
                    } else {
                        // The task was cancelled, which makes no sense, because _we_ hold the join
                        // handle. Only sensible thing to do is panic, because this is a bug.
                        panic!("concurrent map task cancelled outside of our control");
                    }
                }
            },
        }
    }
}