1use std::{fmt, iter::IntoIterator, pin::Pin};
2
3use futures::{stream, task::Context, task::Poll, Sink, SinkExt, Stream, StreamExt};
4
5use crate::event::{into_event_stream, Event, EventArray, EventContainer};
6
7pub enum VectorSink {
8 Sink(Box<dyn Sink<EventArray, Error = ()> + Send + Unpin>),
9 Stream(Box<dyn StreamSink<EventArray> + Send>),
10}
11
12impl VectorSink {
13 pub async fn run(self, input: impl Stream<Item = EventArray> + Send) -> Result<(), ()> {
19 match self {
20 Self::Sink(sink) => input.map(Ok).forward(sink).await,
21 Self::Stream(s) => s.run(Box::pin(input)).await,
22 }
23 }
24
25 pub async fn run_events<I>(self, input: I) -> Result<(), ()>
31 where
32 I: IntoIterator<Item = Event> + Send,
33 I::IntoIter: Send,
34 {
35 self.run(stream::iter(input).map(Into::into)).await
36 }
37
38 pub fn into_sink(self) -> Box<dyn Sink<EventArray, Error = ()> + Send + Unpin> {
44 match self {
45 Self::Sink(sink) => sink,
46 _ => panic!("Failed type coercion, {self:?} is not a Sink"),
47 }
48 }
49
50 pub fn into_stream(self) -> Box<dyn StreamSink<EventArray> + Send> {
56 match self {
57 Self::Stream(stream) => stream,
58 _ => panic!("Failed type coercion, {self:?} is not a Stream"),
59 }
60 }
61
62 #[deprecated]
69 pub fn from_event_sink(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
70 VectorSink::Sink(Box::new(EventSink::new(sink)))
71 }
72
73 pub fn from_event_streamsink(sink: impl StreamSink<Event> + Send + 'static) -> Self {
75 let sink = Box::new(sink);
76 VectorSink::Stream(Box::new(EventStream { sink }))
77 }
78}
79
80impl fmt::Debug for VectorSink {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 f.debug_struct("VectorSink").finish()
83 }
84}
85
86#[async_trait::async_trait]
89pub trait StreamSink<T> {
90 async fn run(self: Box<Self>, input: stream::BoxStream<'_, T>) -> Result<(), ()>;
91}
92
93struct EventSink<S> {
97 sink: S,
98 queue: Option<<EventArray as EventContainer>::IntoIter>,
99}
100
101macro_rules! poll_ready_ok {
102 ( $e:expr ) => {
103 match $e {
104 r @ (Poll::Pending | Poll::Ready(Err(_))) => return r,
105 Poll::Ready(Ok(ok)) => ok,
106 }
107 };
108}
109
110impl<S: Sink<Event> + Send + Unpin> EventSink<S> {
111 fn new(sink: S) -> Self {
112 Self { sink, queue: None }
113 }
114
115 fn next_event(&mut self) -> Option<Event> {
116 match &mut self.queue {
117 #[allow(clippy::single_match_else)] Some(queue) => match queue.next() {
119 Some(event) => Some(event),
120 None => {
121 self.queue = None;
123 None
124 }
125 },
126 None => None,
127 }
128 }
129
130 fn flush_queue(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
131 while self.queue.is_some() {
132 poll_ready_ok!(self.sink.poll_ready_unpin(cx));
133 let Some(event) = self.next_event() else {
134 break;
135 };
136 if let Err(err) = self.sink.start_send_unpin(event) {
137 return Poll::Ready(Err(err));
138 }
139 }
140 Poll::Ready(Ok(()))
141 }
142}
143
144impl<S: Sink<Event> + Send + Unpin> Sink<EventArray> for EventSink<S> {
145 type Error = S::Error;
146 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147 poll_ready_ok!(self.flush_queue(cx));
148 self.sink.poll_ready_unpin(cx)
149 }
150
151 fn start_send(mut self: Pin<&mut Self>, events: EventArray) -> Result<(), Self::Error> {
152 assert!(self.queue.is_none()); self.queue = Some(events.into_events());
154 self.next_event()
155 .map_or(Ok(()), |event| self.sink.start_send_unpin(event))
156 }
157
158 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
159 poll_ready_ok!(self.flush_queue(cx));
160 self.sink.poll_flush_unpin(cx)
161 }
162
163 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
164 poll_ready_ok!(self.flush_queue(cx));
165 self.sink.poll_close_unpin(cx)
166 }
167}
168
169struct EventStream<T> {
171 sink: Box<T>,
172}
173
174#[async_trait::async_trait]
175impl<T: StreamSink<Event> + Send> StreamSink<EventArray> for EventStream<T> {
176 async fn run(self: Box<Self>, input: stream::BoxStream<'_, EventArray>) -> Result<(), ()> {
177 let input = Box::pin(input.flat_map(into_event_stream));
178 self.sink.run(input).await
179 }
180}