vector_common/
finalization.rs

1#![deny(missing_docs)]
2//! This module contains the event metadata required to track an event
3//! as it flows through transforms, being duplicated and merged, and
4//! then report its status when the last copy is delivered or dropped.
5
6use std::{cmp, future::Future, mem, pin::Pin, sync::Arc, task::Poll};
7
8use crossbeam_utils::atomic::AtomicCell;
9use futures::future::FutureExt;
10use tokio::sync::oneshot;
11
12#[cfg(feature = "byte_size_of")]
13use crate::byte_size_of::ByteSizeOf;
14
15/// A collection of event finalizers.
16#[derive(Clone, Debug, Default)]
17pub struct EventFinalizers(Vec<Arc<EventFinalizer>>);
18
19impl Eq for EventFinalizers {}
20
21impl PartialEq for EventFinalizers {
22    fn eq(&self, other: &Self) -> bool {
23        self.0.len() == other.0.len()
24            && (self.0.iter())
25                .zip(other.0.iter())
26                .all(|(a, b)| Arc::ptr_eq(a, b))
27    }
28}
29
30impl PartialOrd for EventFinalizers {
31    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
32        // There is no partial order defined structurally on
33        // `EventFinalizer`. Partial equality is defined on the equality of
34        // `Arc`s. Therefore, partial ordering of `EventFinalizers` is defined
35        // only on the length of the finalizers.
36        self.0.len().partial_cmp(&other.0.len())
37    }
38}
39
40#[cfg(feature = "byte_size_of")]
41impl ByteSizeOf for EventFinalizers {
42    fn allocated_bytes(&self) -> usize {
43        // Don't count the allocated data here, it's not really event
44        // data we're interested in tracking but rather an artifact of
45        // tracking and merging events.
46        0
47    }
48}
49
50impl EventFinalizers {
51    /// Default empty finalizer set for use in `const` contexts.
52    pub const DEFAULT: Self = Self(Vec::new());
53
54    /// Creates a new `EventFinalizers` based on the given event finalizer.
55    #[must_use]
56    pub fn new(finalizer: EventFinalizer) -> Self {
57        Self(vec![Arc::new(finalizer)])
58    }
59
60    /// Returns `true` if the collection contains no event finalizers.
61    #[must_use]
62    pub fn is_empty(&self) -> bool {
63        self.0.is_empty()
64    }
65
66    /// Returns the number of event finalizers in the collection.
67    #[must_use]
68    pub fn len(&self) -> usize {
69        self.0.len()
70    }
71
72    /// Adds a new event finalizer to the collection.
73    pub fn add(&mut self, finalizer: EventFinalizer) {
74        self.0.push(Arc::new(finalizer));
75    }
76
77    /// Merges the event finalizers from `other` into the collection.
78    pub fn merge(&mut self, other: Self) {
79        self.0.extend(other.0);
80    }
81
82    /// Updates the status of all event finalizers in the collection.
83    pub fn update_status(&self, status: EventStatus) {
84        for finalizer in &self.0 {
85            finalizer.update_status(status);
86        }
87    }
88
89    /// Consumes all event finalizers and updates their underlying batches immediately.
90    pub fn update_sources(&mut self) {
91        let finalizers = mem::take(&mut self.0);
92        for finalizer in &finalizers {
93            finalizer.update_batch();
94        }
95    }
96}
97
98impl Finalizable for EventFinalizers {
99    fn take_finalizers(&mut self) -> EventFinalizers {
100        mem::take(self)
101    }
102}
103
104impl std::iter::FromIterator<EventFinalizers> for EventFinalizers {
105    fn from_iter<T: IntoIterator<Item = EventFinalizers>>(iter: T) -> Self {
106        Self(iter.into_iter().flat_map(|f| f.0.into_iter()).collect())
107    }
108}
109
110/// An event finalizer is the shared data required to handle tracking the status of an event, and updating the status of
111/// a batch with that when the event is dropped.
112#[derive(Debug)]
113pub struct EventFinalizer {
114    status: AtomicCell<EventStatus>,
115    batch: BatchNotifier,
116}
117
118#[cfg(feature = "byte_size_of")]
119impl ByteSizeOf for EventFinalizer {
120    fn allocated_bytes(&self) -> usize {
121        // Don't count the batch notifier, as it's shared across
122        // events in a batch.
123        0
124    }
125}
126
127impl EventFinalizer {
128    /// Creates a new `EventFinalizer` attached to the given `batch`.
129    #[must_use]
130    pub fn new(batch: BatchNotifier) -> Self {
131        let status = AtomicCell::new(EventStatus::Dropped);
132        Self { status, batch }
133    }
134
135    /// Updates the status of the event finalizer to `status`.
136    pub fn update_status(&self, status: EventStatus) {
137        self.status
138            .fetch_update(|old_status| Some(old_status.update(status)))
139            .unwrap_or_else(|_| unreachable!());
140    }
141
142    /// Updates the underlying batch status with the status of the event finalizer.
143    ///
144    /// In doing so, the event finalizer is marked as "recorded", which prevents any further updates to it.
145    pub fn update_batch(&self) {
146        let status = self
147            .status
148            .fetch_update(|_| Some(EventStatus::Recorded))
149            .unwrap_or_else(|_| unreachable!());
150        self.batch.update_status(status);
151    }
152}
153
154impl Drop for EventFinalizer {
155    fn drop(&mut self) {
156        self.update_batch();
157    }
158}
159
160/// A convenience newtype wrapper for the one-shot receiver for an
161/// individual batch status.
162#[pin_project::pin_project]
163pub struct BatchStatusReceiver(oneshot::Receiver<BatchStatus>);
164
165impl Future for BatchStatusReceiver {
166    type Output = BatchStatus;
167    fn poll(mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
168        match self.0.poll_unpin(ctx) {
169            Poll::Pending => Poll::Pending,
170            Poll::Ready(Ok(status)) => Poll::Ready(status),
171            Poll::Ready(Err(error)) => {
172                error!(%error, "Batch status receiver dropped before sending.");
173                Poll::Ready(BatchStatus::Errored)
174            }
175        }
176    }
177}
178
179impl BatchStatusReceiver {
180    /// Wrapper for the underlying `try_recv` function.
181    ///
182    /// # Errors
183    ///
184    /// - `TryRecvError::Empty` if no value has been sent yet.
185    /// - `TryRecvError::Closed` if the sender has dropped without sending a value.
186    pub fn try_recv(&mut self) -> Result<BatchStatus, oneshot::error::TryRecvError> {
187        self.0.try_recv()
188    }
189}
190
191/// A batch notifier contains the status of the current batch along with
192/// a one-shot notifier to send that status back to the source. It is
193/// shared among all events of a batch.
194#[derive(Clone, Debug)]
195pub struct BatchNotifier(Arc<OwnedBatchNotifier>);
196
197impl BatchNotifier {
198    /// Creates a new `BatchNotifier` along with the receiver used to await its finalization status.
199    #[must_use]
200    pub fn new_with_receiver() -> (Self, BatchStatusReceiver) {
201        let (sender, receiver) = oneshot::channel();
202        let notifier = OwnedBatchNotifier {
203            status: AtomicCell::new(BatchStatus::Delivered),
204            notifier: Some(sender),
205        };
206        (Self(Arc::new(notifier)), BatchStatusReceiver(receiver))
207    }
208
209    /// Optionally creates a new `BatchNotifier` along with the receiver used to await its finalization status.
210    #[must_use]
211    pub fn maybe_new_with_receiver(enabled: bool) -> (Option<Self>, Option<BatchStatusReceiver>) {
212        if enabled {
213            let (batch, receiver) = Self::new_with_receiver();
214            (Some(batch), Some(receiver))
215        } else {
216            (None, None)
217        }
218    }
219
220    /// Creates a new `BatchNotifier` and attaches it to a group of events.
221    ///
222    /// The receiver used to await the finalization status of the batch is returned.
223    pub fn apply_to<T: AddBatchNotifier>(items: &mut [T]) -> BatchStatusReceiver {
224        let (batch, receiver) = Self::new_with_receiver();
225        for item in items {
226            item.add_batch_notifier(batch.clone());
227        }
228        receiver
229    }
230
231    /// Optionally creates a new `BatchNotifier` and attaches it to a group of events.
232    ///
233    /// If `enabled`, the receiver used to await the finalization status of the batch is
234    /// returned. Otherwise, `None` is returned.
235    pub fn maybe_apply_to<T: AddBatchNotifier>(
236        enabled: bool,
237        items: &mut [T],
238    ) -> Option<BatchStatusReceiver> {
239        enabled.then(|| Self::apply_to(items))
240    }
241
242    /// Updates the status of the notifier.
243    fn update_status(&self, status: EventStatus) {
244        // The status starts as Delivered and can only change if the new
245        // status is different than that.
246        if status != EventStatus::Delivered && status != EventStatus::Dropped {
247            self.0
248                .status
249                .fetch_update(|old_status| Some(old_status.update(status)))
250                .unwrap_or_else(|_| unreachable!());
251        }
252    }
253}
254
255/// The non-shared data underlying the shared `BatchNotifier`
256#[derive(Debug)]
257pub struct OwnedBatchNotifier {
258    status: AtomicCell<BatchStatus>,
259    notifier: Option<oneshot::Sender<BatchStatus>>,
260}
261
262impl OwnedBatchNotifier {
263    /// Sends the status of the notifier back to the source.
264    fn send_status(&mut self) {
265        if let Some(notifier) = self.notifier.take() {
266            let status = self.status.load();
267            // Ignore the error case, as it will happen during normal
268            // source shutdown and we can't detect that here.
269            _ = notifier.send(status);
270        }
271    }
272}
273
274impl Drop for OwnedBatchNotifier {
275    fn drop(&mut self) {
276        self.send_status();
277    }
278}
279
280/// The status of an individual batch.
281#[derive(Copy, Clone, Debug, Eq, PartialEq)]
282#[repr(u8)]
283pub enum BatchStatus {
284    /// All events in the batch were accepted.
285    ///
286    /// This is the default.
287    Delivered,
288    /// At least one event in the batch had a transient error in delivery.
289    Errored,
290    /// At least one event in the batch had a permanent failure or rejection.
291    Rejected,
292}
293
294impl Default for BatchStatus {
295    fn default() -> Self {
296        Self::Delivered
297    }
298}
299
300impl BatchStatus {
301    /// Updates the delivery status based on another batch's delivery status, returning the result.
302    ///
303    /// As not every status has the same priority, some updates may end up being a no-op either due to not being any
304    /// different or due to being lower priority than the current status.
305    #[allow(clippy::match_same_arms)] // False positive: https://github.com/rust-lang/rust-clippy/issues/860
306    fn update(self, status: EventStatus) -> Self {
307        match (self, status) {
308            // `Dropped` and `Delivered` do not change the status.
309            (_, EventStatus::Dropped | EventStatus::Delivered) => self,
310            // `Rejected` overrides `Errored` and `Delivered`
311            (Self::Rejected, _) | (_, EventStatus::Rejected) => Self::Rejected,
312            // `Errored` overrides `Delivered`
313            (Self::Errored, _) | (_, EventStatus::Errored) => Self::Errored,
314            // No change for `Delivered`
315            _ => self,
316        }
317    }
318}
319
320/// The status of an individual event.
321#[derive(Copy, Clone, Debug, Eq, PartialEq)]
322#[repr(u8)]
323pub enum EventStatus {
324    /// All copies of this event were dropped without being finalized.
325    ///
326    /// This is the default.
327    Dropped,
328    /// All copies of this event were delivered successfully.
329    Delivered,
330    /// At least one copy of this event encountered a retriable error.
331    Errored,
332    /// At least one copy of this event encountered a permanent failure or rejection.
333    Rejected,
334    /// This status has been recorded and should not be updated.
335    Recorded,
336}
337
338impl Default for EventStatus {
339    fn default() -> Self {
340        Self::Dropped
341    }
342}
343
344impl EventStatus {
345    /// Updates the status based on another event's status, returning the result.
346    ///
347    /// As not every status has the same priority, some updates may end up being a no-op either due to not being any
348    /// different or due to being lower priority than the current status.
349    ///
350    /// # Panics
351    ///
352    /// Passing a new status of `Dropped` is a programming error and will panic in debug/test builds.
353    #[allow(clippy::match_same_arms)] // False positive: https://github.com/rust-lang/rust-clippy/issues/860
354    #[must_use]
355    pub fn update(self, status: Self) -> Self {
356        match (self, status) {
357            // `Recorded` always overwrites existing status and is never updated
358            (_, Self::Recorded) | (Self::Recorded, _) => Self::Recorded,
359            // `Dropped` always updates to the new status.
360            (Self::Dropped, _) => status,
361            // Updates *to* `Dropped` are nonsense.
362            (_, Self::Dropped) => {
363                debug_assert!(false, "Updating EventStatus to Dropped is nonsense");
364                self
365            }
366            // `Rejected` overrides `Errored` or `Delivered`.
367            (Self::Rejected, _) | (_, Self::Rejected) => Self::Rejected,
368            // `Errored` overrides `Delivered`.
369            (Self::Errored, _) | (_, Self::Errored) => Self::Errored,
370            // No change for `Delivered`.
371            (Self::Delivered, Self::Delivered) => Self::Delivered,
372        }
373    }
374}
375
376/// An object to which we can add a batch notifier.
377pub trait AddBatchNotifier {
378    /// Adds a single shared batch notifier to this type.
379    fn add_batch_notifier(&mut self, notifier: BatchNotifier);
380}
381
382/// An object that can be finalized.
383pub trait Finalizable {
384    /// Consumes the finalizers of this object.
385    ///
386    /// Typically used for coalescing the finalizers of multiple items, such as when batching finalizable objects where
387    /// all finalizations will be processed when the batch itself is processed.
388    fn take_finalizers(&mut self) -> EventFinalizers;
389}
390
391impl<T: Finalizable> Finalizable for Vec<T> {
392    fn take_finalizers(&mut self) -> EventFinalizers {
393        self.iter_mut()
394            .fold(EventFinalizers::default(), |mut acc, x| {
395                acc.merge(x.take_finalizers());
396                acc
397            })
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use tokio::sync::oneshot::error::TryRecvError::Empty;
404
405    use super::*;
406
407    #[test]
408    fn defaults() {
409        let finalizer = EventFinalizers::default();
410        assert_eq!(finalizer.len(), 0);
411    }
412
413    #[test]
414    fn sends_notification() {
415        let (fin, mut receiver) = make_finalizer();
416        assert_eq!(receiver.try_recv(), Err(Empty));
417        drop(fin);
418        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
419    }
420
421    #[test]
422    fn early_update() {
423        let (mut fin, mut receiver) = make_finalizer();
424        fin.update_status(EventStatus::Rejected);
425        assert_eq!(receiver.try_recv(), Err(Empty));
426        fin.update_sources();
427        assert_eq!(fin.len(), 0);
428        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
429    }
430
431    #[test]
432    fn clone_events() {
433        let (fin1, mut receiver) = make_finalizer();
434        let fin2 = fin1.clone();
435        assert_eq!(fin1.len(), 1);
436        assert_eq!(fin2.len(), 1);
437        assert_eq!(fin1, fin2);
438
439        assert_eq!(receiver.try_recv(), Err(Empty));
440        drop(fin1);
441        assert_eq!(receiver.try_recv(), Err(Empty));
442        drop(fin2);
443        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
444    }
445
446    #[test]
447    fn merge_events() {
448        let mut fin0 = EventFinalizers::default();
449        let (fin1, mut receiver1) = make_finalizer();
450        let (fin2, mut receiver2) = make_finalizer();
451
452        assert_eq!(fin0.len(), 0);
453        fin0.merge(fin1);
454        assert_eq!(fin0.len(), 1);
455        fin0.merge(fin2);
456        assert_eq!(fin0.len(), 2);
457
458        assert_eq!(receiver1.try_recv(), Err(Empty));
459        assert_eq!(receiver2.try_recv(), Err(Empty));
460        drop(fin0);
461        assert_eq!(receiver1.try_recv(), Ok(BatchStatus::Delivered));
462        assert_eq!(receiver2.try_recv(), Ok(BatchStatus::Delivered));
463    }
464
465    #[ignore = "The current implementation does not deduplicate finalizers"]
466    #[test]
467    fn clone_and_merge_events() {
468        let (mut fin1, mut receiver) = make_finalizer();
469        let fin2 = fin1.clone();
470        fin1.merge(fin2);
471        assert_eq!(fin1.len(), 1);
472
473        assert_eq!(receiver.try_recv(), Err(Empty));
474        drop(fin1);
475        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
476    }
477
478    #[test]
479    fn multi_event_batch() {
480        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
481        let event1 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
482        let mut event2 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
483        let event3 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
484        // Also clone one…
485        let event4 = event1.clone();
486        drop(batch);
487        assert_eq!(event1.len(), 1);
488        assert_eq!(event2.len(), 1);
489        assert_eq!(event3.len(), 1);
490        assert_eq!(event4.len(), 1);
491        assert_ne!(event1, event2);
492        assert_ne!(event1, event3);
493        assert_eq!(event1, event4);
494        assert_ne!(event2, event3);
495        assert_ne!(event2, event4);
496        assert_ne!(event3, event4);
497        // …and merge another
498        event2.merge(event3);
499        assert_eq!(event2.len(), 2);
500
501        assert_eq!(receiver.try_recv(), Err(Empty));
502        drop(event1);
503        assert_eq!(receiver.try_recv(), Err(Empty));
504        drop(event2);
505        assert_eq!(receiver.try_recv(), Err(Empty));
506        drop(event4);
507        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
508    }
509
510    fn make_finalizer() -> (EventFinalizers, BatchStatusReceiver) {
511        let (batch, receiver) = BatchNotifier::new_with_receiver();
512        let finalizer = EventFinalizers::new(EventFinalizer::new(batch));
513        assert_eq!(finalizer.len(), 1);
514        (finalizer, receiver)
515    }
516
517    #[test]
518    fn event_status_updates() {
519        use EventStatus::{Delivered, Dropped, Errored, Recorded, Rejected};
520
521        assert_eq!(Dropped.update(Dropped), Dropped);
522        assert_eq!(Dropped.update(Delivered), Delivered);
523        assert_eq!(Dropped.update(Errored), Errored);
524        assert_eq!(Dropped.update(Rejected), Rejected);
525        assert_eq!(Dropped.update(Recorded), Recorded);
526
527        //assert_eq!(Delivered.update(Dropped), Delivered);
528        assert_eq!(Delivered.update(Delivered), Delivered);
529        assert_eq!(Delivered.update(Errored), Errored);
530        assert_eq!(Delivered.update(Rejected), Rejected);
531        assert_eq!(Delivered.update(Recorded), Recorded);
532
533        //assert_eq!(Errored.update(Dropped), Errored);
534        assert_eq!(Errored.update(Delivered), Errored);
535        assert_eq!(Errored.update(Errored), Errored);
536        assert_eq!(Errored.update(Rejected), Rejected);
537        assert_eq!(Errored.update(Recorded), Recorded);
538
539        //assert_eq!(Rejected.update(Dropped), Rejected);
540        assert_eq!(Rejected.update(Delivered), Rejected);
541        assert_eq!(Rejected.update(Errored), Rejected);
542        assert_eq!(Rejected.update(Rejected), Rejected);
543        assert_eq!(Rejected.update(Recorded), Recorded);
544
545        //assert_eq!(Recorded.update(Dropped), Recorded);
546        assert_eq!(Recorded.update(Delivered), Recorded);
547        assert_eq!(Recorded.update(Errored), Recorded);
548        assert_eq!(Recorded.update(Rejected), Recorded);
549        assert_eq!(Recorded.update(Recorded), Recorded);
550    }
551
552    #[test]
553    fn batch_status_update() {
554        use BatchStatus::{Delivered, Errored, Rejected};
555
556        assert_eq!(Delivered.update(EventStatus::Dropped), Delivered);
557        assert_eq!(Delivered.update(EventStatus::Delivered), Delivered);
558        assert_eq!(Delivered.update(EventStatus::Errored), Errored);
559        assert_eq!(Delivered.update(EventStatus::Rejected), Rejected);
560        assert_eq!(Delivered.update(EventStatus::Recorded), Delivered);
561
562        assert_eq!(Errored.update(EventStatus::Dropped), Errored);
563        assert_eq!(Errored.update(EventStatus::Delivered), Errored);
564        assert_eq!(Errored.update(EventStatus::Errored), Errored);
565        assert_eq!(Errored.update(EventStatus::Rejected), Rejected);
566        assert_eq!(Errored.update(EventStatus::Recorded), Errored);
567
568        assert_eq!(Rejected.update(EventStatus::Dropped), Rejected);
569        assert_eq!(Rejected.update(EventStatus::Delivered), Rejected);
570        assert_eq!(Rejected.update(EventStatus::Errored), Rejected);
571        assert_eq!(Rejected.update(EventStatus::Rejected), Rejected);
572        assert_eq!(Rejected.update(EventStatus::Recorded), Rejected);
573    }
574}