1#![deny(missing_docs)]
2use std::{cmp, future::Future, mem, pin::Pin, sync::Arc, task::Poll};
7
8use crossbeam_utils::atomic::AtomicCell;
9use futures::future::FutureExt;
10use tokio::sync::oneshot;
11
12#[cfg(feature = "byte_size_of")]
13use crate::byte_size_of::ByteSizeOf;
14
15#[derive(Clone, Debug, Default)]
17pub struct EventFinalizers(Vec<Arc<EventFinalizer>>);
18
19impl Eq for EventFinalizers {}
20
21impl PartialEq for EventFinalizers {
22 fn eq(&self, other: &Self) -> bool {
23 self.0.len() == other.0.len()
24 && (self.0.iter())
25 .zip(other.0.iter())
26 .all(|(a, b)| Arc::ptr_eq(a, b))
27 }
28}
29
30impl PartialOrd for EventFinalizers {
31 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
32 self.0.len().partial_cmp(&other.0.len())
37 }
38}
39
40#[cfg(feature = "byte_size_of")]
41impl ByteSizeOf for EventFinalizers {
42 fn allocated_bytes(&self) -> usize {
43 0
47 }
48}
49
50impl EventFinalizers {
51 pub const DEFAULT: Self = Self(Vec::new());
53
54 #[must_use]
56 pub fn new(finalizer: EventFinalizer) -> Self {
57 Self(vec![Arc::new(finalizer)])
58 }
59
60 #[must_use]
62 pub fn is_empty(&self) -> bool {
63 self.0.is_empty()
64 }
65
66 #[must_use]
68 pub fn len(&self) -> usize {
69 self.0.len()
70 }
71
72 pub fn add(&mut self, finalizer: EventFinalizer) {
74 self.0.push(Arc::new(finalizer));
75 }
76
77 pub fn merge(&mut self, other: Self) {
79 self.0.extend(other.0);
80 }
81
82 pub fn update_status(&self, status: EventStatus) {
84 for finalizer in &self.0 {
85 finalizer.update_status(status);
86 }
87 }
88
89 pub fn update_sources(&mut self) {
91 let finalizers = mem::take(&mut self.0);
92 for finalizer in &finalizers {
93 finalizer.update_batch();
94 }
95 }
96}
97
98impl Finalizable for EventFinalizers {
99 fn take_finalizers(&mut self) -> EventFinalizers {
100 mem::take(self)
101 }
102}
103
104impl std::iter::FromIterator<EventFinalizers> for EventFinalizers {
105 fn from_iter<T: IntoIterator<Item = EventFinalizers>>(iter: T) -> Self {
106 Self(iter.into_iter().flat_map(|f| f.0.into_iter()).collect())
107 }
108}
109
110#[derive(Debug)]
113pub struct EventFinalizer {
114 status: AtomicCell<EventStatus>,
115 batch: BatchNotifier,
116}
117
118#[cfg(feature = "byte_size_of")]
119impl ByteSizeOf for EventFinalizer {
120 fn allocated_bytes(&self) -> usize {
121 0
124 }
125}
126
127impl EventFinalizer {
128 #[must_use]
130 pub fn new(batch: BatchNotifier) -> Self {
131 let status = AtomicCell::new(EventStatus::Dropped);
132 Self { status, batch }
133 }
134
135 pub fn update_status(&self, status: EventStatus) {
137 self.status
138 .fetch_update(|old_status| Some(old_status.update(status)))
139 .unwrap_or_else(|_| unreachable!());
140 }
141
142 pub fn update_batch(&self) {
146 let status = self
147 .status
148 .fetch_update(|_| Some(EventStatus::Recorded))
149 .unwrap_or_else(|_| unreachable!());
150 self.batch.update_status(status);
151 }
152}
153
154impl Drop for EventFinalizer {
155 fn drop(&mut self) {
156 self.update_batch();
157 }
158}
159
160#[pin_project::pin_project]
163pub struct BatchStatusReceiver(oneshot::Receiver<BatchStatus>);
164
165impl Future for BatchStatusReceiver {
166 type Output = BatchStatus;
167 fn poll(mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
168 match self.0.poll_unpin(ctx) {
169 Poll::Pending => Poll::Pending,
170 Poll::Ready(Ok(status)) => Poll::Ready(status),
171 Poll::Ready(Err(error)) => {
172 error!(%error, "Batch status receiver dropped before sending.");
173 Poll::Ready(BatchStatus::Errored)
174 }
175 }
176 }
177}
178
179impl BatchStatusReceiver {
180 pub fn try_recv(&mut self) -> Result<BatchStatus, oneshot::error::TryRecvError> {
187 self.0.try_recv()
188 }
189}
190
191#[derive(Clone, Debug)]
195pub struct BatchNotifier(Arc<OwnedBatchNotifier>);
196
197impl BatchNotifier {
198 #[must_use]
200 pub fn new_with_receiver() -> (Self, BatchStatusReceiver) {
201 let (sender, receiver) = oneshot::channel();
202 let notifier = OwnedBatchNotifier {
203 status: AtomicCell::new(BatchStatus::Delivered),
204 notifier: Some(sender),
205 };
206 (Self(Arc::new(notifier)), BatchStatusReceiver(receiver))
207 }
208
209 #[must_use]
211 pub fn maybe_new_with_receiver(enabled: bool) -> (Option<Self>, Option<BatchStatusReceiver>) {
212 if enabled {
213 let (batch, receiver) = Self::new_with_receiver();
214 (Some(batch), Some(receiver))
215 } else {
216 (None, None)
217 }
218 }
219
220 pub fn apply_to<T: AddBatchNotifier>(items: &mut [T]) -> BatchStatusReceiver {
224 let (batch, receiver) = Self::new_with_receiver();
225 for item in items {
226 item.add_batch_notifier(batch.clone());
227 }
228 receiver
229 }
230
231 pub fn maybe_apply_to<T: AddBatchNotifier>(
236 enabled: bool,
237 items: &mut [T],
238 ) -> Option<BatchStatusReceiver> {
239 enabled.then(|| Self::apply_to(items))
240 }
241
242 fn update_status(&self, status: EventStatus) {
244 if status != EventStatus::Delivered && status != EventStatus::Dropped {
247 self.0
248 .status
249 .fetch_update(|old_status| Some(old_status.update(status)))
250 .unwrap_or_else(|_| unreachable!());
251 }
252 }
253}
254
255#[derive(Debug)]
257pub struct OwnedBatchNotifier {
258 status: AtomicCell<BatchStatus>,
259 notifier: Option<oneshot::Sender<BatchStatus>>,
260}
261
262impl OwnedBatchNotifier {
263 fn send_status(&mut self) {
265 if let Some(notifier) = self.notifier.take() {
266 let status = self.status.load();
267 _ = notifier.send(status);
270 }
271 }
272}
273
274impl Drop for OwnedBatchNotifier {
275 fn drop(&mut self) {
276 self.send_status();
277 }
278}
279
280#[derive(Copy, Clone, Debug, Eq, PartialEq)]
282#[repr(u8)]
283#[derive(Default)]
284pub enum BatchStatus {
285 #[default]
289 Delivered,
290 Errored,
292 Rejected,
294}
295
296impl BatchStatus {
297 #[allow(clippy::match_same_arms)] fn update(self, status: EventStatus) -> Self {
303 match (self, status) {
304 (_, EventStatus::Dropped | EventStatus::Delivered) => self,
306 (Self::Rejected, _) | (_, EventStatus::Rejected) => Self::Rejected,
308 (Self::Errored, _) | (_, EventStatus::Errored) => Self::Errored,
310 _ => self,
312 }
313 }
314}
315
316#[derive(Copy, Clone, Debug, Eq, PartialEq)]
318#[repr(u8)]
319#[derive(Default)]
320pub enum EventStatus {
321 #[default]
325 Dropped,
326 Delivered,
328 Errored,
330 Rejected,
332 Recorded,
334}
335
336impl EventStatus {
337 #[allow(clippy::match_same_arms)] #[must_use]
347 pub fn update(self, status: Self) -> Self {
348 match (self, status) {
349 (_, Self::Recorded) | (Self::Recorded, _) => Self::Recorded,
351 (Self::Dropped, _) => status,
353 (_, Self::Dropped) => {
355 debug_assert!(false, "Updating EventStatus to Dropped is nonsense");
356 self
357 }
358 (Self::Rejected, _) | (_, Self::Rejected) => Self::Rejected,
360 (Self::Errored, _) | (_, Self::Errored) => Self::Errored,
362 (Self::Delivered, Self::Delivered) => Self::Delivered,
364 }
365 }
366}
367
368pub trait AddBatchNotifier {
370 fn add_batch_notifier(&mut self, notifier: BatchNotifier);
372}
373
374pub trait Finalizable {
376 fn take_finalizers(&mut self) -> EventFinalizers;
381}
382
383impl<T: Finalizable> Finalizable for Vec<T> {
384 fn take_finalizers(&mut self) -> EventFinalizers {
385 self.iter_mut()
386 .fold(EventFinalizers::default(), |mut acc, x| {
387 acc.merge(x.take_finalizers());
388 acc
389 })
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use tokio::sync::oneshot::error::TryRecvError::Empty;
396
397 use super::*;
398
399 #[test]
400 fn defaults() {
401 let finalizer = EventFinalizers::default();
402 assert_eq!(finalizer.len(), 0);
403 }
404
405 #[test]
406 fn sends_notification() {
407 let (fin, mut receiver) = make_finalizer();
408 assert_eq!(receiver.try_recv(), Err(Empty));
409 drop(fin);
410 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
411 }
412
413 #[test]
414 fn early_update() {
415 let (mut fin, mut receiver) = make_finalizer();
416 fin.update_status(EventStatus::Rejected);
417 assert_eq!(receiver.try_recv(), Err(Empty));
418 fin.update_sources();
419 assert_eq!(fin.len(), 0);
420 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
421 }
422
423 #[test]
424 fn clone_events() {
425 let (fin1, mut receiver) = make_finalizer();
426 let fin2 = fin1.clone();
427 assert_eq!(fin1.len(), 1);
428 assert_eq!(fin2.len(), 1);
429 assert_eq!(fin1, fin2);
430
431 assert_eq!(receiver.try_recv(), Err(Empty));
432 drop(fin1);
433 assert_eq!(receiver.try_recv(), Err(Empty));
434 drop(fin2);
435 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
436 }
437
438 #[test]
439 fn merge_events() {
440 let mut fin0 = EventFinalizers::default();
441 let (fin1, mut receiver1) = make_finalizer();
442 let (fin2, mut receiver2) = make_finalizer();
443
444 assert_eq!(fin0.len(), 0);
445 fin0.merge(fin1);
446 assert_eq!(fin0.len(), 1);
447 fin0.merge(fin2);
448 assert_eq!(fin0.len(), 2);
449
450 assert_eq!(receiver1.try_recv(), Err(Empty));
451 assert_eq!(receiver2.try_recv(), Err(Empty));
452 drop(fin0);
453 assert_eq!(receiver1.try_recv(), Ok(BatchStatus::Delivered));
454 assert_eq!(receiver2.try_recv(), Ok(BatchStatus::Delivered));
455 }
456
457 #[ignore = "The current implementation does not deduplicate finalizers"]
458 #[test]
459 fn clone_and_merge_events() {
460 let (mut fin1, mut receiver) = make_finalizer();
461 let fin2 = fin1.clone();
462 fin1.merge(fin2);
463 assert_eq!(fin1.len(), 1);
464
465 assert_eq!(receiver.try_recv(), Err(Empty));
466 drop(fin1);
467 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
468 }
469
470 #[test]
471 fn multi_event_batch() {
472 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
473 let event1 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
474 let mut event2 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
475 let event3 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
476 let event4 = event1.clone();
478 drop(batch);
479 assert_eq!(event1.len(), 1);
480 assert_eq!(event2.len(), 1);
481 assert_eq!(event3.len(), 1);
482 assert_eq!(event4.len(), 1);
483 assert_ne!(event1, event2);
484 assert_ne!(event1, event3);
485 assert_eq!(event1, event4);
486 assert_ne!(event2, event3);
487 assert_ne!(event2, event4);
488 assert_ne!(event3, event4);
489 event2.merge(event3);
491 assert_eq!(event2.len(), 2);
492
493 assert_eq!(receiver.try_recv(), Err(Empty));
494 drop(event1);
495 assert_eq!(receiver.try_recv(), Err(Empty));
496 drop(event2);
497 assert_eq!(receiver.try_recv(), Err(Empty));
498 drop(event4);
499 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
500 }
501
502 fn make_finalizer() -> (EventFinalizers, BatchStatusReceiver) {
503 let (batch, receiver) = BatchNotifier::new_with_receiver();
504 let finalizer = EventFinalizers::new(EventFinalizer::new(batch));
505 assert_eq!(finalizer.len(), 1);
506 (finalizer, receiver)
507 }
508
509 #[test]
510 fn event_status_updates() {
511 use EventStatus::{Delivered, Dropped, Errored, Recorded, Rejected};
512
513 assert_eq!(Dropped.update(Dropped), Dropped);
514 assert_eq!(Dropped.update(Delivered), Delivered);
515 assert_eq!(Dropped.update(Errored), Errored);
516 assert_eq!(Dropped.update(Rejected), Rejected);
517 assert_eq!(Dropped.update(Recorded), Recorded);
518
519 assert_eq!(Delivered.update(Delivered), Delivered);
521 assert_eq!(Delivered.update(Errored), Errored);
522 assert_eq!(Delivered.update(Rejected), Rejected);
523 assert_eq!(Delivered.update(Recorded), Recorded);
524
525 assert_eq!(Errored.update(Delivered), Errored);
527 assert_eq!(Errored.update(Errored), Errored);
528 assert_eq!(Errored.update(Rejected), Rejected);
529 assert_eq!(Errored.update(Recorded), Recorded);
530
531 assert_eq!(Rejected.update(Delivered), Rejected);
533 assert_eq!(Rejected.update(Errored), Rejected);
534 assert_eq!(Rejected.update(Rejected), Rejected);
535 assert_eq!(Rejected.update(Recorded), Recorded);
536
537 assert_eq!(Recorded.update(Delivered), Recorded);
539 assert_eq!(Recorded.update(Errored), Recorded);
540 assert_eq!(Recorded.update(Rejected), Recorded);
541 assert_eq!(Recorded.update(Recorded), Recorded);
542 }
543
544 #[test]
545 fn batch_status_update() {
546 use BatchStatus::{Delivered, Errored, Rejected};
547
548 assert_eq!(Delivered.update(EventStatus::Dropped), Delivered);
549 assert_eq!(Delivered.update(EventStatus::Delivered), Delivered);
550 assert_eq!(Delivered.update(EventStatus::Errored), Errored);
551 assert_eq!(Delivered.update(EventStatus::Rejected), Rejected);
552 assert_eq!(Delivered.update(EventStatus::Recorded), Delivered);
553
554 assert_eq!(Errored.update(EventStatus::Dropped), Errored);
555 assert_eq!(Errored.update(EventStatus::Delivered), Errored);
556 assert_eq!(Errored.update(EventStatus::Errored), Errored);
557 assert_eq!(Errored.update(EventStatus::Rejected), Rejected);
558 assert_eq!(Errored.update(EventStatus::Recorded), Errored);
559
560 assert_eq!(Rejected.update(EventStatus::Dropped), Rejected);
561 assert_eq!(Rejected.update(EventStatus::Delivered), Rejected);
562 assert_eq!(Rejected.update(EventStatus::Errored), Rejected);
563 assert_eq!(Rejected.update(EventStatus::Rejected), Rejected);
564 assert_eq!(Rejected.update(EventStatus::Recorded), Rejected);
565 }
566}