vector_core/
sink.rs

1use std::{fmt, iter::IntoIterator, pin::Pin};
2
3use futures::{
4    Sink, SinkExt, Stream, StreamExt, stream,
5    task::{Context, Poll},
6};
7
8use crate::event::{Event, EventArray, EventContainer, into_event_stream};
9
10pub enum VectorSink {
11    Sink(Box<dyn Sink<EventArray, Error = ()> + Send + Unpin>),
12    Stream(Box<dyn StreamSink<EventArray> + Send>),
13}
14
15impl VectorSink {
16    /// Run the `VectorSink`
17    ///
18    /// # Errors
19    ///
20    /// It is unclear under what conditions this function will error.
21    pub async fn run(self, input: impl Stream<Item = EventArray> + Send) -> Result<(), ()> {
22        match self {
23            Self::Sink(sink) => input.map(Ok).forward(sink).await,
24            Self::Stream(s) => s.run(Box::pin(input)).await,
25        }
26    }
27
28    /// Run the `VectorSink` with a one-time `Vec` of `Event`s, for use in tests
29    ///
30    /// # Errors
31    ///
32    /// See `VectorSink::run` for errors.
33    pub async fn run_events<I>(self, input: I) -> Result<(), ()>
34    where
35        I: IntoIterator<Item = Event> + Send,
36        I::IntoIter: Send,
37    {
38        self.run(stream::iter(input).map(Into::into)).await
39    }
40
41    /// Converts `VectorSink` into a `futures::Sink`
42    ///
43    /// # Panics
44    ///
45    /// This function will panic if the self instance is not `VectorSink::Sink`.
46    pub fn into_sink(self) -> Box<dyn Sink<EventArray, Error = ()> + Send + Unpin> {
47        match self {
48            Self::Sink(sink) => sink,
49            _ => panic!("Failed type coercion, {self:?} is not a Sink"),
50        }
51    }
52
53    /// Converts `VectorSink` into a `StreamSink`
54    ///
55    /// # Panics
56    ///
57    /// This function will panic if the self instance is not `VectorSink::Stream`.
58    pub fn into_stream(self) -> Box<dyn StreamSink<EventArray> + Send> {
59        match self {
60            Self::Stream(stream) => stream,
61            _ => panic!("Failed type coercion, {self:?} is not a Stream"),
62        }
63    }
64
65    /// Converts an event sink into a `VectorSink`
66    ///
67    /// Deprecated in favor of `VectorSink::from_event_streamsink`. See [vector/9261]
68    /// for more info.
69    ///
70    /// [vector/9261]: https://github.com/vectordotdev/vector/issues/9261
71    #[deprecated]
72    pub fn from_event_sink(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
73        VectorSink::Sink(Box::new(EventSink::new(sink)))
74    }
75
76    /// Converts an event stream into a `VectorSink`
77    pub fn from_event_streamsink(sink: impl StreamSink<Event> + Send + 'static) -> Self {
78        let sink = Box::new(sink);
79        VectorSink::Stream(Box::new(EventStream { sink }))
80    }
81}
82
83impl fmt::Debug for VectorSink {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("VectorSink").finish()
86    }
87}
88
89// === StreamSink ===
90
91#[async_trait::async_trait]
92pub trait StreamSink<T> {
93    async fn run(self: Box<Self>, input: stream::BoxStream<'_, T>) -> Result<(), ()>;
94}
95
96/// Wrapper for sinks implementing `Sink<Event>` to implement
97/// `Sink<EventArray>`. This stores an iterator over the incoming
98/// `EventArray` to be pushed into the wrapped sink one at a time.
99struct EventSink<S> {
100    sink: S,
101    queue: Option<<EventArray as EventContainer>::IntoIter>,
102}
103
104macro_rules! poll_ready_ok {
105    ( $e:expr ) => {
106        match $e {
107            r @ (Poll::Pending | Poll::Ready(Err(_))) => return r,
108            Poll::Ready(Ok(ok)) => ok,
109        }
110    };
111}
112
113impl<S: Sink<Event> + Send + Unpin> EventSink<S> {
114    fn new(sink: S) -> Self {
115        Self { sink, queue: None }
116    }
117
118    fn next_event(&mut self) -> Option<Event> {
119        match &mut self.queue {
120            #[allow(clippy::single_match_else)] // No, clippy, this isn't a single pattern
121            Some(queue) => match queue.next() {
122                Some(event) => Some(event),
123                None => {
124                    // Reset the queue to empty after the last event
125                    self.queue = None;
126                    None
127                }
128            },
129            None => None,
130        }
131    }
132
133    fn flush_queue(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
134        while self.queue.is_some() {
135            poll_ready_ok!(self.sink.poll_ready_unpin(cx));
136            let Some(event) = self.next_event() else {
137                break;
138            };
139            if let Err(err) = self.sink.start_send_unpin(event) {
140                return Poll::Ready(Err(err));
141            }
142        }
143        Poll::Ready(Ok(()))
144    }
145}
146
147impl<S: Sink<Event> + Send + Unpin> Sink<EventArray> for EventSink<S> {
148    type Error = S::Error;
149    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
150        poll_ready_ok!(self.flush_queue(cx));
151        self.sink.poll_ready_unpin(cx)
152    }
153
154    fn start_send(mut self: Pin<&mut Self>, events: EventArray) -> Result<(), Self::Error> {
155        assert!(self.queue.is_none()); // Should be guaranteed by `poll_ready`
156        self.queue = Some(events.into_events());
157        self.next_event()
158            .map_or(Ok(()), |event| self.sink.start_send_unpin(event))
159    }
160
161    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
162        poll_ready_ok!(self.flush_queue(cx));
163        self.sink.poll_flush_unpin(cx)
164    }
165
166    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167        poll_ready_ok!(self.flush_queue(cx));
168        self.sink.poll_close_unpin(cx)
169    }
170}
171
172/// Wrapper for sinks implementing `StreamSink<Event>` to implement `StreamSink<EventArray>`
173struct EventStream<T> {
174    sink: Box<T>,
175}
176
177#[async_trait::async_trait]
178impl<T: StreamSink<Event> + Send> StreamSink<EventArray> for EventStream<T> {
179    async fn run(self: Box<Self>, input: stream::BoxStream<'_, EventArray>) -> Result<(), ()> {
180        let input = Box::pin(input.flat_map(into_event_stream));
181        self.sink.run(input).await
182    }
183}