vector_core/
sink.rs

1use std::{fmt, iter::IntoIterator, pin::Pin};
2
3use futures::{stream, task::Context, task::Poll, Sink, SinkExt, Stream, StreamExt};
4
5use crate::event::{into_event_stream, Event, EventArray, EventContainer};
6
7pub enum VectorSink {
8    Sink(Box<dyn Sink<EventArray, Error = ()> + Send + Unpin>),
9    Stream(Box<dyn StreamSink<EventArray> + Send>),
10}
11
12impl VectorSink {
13    /// Run the `VectorSink`
14    ///
15    /// # Errors
16    ///
17    /// It is unclear under what conditions this function will error.
18    pub async fn run(self, input: impl Stream<Item = EventArray> + Send) -> Result<(), ()> {
19        match self {
20            Self::Sink(sink) => input.map(Ok).forward(sink).await,
21            Self::Stream(s) => s.run(Box::pin(input)).await,
22        }
23    }
24
25    /// Run the `VectorSink` with a one-time `Vec` of `Event`s, for use in tests
26    ///
27    /// # Errors
28    ///
29    /// See `VectorSink::run` for errors.
30    pub async fn run_events<I>(self, input: I) -> Result<(), ()>
31    where
32        I: IntoIterator<Item = Event> + Send,
33        I::IntoIter: Send,
34    {
35        self.run(stream::iter(input).map(Into::into)).await
36    }
37
38    /// Converts `VectorSink` into a `futures::Sink`
39    ///
40    /// # Panics
41    ///
42    /// This function will panic if the self instance is not `VectorSink::Sink`.
43    pub fn into_sink(self) -> Box<dyn Sink<EventArray, Error = ()> + Send + Unpin> {
44        match self {
45            Self::Sink(sink) => sink,
46            _ => panic!("Failed type coercion, {self:?} is not a Sink"),
47        }
48    }
49
50    /// Converts `VectorSink` into a `StreamSink`
51    ///
52    /// # Panics
53    ///
54    /// This function will panic if the self instance is not `VectorSink::Stream`.
55    pub fn into_stream(self) -> Box<dyn StreamSink<EventArray> + Send> {
56        match self {
57            Self::Stream(stream) => stream,
58            _ => panic!("Failed type coercion, {self:?} is not a Stream"),
59        }
60    }
61
62    /// Converts an event sink into a `VectorSink`
63    ///
64    /// Deprecated in favor of `VectorSink::from_event_streamsink`. See [vector/9261]
65    /// for more info.
66    ///
67    /// [vector/9261]: https://github.com/vectordotdev/vector/issues/9261
68    #[deprecated]
69    pub fn from_event_sink(sink: impl Sink<Event, Error = ()> + Send + Unpin + 'static) -> Self {
70        VectorSink::Sink(Box::new(EventSink::new(sink)))
71    }
72
73    /// Converts an event stream into a `VectorSink`
74    pub fn from_event_streamsink(sink: impl StreamSink<Event> + Send + 'static) -> Self {
75        let sink = Box::new(sink);
76        VectorSink::Stream(Box::new(EventStream { sink }))
77    }
78}
79
80impl fmt::Debug for VectorSink {
81    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82        f.debug_struct("VectorSink").finish()
83    }
84}
85
86// === StreamSink ===
87
88#[async_trait::async_trait]
89pub trait StreamSink<T> {
90    async fn run(self: Box<Self>, input: stream::BoxStream<'_, T>) -> Result<(), ()>;
91}
92
93/// Wrapper for sinks implementing `Sink<Event>` to implement
94/// `Sink<EventArray>`. This stores an iterator over the incoming
95/// `EventArray` to be pushed into the wrapped sink one at a time.
96struct EventSink<S> {
97    sink: S,
98    queue: Option<<EventArray as EventContainer>::IntoIter>,
99}
100
101macro_rules! poll_ready_ok {
102    ( $e:expr ) => {
103        match $e {
104            r @ (Poll::Pending | Poll::Ready(Err(_))) => return r,
105            Poll::Ready(Ok(ok)) => ok,
106        }
107    };
108}
109
110impl<S: Sink<Event> + Send + Unpin> EventSink<S> {
111    fn new(sink: S) -> Self {
112        Self { sink, queue: None }
113    }
114
115    fn next_event(&mut self) -> Option<Event> {
116        match &mut self.queue {
117            #[allow(clippy::single_match_else)] // No, clippy, this isn't a single pattern
118            Some(queue) => match queue.next() {
119                Some(event) => Some(event),
120                None => {
121                    // Reset the queue to empty after the last event
122                    self.queue = None;
123                    None
124                }
125            },
126            None => None,
127        }
128    }
129
130    fn flush_queue(self: &mut Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
131        while self.queue.is_some() {
132            poll_ready_ok!(self.sink.poll_ready_unpin(cx));
133            let Some(event) = self.next_event() else {
134                break;
135            };
136            if let Err(err) = self.sink.start_send_unpin(event) {
137                return Poll::Ready(Err(err));
138            }
139        }
140        Poll::Ready(Ok(()))
141    }
142}
143
144impl<S: Sink<Event> + Send + Unpin> Sink<EventArray> for EventSink<S> {
145    type Error = S::Error;
146    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147        poll_ready_ok!(self.flush_queue(cx));
148        self.sink.poll_ready_unpin(cx)
149    }
150
151    fn start_send(mut self: Pin<&mut Self>, events: EventArray) -> Result<(), Self::Error> {
152        assert!(self.queue.is_none()); // Should be guaranteed by `poll_ready`
153        self.queue = Some(events.into_events());
154        self.next_event()
155            .map_or(Ok(()), |event| self.sink.start_send_unpin(event))
156    }
157
158    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
159        poll_ready_ok!(self.flush_queue(cx));
160        self.sink.poll_flush_unpin(cx)
161    }
162
163    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
164        poll_ready_ok!(self.flush_queue(cx));
165        self.sink.poll_close_unpin(cx)
166    }
167}
168
169/// Wrapper for sinks implementing `StreamSink<Event>` to implement `StreamSink<EventArray>`
170struct EventStream<T> {
171    sink: Box<T>,
172}
173
174#[async_trait::async_trait]
175impl<T: StreamSink<Event> + Send> StreamSink<EventArray> for EventStream<T> {
176    async fn run(self: Box<Self>, input: stream::BoxStream<'_, EventArray>) -> Result<(), ()> {
177        let input = Box::pin(input.flat_map(into_event_stream));
178        self.sink.run(input).await
179    }
180}