1use std::{fmt, iter::IntoIterator, pin::Pin};
2
3use futures::{
4 Sink, SinkExt, Stream, StreamExt, stream,
5 task::{Context, Poll},
6};
7
8use crate::event::{Event, EventArray, EventContainer, into_event_stream};
9
10pub enum VectorSink {
11 Sink(Box<dyn Sink<EventArray, Error = ()> + Send + Unpin>),
12 Stream(Box<dyn StreamSink<EventArray> + Send>),
13}
14
15impl VectorSink {
16 pub async fn run(self, input: impl Stream<Item = EventArray> + Send) -> Result<(), ()> {
22 match self {
23 Self::Sink(sink) => input.map(Ok).forward(sink).await,
24 Self::Stream(s) => s.run(Box::pin(input)).await,
25 }
26 }
27
28 pub async fn run_events<I>(self, input: I) -> Result<(), ()>
34 where
35 I: IntoIterator<Item = Event> + Send,
36 I::IntoIter: Send,
37 {
38 self.run(stream::iter(input).map(Into::into)).await
39 }
40
41 pub fn into_sink(self) -> Box<dyn Sink<EventArray, Error = ()> + Send + Unpin> {
47 match self {
48 Self::Sink(sink) => sink,
49 _ => panic!("Failed type coercion, {self:?} is not a Sink"),
50 }
51 }
52
53 pub fn into_stream(self) -> Box<dyn StreamSink<EventArray> + Send> {
59 match self {
60 Self::Stream(stream) => stream,
61 _ => panic!("Failed type coercion, {self:?} is not a Stream"),
62 }
63 }
64
65 #[deprecated]
72 pub fn from_event_sink(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
73 VectorSink::Sink(Box::new(EventSink::new(sink)))
74 }
75
76 pub fn from_event_streamsink(sink: impl StreamSink<Event> + Send + 'static) -> Self {
78 let sink = Box::new(sink);
79 VectorSink::Stream(Box::new(EventStream { sink }))
80 }
81}
82
83impl fmt::Debug for VectorSink {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("VectorSink").finish()
86 }
87}
88
89#[async_trait::async_trait]
92pub trait StreamSink<T> {
93 async fn run(self: Box<Self>, input: stream::BoxStream<'_, T>) -> Result<(), ()>;
94}
95
96struct EventSink<S> {
100 sink: S,
101 queue: Option<<EventArray as EventContainer>::IntoIter>,
102}
103
104macro_rules! poll_ready_ok {
105 ( $e:expr ) => {
106 match $e {
107 r @ (Poll::Pending | Poll::Ready(Err(_))) => return r,
108 Poll::Ready(Ok(ok)) => ok,
109 }
110 };
111}
112
113impl<S: Sink<Event> + Send + Unpin> EventSink<S> {
114 fn new(sink: S) -> Self {
115 Self { sink, queue: None }
116 }
117
118 fn next_event(&mut self) -> Option<Event> {
119 match &mut self.queue {
120 #[allow(clippy::single_match_else)] Some(queue) => match queue.next() {
122 Some(event) => Some(event),
123 None => {
124 self.queue = None;
126 None
127 }
128 },
129 None => None,
130 }
131 }
132
133 fn flush_queue(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
134 while self.queue.is_some() {
135 poll_ready_ok!(self.sink.poll_ready_unpin(cx));
136 let Some(event) = self.next_event() else {
137 break;
138 };
139 if let Err(err) = self.sink.start_send_unpin(event) {
140 return Poll::Ready(Err(err));
141 }
142 }
143 Poll::Ready(Ok(()))
144 }
145}
146
147impl<S: Sink<Event> + Send + Unpin> Sink<EventArray> for EventSink<S> {
148 type Error = S::Error;
149 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150 poll_ready_ok!(self.flush_queue(cx));
151 self.sink.poll_ready_unpin(cx)
152 }
153
154 fn start_send(mut self: Pin<&mut Self>, events: EventArray) -> Result<(), Self::Error> {
155 assert!(self.queue.is_none()); self.queue = Some(events.into_events());
157 self.next_event()
158 .map_or(Ok(()), |event| self.sink.start_send_unpin(event))
159 }
160
161 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162 poll_ready_ok!(self.flush_queue(cx));
163 self.sink.poll_flush_unpin(cx)
164 }
165
166 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167 poll_ready_ok!(self.flush_queue(cx));
168 self.sink.poll_close_unpin(cx)
169 }
170}
171
172struct EventStream<T> {
174 sink: Box<T>,
175}
176
177#[async_trait::async_trait]
178impl<T: StreamSink<Event> + Send> StreamSink<EventArray> for EventStream<T> {
179 async fn run(self: Box<Self>, input: stream::BoxStream<'_, EventArray>) -> Result<(), ()> {
180 let input = Box::pin(input.flat_map(into_event_stream));
181 self.sink.run(input).await
182 }
183}