#![deny(missing_docs)]
use std::{cmp, future::Future, mem, pin::Pin, sync::Arc, task::Poll};
use crossbeam_utils::atomic::AtomicCell;
use futures::future::FutureExt;
use tokio::sync::oneshot;
#[cfg(feature = "byte_size_of")]
use crate::byte_size_of::ByteSizeOf;
#[derive(Clone, Debug, Default)]
pub struct EventFinalizers(Vec<Arc<EventFinalizer>>);
impl Eq for EventFinalizers {}
impl PartialEq for EventFinalizers {
fn eq(&self, other: &Self) -> bool {
self.0.len() == other.0.len()
&& (self.0.iter())
.zip(other.0.iter())
.all(|(a, b)| Arc::ptr_eq(a, b))
}
}
impl PartialOrd for EventFinalizers {
fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
self.0.len().partial_cmp(&other.0.len())
}
}
#[cfg(feature = "byte_size_of")]
impl ByteSizeOf for EventFinalizers {
fn allocated_bytes(&self) -> usize {
0
}
}
impl EventFinalizers {
pub const DEFAULT: Self = Self(Vec::new());
#[must_use]
pub fn new(finalizer: EventFinalizer) -> Self {
Self(vec![Arc::new(finalizer)])
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.0.len()
}
pub fn add(&mut self, finalizer: EventFinalizer) {
self.0.push(Arc::new(finalizer));
}
pub fn merge(&mut self, other: Self) {
self.0.extend(other.0);
}
pub fn update_status(&self, status: EventStatus) {
for finalizer in &self.0 {
finalizer.update_status(status);
}
}
pub fn update_sources(&mut self) {
let finalizers = mem::take(&mut self.0);
for finalizer in &finalizers {
finalizer.update_batch();
}
}
}
impl Finalizable for EventFinalizers {
fn take_finalizers(&mut self) -> EventFinalizers {
mem::take(self)
}
}
impl std::iter::FromIterator<EventFinalizers> for EventFinalizers {
fn from_iter<T: IntoIterator<Item = EventFinalizers>>(iter: T) -> Self {
Self(iter.into_iter().flat_map(|f| f.0.into_iter()).collect())
}
}
#[derive(Debug)]
pub struct EventFinalizer {
status: AtomicCell<EventStatus>,
batch: BatchNotifier,
}
#[cfg(feature = "byte_size_of")]
impl ByteSizeOf for EventFinalizer {
fn allocated_bytes(&self) -> usize {
0
}
}
impl EventFinalizer {
#[must_use]
pub fn new(batch: BatchNotifier) -> Self {
let status = AtomicCell::new(EventStatus::Dropped);
Self { status, batch }
}
pub fn update_status(&self, status: EventStatus) {
self.status
.fetch_update(|old_status| Some(old_status.update(status)))
.unwrap_or_else(|_| unreachable!());
}
pub fn update_batch(&self) {
let status = self
.status
.fetch_update(|_| Some(EventStatus::Recorded))
.unwrap_or_else(|_| unreachable!());
self.batch.update_status(status);
}
}
impl Drop for EventFinalizer {
fn drop(&mut self) {
self.update_batch();
}
}
#[pin_project::pin_project]
pub struct BatchStatusReceiver(oneshot::Receiver<BatchStatus>);
impl Future for BatchStatusReceiver {
type Output = BatchStatus;
fn poll(mut self: Pin<&mut Self>, ctx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match self.0.poll_unpin(ctx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(status)) => Poll::Ready(status),
Poll::Ready(Err(error)) => {
error!(%error, "Batch status receiver dropped before sending.");
Poll::Ready(BatchStatus::Errored)
}
}
}
}
impl BatchStatusReceiver {
pub fn try_recv(&mut self) -> Result<BatchStatus, oneshot::error::TryRecvError> {
self.0.try_recv()
}
}
#[derive(Clone, Debug)]
pub struct BatchNotifier(Arc<OwnedBatchNotifier>);
impl BatchNotifier {
#[must_use]
pub fn new_with_receiver() -> (Self, BatchStatusReceiver) {
let (sender, receiver) = oneshot::channel();
let notifier = OwnedBatchNotifier {
status: AtomicCell::new(BatchStatus::Delivered),
notifier: Some(sender),
};
(Self(Arc::new(notifier)), BatchStatusReceiver(receiver))
}
#[must_use]
pub fn maybe_new_with_receiver(enabled: bool) -> (Option<Self>, Option<BatchStatusReceiver>) {
if enabled {
let (batch, receiver) = Self::new_with_receiver();
(Some(batch), Some(receiver))
} else {
(None, None)
}
}
pub fn apply_to<T: AddBatchNotifier>(items: &mut [T]) -> BatchStatusReceiver {
let (batch, receiver) = Self::new_with_receiver();
for item in items {
item.add_batch_notifier(batch.clone());
}
receiver
}
pub fn maybe_apply_to<T: AddBatchNotifier>(
enabled: bool,
items: &mut [T],
) -> Option<BatchStatusReceiver> {
enabled.then(|| Self::apply_to(items))
}
fn update_status(&self, status: EventStatus) {
if status != EventStatus::Delivered && status != EventStatus::Dropped {
self.0
.status
.fetch_update(|old_status| Some(old_status.update(status)))
.unwrap_or_else(|_| unreachable!());
}
}
}
#[derive(Debug)]
pub struct OwnedBatchNotifier {
status: AtomicCell<BatchStatus>,
notifier: Option<oneshot::Sender<BatchStatus>>,
}
impl OwnedBatchNotifier {
fn send_status(&mut self) {
if let Some(notifier) = self.notifier.take() {
let status = self.status.load();
_ = notifier.send(status);
}
}
}
impl Drop for OwnedBatchNotifier {
fn drop(&mut self) {
self.send_status();
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(u8)]
pub enum BatchStatus {
Delivered,
Errored,
Rejected,
}
impl Default for BatchStatus {
fn default() -> Self {
Self::Delivered
}
}
impl BatchStatus {
#[allow(clippy::match_same_arms)] fn update(self, status: EventStatus) -> Self {
match (self, status) {
(_, EventStatus::Dropped | EventStatus::Delivered) => self,
(Self::Rejected, _) | (_, EventStatus::Rejected) => Self::Rejected,
(Self::Errored, _) | (_, EventStatus::Errored) => Self::Errored,
_ => self,
}
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[repr(u8)]
pub enum EventStatus {
Dropped,
Delivered,
Errored,
Rejected,
Recorded,
}
impl Default for EventStatus {
fn default() -> Self {
Self::Dropped
}
}
impl EventStatus {
#[allow(clippy::match_same_arms)] #[must_use]
pub fn update(self, status: Self) -> Self {
match (self, status) {
(_, Self::Recorded) | (Self::Recorded, _) => Self::Recorded,
(Self::Dropped, _) => status,
(_, Self::Dropped) => {
debug_assert!(false, "Updating EventStatus to Dropped is nonsense");
self
}
(Self::Rejected, _) | (_, Self::Rejected) => Self::Rejected,
(Self::Errored, _) | (_, Self::Errored) => Self::Errored,
(Self::Delivered, Self::Delivered) => Self::Delivered,
}
}
}
pub trait AddBatchNotifier {
fn add_batch_notifier(&mut self, notifier: BatchNotifier);
}
pub trait Finalizable {
fn take_finalizers(&mut self) -> EventFinalizers;
}
impl<T: Finalizable> Finalizable for Vec<T> {
fn take_finalizers(&mut self) -> EventFinalizers {
self.iter_mut()
.fold(EventFinalizers::default(), |mut acc, x| {
acc.merge(x.take_finalizers());
acc
})
}
}
#[cfg(test)]
mod tests {
use tokio::sync::oneshot::error::TryRecvError::Empty;
use super::*;
#[test]
fn defaults() {
let finalizer = EventFinalizers::default();
assert_eq!(finalizer.len(), 0);
}
#[test]
fn sends_notification() {
let (fin, mut receiver) = make_finalizer();
assert_eq!(receiver.try_recv(), Err(Empty));
drop(fin);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
}
#[test]
fn early_update() {
let (mut fin, mut receiver) = make_finalizer();
fin.update_status(EventStatus::Rejected);
assert_eq!(receiver.try_recv(), Err(Empty));
fin.update_sources();
assert_eq!(fin.len(), 0);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
}
#[test]
fn clone_events() {
let (fin1, mut receiver) = make_finalizer();
let fin2 = fin1.clone();
assert_eq!(fin1.len(), 1);
assert_eq!(fin2.len(), 1);
assert_eq!(fin1, fin2);
assert_eq!(receiver.try_recv(), Err(Empty));
drop(fin1);
assert_eq!(receiver.try_recv(), Err(Empty));
drop(fin2);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
}
#[test]
fn merge_events() {
let mut fin0 = EventFinalizers::default();
let (fin1, mut receiver1) = make_finalizer();
let (fin2, mut receiver2) = make_finalizer();
assert_eq!(fin0.len(), 0);
fin0.merge(fin1);
assert_eq!(fin0.len(), 1);
fin0.merge(fin2);
assert_eq!(fin0.len(), 2);
assert_eq!(receiver1.try_recv(), Err(Empty));
assert_eq!(receiver2.try_recv(), Err(Empty));
drop(fin0);
assert_eq!(receiver1.try_recv(), Ok(BatchStatus::Delivered));
assert_eq!(receiver2.try_recv(), Ok(BatchStatus::Delivered));
}
#[ignore] #[test]
fn clone_and_merge_events() {
let (mut fin1, mut receiver) = make_finalizer();
let fin2 = fin1.clone();
fin1.merge(fin2);
assert_eq!(fin1.len(), 1);
assert_eq!(receiver.try_recv(), Err(Empty));
drop(fin1);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
}
#[test]
fn multi_event_batch() {
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let event1 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
let mut event2 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
let event3 = EventFinalizers::new(EventFinalizer::new(batch.clone()));
let event4 = event1.clone();
drop(batch);
assert_eq!(event1.len(), 1);
assert_eq!(event2.len(), 1);
assert_eq!(event3.len(), 1);
assert_eq!(event4.len(), 1);
assert_ne!(event1, event2);
assert_ne!(event1, event3);
assert_eq!(event1, event4);
assert_ne!(event2, event3);
assert_ne!(event2, event4);
assert_ne!(event3, event4);
event2.merge(event3);
assert_eq!(event2.len(), 2);
assert_eq!(receiver.try_recv(), Err(Empty));
drop(event1);
assert_eq!(receiver.try_recv(), Err(Empty));
drop(event2);
assert_eq!(receiver.try_recv(), Err(Empty));
drop(event4);
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
}
fn make_finalizer() -> (EventFinalizers, BatchStatusReceiver) {
let (batch, receiver) = BatchNotifier::new_with_receiver();
let finalizer = EventFinalizers::new(EventFinalizer::new(batch));
assert_eq!(finalizer.len(), 1);
(finalizer, receiver)
}
#[test]
fn event_status_updates() {
use EventStatus::{Delivered, Dropped, Errored, Recorded, Rejected};
assert_eq!(Dropped.update(Dropped), Dropped);
assert_eq!(Dropped.update(Delivered), Delivered);
assert_eq!(Dropped.update(Errored), Errored);
assert_eq!(Dropped.update(Rejected), Rejected);
assert_eq!(Dropped.update(Recorded), Recorded);
assert_eq!(Delivered.update(Delivered), Delivered);
assert_eq!(Delivered.update(Errored), Errored);
assert_eq!(Delivered.update(Rejected), Rejected);
assert_eq!(Delivered.update(Recorded), Recorded);
assert_eq!(Errored.update(Delivered), Errored);
assert_eq!(Errored.update(Errored), Errored);
assert_eq!(Errored.update(Rejected), Rejected);
assert_eq!(Errored.update(Recorded), Recorded);
assert_eq!(Rejected.update(Delivered), Rejected);
assert_eq!(Rejected.update(Errored), Rejected);
assert_eq!(Rejected.update(Rejected), Rejected);
assert_eq!(Rejected.update(Recorded), Recorded);
assert_eq!(Recorded.update(Delivered), Recorded);
assert_eq!(Recorded.update(Errored), Recorded);
assert_eq!(Recorded.update(Rejected), Recorded);
assert_eq!(Recorded.update(Recorded), Recorded);
}
#[test]
fn batch_status_update() {
use BatchStatus::{Delivered, Errored, Rejected};
assert_eq!(Delivered.update(EventStatus::Dropped), Delivered);
assert_eq!(Delivered.update(EventStatus::Delivered), Delivered);
assert_eq!(Delivered.update(EventStatus::Errored), Errored);
assert_eq!(Delivered.update(EventStatus::Rejected), Rejected);
assert_eq!(Delivered.update(EventStatus::Recorded), Delivered);
assert_eq!(Errored.update(EventStatus::Dropped), Errored);
assert_eq!(Errored.update(EventStatus::Delivered), Errored);
assert_eq!(Errored.update(EventStatus::Errored), Errored);
assert_eq!(Errored.update(EventStatus::Rejected), Rejected);
assert_eq!(Errored.update(EventStatus::Recorded), Errored);
assert_eq!(Rejected.update(EventStatus::Dropped), Rejected);
assert_eq!(Rejected.update(EventStatus::Delivered), Rejected);
assert_eq!(Rejected.update(EventStatus::Errored), Rejected);
assert_eq!(Rejected.update(EventStatus::Rejected), Rejected);
assert_eq!(Rejected.update(EventStatus::Recorded), Rejected);
}
}