vector_buffers/topology/channel/
receiver.rs

1use std::{
2    mem,
3    pin::Pin,
4    task::{ready, Context, Poll},
5};
6
7use async_recursion::async_recursion;
8use futures::Stream;
9use tokio::select;
10use tokio_util::sync::ReusableBoxFuture;
11use vector_common::internal_event::emit;
12
13use super::limited_queue::LimitedReceiver;
14use crate::{
15    buffer_usage_data::BufferUsageHandle,
16    variants::disk_v2::{self, ProductionFilesystem},
17    Bufferable,
18};
19
20/// Adapter for papering over various receiver backends.
21#[allow(clippy::large_enum_variant)]
22#[derive(Debug)]
23pub enum ReceiverAdapter<T: Bufferable> {
24    /// The in-memory channel buffer.
25    InMemory(LimitedReceiver<T>),
26
27    /// The disk v2 buffer.
28    DiskV2(disk_v2::BufferReader<T, ProductionFilesystem>),
29}
30
31impl<T: Bufferable> From<LimitedReceiver<T>> for ReceiverAdapter<T> {
32    fn from(v: LimitedReceiver<T>) -> Self {
33        Self::InMemory(v)
34    }
35}
36
37impl<T: Bufferable> From<disk_v2::BufferReader<T, ProductionFilesystem>> for ReceiverAdapter<T> {
38    fn from(v: disk_v2::BufferReader<T, ProductionFilesystem>) -> Self {
39        Self::DiskV2(v)
40    }
41}
42
43impl<T> ReceiverAdapter<T>
44where
45    T: Bufferable,
46{
47    pub(crate) async fn next(&mut self) -> Option<T> {
48        match self {
49            ReceiverAdapter::InMemory(rx) => rx.next().await,
50            ReceiverAdapter::DiskV2(reader) => loop {
51                match reader.next().await {
52                    Ok(result) => break result,
53                    Err(e) => match e.as_recoverable_error() {
54                        Some(re) => {
55                            // If we've hit a recoverable error, we'll emit an event to indicate as much but we'll still
56                            // keep trying to read the next available record.
57                            emit(re);
58                        }
59                        None => panic!("Reader encountered unrecoverable error: {e:?}"),
60                    },
61                }
62            },
63        }
64    }
65}
66
67/// A buffer receiver.
68///
69/// The receiver handles retrieving events from the buffer, regardless of the overall buffer configuration.
70///
71/// If a buffer was configured to operate in "overflow" mode, then the receiver will be responsible
72/// for querying the overflow buffer as well.  The ordering of events when operating in "overflow"
73/// is undefined, as the receiver will try to manage polling both its own buffer, as well as the
74/// overflow buffer, in order to fairly balance throughput.
75#[derive(Debug)]
76pub struct BufferReceiver<T: Bufferable> {
77    base: ReceiverAdapter<T>,
78    overflow: Option<Box<BufferReceiver<T>>>,
79    instrumentation: Option<BufferUsageHandle>,
80}
81
82impl<T: Bufferable> BufferReceiver<T> {
83    /// Creates a new [`BufferReceiver`] wrapping the given channel receiver.
84    pub fn new(base: ReceiverAdapter<T>) -> Self {
85        Self {
86            base,
87            overflow: None,
88            instrumentation: None,
89        }
90    }
91
92    /// Creates a new [`BufferReceiver`] wrapping the given channel receiver and overflow receiver.
93    pub fn with_overflow(base: ReceiverAdapter<T>, overflow: BufferReceiver<T>) -> Self {
94        Self {
95            base,
96            overflow: Some(Box::new(overflow)),
97            instrumentation: None,
98        }
99    }
100
101    /// Converts this receiver into an overflowing receiver using the given `BufferSender<T>`.
102    ///
103    /// Note: this resets the internal state of this sender, and so this should not be called except
104    /// when initially constructing `BufferSender<T>`.
105    #[cfg(test)]
106    pub fn switch_to_overflow(&mut self, overflow: BufferReceiver<T>) {
107        self.overflow = Some(Box::new(overflow));
108    }
109
110    /// Configures this receiver to instrument the items passing through it.
111    pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
112        self.instrumentation = Some(handle);
113    }
114
115    #[async_recursion]
116    pub async fn next(&mut self) -> Option<T> {
117        // We want to poll both our base and overflow receivers without waiting for one or the
118        // other to entirely drain before checking the other.  This ensures that we're fairly
119        // servicing both receivers, and avoiding stalls in one or the other.
120        //
121        // This is primarily important in situations where an overflow-triggering event has
122        // occurred, and is over, and items are flowing through the base receiver.  If we waited to
123        // entirely drain the overflow receiver, we might cause another small stall of the pipeline
124        // attached to the base receiver.
125        let overflow = self.overflow.as_mut().map(Pin::new);
126
127        let (item, from_base) = match overflow {
128            None => match self.base.next().await {
129                Some(item) => (item, true),
130                None => return None,
131            },
132            Some(mut overflow) => {
133                select! {
134                    Some(item) = overflow.next() => (item, false),
135                    Some(item) = self.base.next() => (item, true),
136                    else => return None,
137                }
138            }
139        };
140
141        // If instrumentation is enabled, and we got the item from the base receiver, then and only
142        // then do we track sending the event out.
143        if let Some(handle) = self.instrumentation.as_ref() {
144            if from_base {
145                handle.increment_sent_event_count_and_byte_size(
146                    item.event_count() as u64,
147                    item.size_of() as u64,
148                );
149            }
150        }
151
152        Some(item)
153    }
154
155    pub fn into_stream(self) -> BufferReceiverStream<T> {
156        BufferReceiverStream::new(self)
157    }
158}
159
160#[allow(clippy::large_enum_variant)]
161enum StreamState<T: Bufferable> {
162    Idle(BufferReceiver<T>),
163    Polling,
164    Closed,
165}
166
167pub struct BufferReceiverStream<T: Bufferable> {
168    state: StreamState<T>,
169    recv_fut: ReusableBoxFuture<'static, (Option<T>, BufferReceiver<T>)>,
170}
171
172impl<T: Bufferable> BufferReceiverStream<T> {
173    pub fn new(receiver: BufferReceiver<T>) -> Self {
174        Self {
175            state: StreamState::Idle(receiver),
176            recv_fut: ReusableBoxFuture::new(make_recv_future(None)),
177        }
178    }
179}
180
181impl<T: Bufferable> Stream for BufferReceiverStream<T> {
182    type Item = T;
183
184    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185        loop {
186            match mem::replace(&mut self.state, StreamState::Polling) {
187                s @ StreamState::Closed => {
188                    self.state = s;
189                    return Poll::Ready(None);
190                }
191                StreamState::Idle(receiver) => {
192                    self.recv_fut.set(make_recv_future(Some(receiver)));
193                }
194                StreamState::Polling => {
195                    let (result, receiver) = ready!(self.recv_fut.poll(cx));
196                    self.state = if result.is_none() {
197                        StreamState::Closed
198                    } else {
199                        StreamState::Idle(receiver)
200                    };
201
202                    return Poll::Ready(result);
203                }
204            }
205        }
206    }
207}
208
209async fn make_recv_future<T: Bufferable>(
210    receiver: Option<BufferReceiver<T>>,
211) -> (Option<T>, BufferReceiver<T>) {
212    match receiver {
213        None => panic!("invalid to poll future in uninitialized state"),
214        Some(mut receiver) => {
215            let result = receiver.next().await;
216            (result, receiver)
217        }
218    }
219}