vector/sources/kubernetes_logs/
lifecycle.rs1use std::{
2 future::Future,
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6
7use futures::{
8 channel::oneshot,
9 future::{select, BoxFuture, Either},
10 pin_mut,
11 stream::FuturesOrdered,
12 FutureExt, StreamExt,
13};
14
15use crate::shutdown::ShutdownSignal;
16
17#[derive(Debug)]
24pub struct Lifecycle<'bound> {
25 futs: FuturesOrdered<BoxFuture<'bound, ()>>,
26 fut_shutdowns: Vec<oneshot::Sender<()>>,
27}
28
29#[derive(Debug)]
33pub enum GlobalShutdownToken {
34 Token,
36 Unused,
39}
40
41impl<'bound> Lifecycle<'bound> {
42 pub fn new() -> Self {
44 Self {
45 futs: FuturesOrdered::new(),
46 fut_shutdowns: Vec::new(),
47 }
48 }
49
50 pub fn add(&mut self) -> (Slot<'bound, '_>, ShutdownHandle) {
56 let (tx, rx) = oneshot::channel();
57 let slot = Slot {
58 lifecycle: self,
59 shutdown_trigger: tx,
60 };
61 let shutdown_handle = ShutdownHandle(rx);
62 (slot, shutdown_handle)
63 }
64
65 pub async fn run(mut self, mut global_shutdown: ShutdownSignal) -> GlobalShutdownToken {
67 let first_task_fut = self.futs.next();
68 pin_mut!(first_task_fut);
69
70 let token = match select(first_task_fut, &mut global_shutdown).await {
71 Either::Left((None, _)) => {
72 trace!(message = "Lifecycle had no tasks upon run, we're done.");
73 GlobalShutdownToken::Unused
74 }
75 Either::Left((Some(()), _)) => {
76 trace!(message = "Lifecycle had the first task completed.");
77 GlobalShutdownToken::Unused
78 }
79 Either::Right((_shutdown_signal_token, _)) => {
80 trace!(message = "Lifecycle got a global shutdown request.");
81 GlobalShutdownToken::Token
82 }
83 };
84
85 for fut_shutdown in self.fut_shutdowns {
87 if fut_shutdown.send(()).is_err() {
88 trace!(
89 message = "Error while sending a future shutdown, \
90 the receiver is already dropped; \
91 this is not a problem."
92 );
93 }
94 }
95
96 while let Some(()) = self.futs.next().await {
98 trace!(message = "A lifecycle-managed future completed after shutdown was requested.");
99 }
100
101 token
104 }
105}
106
107#[derive(Debug)]
109pub struct Slot<'bound, 'lc> {
110 lifecycle: &'lc mut Lifecycle<'bound>,
111 shutdown_trigger: oneshot::Sender<()>,
112}
113
114impl<'bound> Slot<'bound, '_> {
115 pub fn bind(self, future: BoxFuture<'bound, ()>) {
120 self.lifecycle.futs.push_back(future);
121 self.lifecycle.fut_shutdowns.push(self.shutdown_trigger);
122 }
123}
124
125#[derive(Debug)]
127pub struct ShutdownHandle(oneshot::Receiver<()>);
128
129impl Future for ShutdownHandle {
130 type Output = ();
131
132 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133 _ = ready!(self.0.poll_unpin(cx));
134 Poll::Ready(())
135 }
136}