vector_buffers/topology/channel/
receiver.rs1use std::{
2 mem,
3 pin::Pin,
4 task::{ready, Context, Poll},
5};
6
7use async_recursion::async_recursion;
8use futures::Stream;
9use tokio::select;
10use tokio_util::sync::ReusableBoxFuture;
11use vector_common::internal_event::emit;
12
13use super::limited_queue::LimitedReceiver;
14use crate::{
15 buffer_usage_data::BufferUsageHandle,
16 variants::disk_v2::{self, ProductionFilesystem},
17 Bufferable,
18};
19
20#[allow(clippy::large_enum_variant)]
22#[derive(Debug)]
23pub enum ReceiverAdapter<T: Bufferable> {
24 InMemory(LimitedReceiver<T>),
26
27 DiskV2(disk_v2::BufferReader<T, ProductionFilesystem>),
29}
30
31impl<T: Bufferable> From<LimitedReceiver<T>> for ReceiverAdapter<T> {
32 fn from(v: LimitedReceiver<T>) -> Self {
33 Self::InMemory(v)
34 }
35}
36
37impl<T: Bufferable> From<disk_v2::BufferReader<T, ProductionFilesystem>> for ReceiverAdapter<T> {
38 fn from(v: disk_v2::BufferReader<T, ProductionFilesystem>) -> Self {
39 Self::DiskV2(v)
40 }
41}
42
43impl<T> ReceiverAdapter<T>
44where
45 T: Bufferable,
46{
47 pub(crate) async fn next(&mut self) -> Option<T> {
48 match self {
49 ReceiverAdapter::InMemory(rx) => rx.next().await,
50 ReceiverAdapter::DiskV2(reader) => loop {
51 match reader.next().await {
52 Ok(result) => break result,
53 Err(e) => match e.as_recoverable_error() {
54 Some(re) => {
55 emit(re);
58 }
59 None => panic!("Reader encountered unrecoverable error: {e:?}"),
60 },
61 }
62 },
63 }
64 }
65}
66
67#[derive(Debug)]
76pub struct BufferReceiver<T: Bufferable> {
77 base: ReceiverAdapter<T>,
78 overflow: Option<Box<BufferReceiver<T>>>,
79 instrumentation: Option<BufferUsageHandle>,
80}
81
82impl<T: Bufferable> BufferReceiver<T> {
83 pub fn new(base: ReceiverAdapter<T>) -> Self {
85 Self {
86 base,
87 overflow: None,
88 instrumentation: None,
89 }
90 }
91
92 pub fn with_overflow(base: ReceiverAdapter<T>, overflow: BufferReceiver<T>) -> Self {
94 Self {
95 base,
96 overflow: Some(Box::new(overflow)),
97 instrumentation: None,
98 }
99 }
100
101 #[cfg(test)]
106 pub fn switch_to_overflow(&mut self, overflow: BufferReceiver<T>) {
107 self.overflow = Some(Box::new(overflow));
108 }
109
110 pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
112 self.instrumentation = Some(handle);
113 }
114
115 #[async_recursion]
116 pub async fn next(&mut self) -> Option<T> {
117 let overflow = self.overflow.as_mut().map(Pin::new);
126
127 let (item, from_base) = match overflow {
128 None => match self.base.next().await {
129 Some(item) => (item, true),
130 None => return None,
131 },
132 Some(mut overflow) => {
133 select! {
134 Some(item) = overflow.next() => (item, false),
135 Some(item) = self.base.next() => (item, true),
136 else => return None,
137 }
138 }
139 };
140
141 if let Some(handle) = self.instrumentation.as_ref() {
144 if from_base {
145 handle.increment_sent_event_count_and_byte_size(
146 item.event_count() as u64,
147 item.size_of() as u64,
148 );
149 }
150 }
151
152 Some(item)
153 }
154
155 pub fn into_stream(self) -> BufferReceiverStream<T> {
156 BufferReceiverStream::new(self)
157 }
158}
159
160#[allow(clippy::large_enum_variant)]
161enum StreamState<T: Bufferable> {
162 Idle(BufferReceiver<T>),
163 Polling,
164 Closed,
165}
166
167pub struct BufferReceiverStream<T: Bufferable> {
168 state: StreamState<T>,
169 recv_fut: ReusableBoxFuture<'static, (Option<T>, BufferReceiver<T>)>,
170}
171
172impl<T: Bufferable> BufferReceiverStream<T> {
173 pub fn new(receiver: BufferReceiver<T>) -> Self {
174 Self {
175 state: StreamState::Idle(receiver),
176 recv_fut: ReusableBoxFuture::new(make_recv_future(None)),
177 }
178 }
179}
180
181impl<T: Bufferable> Stream for BufferReceiverStream<T> {
182 type Item = T;
183
184 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
185 loop {
186 match mem::replace(&mut self.state, StreamState::Polling) {
187 s @ StreamState::Closed => {
188 self.state = s;
189 return Poll::Ready(None);
190 }
191 StreamState::Idle(receiver) => {
192 self.recv_fut.set(make_recv_future(Some(receiver)));
193 }
194 StreamState::Polling => {
195 let (result, receiver) = ready!(self.recv_fut.poll(cx));
196 self.state = if result.is_none() {
197 StreamState::Closed
198 } else {
199 StreamState::Idle(receiver)
200 };
201
202 return Poll::Ready(result);
203 }
204 }
205 }
206 }
207}
208
209async fn make_recv_future<T: Bufferable>(
210 receiver: Option<BufferReceiver<T>>,
211) -> (Option<T>, BufferReceiver<T>) {
212 match receiver {
213 None => panic!("invalid to poll future in uninitialized state"),
214 Some(mut receiver) => {
215 let result = receiver.next().await;
216 (result, receiver)
217 }
218 }
219}