1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
use std::{
    future::Future,
    pin::Pin,
    task::{ready, Context, Poll},
};

use futures::{
    channel::oneshot,
    future::{select, BoxFuture, Either},
    pin_mut,
    stream::FuturesOrdered,
    FutureExt, StreamExt,
};

use crate::shutdown::ShutdownSignal;

/// Lifecycle encapsulates logic for managing a lifecycle of multiple futures
/// that are bounded together by a shared shutdown condition.
///
/// If any of the futures completes, or global shutdown it requested, all of the
/// managed futures are requested to shutdown. They can do so gracefully after
/// completing their work.
#[derive(Debug)]
pub struct Lifecycle<'bound> {
    futs: FuturesOrdered<BoxFuture<'bound, ()>>,
    fut_shutdowns: Vec<oneshot::Sender<()>>,
}

/// Holds a "global" shutdown signal or shutdown signal token.
/// Effectively used to hold the token or signal such that it can be dropped
/// after the shutdown is complete.
#[derive(Debug)]
pub enum GlobalShutdownToken {
    /// The global shutdown signal was consumed.
    Token,
    /// The [`ShutdownSignal`] wasn't consumed, and still holds on to the
    /// [`ShutdownSignalToken`].
    Unused,
}

impl<'bound> Lifecycle<'bound> {
    /// Create a new [`Lifecycle`].
    pub fn new() -> Self {
        Self {
            futs: FuturesOrdered::new(),
            fut_shutdowns: Vec::new(),
        }
    }

    /// Add a new future to be managed by the [`Lifecycle`].
    ///
    /// Returns a [`Slot`] to be bound with the `Future`, and
    /// a [`ShutdownHandle`] that is to be used by the bound future to wait for
    /// shutdown.
    pub fn add(&mut self) -> (Slot<'bound, '_>, ShutdownHandle) {
        let (tx, rx) = oneshot::channel();
        let slot = Slot {
            lifecycle: self,
            shutdown_trigger: tx,
        };
        let shutdown_handle = ShutdownHandle(rx);
        (slot, shutdown_handle)
    }

    /// Run the managed futures and keep track of the shutdown process.
    pub async fn run(mut self, mut global_shutdown: ShutdownSignal) -> GlobalShutdownToken {
        let first_task_fut = self.futs.next();
        pin_mut!(first_task_fut);

        let token = match select(first_task_fut, &mut global_shutdown).await {
            Either::Left((None, _)) => {
                trace!(message = "Lifecycle had no tasks upon run, we're done.");
                GlobalShutdownToken::Unused
            }
            Either::Left((Some(()), _)) => {
                trace!(message = "Lifecycle had the first task completed.");
                GlobalShutdownToken::Unused
            }
            Either::Right((_shutdown_signal_token, _)) => {
                trace!(message = "Lifecycle got a global shutdown request.");
                GlobalShutdownToken::Token
            }
        };

        // Send the shutdowns to all managed futures.
        for fut_shutdown in self.fut_shutdowns {
            if fut_shutdown.send(()).is_err() {
                trace!(
                    message = "Error while sending a future shutdown, \
                        the receiver is already dropped; \
                        this is not a problem."
                );
            }
        }

        // Wait for all the futures to complete.
        while let Some(()) = self.futs.next().await {
            trace!(message = "A lifecycle-managed future completed after shutdown was requested.");
        }

        // Return the global shutdown token so that caller can perform it's
        // cleanup.
        token
    }
}

/// Represents an unbounded slot at the lifecycle.
#[derive(Debug)]
pub struct Slot<'bound, 'lc> {
    lifecycle: &'lc mut Lifecycle<'bound>,
    shutdown_trigger: oneshot::Sender<()>,
}

impl<'bound, 'lc> Slot<'bound, 'lc> {
    /// Bind the lifecycle slot to a concrete future.
    /// The passed future MUST start it's shutdown process when requested to
    /// shutdown via the signal passed from the corresponding
    /// [`ShutdownHandle`].
    pub fn bind(self, future: BoxFuture<'bound, ()>) {
        self.lifecycle.futs.push_back(future);
        self.lifecycle.fut_shutdowns.push(self.shutdown_trigger);
    }
}

/// A handle that allows waiting for the lifecycle-issued shutdown.
#[derive(Debug)]
pub struct ShutdownHandle(oneshot::Receiver<()>);

impl Future for ShutdownHandle {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        _ = ready!(self.0.poll_unpin(cx));
        Poll::Ready(())
    }
}