vector/sources/kubernetes_logs/
lifecycle.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6
7use futures::{
8    channel::oneshot,
9    future::{select, BoxFuture, Either},
10    pin_mut,
11    stream::FuturesOrdered,
12    FutureExt, StreamExt,
13};
14
15use crate::shutdown::ShutdownSignal;
16
17/// Lifecycle encapsulates logic for managing a lifecycle of multiple futures
18/// that are bounded together by a shared shutdown condition.
19///
20/// If any of the futures completes, or global shutdown it requested, all of the
21/// managed futures are requested to shutdown. They can do so gracefully after
22/// completing their work.
23#[derive(Debug)]
24pub struct Lifecycle<'bound> {
25    futs: FuturesOrdered<BoxFuture<'bound, ()>>,
26    fut_shutdowns: Vec<oneshot::Sender<()>>,
27}
28
29/// Holds a "global" shutdown signal or shutdown signal token.
30/// Effectively used to hold the token or signal such that it can be dropped
31/// after the shutdown is complete.
32#[derive(Debug)]
33pub enum GlobalShutdownToken {
34    /// The global shutdown signal was consumed.
35    Token,
36    /// The [`ShutdownSignal`] wasn't consumed, and still holds on to the
37    /// [`ShutdownSignalToken`].
38    Unused,
39}
40
41impl<'bound> Lifecycle<'bound> {
42    /// Create a new [`Lifecycle`].
43    pub fn new() -> Self {
44        Self {
45            futs: FuturesOrdered::new(),
46            fut_shutdowns: Vec::new(),
47        }
48    }
49
50    /// Add a new future to be managed by the [`Lifecycle`].
51    ///
52    /// Returns a [`Slot`] to be bound with the `Future`, and
53    /// a [`ShutdownHandle`] that is to be used by the bound future to wait for
54    /// shutdown.
55    pub fn add(&mut self) -> (Slot<'bound, '_>, ShutdownHandle) {
56        let (tx, rx) = oneshot::channel();
57        let slot = Slot {
58            lifecycle: self,
59            shutdown_trigger: tx,
60        };
61        let shutdown_handle = ShutdownHandle(rx);
62        (slot, shutdown_handle)
63    }
64
65    /// Run the managed futures and keep track of the shutdown process.
66    pub async fn run(mut self, mut global_shutdown: ShutdownSignal) -> GlobalShutdownToken {
67        let first_task_fut = self.futs.next();
68        pin_mut!(first_task_fut);
69
70        let token = match select(first_task_fut, &mut global_shutdown).await {
71            Either::Left((None, _)) => {
72                trace!(message = "Lifecycle had no tasks upon run, we're done.");
73                GlobalShutdownToken::Unused
74            }
75            Either::Left((Some(()), _)) => {
76                trace!(message = "Lifecycle had the first task completed.");
77                GlobalShutdownToken::Unused
78            }
79            Either::Right((_shutdown_signal_token, _)) => {
80                trace!(message = "Lifecycle got a global shutdown request.");
81                GlobalShutdownToken::Token
82            }
83        };
84
85        // Send the shutdowns to all managed futures.
86        for fut_shutdown in self.fut_shutdowns {
87            if fut_shutdown.send(()).is_err() {
88                trace!(
89                    message = "Error while sending a future shutdown, \
90                        the receiver is already dropped; \
91                        this is not a problem."
92                );
93            }
94        }
95
96        // Wait for all the futures to complete.
97        while let Some(()) = self.futs.next().await {
98            trace!(message = "A lifecycle-managed future completed after shutdown was requested.");
99        }
100
101        // Return the global shutdown token so that caller can perform it's
102        // cleanup.
103        token
104    }
105}
106
107/// Represents an unbounded slot at the lifecycle.
108#[derive(Debug)]
109pub struct Slot<'bound, 'lc> {
110    lifecycle: &'lc mut Lifecycle<'bound>,
111    shutdown_trigger: oneshot::Sender<()>,
112}
113
114impl<'bound> Slot<'bound, '_> {
115    /// Bind the lifecycle slot to a concrete future.
116    /// The passed future MUST start it's shutdown process when requested to
117    /// shutdown via the signal passed from the corresponding
118    /// [`ShutdownHandle`].
119    pub fn bind(self, future: BoxFuture<'bound, ()>) {
120        self.lifecycle.futs.push_back(future);
121        self.lifecycle.fut_shutdowns.push(self.shutdown_trigger);
122    }
123}
124
125/// A handle that allows waiting for the lifecycle-issued shutdown.
126#[derive(Debug)]
127pub struct ShutdownHandle(oneshot::Receiver<()>);
128
129impl Future for ShutdownHandle {
130    type Output = ();
131
132    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133        _ = ready!(self.0.poll_unpin(cx));
134        Poll::Ready(())
135    }
136}