vector_buffers/topology/channel/
limited_queue.rs1use std::{
2 cmp, fmt,
3 fmt::Debug,
4 pin::Pin,
5 sync::{
6 atomic::{AtomicUsize, Ordering},
7 Arc,
8 },
9};
10
11use async_stream::stream;
12use crossbeam_queue::{ArrayQueue, SegQueue};
13use futures::Stream;
14use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
15
16use crate::{config::MemoryBufferSize, InMemoryBufferable};
17
18#[derive(Debug, PartialEq, Eq)]
20pub struct SendError<T>(pub T);
21
22impl<T> fmt::Display for SendError<T> {
23 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24 write!(fmt, "receiver disconnected")
25 }
26}
27
28impl<T: fmt::Debug> std::error::Error for SendError<T> {}
29
30#[derive(Debug, PartialEq, Eq)]
32pub enum TrySendError<T> {
33 InsufficientCapacity(T),
34 Disconnected(T),
35}
36
37impl<T> TrySendError<T> {
38 pub fn into_inner(self) -> T {
39 match self {
40 Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
41 }
42 }
43}
44
45impl<T> fmt::Display for TrySendError<T> {
46 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
47 match self {
48 Self::InsufficientCapacity(_) => {
49 write!(fmt, "channel lacks sufficient capacity for send")
50 }
51 Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
52 }
53 }
54}
55
56impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
57
58trait QueueImpl<T>: Send + Sync + fmt::Debug {
60 fn push(&self, item: T);
61 fn pop(&self) -> Option<T>;
62}
63
64impl<T> QueueImpl<T> for ArrayQueue<T>
65where
66 T: Send + Sync + fmt::Debug,
67{
68 fn push(&self, item: T) {
69 self.push(item)
70 .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
71 }
72
73 fn pop(&self) -> Option<T> {
74 self.pop()
75 }
76}
77
78impl<T> QueueImpl<T> for SegQueue<T>
79where
80 T: Send + Sync + fmt::Debug,
81{
82 fn push(&self, item: T) {
83 self.push(item);
84 }
85
86 fn pop(&self) -> Option<T> {
87 self.pop()
88 }
89}
90
91#[derive(Debug)]
92struct Inner<T> {
93 data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
94 limit: MemoryBufferSize,
95 limiter: Arc<Semaphore>,
96 read_waker: Arc<Notify>,
97}
98
99impl<T> Clone for Inner<T> {
100 fn clone(&self) -> Self {
101 Self {
102 data: self.data.clone(),
103 limit: self.limit,
104 limiter: self.limiter.clone(),
105 read_waker: self.read_waker.clone(),
106 }
107 }
108}
109
110#[derive(Debug)]
111pub struct LimitedSender<T> {
112 inner: Inner<T>,
113 sender_count: Arc<AtomicUsize>,
114}
115
116impl<T: InMemoryBufferable> LimitedSender<T> {
117 #[allow(clippy::cast_possible_truncation)]
118 fn get_required_permits_for_item(&self, item: &T) -> u32 {
119 let (limit, value) = match self.inner.limit {
123 MemoryBufferSize::MaxSize(max_size) => (max_size, item.allocated_bytes()),
124 MemoryBufferSize::MaxEvents(max_events) => (max_events, item.event_count()),
125 };
126 cmp::min(limit.get(), value) as u32
127 }
128
129 pub fn available_capacity(&self) -> usize {
131 self.inner.limiter.available_permits()
132 }
133
134 pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
141 let permits_required = self.get_required_permits_for_item(&item);
143 let Ok(permits) = self
144 .inner
145 .limiter
146 .clone()
147 .acquire_many_owned(permits_required)
148 .await
149 else {
150 return Err(SendError(item));
151 };
152
153 self.inner.data.push((permits, item));
154 self.inner.read_waker.notify_one();
155
156 trace!("Sent item.");
157
158 Ok(())
159 }
160
161 pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
174 let permits_required = self.get_required_permits_for_item(&item);
176 let permits = match self
177 .inner
178 .limiter
179 .clone()
180 .try_acquire_many_owned(permits_required)
181 {
182 Ok(permits) => permits,
183 Err(ae) => {
184 return match ae {
185 TryAcquireError::NoPermits => Err(TrySendError::InsufficientCapacity(item)),
186 TryAcquireError::Closed => Err(TrySendError::Disconnected(item)),
187 }
188 }
189 };
190
191 self.inner.data.push((permits, item));
192 self.inner.read_waker.notify_one();
193
194 trace!("Attempt to send item succeeded.");
195
196 Ok(())
197 }
198}
199
200impl<T> Clone for LimitedSender<T> {
201 fn clone(&self) -> Self {
202 self.sender_count.fetch_add(1, Ordering::SeqCst);
203
204 Self {
205 inner: self.inner.clone(),
206 sender_count: Arc::clone(&self.sender_count),
207 }
208 }
209}
210
211impl<T> Drop for LimitedSender<T> {
212 fn drop(&mut self) {
213 if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
215 self.inner.limiter.close();
216 self.inner.read_waker.notify_one();
217 }
218 }
219}
220
221#[derive(Debug)]
222pub struct LimitedReceiver<T> {
223 inner: Inner<T>,
224}
225
226impl<T: Send + 'static> LimitedReceiver<T> {
227 pub fn available_capacity(&self) -> usize {
229 self.inner.limiter.available_permits()
230 }
231
232 pub async fn next(&mut self) -> Option<T> {
233 loop {
234 if let Some((_permit, item)) = self.inner.data.pop() {
235 return Some(item);
236 }
237
238 if self.inner.limiter.is_closed() {
241 return None;
242 }
243
244 self.inner.read_waker.notified().await;
248 }
249 }
250
251 pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
252 let mut receiver = self;
253 Box::pin(stream! {
254 while let Some(item) = receiver.next().await {
255 yield item;
256 }
257 })
258 }
259}
260
261impl<T> Drop for LimitedReceiver<T> {
262 fn drop(&mut self) {
263 self.inner.limiter.close();
267 }
268}
269
270pub fn limited<T: InMemoryBufferable + fmt::Debug>(
271 limit: MemoryBufferSize,
272) -> (LimitedSender<T>, LimitedReceiver<T>) {
273 let inner = match limit {
274 MemoryBufferSize::MaxEvents(max_events) => Inner {
275 data: Arc::new(ArrayQueue::new(max_events.get())),
276 limit,
277 limiter: Arc::new(Semaphore::new(max_events.get())),
278 read_waker: Arc::new(Notify::new()),
279 },
280 MemoryBufferSize::MaxSize(max_size) => Inner {
281 data: Arc::new(SegQueue::new()),
282 limit,
283 limiter: Arc::new(Semaphore::new(max_size.get())),
284 read_waker: Arc::new(Notify::new()),
285 },
286 };
287
288 let sender = LimitedSender {
289 inner: inner.clone(),
290 sender_count: Arc::new(AtomicUsize::new(1)),
291 };
292 let receiver = LimitedReceiver { inner };
293
294 (sender, receiver)
295}
296
297#[cfg(test)]
298mod tests {
299 use std::num::NonZeroUsize;
300
301 use tokio_test::{assert_pending, assert_ready, task::spawn};
302 use vector_common::byte_size_of::ByteSizeOf;
303
304 use super::limited;
305 use crate::{
306 test::MultiEventRecord,
307 topology::{channel::limited_queue::SendError, test_util::Sample},
308 MemoryBufferSize,
309 };
310
311 #[tokio::test]
312 async fn send_receive() {
313 let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap()));
314
315 assert_eq!(2, tx.available_capacity());
316
317 let msg = Sample::new(42);
318
319 let mut send = spawn(async { tx.send(msg).await });
321
322 let mut recv = spawn(async { rx.next().await });
323
324 assert!(!send.is_woken());
326 assert!(!recv.is_woken());
327
328 assert_pending!(recv.poll());
330
331 assert_eq!(Ok(()), assert_ready!(send.poll()));
333
334 assert!(recv.is_woken());
336 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
337 }
338
339 #[test]
340 fn test_limiting_by_byte_size() {
341 let max_elements = 10;
342 let msg = Sample::new_with_heap_allocated_values(50);
343 let msg_size = msg.allocated_bytes();
344 let max_allowed_bytes = msg_size * max_elements;
345
346 let (mut tx, mut rx) = limited(MemoryBufferSize::MaxSize(
348 NonZeroUsize::new(max_allowed_bytes).unwrap(),
349 ));
350
351 assert_eq!(max_allowed_bytes, tx.available_capacity());
352
353 for _ in 0..10 {
355 let msg_clone = msg.clone();
356 let mut f = spawn(async { tx.send(msg_clone).await });
357 assert_eq!(Ok(()), assert_ready!(f.poll()));
358 }
359 assert_eq!(0, tx.available_capacity());
361
362 let mut send_final = spawn({
364 let msg_clone = msg.clone();
365 async { tx.send(msg_clone).await }
366 });
367 assert_pending!(send_final.poll());
368
369 for _ in 0..10 {
371 let mut f = spawn(async { rx.next().await });
372 let value = assert_ready!(f.poll());
373 assert_eq!(value.allocated_bytes(), msg_size);
374 }
375 let mut recv = spawn(async { rx.next().await });
377 assert_pending!(recv.poll());
378 }
379
380 #[test]
381 fn sender_waits_for_more_capacity_when_none_available() {
382 let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
383
384 assert_eq!(1, tx.available_capacity());
385
386 let msg1 = Sample::new(42);
387 let msg2 = Sample::new(43);
388
389 let mut send1 = spawn(async { tx.send(msg1).await });
391
392 let mut recv1 = spawn(async { rx.next().await });
393
394 assert!(!send1.is_woken());
396 assert!(!recv1.is_woken());
397
398 assert_pending!(recv1.poll());
400
401 assert_eq!(Ok(()), assert_ready!(send1.poll()));
403 drop(send1);
404
405 assert_eq!(0, tx.available_capacity());
406
407 assert!(recv1.is_woken());
410
411 let mut send2 = spawn(async { tx.send(msg2).await });
413
414 assert!(!send2.is_woken());
415 assert_pending!(send2.poll());
416
417 assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
419 drop(recv1);
420
421 assert_eq!(0, rx.available_capacity());
424 assert!(send2.is_woken());
425
426 let mut recv2 = spawn(async { rx.next().await });
427 assert_pending!(recv2.poll());
428
429 assert_eq!(Ok(()), assert_ready!(send2.poll()));
430 drop(send2);
431
432 assert_eq!(0, tx.available_capacity());
433
434 assert!(recv2.is_woken());
436 assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
437
438 assert_eq!(1, tx.available_capacity());
439 }
440
441 #[test]
442 fn sender_waits_for_more_capacity_when_partial_available() {
443 let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap()));
444
445 assert_eq!(7, tx.available_capacity());
446
447 let msgs1 = vec![
448 MultiEventRecord::new(1),
449 MultiEventRecord::new(2),
450 MultiEventRecord::new(3),
451 ];
452 let msg2 = MultiEventRecord::new(4);
453
454 let mut small_sends = spawn(async {
456 for msg in msgs1.clone() {
457 tx.send(msg).await?;
458 }
459
460 Ok::<_, SendError<MultiEventRecord>>(())
461 });
462
463 let mut recv1 = spawn(async { rx.next().await });
464
465 assert!(!small_sends.is_woken());
467 assert!(!recv1.is_woken());
468
469 assert_pending!(recv1.poll());
471
472 assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
475 drop(small_sends);
476
477 assert_eq!(1, tx.available_capacity());
478
479 assert!(recv1.is_woken());
482
483 let mut send2 = spawn(tx.send(msg2.clone()));
485
486 assert!(!send2.is_woken());
487 assert_pending!(send2.poll());
488
489 assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
492 drop(recv1);
493
494 assert_eq!(0, rx.available_capacity());
498
499 assert!(!send2.is_woken());
501
502 let mut recv2 = spawn(async { rx.next().await });
504 assert!(!recv2.is_woken());
505 assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
506 drop(recv2);
507
508 assert_eq!(0, rx.available_capacity());
509
510 assert!(send2.is_woken());
511 assert_eq!(Ok(()), assert_ready!(send2.poll()));
512
513 let mut recv3 = spawn(async { rx.next().await });
515 assert!(!recv3.is_woken());
516 assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
517 drop(recv3);
518
519 assert_eq!(3, rx.available_capacity());
520
521 let mut recv4 = spawn(async { rx.next().await });
522 assert!(!recv4.is_woken());
523 assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
524 drop(recv4);
525
526 assert_eq!(7, rx.available_capacity());
527 }
528
529 #[test]
530 fn empty_receiver_returns_none_when_last_sender_drops() {
531 let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
532
533 assert_eq!(1, tx.available_capacity());
534
535 let tx2 = tx.clone();
536 let msg = Sample::new(42);
537
538 let mut send = spawn(async { tx.send(msg).await });
540
541 let mut recv = spawn(async { rx.next().await });
542
543 assert!(!send.is_woken());
545 assert!(!recv.is_woken());
546
547 assert_pending!(recv.poll());
549
550 drop(tx2);
552 assert!(!recv.is_woken());
553 assert_pending!(recv.poll());
554
555 assert_eq!(Ok(()), assert_ready!(send.poll()));
559 drop(send);
560 drop(tx);
561
562 assert!(recv.is_woken());
563 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
564 drop(recv);
565
566 let mut recv2 = spawn(async { rx.next().await });
567 assert!(!recv2.is_woken());
568 assert_eq!(None, assert_ready!(recv2.poll()));
569 }
570
571 #[test]
572 fn receiver_returns_none_once_empty_when_last_sender_drops() {
573 let (tx, mut rx) =
574 limited::<Sample>(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
575
576 assert_eq!(1, tx.available_capacity());
577
578 let tx2 = tx.clone();
579
580 let mut recv = spawn(async { rx.next().await });
582
583 assert!(!recv.is_woken());
585
586 assert_pending!(recv.poll());
588
589 drop(tx);
591 assert!(!recv.is_woken());
592 assert_pending!(recv.poll());
593
594 drop(tx2);
597 assert!(recv.is_woken());
598 assert_eq!(None, assert_ready!(recv.poll()));
599 }
600
601 #[test]
602 fn oversized_send_allowed_when_empty() {
603 let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
604
605 assert_eq!(1, tx.available_capacity());
606
607 let msg = MultiEventRecord::new(2);
608
609 let mut send = spawn(async { tx.send(msg.clone()).await });
611
612 let mut recv = spawn(async { rx.next().await });
613
614 assert!(!send.is_woken());
616 assert!(!recv.is_woken());
617
618 assert_eq!(Ok(()), assert_ready!(send.poll()));
621 drop(send);
622
623 assert_eq!(0, tx.available_capacity());
624
625 assert_eq!(Some(msg), assert_ready!(recv.poll()));
628 drop(recv);
629
630 assert_eq!(1, rx.available_capacity());
631 }
632
633 #[test]
634 fn oversized_send_allowed_when_partial_capacity() {
635 let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap()));
636
637 assert_eq!(2, tx.available_capacity());
638
639 let msg1 = MultiEventRecord::new(1);
640 let msg2 = MultiEventRecord::new(3);
641
642 let mut send = spawn(async { tx.send(msg1.clone()).await });
644
645 assert!(!send.is_woken());
647
648 assert_eq!(Ok(()), assert_ready!(send.poll()));
650 drop(send);
651
652 assert_eq!(1, tx.available_capacity());
653
654 let mut send2 = spawn(async { tx.send(msg2.clone()).await });
657
658 assert!(!send2.is_woken());
659 assert_pending!(send2.poll());
660
661 assert_eq!(0, rx.available_capacity());
662
663 let mut recv = spawn(async { rx.next().await });
666 assert!(!recv.is_woken());
667 assert!(!send2.is_woken());
668
669 assert_eq!(Some(msg1), assert_ready!(recv.poll()));
670 drop(recv);
671
672 assert_eq!(0, rx.available_capacity());
673
674 assert_eq!(Ok(()), assert_ready!(send2.poll()));
677 drop(send2);
678
679 assert_eq!(0, tx.available_capacity());
680
681 let mut recv2 = spawn(async { rx.next().await });
682 assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
683
684 assert_eq!(2, tx.available_capacity());
685 }
686}