vector_common/
finalization.rs

1#![deny(missing_docs)]
2//! This module contains the event metadata required to track an event
3//! as it flows through transforms, being duplicated and merged, and
4//! then report its status when the last copy is delivered or dropped.
5
6use std::{cmp, future::Future, mem, pin::Pin, sync::Arc, task::Poll};
7
8use crossbeam_utils::atomic::AtomicCell;
9use futures::future::FutureExt;
10use tokio::sync::oneshot;
11
12#[cfg(feature = "byte_size_of")]
13use crate::byte_size_of::ByteSizeOf;
14
15/// A collection of event finalizers.
16#[derive(Clone, Debug, Default)]
17pub struct EventFinalizers(Vec<Arc<EventFinalizer>>);
18
19impl Eq for EventFinalizers {}
20
21impl PartialEq for EventFinalizers {
22    fn eq(&self, other: &Self) -> bool {
23        self.0.len() == other.0.len()
24            && (self.0.iter())
25                .zip(other.0.iter())
26                .all(|(a, b)| Arc::ptr_eq(a, b))
27    }
28}
29
30impl PartialOrd for EventFinalizers {
31    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
32        // There is no partial order defined structurally on
33        // `EventFinalizer`. Partial equality is defined on the equality of
34        // `Arc`s. Therefore, partial ordering of `EventFinalizers` is defined
35        // only on the length of the finalizers.
36        self.0.len().partial_cmp(&other.0.len())
37    }
38}
39
40#[cfg(feature = "byte_size_of")]
41impl ByteSizeOf for EventFinalizers {
42    fn allocated_bytes(&self) -> usize {
43        // Don't count the allocated data here, it's not really event
44        // data we're interested in tracking but rather an artifact of
45        // tracking and merging events.
46        0
47    }
48}
49
50impl EventFinalizers {
51    /// Default empty finalizer set for use in `const` contexts.
52    pub const DEFAULT: Self = Self(Vec::new());
53
54    /// Creates a new `EventFinalizers` based on the given event finalizer.
55    #[must_use]
56    pub fn new(finalizer: EventFinalizer) -> Self {
57        Self(vec![Arc::new(finalizer)])
58    }
59
60    /// Returns `true` if the collection contains no event finalizers.
61    #[must_use]
62    pub fn is_empty(&self) -> bool {
63        self.0.is_empty()
64    }
65
66    /// Returns the number of event finalizers in the collection.
67    #[must_use]
68    pub fn len(&self) -> usize {
69        self.0.len()
70    }
71
72    /// Adds a new event finalizer to the collection.
73    pub fn add(&mut self, finalizer: EventFinalizer) {
74        self.0.push(Arc::new(finalizer));
75    }
76
77    /// Merges the event finalizers from `other` into the collection.
78    pub fn merge(&mut self, other: Self) {
79        self.0.extend(other.0);
80    }
81
82    /// Updates the status of all event finalizers in the collection.
83    pub fn update_status(&self, status: EventStatus) {
84        for finalizer in &self.0 {
85            finalizer.update_status(status);
86        }
87    }
88
89    /// Consumes all event finalizers and updates their underlying batches immediately.
90    pub fn update_sources(&mut self) {
91        let finalizers = mem::take(&mut self.0);
92        for finalizer in &finalizers {
93            finalizer.update_batch();
94        }
95    }
96}
97
98impl Finalizable for EventFinalizers {
99    fn take_finalizers(&mut self) -> EventFinalizers {
100        mem::take(self)
101    }
102}
103
104impl std::iter::FromIterator<EventFinalizers> for EventFinalizers {
105    fn from_iter<T: IntoIterator<Item = EventFinalizers>>(iter: T) -> Self {
106        Self(iter.into_iter().flat_map(|f| f.0.into_iter()).collect())
107    }
108}
109
110/// An event finalizer is the shared data required to handle tracking the status of an event, and updating the status of
111/// a batch with that when the event is dropped.
112#[derive(Debug)]
113pub struct EventFinalizer {
114    status: AtomicCell<EventStatus>,
115    batch: BatchNotifier,
116}
117
118#[cfg(feature = "byte_size_of")]
119impl ByteSizeOf for EventFinalizer {
120    fn allocated_bytes(&self) -> usize {
121        // Don't count the batch notifier, as it's shared across
122        // events in a batch.
123        0
124    }
125}
126
127impl EventFinalizer {
128    /// Creates a new `EventFinalizer` attached to the given `batch`.
129    #[must_use]
130    pub fn new(batch: BatchNotifier) -> Self {
131        let status = AtomicCell::new(EventStatus::Dropped);
132        Self { status, batch }
133    }
134
135    /// Updates the status of the event finalizer to `status`.
136    pub fn update_status(&self, status: EventStatus) {
137        self.status
138            .fetch_update(|old_status| Some(old_status.update(status)))
139            .unwrap_or_else(|_| unreachable!());
140    }
141
142    /// Updates the underlying batch status with the status of the event finalizer.
143    ///
144    /// In doing so, the event finalizer is marked as "recorded", which prevents any further updates to it.
145    pub fn update_batch(&self) {
146        let status = self
147            .status
148            .fetch_update(|_| Some(EventStatus::Recorded))
149            .unwrap_or_else(|_| unreachable!());
150        self.batch.update_status(status);
151    }
152}
153
154impl Drop for EventFinalizer {
155    fn drop(&mut self) {
156        self.update_batch();
157    }
158}
159
160/// A convenience newtype wrapper for the one-shot receiver for an
161/// individual batch status.
162#[pin_project::pin_project]
163pub struct BatchStatusReceiver(oneshot::Receiver<BatchStatus>);
164
165impl Future for BatchStatusReceiver {
166    type Output = BatchStatus;
167    fn poll(mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
168        match self.0.poll_unpin(ctx) {
169            Poll::Pending => Poll::Pending,
170            Poll::Ready(Ok(status)) => Poll::Ready(status),
171            Poll::Ready(Err(error)) => {
172                error!(%error, "Batch status receiver dropped before sending.");
173                Poll::Ready(BatchStatus::Errored)
174            }
175        }
176    }
177}
178
179impl BatchStatusReceiver {
180    /// Wrapper for the underlying `try_recv` function.
181    ///
182    /// # Errors
183    ///
184    /// - `TryRecvError::Empty` if no value has been sent yet.
185    /// - `TryRecvError::Closed` if the sender has dropped without sending a value.
186    pub fn try_recv(&mut self) -> Result<BatchStatus, oneshot::error::TryRecvError> {
187        self.0.try_recv()
188    }
189}
190
191/// A batch notifier contains the status of the current batch along with
192/// a one-shot notifier to send that status back to the source. It is
193/// shared among all events of a batch.
194#[derive(Clone, Debug)]
195pub struct BatchNotifier(Arc<OwnedBatchNotifier>);
196
197impl BatchNotifier {
198    /// Creates a new `BatchNotifier` along with the receiver used to await its finalization status.
199    #[must_use]
200    pub fn new_with_receiver() -> (Self, BatchStatusReceiver) {
201        let (sender, receiver) = oneshot::channel();
202        let notifier = OwnedBatchNotifier {
203            status: AtomicCell::new(BatchStatus::Delivered),
204            notifier: Some(sender),
205        };
206        (Self(Arc::new(notifier)), BatchStatusReceiver(receiver))
207    }
208
209    /// Optionally creates a new `BatchNotifier` along with the receiver used to await its finalization status.
210    #[must_use]
211    pub fn maybe_new_with_receiver(enabled: bool) -> (Option<Self>, Option<BatchStatusReceiver>) {
212        if enabled {
213            let (batch, receiver) = Self::new_with_receiver();
214            (Some(batch), Some(receiver))
215        } else {
216            (None, None)
217        }
218    }
219
220    /// Creates a new `BatchNotifier` and attaches it to a group of events.
221    ///
222    /// The receiver used to await the finalization status of the batch is returned.
223    pub fn apply_to<T: AddBatchNotifier>(items: &mut [T]) -> BatchStatusReceiver {
224        let (batch, receiver) = Self::new_with_receiver();
225        for item in items {
226            item.add_batch_notifier(batch.clone());
227        }
228        receiver
229    }
230
231    /// Optionally creates a new `BatchNotifier` and attaches it to a group of events.
232    ///
233    /// If `enabled`, the receiver used to await the finalization status of the batch is
234    /// returned. Otherwise, `None` is returned.
235    pub fn maybe_apply_to<T: AddBatchNotifier>(
236        enabled: bool,
237        items: &mut [T],
238    ) -> Option<BatchStatusReceiver> {
239        enabled.then(|| Self::apply_to(items))
240    }
241
242    /// Updates the status of the notifier.
243    fn update_status(&self, status: EventStatus) {
244        // The status starts as Delivered and can only change if the new
245        // status is different than that.
246        if status != EventStatus::Delivered && status != EventStatus::Dropped {
247            self.0
248                .status
249                .fetch_update(|old_status| Some(old_status.update(status)))
250                .unwrap_or_else(|_| unreachable!());
251        }
252    }
253}
254
255/// The non-shared data underlying the shared `BatchNotifier`
256#[derive(Debug)]
257pub struct OwnedBatchNotifier {
258    status: AtomicCell<BatchStatus>,
259    notifier: Option<oneshot::Sender<BatchStatus>>,
260}
261
262impl OwnedBatchNotifier {
263    /// Sends the status of the notifier back to the source.
264    fn send_status(&mut self) {
265        if let Some(notifier) = self.notifier.take() {
266            let status = self.status.load();
267            // Ignore the error case, as it will happen during normal
268            // source shutdown and we can't detect that here.
269            _ = notifier.send(status);
270        }
271    }
272}
273
274impl Drop for OwnedBatchNotifier {
275    fn drop(&mut self) {
276        self.send_status();
277    }
278}
279
280/// The status of an individual batch.
281#[derive(Copy, Clone, Debug, Eq, PartialEq)]
282#[repr(u8)]
283#[derive(Default)]
284pub enum BatchStatus {
285    /// All events in the batch were accepted.
286    ///
287    /// This is the default.
288    #[default]
289    Delivered,
290    /// At least one event in the batch had a transient error in delivery.
291    Errored,
292    /// At least one event in the batch had a permanent failure or rejection.
293    Rejected,
294}
295
296impl BatchStatus {
297    /// Updates the delivery status based on another batch's delivery status, returning the result.
298    ///
299    /// As not every status has the same priority, some updates may end up being a no-op either due to not being any
300    /// different or due to being lower priority than the current status.
301    #[allow(clippy::match_same_arms)] // False positive: https://github.com/rust-lang/rust-clippy/issues/860
302    fn update(self, status: EventStatus) -> Self {
303        match (self, status) {
304            // `Dropped` and `Delivered` do not change the status.
305            (_, EventStatus::Dropped | EventStatus::Delivered) => self,
306            // `Rejected` overrides `Errored` and `Delivered`
307            (Self::Rejected, _) | (_, EventStatus::Rejected) => Self::Rejected,
308            // `Errored` overrides `Delivered`
309            (Self::Errored, _) | (_, EventStatus::Errored) => Self::Errored,
310            // No change for `Delivered`
311            _ => self,
312        }
313    }
314}
315
316/// The status of an individual event.
317#[derive(Copy, Clone, Debug, Eq, PartialEq)]
318#[repr(u8)]
319#[derive(Default)]
320pub enum EventStatus {
321    /// All copies of this event were dropped without being finalized.
322    ///
323    /// This is the default.
324    #[default]
325    Dropped,
326    /// All copies of this event were delivered successfully.
327    Delivered,
328    /// At least one copy of this event encountered a retriable error.
329    Errored,
330    /// At least one copy of this event encountered a permanent failure or rejection.
331    Rejected,
332    /// This status has been recorded and should not be updated.
333    Recorded,
334}
335
336impl EventStatus {
337    /// Updates the status based on another event's status, returning the result.
338    ///
339    /// As not every status has the same priority, some updates may end up being a no-op either due to not being any
340    /// different or due to being lower priority than the current status.
341    ///
342    /// # Panics
343    ///
344    /// Passing a new status of `Dropped` is a programming error and will panic in debug/test builds.
345    #[allow(clippy::match_same_arms)] // False positive: https://github.com/rust-lang/rust-clippy/issues/860
346    #[must_use]
347    pub fn update(self, status: Self) -> Self {
348        match (self, status) {
349            // `Recorded` always overwrites existing status and is never updated
350            (_, Self::Recorded) | (Self::Recorded, _) => Self::Recorded,
351            // `Dropped` always updates to the new status.
352            (Self::Dropped, _) => status,
353            // Updates *to* `Dropped` are nonsense.
354            (_, Self::Dropped) => {
355                debug_assert!(false, "Updating EventStatus to Dropped is nonsense");
356                self
357            }
358            // `Rejected` overrides `Errored` or `Delivered`.
359            (Self::Rejected, _) | (_, Self::Rejected) => Self::Rejected,
360            // `Errored` overrides `Delivered`.
361            (Self::Errored, _) | (_, Self::Errored) => Self::Errored,
362            // No change for `Delivered`.
363            (Self::Delivered, Self::Delivered) => Self::Delivered,
364        }
365    }
366}
367
368/// An object to which we can add a batch notifier.
369pub trait AddBatchNotifier {
370    /// Adds a single shared batch notifier to this type.
371    fn add_batch_notifier(&mut self, notifier: BatchNotifier);
372}
373
374/// An object that can be finalized.
375pub trait Finalizable {
376    /// Consumes the finalizers of this object.
377    ///
378    /// Typically used for coalescing the finalizers of multiple items, such as when batching finalizable objects where
379    /// all finalizations will be processed when the batch itself is processed.
380    fn take_finalizers(&mut self) -> EventFinalizers;
381}
382
383impl<T: Finalizable> Finalizable for Vec<T> {
384    fn take_finalizers(&mut self) -> EventFinalizers {
385        self.iter_mut()
386            .fold(EventFinalizers::default(), |mut acc, x| {
387                acc.merge(x.take_finalizers());
388                acc
389            })
390    }
391}
392
393#[cfg(test)]
394mod tests {
395    use tokio::sync::oneshot::error::TryRecvError::Empty;
396
397    use super::*;
398
399    #[test]
400    fn defaults() {
401        let finalizer = EventFinalizers::default();
402        assert_eq!(finalizer.len(), 0);
403    }
404
405    #[test]
406    fn sends_notification() {
407        let (fin, mut receiver) = make_finalizer();
408        assert_eq!(receiver.try_recv(), Err(Empty));
409        drop(fin);
410        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
411    }
412
413    #[test]
414    fn early_update() {
415        let (mut fin, mut receiver) = make_finalizer();
416        fin.update_status(EventStatus::Rejected);
417        assert_eq!(receiver.try_recv(), Err(Empty));
418        fin.update_sources();
419        assert_eq!(fin.len(), 0);
420        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
421    }
422
423    #[test]
424    fn clone_events() {
425        let (fin1, mut receiver) = make_finalizer();
426        let fin2 = fin1.clone();
427        assert_eq!(fin1.len(), 1);
428        assert_eq!(fin2.len(), 1);
429        assert_eq!(fin1, fin2);
430
431        assert_eq!(receiver.try_recv(), Err(Empty));
432        drop(fin1);
433        assert_eq!(receiver.try_recv(), Err(Empty));
434        drop(fin2);
435        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
436    }
437
438    #[test]
439    fn merge_events() {
440        let mut fin0 = EventFinalizers::default();
441        let (fin1, mut receiver1) = make_finalizer();
442        let (fin2, mut receiver2) = make_finalizer();
443
444        assert_eq!(fin0.len(), 0);
445        fin0.merge(fin1);
446        assert_eq!(fin0.len(), 1);
447        fin0.merge(fin2);
448        assert_eq!(fin0.len(), 2);
449
450        assert_eq!(receiver1.try_recv(), Err(Empty));
451        assert_eq!(receiver2.try_recv(), Err(Empty));
452        drop(fin0);
453        assert_eq!(receiver1.try_recv(), Ok(BatchStatus::Delivered));
454        assert_eq!(receiver2.try_recv(), Ok(BatchStatus::Delivered));
455    }
456
457    #[ignore = "The current implementation does not deduplicate finalizers"]
458    #[test]
459    fn clone_and_merge_events() {
460        let (mut fin1, mut receiver) = make_finalizer();
461        let fin2 = fin1.clone();
462        fin1.merge(fin2);
463        assert_eq!(fin1.len(), 1);
464
465        assert_eq!(receiver.try_recv(), Err(Empty));
466        drop(fin1);
467        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
468    }
469
470    #[test]
471    fn multi_event_batch() {
472        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
473        let event1 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
474        let mut event2 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
475        let event3 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
476        // Also clone one…
477        let event4 = event1.clone();
478        drop(batch);
479        assert_eq!(event1.len(), 1);
480        assert_eq!(event2.len(), 1);
481        assert_eq!(event3.len(), 1);
482        assert_eq!(event4.len(), 1);
483        assert_ne!(event1, event2);
484        assert_ne!(event1, event3);
485        assert_eq!(event1, event4);
486        assert_ne!(event2, event3);
487        assert_ne!(event2, event4);
488        assert_ne!(event3, event4);
489        // …and merge another
490        event2.merge(event3);
491        assert_eq!(event2.len(), 2);
492
493        assert_eq!(receiver.try_recv(), Err(Empty));
494        drop(event1);
495        assert_eq!(receiver.try_recv(), Err(Empty));
496        drop(event2);
497        assert_eq!(receiver.try_recv(), Err(Empty));
498        drop(event4);
499        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
500    }
501
502    fn make_finalizer() -> (EventFinalizers, BatchStatusReceiver) {
503        let (batch, receiver) = BatchNotifier::new_with_receiver();
504        let finalizer = EventFinalizers::new(EventFinalizer::new(batch));
505        assert_eq!(finalizer.len(), 1);
506        (finalizer, receiver)
507    }
508
509    #[test]
510    fn event_status_updates() {
511        use EventStatus::{Delivered, Dropped, Errored, Recorded, Rejected};
512
513        assert_eq!(Dropped.update(Dropped), Dropped);
514        assert_eq!(Dropped.update(Delivered), Delivered);
515        assert_eq!(Dropped.update(Errored), Errored);
516        assert_eq!(Dropped.update(Rejected), Rejected);
517        assert_eq!(Dropped.update(Recorded), Recorded);
518
519        //assert_eq!(Delivered.update(Dropped), Delivered);
520        assert_eq!(Delivered.update(Delivered), Delivered);
521        assert_eq!(Delivered.update(Errored), Errored);
522        assert_eq!(Delivered.update(Rejected), Rejected);
523        assert_eq!(Delivered.update(Recorded), Recorded);
524
525        //assert_eq!(Errored.update(Dropped), Errored);
526        assert_eq!(Errored.update(Delivered), Errored);
527        assert_eq!(Errored.update(Errored), Errored);
528        assert_eq!(Errored.update(Rejected), Rejected);
529        assert_eq!(Errored.update(Recorded), Recorded);
530
531        //assert_eq!(Rejected.update(Dropped), Rejected);
532        assert_eq!(Rejected.update(Delivered), Rejected);
533        assert_eq!(Rejected.update(Errored), Rejected);
534        assert_eq!(Rejected.update(Rejected), Rejected);
535        assert_eq!(Rejected.update(Recorded), Recorded);
536
537        //assert_eq!(Recorded.update(Dropped), Recorded);
538        assert_eq!(Recorded.update(Delivered), Recorded);
539        assert_eq!(Recorded.update(Errored), Recorded);
540        assert_eq!(Recorded.update(Rejected), Recorded);
541        assert_eq!(Recorded.update(Recorded), Recorded);
542    }
543
544    #[test]
545    fn batch_status_update() {
546        use BatchStatus::{Delivered, Errored, Rejected};
547
548        assert_eq!(Delivered.update(EventStatus::Dropped), Delivered);
549        assert_eq!(Delivered.update(EventStatus::Delivered), Delivered);
550        assert_eq!(Delivered.update(EventStatus::Errored), Errored);
551        assert_eq!(Delivered.update(EventStatus::Rejected), Rejected);
552        assert_eq!(Delivered.update(EventStatus::Recorded), Delivered);
553
554        assert_eq!(Errored.update(EventStatus::Dropped), Errored);
555        assert_eq!(Errored.update(EventStatus::Delivered), Errored);
556        assert_eq!(Errored.update(EventStatus::Errored), Errored);
557        assert_eq!(Errored.update(EventStatus::Rejected), Rejected);
558        assert_eq!(Errored.update(EventStatus::Recorded), Errored);
559
560        assert_eq!(Rejected.update(EventStatus::Dropped), Rejected);
561        assert_eq!(Rejected.update(EventStatus::Delivered), Rejected);
562        assert_eq!(Rejected.update(EventStatus::Errored), Rejected);
563        assert_eq!(Rejected.update(EventStatus::Rejected), Rejected);
564        assert_eq!(Rejected.update(EventStatus::Recorded), Rejected);
565    }
566}