1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
#![allow(clippy::module_name_repetitions)]

use std::marker::{PhantomData, Unpin};
use std::{fmt::Debug, future::Future, pin::Pin, sync::Arc, task::Context, task::Poll};

use futures::stream::{BoxStream, FuturesOrdered, FuturesUnordered};
use futures::{future::OptionFuture, FutureExt, Stream, StreamExt};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::Notify;

use crate::finalization::{BatchStatus, BatchStatusReceiver};
use crate::shutdown::ShutdownSignal;

/// The `OrderedFinalizer` framework produces a stream of acknowledged
/// event batch identifiers from a source in a single background task
/// *in the order they are received from the source*, using
/// `FinalizerSet`.
pub type OrderedFinalizer<T> = FinalizerSet<T, FuturesOrdered<FinalizerFuture<T>>>;

/// The `UnorderedFinalizer` framework produces a stream of
/// acknowledged event batch identifiers from a source in a single
/// background task *in the order that finalization happens on the
/// event batches*, using `FinalizerSet`.
pub type UnorderedFinalizer<T> = FinalizerSet<T, FuturesUnordered<FinalizerFuture<T>>>;

/// The `FinalizerSet` framework here is a mechanism for creating a
/// stream of acknowledged (finalized) event batch identifiers from a
/// source as done in a single background task. It does this by
/// pushing the batch status receiver along with an identifier into
/// either a `FuturesOrdered` or `FuturesUnordered`, waiting on the
/// stream of acknowledgements that comes out, extracting just the
/// identifier and sending that into the returned stream. The type `T`
/// is the source-specific data associated with each entry.
#[derive(Debug)]
pub struct FinalizerSet<T, S> {
    sender: Option<UnboundedSender<(BatchStatusReceiver, T)>>,
    flush: Arc<Notify>,
    _phantom: PhantomData<S>,
}

impl<T, S> FinalizerSet<T, S>
where
    T: Send + Debug + 'static,
    S: FuturesSet<FinalizerFuture<T>> + Default + Send + Unpin + 'static,
{
    /// Produce a finalizer set along with the output stream of
    /// received acknowledged batch identifiers.
    ///
    /// The output stream will end when the source closes the producer side of the channel, and
    /// acknowledgements in the channel are drained.
    ///
    /// If the optional shutdown signal is provided, the output stream will end immediately when a
    /// shutdown signal is received. This is not recommended, and can cause some acknowledgements
    /// to go unprocessed. Sources may process the message(s) that correspond to those
    /// acknowledgements again.
    #[must_use]
    pub fn new(shutdown: Option<ShutdownSignal>) -> (Self, BoxStream<'static, (BatchStatus, T)>) {
        let (todo_tx, todo_rx) = mpsc::unbounded_channel();
        let flush1 = Arc::new(Notify::new());
        let flush2 = Arc::clone(&flush1);
        (
            Self {
                sender: Some(todo_tx),
                flush: flush1,
                _phantom: PhantomData,
            },
            finalizer_stream(shutdown, todo_rx, S::default(), flush2).boxed(),
        )
    }

    /// This returns an optional finalizer set along with a generic
    /// stream of acknowledged identifiers. In the case the finalizer
    /// is not to be used, a special empty stream is returned that is
    /// always pending and so never wakes.
    #[must_use]
    pub fn maybe_new(
        maybe: bool,
        shutdown: Option<ShutdownSignal>,
    ) -> (Option<Self>, BoxStream<'static, (BatchStatus, T)>) {
        if maybe {
            let (finalizer, stream) = Self::new(shutdown);
            (Some(finalizer), stream)
        } else {
            (None, EmptyStream::default().boxed())
        }
    }

    pub fn add(&self, entry: T, receiver: BatchStatusReceiver) {
        if let Some(sender) = &self.sender {
            if let Err(error) = sender.send((receiver, entry)) {
                error!(message = "FinalizerSet task ended prematurely.", %error);
            }
        }
    }

    pub fn flush(&self) {
        self.flush.notify_one();
    }
}

fn finalizer_stream<T, S>(
    shutdown: Option<ShutdownSignal>,
    mut new_entries: UnboundedReceiver<(BatchStatusReceiver, T)>,
    mut status_receivers: S,
    flush: Arc<Notify>,
) -> impl Stream<Item = (BatchStatus, T)>
where
    S: Default + FuturesSet<FinalizerFuture<T>> + Unpin,
{
    let handle_shutdown = shutdown.is_some();
    let mut shutdown = OptionFuture::from(shutdown);

    async_stream::stream! {
        loop {
            tokio::select! {
                biased;
                _ = &mut shutdown, if handle_shutdown => break,
                () = flush.notified() => {
                    // Drop all the existing status receivers and start over.
                    status_receivers = S::default();
                },
                // Prefer to remove finalizers than to add new finalizers to prevent unbounded
                // growth under load.
                finished = status_receivers.next(), if !status_receivers.is_empty() => match finished {
                    Some((status, entry)) => yield (status, entry),
                    // The `is_empty` guard above prevents this from being reachable.
                    None => unreachable!(),
                },
                // Only poll for new entries until shutdown is flagged.
                new_entry = new_entries.recv() => match new_entry {
                    Some((receiver, entry)) => {
                        status_receivers.push(FinalizerFuture {
                            receiver,
                            entry: Some(entry),
                        });
                    }
                    // The end of the new entry channel signals shutdown
                    None => break,
                },
            }
        }

        // We've either seen a shutdown signal or the new entry sender
        // was closed. Wait for the last statuses to come in before
        // indicating we are done.
        while let Some((status, entry)) = status_receivers.next().await {
            yield (status, entry);
        }

        // Hold on to the shutdown signal until here to prevent
        // notification of completion before this stream is done.
        drop(shutdown);
    }
}

pub trait FuturesSet<Fut: Future>: Stream<Item = Fut::Output> {
    fn is_empty(&self) -> bool;
    fn push(&mut self, future: Fut);
}

impl<Fut: Future> FuturesSet<Fut> for FuturesOrdered<Fut> {
    fn is_empty(&self) -> bool {
        Self::is_empty(self)
    }

    fn push(&mut self, future: Fut) {
        Self::push_back(self, future);
    }
}

impl<Fut: Future> FuturesSet<Fut> for FuturesUnordered<Fut> {
    fn is_empty(&self) -> bool {
        Self::is_empty(self)
    }

    fn push(&mut self, future: Fut) {
        Self::push(self, future);
    }
}

#[pin_project::pin_project]
pub struct FinalizerFuture<T> {
    receiver: BatchStatusReceiver,
    entry: Option<T>,
}

impl<T> Future for FinalizerFuture<T> {
    type Output = (<BatchStatusReceiver as Future>::Output, T);
    fn poll(mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        let status = std::task::ready!(self.receiver.poll_unpin(ctx));
        // The use of this above in a `Futures{Ordered|Unordered|`
        // will only take this once before dropping the future.
        Poll::Ready((status, self.entry.take().unwrap_or_else(|| unreachable!())))
    }
}

#[derive(Clone, Copy)]
pub struct EmptyStream<T>(PhantomData<T>);

impl<T> Default for EmptyStream<T> {
    fn default() -> Self {
        Self(PhantomData)
    }
}

impl<T> Stream for EmptyStream<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Poll::Pending
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, Some(0))
    }
}