vector_common/
finalizer.rs

1#![allow(clippy::module_name_repetitions)]
2
3use std::marker::{PhantomData, Unpin};
4use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};
5
6use futures::stream::{BoxStream, FuturesOrdered, FuturesUnordered};
7use futures::{future::OptionFuture, FutureExt, Stream, StreamExt};
8use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
9use tokio::sync::Notify;
10
11use crate::finalization::{BatchStatus, BatchStatusReceiver};
12use crate::shutdown::ShutdownSignal;
13
14/// The `OrderedFinalizer` framework produces a stream of acknowledged
15/// event batch identifiers from a source in a single background task
16/// *in the order they are received from the source*, using
17/// `FinalizerSet`.
18pub type OrderedFinalizer<T> = FinalizerSet<T, FuturesOrdered<FinalizerFuture<T>>>;
19
20/// The `UnorderedFinalizer` framework produces a stream of
21/// acknowledged event batch identifiers from a source in a single
22/// background task *in the order that finalization happens on the
23/// event batches*, using `FinalizerSet`.
24pub type UnorderedFinalizer<T> = FinalizerSet<T, FuturesUnordered<FinalizerFuture<T>>>;
25
26/// The `FinalizerSet` framework here is a mechanism for creating a
27/// stream of acknowledged (finalized) event batch identifiers from a
28/// source as done in a single background task. It does this by
29/// pushing the batch status receiver along with an identifier into
30/// either a `FuturesOrdered` or `FuturesUnordered`, waiting on the
31/// stream of acknowledgements that comes out, extracting just the
32/// identifier and sending that into the returned stream. The type `T`
33/// is the source-specific data associated with each entry.
34#[derive(Debug)]
35pub struct FinalizerSet<T, S> {
36    sender: Option<UnboundedSender<(BatchStatusReceiver, T)>>,
37    flush: Arc<Notify>,
38    _phantom: PhantomData<S>,
39}
40
41impl<T, S> FinalizerSet<T, S>
42where
43    T: Send + Debug + 'static,
44    S: FuturesSet<FinalizerFuture<T>> + Default + Send + Unpin + 'static,
45{
46    /// Produce a finalizer set along with the output stream of
47    /// received acknowledged batch identifiers.
48    ///
49    /// The output stream will end when the source closes the producer side of the channel, and
50    /// acknowledgements in the channel are drained.
51    ///
52    /// If the optional shutdown signal is provided, the output stream will end immediately when a
53    /// shutdown signal is received. This is not recommended, and can cause some acknowledgements
54    /// to go unprocessed. Sources may process the message(s) that correspond to those
55    /// acknowledgements again.
56    #[must_use]
57    pub fn new(shutdown: Option<ShutdownSignal>) -> (Self, BoxStream<'static, (BatchStatus, T)>) {
58        let (todo_tx, todo_rx) = mpsc::unbounded_channel();
59        let flush1 = Arc::new(Notify::new());
60        let flush2 = Arc::clone(&flush1);
61        (
62            Self {
63                sender: Some(todo_tx),
64                flush: flush1,
65                _phantom: PhantomData,
66            },
67            finalizer_stream(shutdown, todo_rx, S::default(), flush2).boxed(),
68        )
69    }
70
71    /// This returns an optional finalizer set along with a generic
72    /// stream of acknowledged identifiers. In the case the finalizer
73    /// is not to be used, a special empty stream is returned that is
74    /// always pending and so never wakes.
75    #[must_use]
76    pub fn maybe_new(
77        maybe: bool,
78        shutdown: Option<ShutdownSignal>,
79    ) -> (Option<Self>, BoxStream<'static, (BatchStatus, T)>) {
80        if maybe {
81            let (finalizer, stream) = Self::new(shutdown);
82            (Some(finalizer), stream)
83        } else {
84            (None, EmptyStream::default().boxed())
85        }
86    }
87
88    pub fn add(&self, entry: T, receiver: BatchStatusReceiver) {
89        if let Some(sender) = &self.sender {
90            if let Err(error) = sender.send((receiver, entry)) {
91                error!(message = "FinalizerSet task ended prematurely.", %error);
92            }
93        }
94    }
95
96    pub fn flush(&self) {
97        self.flush.notify_one();
98    }
99}
100
101fn finalizer_stream<T, S>(
102    shutdown: Option<ShutdownSignal>,
103    mut new_entries: UnboundedReceiver<(BatchStatusReceiver, T)>,
104    mut status_receivers: S,
105    flush: Arc<Notify>,
106) -> impl Stream<Item = (BatchStatus, T)>
107where
108    S: Default + FuturesSet<FinalizerFuture<T>> + Unpin,
109{
110    let handle_shutdown = shutdown.is_some();
111    let mut shutdown = OptionFuture::from(shutdown);
112
113    async_stream::stream! {
114        loop {
115            tokio::select! {
116                biased;
117                _ = &mut shutdown, if handle_shutdown => break,
118                () = flush.notified() => {
119                    // Drop all the existing status receivers and start over.
120                    status_receivers = S::default();
121                },
122                // Prefer to remove finalizers than to add new finalizers to prevent unbounded
123                // growth under load.
124                finished = status_receivers.next(), if !status_receivers.is_empty() => match finished {
125                    Some((status, entry)) => yield (status, entry),
126                    // The `is_empty` guard above prevents this from being reachable.
127                    None => unreachable!(),
128                },
129                // Only poll for new entries until shutdown is flagged.
130                new_entry = new_entries.recv() => match new_entry {
131                    Some((receiver, entry)) => {
132                        status_receivers.push(FinalizerFuture {
133                            receiver,
134                            entry: Some(entry),
135                        });
136                    }
137                    // The end of the new entry channel signals shutdown
138                    None => break,
139                },
140            }
141        }
142
143        // We've either seen a shutdown signal or the new entry sender
144        // was closed. Wait for the last statuses to come in before
145        // indicating we are done.
146        while let Some((status, entry)) = status_receivers.next().await {
147            yield (status, entry);
148        }
149
150        // Hold on to the shutdown signal until here to prevent
151        // notification of completion before this stream is done.
152        drop(shutdown);
153    }
154}
155
156pub trait FuturesSet<Fut: Future>: Stream<Item = Fut::Output> {
157    fn is_empty(&self) -> bool;
158    fn push(&mut self, future: Fut);
159}
160
161impl<Fut: Future> FuturesSet<Fut> for FuturesOrdered<Fut> {
162    fn is_empty(&self) -> bool {
163        Self::is_empty(self)
164    }
165
166    fn push(&mut self, future: Fut) {
167        Self::push_back(self, future);
168    }
169}
170
171impl<Fut: Future> FuturesSet<Fut> for FuturesUnordered<Fut> {
172    fn is_empty(&self) -> bool {
173        Self::is_empty(self)
174    }
175
176    fn push(&mut self, future: Fut) {
177        Self::push(self, future);
178    }
179}
180
181#[pin_project::pin_project]
182pub struct FinalizerFuture<T> {
183    receiver: BatchStatusReceiver,
184    entry: Option<T>,
185}
186
187impl<T> Future for FinalizerFuture<T> {
188    type Output = (<BatchStatusReceiver as Future>::Output, T);
189    fn poll(mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
190        let status = std::task::ready!(self.receiver.poll_unpin(ctx));
191        // The use of this above in a `Futures{Ordered|Unordered|`
192        // will only take this once before dropping the future.
193        Poll::Ready((status, self.entry.take().unwrap_or_else(|| unreachable!())))
194    }
195}
196
197#[derive(Clone, Copy)]
198pub struct EmptyStream<T>(PhantomData<T>);
199
200impl<T> Default for EmptyStream<T> {
201    fn default() -> Self {
202        Self(PhantomData)
203    }
204}
205
206impl<T> Stream for EmptyStream<T> {
207    type Item = T;
208
209    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
210        Poll::Pending
211    }
212
213    fn size_hint(&self) -> (usize, Option<usize>) {
214        (0, Some(0))
215    }
216}