use std::{fmt, iter::IntoIterator, pin::Pin};
use futures::{stream, task::Context, task::Poll, Sink, SinkExt, Stream, StreamExt};
use crate::event::{into_event_stream, Event, EventArray, EventContainer};
pub enum VectorSink {
Sink(Box<dyn Sink<EventArray, Error = ()> + Send + Unpin>),
Stream(Box<dyn StreamSink<EventArray> + Send>),
}
impl VectorSink {
pub async fn run(self, input: impl Stream<Item = EventArray> + Send) -> Result<(), ()> {
match self {
Self::Sink(sink) => input.map(Ok).forward(sink).await,
Self::Stream(s) => s.run(Box::pin(input)).await,
}
}
pub async fn run_events<I>(self, input: I) -> Result<(), ()>
where
I: IntoIterator<Item = Event> + Send,
I::IntoIter: Send,
{
self.run(stream::iter(input).map(Into::into)).await
}
pub fn into_sink(self) -> Box<dyn Sink<EventArray, Error = ()> + Send + Unpin> {
match self {
Self::Sink(sink) => sink,
_ => panic!("Failed type coercion, {self:?} is not a Sink"),
}
}
pub fn into_stream(self) -> Box<dyn StreamSink<EventArray> + Send> {
match self {
Self::Stream(stream) => stream,
_ => panic!("Failed type coercion, {self:?} is not a Stream"),
}
}
#[deprecated]
pub fn from_event_sink(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
VectorSink::Sink(Box::new(EventSink::new(sink)))
}
pub fn from_event_streamsink(sink: impl StreamSink<Event> + Send + 'static) -> Self {
let sink = Box::new(sink);
VectorSink::Stream(Box::new(EventStream { sink }))
}
}
impl fmt::Debug for VectorSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("VectorSink").finish()
}
}
#[async_trait::async_trait]
pub trait StreamSink<T> {
async fn run(self: Box<Self>, input: stream::BoxStream<'_, T>) -> Result<(), ()>;
}
struct EventSink<S> {
sink: S,
queue: Option<<EventArray as EventContainer>::IntoIter>,
}
macro_rules! poll_ready_ok {
( $e:expr ) => {
match $e {
r @ (Poll::Pending | Poll::Ready(Err(_))) => return r,
Poll::Ready(Ok(ok)) => ok,
}
};
}
impl<S: Sink<Event> + Send + Unpin> EventSink<S> {
fn new(sink: S) -> Self {
Self { sink, queue: None }
}
fn next_event(&mut self) -> Option<Event> {
match &mut self.queue {
#[allow(clippy::single_match_else)] Some(queue) => match queue.next() {
Some(event) => Some(event),
None => {
self.queue = None;
None
}
},
None => None,
}
}
fn flush_queue(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
while self.queue.is_some() {
poll_ready_ok!(self.sink.poll_ready_unpin(cx));
let Some(event) = self.next_event() else {
break;
};
if let Err(err) = self.sink.start_send_unpin(event) {
return Poll::Ready(Err(err));
}
}
Poll::Ready(Ok(()))
}
}
impl<S: Sink<Event> + Send + Unpin> Sink<EventArray> for EventSink<S> {
type Error = S::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
poll_ready_ok!(self.flush_queue(cx));
self.sink.poll_ready_unpin(cx)
}
fn start_send(mut self: Pin<&mut Self>, events: EventArray) -> Result<(), Self::Error> {
assert!(self.queue.is_none()); self.queue = Some(events.into_events());
self.next_event()
.map_or(Ok(()), |event| self.sink.start_send_unpin(event))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
poll_ready_ok!(self.flush_queue(cx));
self.sink.poll_flush_unpin(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
poll_ready_ok!(self.flush_queue(cx));
self.sink.poll_close_unpin(cx)
}
}
struct EventStream<T> {
sink: Box<T>,
}
#[async_trait::async_trait]
impl<T: StreamSink<Event> + Send> StreamSink<EventArray> for EventStream<T> {
async fn run(self: Box<Self>, input: stream::BoxStream<'_, EventArray>) -> Result<(), ()> {
let input = Box::pin(input.flat_map(into_event_stream));
self.sink.run(input).await
}
}