1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use futures::{
channel::oneshot,
future::{select, BoxFuture, Either},
pin_mut,
stream::FuturesOrdered,
FutureExt, StreamExt,
};
use crate::shutdown::ShutdownSignal;
/// Lifecycle encapsulates logic for managing a lifecycle of multiple futures
/// that are bounded together by a shared shutdown condition.
///
/// If any of the futures completes, or global shutdown it requested, all of the
/// managed futures are requested to shutdown. They can do so gracefully after
/// completing their work.
#[derive(Debug)]
pub struct Lifecycle<'bound> {
futs: FuturesOrdered<BoxFuture<'bound, ()>>,
fut_shutdowns: Vec<oneshot::Sender<()>>,
}
/// Holds a "global" shutdown signal or shutdown signal token.
/// Effectively used to hold the token or signal such that it can be dropped
/// after the shutdown is complete.
#[derive(Debug)]
pub enum GlobalShutdownToken {
/// The global shutdown signal was consumed.
Token,
/// The [`ShutdownSignal`] wasn't consumed, and still holds on to the
/// [`ShutdownSignalToken`].
Unused,
}
impl<'bound> Lifecycle<'bound> {
/// Create a new [`Lifecycle`].
pub fn new() -> Self {
Self {
futs: FuturesOrdered::new(),
fut_shutdowns: Vec::new(),
}
}
/// Add a new future to be managed by the [`Lifecycle`].
///
/// Returns a [`Slot`] to be bound with the `Future`, and
/// a [`ShutdownHandle`] that is to be used by the bound future to wait for
/// shutdown.
pub fn add(&mut self) -> (Slot<'bound, '_>, ShutdownHandle) {
let (tx, rx) = oneshot::channel();
let slot = Slot {
lifecycle: self,
shutdown_trigger: tx,
};
let shutdown_handle = ShutdownHandle(rx);
(slot, shutdown_handle)
}
/// Run the managed futures and keep track of the shutdown process.
pub async fn run(mut self, mut global_shutdown: ShutdownSignal) -> GlobalShutdownToken {
let first_task_fut = self.futs.next();
pin_mut!(first_task_fut);
let token = match select(first_task_fut, &mut global_shutdown).await {
Either::Left((None, _)) => {
trace!(message = "Lifecycle had no tasks upon run, we're done.");
GlobalShutdownToken::Unused
}
Either::Left((Some(()), _)) => {
trace!(message = "Lifecycle had the first task completed.");
GlobalShutdownToken::Unused
}
Either::Right((_shutdown_signal_token, _)) => {
trace!(message = "Lifecycle got a global shutdown request.");
GlobalShutdownToken::Token
}
};
// Send the shutdowns to all managed futures.
for fut_shutdown in self.fut_shutdowns {
if fut_shutdown.send(()).is_err() {
trace!(
message = "Error while sending a future shutdown, \
the receiver is already dropped; \
this is not a problem."
);
}
}
// Wait for all the futures to complete.
while let Some(()) = self.futs.next().await {
trace!(message = "A lifecycle-managed future completed after shutdown was requested.");
}
// Return the global shutdown token so that caller can perform it's
// cleanup.
token
}
}
/// Represents an unbounded slot at the lifecycle.
#[derive(Debug)]
pub struct Slot<'bound, 'lc> {
lifecycle: &'lc mut Lifecycle<'bound>,
shutdown_trigger: oneshot::Sender<()>,
}
impl<'bound, 'lc> Slot<'bound, 'lc> {
/// Bind the lifecycle slot to a concrete future.
/// The passed future MUST start it's shutdown process when requested to
/// shutdown via the signal passed from the corresponding
/// [`ShutdownHandle`].
pub fn bind(self, future: BoxFuture<'bound, ()>) {
self.lifecycle.futs.push_back(future);
self.lifecycle.fut_shutdowns.push(self.shutdown_trigger);
}
}
/// A handle that allows waiting for the lifecycle-issued shutdown.
#[derive(Debug)]
pub struct ShutdownHandle(oneshot::Receiver<()>);
impl Future for ShutdownHandle {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
_ = ready!(self.0.poll_unpin(cx));
Poll::Ready(())
}
}