1#![deny(missing_docs)]
2use std::{cmp, future::Future, mem, pin::Pin, sync::Arc, task::Poll};
7
8use crossbeam_utils::atomic::AtomicCell;
9use futures::future::FutureExt;
10use tokio::sync::oneshot;
11
12#[cfg(feature = "byte_size_of")]
13use crate::byte_size_of::ByteSizeOf;
14
15#[derive(Clone, Debug, Default)]
17pub struct EventFinalizers(Vec<Arc<EventFinalizer>>);
18
19impl Eq for EventFinalizers {}
20
21impl PartialEq for EventFinalizers {
22 fn eq(&self, other: &Self) -> bool {
23 self.0.len() == other.0.len()
24 && (self.0.iter())
25 .zip(other.0.iter())
26 .all(|(a, b)| Arc::ptr_eq(a, b))
27 }
28}
29
30impl PartialOrd for EventFinalizers {
31 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
32 self.0.len().partial_cmp(&other.0.len())
37 }
38}
39
40#[cfg(feature = "byte_size_of")]
41impl ByteSizeOf for EventFinalizers {
42 fn allocated_bytes(&self) -> usize {
43 0
47 }
48}
49
50impl EventFinalizers {
51 pub const DEFAULT: Self = Self(Vec::new());
53
54 #[must_use]
56 pub fn new(finalizer: EventFinalizer) -> Self {
57 Self(vec![Arc::new(finalizer)])
58 }
59
60 #[must_use]
62 pub fn is_empty(&self) -> bool {
63 self.0.is_empty()
64 }
65
66 #[must_use]
68 pub fn len(&self) -> usize {
69 self.0.len()
70 }
71
72 pub fn add(&mut self, finalizer: EventFinalizer) {
74 self.0.push(Arc::new(finalizer));
75 }
76
77 pub fn merge(&mut self, other: Self) {
79 self.0.extend(other.0);
80 }
81
82 pub fn update_status(&self, status: EventStatus) {
84 for finalizer in &self.0 {
85 finalizer.update_status(status);
86 }
87 }
88
89 pub fn update_sources(&mut self) {
91 let finalizers = mem::take(&mut self.0);
92 for finalizer in &finalizers {
93 finalizer.update_batch();
94 }
95 }
96}
97
98impl Finalizable for EventFinalizers {
99 fn take_finalizers(&mut self) -> EventFinalizers {
100 mem::take(self)
101 }
102}
103
104impl std::iter::FromIterator<EventFinalizers> for EventFinalizers {
105 fn from_iter<T: IntoIterator<Item = EventFinalizers>>(iter: T) -> Self {
106 Self(iter.into_iter().flat_map(|f| f.0.into_iter()).collect())
107 }
108}
109
110#[derive(Debug)]
113pub struct EventFinalizer {
114 status: AtomicCell<EventStatus>,
115 batch: BatchNotifier,
116}
117
118#[cfg(feature = "byte_size_of")]
119impl ByteSizeOf for EventFinalizer {
120 fn allocated_bytes(&self) -> usize {
121 0
124 }
125}
126
127impl EventFinalizer {
128 #[must_use]
130 pub fn new(batch: BatchNotifier) -> Self {
131 let status = AtomicCell::new(EventStatus::Dropped);
132 Self { status, batch }
133 }
134
135 pub fn update_status(&self, status: EventStatus) {
137 self.status
138 .fetch_update(|old_status| Some(old_status.update(status)))
139 .unwrap_or_else(|_| unreachable!());
140 }
141
142 pub fn update_batch(&self) {
146 let status = self
147 .status
148 .fetch_update(|_| Some(EventStatus::Recorded))
149 .unwrap_or_else(|_| unreachable!());
150 self.batch.update_status(status);
151 }
152}
153
154impl Drop for EventFinalizer {
155 fn drop(&mut self) {
156 self.update_batch();
157 }
158}
159
160#[pin_project::pin_project]
163pub struct BatchStatusReceiver(oneshot::Receiver<BatchStatus>);
164
165impl Future for BatchStatusReceiver {
166 type Output = BatchStatus;
167 fn poll(mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
168 match self.0.poll_unpin(ctx) {
169 Poll::Pending => Poll::Pending,
170 Poll::Ready(Ok(status)) => Poll::Ready(status),
171 Poll::Ready(Err(error)) => {
172 error!(%error, "Batch status receiver dropped before sending.");
173 Poll::Ready(BatchStatus::Errored)
174 }
175 }
176 }
177}
178
179impl BatchStatusReceiver {
180 pub fn try_recv(&mut self) -> Result<BatchStatus, oneshot::error::TryRecvError> {
187 self.0.try_recv()
188 }
189}
190
191#[derive(Clone, Debug)]
195pub struct BatchNotifier(Arc<OwnedBatchNotifier>);
196
197impl BatchNotifier {
198 #[must_use]
200 pub fn new_with_receiver() -> (Self, BatchStatusReceiver) {
201 let (sender, receiver) = oneshot::channel();
202 let notifier = OwnedBatchNotifier {
203 status: AtomicCell::new(BatchStatus::Delivered),
204 notifier: Some(sender),
205 };
206 (Self(Arc::new(notifier)), BatchStatusReceiver(receiver))
207 }
208
209 #[must_use]
211 pub fn maybe_new_with_receiver(enabled: bool) -> (Option<Self>, Option<BatchStatusReceiver>) {
212 if enabled {
213 let (batch, receiver) = Self::new_with_receiver();
214 (Some(batch), Some(receiver))
215 } else {
216 (None, None)
217 }
218 }
219
220 pub fn apply_to<T: AddBatchNotifier>(items: &mut [T]) -> BatchStatusReceiver {
224 let (batch, receiver) = Self::new_with_receiver();
225 for item in items {
226 item.add_batch_notifier(batch.clone());
227 }
228 receiver
229 }
230
231 pub fn maybe_apply_to<T: AddBatchNotifier>(
236 enabled: bool,
237 items: &mut [T],
238 ) -> Option<BatchStatusReceiver> {
239 enabled.then(|| Self::apply_to(items))
240 }
241
242 fn update_status(&self, status: EventStatus) {
244 if status != EventStatus::Delivered && status != EventStatus::Dropped {
247 self.0
248 .status
249 .fetch_update(|old_status| Some(old_status.update(status)))
250 .unwrap_or_else(|_| unreachable!());
251 }
252 }
253}
254
255#[derive(Debug)]
257pub struct OwnedBatchNotifier {
258 status: AtomicCell<BatchStatus>,
259 notifier: Option<oneshot::Sender<BatchStatus>>,
260}
261
262impl OwnedBatchNotifier {
263 fn send_status(&mut self) {
265 if let Some(notifier) = self.notifier.take() {
266 let status = self.status.load();
267 _ = notifier.send(status);
270 }
271 }
272}
273
274impl Drop for OwnedBatchNotifier {
275 fn drop(&mut self) {
276 self.send_status();
277 }
278}
279
280#[derive(Copy, Clone, Debug, Eq, PartialEq)]
282#[repr(u8)]
283pub enum BatchStatus {
284 Delivered,
288 Errored,
290 Rejected,
292}
293
294impl Default for BatchStatus {
295 fn default() -> Self {
296 Self::Delivered
297 }
298}
299
300impl BatchStatus {
301 #[allow(clippy::match_same_arms)] fn update(self, status: EventStatus) -> Self {
307 match (self, status) {
308 (_, EventStatus::Dropped | EventStatus::Delivered) => self,
310 (Self::Rejected, _) | (_, EventStatus::Rejected) => Self::Rejected,
312 (Self::Errored, _) | (_, EventStatus::Errored) => Self::Errored,
314 _ => self,
316 }
317 }
318}
319
320#[derive(Copy, Clone, Debug, Eq, PartialEq)]
322#[repr(u8)]
323pub enum EventStatus {
324 Dropped,
328 Delivered,
330 Errored,
332 Rejected,
334 Recorded,
336}
337
338impl Default for EventStatus {
339 fn default() -> Self {
340 Self::Dropped
341 }
342}
343
344impl EventStatus {
345 #[allow(clippy::match_same_arms)] #[must_use]
355 pub fn update(self, status: Self) -> Self {
356 match (self, status) {
357 (_, Self::Recorded) | (Self::Recorded, _) => Self::Recorded,
359 (Self::Dropped, _) => status,
361 (_, Self::Dropped) => {
363 debug_assert!(false, "Updating EventStatus to Dropped is nonsense");
364 self
365 }
366 (Self::Rejected, _) | (_, Self::Rejected) => Self::Rejected,
368 (Self::Errored, _) | (_, Self::Errored) => Self::Errored,
370 (Self::Delivered, Self::Delivered) => Self::Delivered,
372 }
373 }
374}
375
376pub trait AddBatchNotifier {
378 fn add_batch_notifier(&mut self, notifier: BatchNotifier);
380}
381
382pub trait Finalizable {
384 fn take_finalizers(&mut self) -> EventFinalizers;
389}
390
391impl<T: Finalizable> Finalizable for Vec<T> {
392 fn take_finalizers(&mut self) -> EventFinalizers {
393 self.iter_mut()
394 .fold(EventFinalizers::default(), |mut acc, x| {
395 acc.merge(x.take_finalizers());
396 acc
397 })
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use tokio::sync::oneshot::error::TryRecvError::Empty;
404
405 use super::*;
406
407 #[test]
408 fn defaults() {
409 let finalizer = EventFinalizers::default();
410 assert_eq!(finalizer.len(), 0);
411 }
412
413 #[test]
414 fn sends_notification() {
415 let (fin, mut receiver) = make_finalizer();
416 assert_eq!(receiver.try_recv(), Err(Empty));
417 drop(fin);
418 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
419 }
420
421 #[test]
422 fn early_update() {
423 let (mut fin, mut receiver) = make_finalizer();
424 fin.update_status(EventStatus::Rejected);
425 assert_eq!(receiver.try_recv(), Err(Empty));
426 fin.update_sources();
427 assert_eq!(fin.len(), 0);
428 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
429 }
430
431 #[test]
432 fn clone_events() {
433 let (fin1, mut receiver) = make_finalizer();
434 let fin2 = fin1.clone();
435 assert_eq!(fin1.len(), 1);
436 assert_eq!(fin2.len(), 1);
437 assert_eq!(fin1, fin2);
438
439 assert_eq!(receiver.try_recv(), Err(Empty));
440 drop(fin1);
441 assert_eq!(receiver.try_recv(), Err(Empty));
442 drop(fin2);
443 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
444 }
445
446 #[test]
447 fn merge_events() {
448 let mut fin0 = EventFinalizers::default();
449 let (fin1, mut receiver1) = make_finalizer();
450 let (fin2, mut receiver2) = make_finalizer();
451
452 assert_eq!(fin0.len(), 0);
453 fin0.merge(fin1);
454 assert_eq!(fin0.len(), 1);
455 fin0.merge(fin2);
456 assert_eq!(fin0.len(), 2);
457
458 assert_eq!(receiver1.try_recv(), Err(Empty));
459 assert_eq!(receiver2.try_recv(), Err(Empty));
460 drop(fin0);
461 assert_eq!(receiver1.try_recv(), Ok(BatchStatus::Delivered));
462 assert_eq!(receiver2.try_recv(), Ok(BatchStatus::Delivered));
463 }
464
465 #[ignore = "The current implementation does not deduplicate finalizers"]
466 #[test]
467 fn clone_and_merge_events() {
468 let (mut fin1, mut receiver) = make_finalizer();
469 let fin2 = fin1.clone();
470 fin1.merge(fin2);
471 assert_eq!(fin1.len(), 1);
472
473 assert_eq!(receiver.try_recv(), Err(Empty));
474 drop(fin1);
475 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
476 }
477
478 #[test]
479 fn multi_event_batch() {
480 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
481 let event1 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
482 let mut event2 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
483 let event3 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
484 let event4 = event1.clone();
486 drop(batch);
487 assert_eq!(event1.len(), 1);
488 assert_eq!(event2.len(), 1);
489 assert_eq!(event3.len(), 1);
490 assert_eq!(event4.len(), 1);
491 assert_ne!(event1, event2);
492 assert_ne!(event1, event3);
493 assert_eq!(event1, event4);
494 assert_ne!(event2, event3);
495 assert_ne!(event2, event4);
496 assert_ne!(event3, event4);
497 event2.merge(event3);
499 assert_eq!(event2.len(), 2);
500
501 assert_eq!(receiver.try_recv(), Err(Empty));
502 drop(event1);
503 assert_eq!(receiver.try_recv(), Err(Empty));
504 drop(event2);
505 assert_eq!(receiver.try_recv(), Err(Empty));
506 drop(event4);
507 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
508 }
509
510 fn make_finalizer() -> (EventFinalizers, BatchStatusReceiver) {
511 let (batch, receiver) = BatchNotifier::new_with_receiver();
512 let finalizer = EventFinalizers::new(EventFinalizer::new(batch));
513 assert_eq!(finalizer.len(), 1);
514 (finalizer, receiver)
515 }
516
517 #[test]
518 fn event_status_updates() {
519 use EventStatus::{Delivered, Dropped, Errored, Recorded, Rejected};
520
521 assert_eq!(Dropped.update(Dropped), Dropped);
522 assert_eq!(Dropped.update(Delivered), Delivered);
523 assert_eq!(Dropped.update(Errored), Errored);
524 assert_eq!(Dropped.update(Rejected), Rejected);
525 assert_eq!(Dropped.update(Recorded), Recorded);
526
527 assert_eq!(Delivered.update(Delivered), Delivered);
529 assert_eq!(Delivered.update(Errored), Errored);
530 assert_eq!(Delivered.update(Rejected), Rejected);
531 assert_eq!(Delivered.update(Recorded), Recorded);
532
533 assert_eq!(Errored.update(Delivered), Errored);
535 assert_eq!(Errored.update(Errored), Errored);
536 assert_eq!(Errored.update(Rejected), Rejected);
537 assert_eq!(Errored.update(Recorded), Recorded);
538
539 assert_eq!(Rejected.update(Delivered), Rejected);
541 assert_eq!(Rejected.update(Errored), Rejected);
542 assert_eq!(Rejected.update(Rejected), Rejected);
543 assert_eq!(Rejected.update(Recorded), Recorded);
544
545 assert_eq!(Recorded.update(Delivered), Recorded);
547 assert_eq!(Recorded.update(Errored), Recorded);
548 assert_eq!(Recorded.update(Rejected), Recorded);
549 assert_eq!(Recorded.update(Recorded), Recorded);
550 }
551
552 #[test]
553 fn batch_status_update() {
554 use BatchStatus::{Delivered, Errored, Rejected};
555
556 assert_eq!(Delivered.update(EventStatus::Dropped), Delivered);
557 assert_eq!(Delivered.update(EventStatus::Delivered), Delivered);
558 assert_eq!(Delivered.update(EventStatus::Errored), Errored);
559 assert_eq!(Delivered.update(EventStatus::Rejected), Rejected);
560 assert_eq!(Delivered.update(EventStatus::Recorded), Delivered);
561
562 assert_eq!(Errored.update(EventStatus::Dropped), Errored);
563 assert_eq!(Errored.update(EventStatus::Delivered), Errored);
564 assert_eq!(Errored.update(EventStatus::Errored), Errored);
565 assert_eq!(Errored.update(EventStatus::Rejected), Rejected);
566 assert_eq!(Errored.update(EventStatus::Recorded), Errored);
567
568 assert_eq!(Rejected.update(EventStatus::Dropped), Rejected);
569 assert_eq!(Rejected.update(EventStatus::Delivered), Rejected);
570 assert_eq!(Rejected.update(EventStatus::Errored), Rejected);
571 assert_eq!(Rejected.update(EventStatus::Rejected), Rejected);
572 assert_eq!(Rejected.update(EventStatus::Recorded), Rejected);
573 }
574}