1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
use std::{
task::{ready, Context, Poll},
use async_recursion::async_recursion;
use futures::Stream;
use tokio::select;
use tokio_util::sync::ReusableBoxFuture;
use vector_common::internal_event::emit;
use super::limited_queue::LimitedReceiver;
use crate::{
variants::disk_v2::{self, ProductionFilesystem},
/// Adapter for papering over various receiver backends.
pub enum ReceiverAdapter<T: Bufferable> {
/// The in-memory channel buffer.
/// The disk v2 buffer.
DiskV2(disk_v2::BufferReader<T, ProductionFilesystem>),
impl<T: Bufferable> From<LimitedReceiver<T>> for ReceiverAdapter<T> {
fn from(v: LimitedReceiver<T>) -> Self {
impl<T: Bufferable> From<disk_v2::BufferReader<T, ProductionFilesystem>> for ReceiverAdapter<T> {
fn from(v: disk_v2::BufferReader<T, ProductionFilesystem>) -> Self {
impl<T> ReceiverAdapter<T>
T: Bufferable,
pub(crate) async fn next(&mut self) -> Option<T> {
match self {
ReceiverAdapter::InMemory(rx) =>,
ReceiverAdapter::DiskV2(reader) => loop {
match {
Ok(result) => break result,
Err(e) => match e.as_recoverable_error() {
Some(re) => {
// If we've hit a recoverable error, we'll emit an event to indicate as much but we'll still
// keep trying to read the next available record.
None => panic!("Reader encountered unrecoverable error: {e:?}"),
/// A buffer receiver.
/// The receiver handles retrieving events from the buffer, regardless of the overall buffer configuration.
/// If a buffer was configured to operate in "overflow" mode, then the receiver will be responsible
/// for querying the overflow buffer as well. The ordering of events when operating in "overflow"
/// is undefined, as the receiver will try to manage polling both its own buffer, as well as the
/// overflow buffer, in order to fairly balance throughput.
pub struct BufferReceiver<T: Bufferable> {
base: ReceiverAdapter<T>,
overflow: Option<Box<BufferReceiver<T>>>,
instrumentation: Option<BufferUsageHandle>,
impl<T: Bufferable> BufferReceiver<T> {
/// Creates a new [`BufferReceiver`] wrapping the given channel receiver.
pub fn new(base: ReceiverAdapter<T>) -> Self {
Self {
overflow: None,
instrumentation: None,
/// Creates a new [`BufferReceiver`] wrapping the given channel receiver and overflow receiver.
pub fn with_overflow(base: ReceiverAdapter<T>, overflow: BufferReceiver<T>) -> Self {
Self {
overflow: Some(Box::new(overflow)),
instrumentation: None,
/// Converts this receiver into an overflowing receiver using the given `BufferSender<T>`.
/// Note: this resets the internal state of this sender, and so this should not be called except
/// when initially constructing `BufferSender<T>`.
pub fn switch_to_overflow(&mut self, overflow: BufferReceiver<T>) {
self.overflow = Some(Box::new(overflow));
/// Configures this receiver to instrument the items passing through it.
pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
self.instrumentation = Some(handle);
pub async fn next(&mut self) -> Option<T> {
// We want to poll both our base and overflow receivers without waiting for one or the
// other to entirely drain before checking the other. This ensures that we're fairly
// servicing both receivers, and avoiding stalls in one or the other.
// This is primarily important in situations where an overflow-triggering event has
// occurred, and is over, and items are flowing through the base receiver. If we waited to
// entirely drain the overflow receiver, we might cause another small stall of the pipeline
// attached to the base receiver.
let overflow = self.overflow.as_mut().map(Pin::new);
let (item, from_base) = match overflow {
None => match {
Some(item) => (item, true),
None => return None,
Some(mut overflow) => {
select! {
Some(item) = => (item, false),
Some(item) = => (item, true),
else => return None,
// If instrumentation is enabled, and we got the item from the base receiver, then and only
// then do we track sending the event out.
if let Some(handle) = self.instrumentation.as_ref() {
if from_base {
item.event_count() as u64,
item.size_of() as u64,
pub fn into_stream(self) -> BufferReceiverStream<T> {
enum StreamState<T: Bufferable> {
pub struct BufferReceiverStream<T: Bufferable> {
state: StreamState<T>,
recv_fut: ReusableBoxFuture<'static, (Option<T>, BufferReceiver<T>)>,
impl<T: Bufferable> BufferReceiverStream<T> {
pub fn new(receiver: BufferReceiver<T>) -> Self {
Self {
state: StreamState::Idle(receiver),
recv_fut: ReusableBoxFuture::new(make_recv_future(None)),
impl<T: Bufferable> Stream for BufferReceiverStream<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match mem::replace(&mut self.state, StreamState::Polling) {
s @ StreamState::Closed => {
self.state = s;
return Poll::Ready(None);
StreamState::Idle(receiver) => {
StreamState::Polling => {
let (result, receiver) = ready!(self.recv_fut.poll(cx));
self.state = if result.is_none() {
} else {
return Poll::Ready(result);
async fn make_recv_future<T: Bufferable>(
receiver: Option<BufferReceiver<T>>,
) -> (Option<T>, BufferReceiver<T>) {
match receiver {
None => panic!("invalid to poll future in uninitialized state"),
Some(mut receiver) => {
let result =;
(result, receiver)