vector_buffers/topology/channel/
limited_queue.rs

1use std::{
2    cmp, fmt,
3    fmt::Debug,
4    pin::Pin,
5    sync::{
6        atomic::{AtomicUsize, Ordering},
7        Arc,
8    },
9};
10
11use async_stream::stream;
12use crossbeam_queue::{ArrayQueue, SegQueue};
13use futures::Stream;
14use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
15
16use crate::{config::MemoryBufferSize, InMemoryBufferable};
17
18/// Error returned by `LimitedSender::send` when the receiver has disconnected.
19#[derive(Debug, PartialEq, Eq)]
20pub struct SendError<T>(pub T);
21
22impl<T> fmt::Display for SendError<T> {
23    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
24        write!(fmt, "receiver disconnected")
25    }
26}
27
28impl<T: fmt::Debug> std::error::Error for SendError<T> {}
29
30/// Error returned by `LimitedSender::try_send`.
31#[derive(Debug, PartialEq, Eq)]
32pub enum TrySendError<T> {
33    InsufficientCapacity(T),
34    Disconnected(T),
35}
36
37impl<T> TrySendError<T> {
38    pub fn into_inner(self) -> T {
39        match self {
40            Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
41        }
42    }
43}
44
45impl<T> fmt::Display for TrySendError<T> {
46    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
47        match self {
48            Self::InsufficientCapacity(_) => {
49                write!(fmt, "channel lacks sufficient capacity for send")
50            }
51            Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
52        }
53    }
54}
55
56impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
57
58// Trait over common queue operations so implementation can be chosen at initialization phase
59trait QueueImpl<T>: Send + Sync + fmt::Debug {
60    fn push(&self, item: T);
61    fn pop(&self) -> Option<T>;
62}
63
64impl<T> QueueImpl<T> for ArrayQueue<T>
65where
66    T: Send + Sync + fmt::Debug,
67{
68    fn push(&self, item: T) {
69        self.push(item)
70            .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
71    }
72
73    fn pop(&self) -> Option<T> {
74        self.pop()
75    }
76}
77
78impl<T> QueueImpl<T> for SegQueue<T>
79where
80    T: Send + Sync + fmt::Debug,
81{
82    fn push(&self, item: T) {
83        self.push(item);
84    }
85
86    fn pop(&self) -> Option<T> {
87        self.pop()
88    }
89}
90
91#[derive(Debug)]
92struct Inner<T> {
93    data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
94    limit: MemoryBufferSize,
95    limiter: Arc<Semaphore>,
96    read_waker: Arc<Notify>,
97}
98
99impl<T> Clone for Inner<T> {
100    fn clone(&self) -> Self {
101        Self {
102            data: self.data.clone(),
103            limit: self.limit,
104            limiter: self.limiter.clone(),
105            read_waker: self.read_waker.clone(),
106        }
107    }
108}
109
110#[derive(Debug)]
111pub struct LimitedSender<T> {
112    inner: Inner<T>,
113    sender_count: Arc<AtomicUsize>,
114}
115
116impl<T: InMemoryBufferable> LimitedSender<T> {
117    #[allow(clippy::cast_possible_truncation)]
118    fn get_required_permits_for_item(&self, item: &T) -> u32 {
119        // We have to limit the number of permits we ask for to the overall limit since we're always
120        // willing to store more items than the limit if the queue is entirely empty, because
121        // otherwise we might deadlock ourselves by not being able to send a single item.
122        let (limit, value) = match self.inner.limit {
123            MemoryBufferSize::MaxSize(max_size) => (max_size, item.allocated_bytes()),
124            MemoryBufferSize::MaxEvents(max_events) => (max_events, item.event_count()),
125        };
126        cmp::min(limit.get(), value) as u32
127    }
128
129    /// Gets the number of items that this channel could accept.
130    pub fn available_capacity(&self) -> usize {
131        self.inner.limiter.available_permits()
132    }
133
134    /// Sends an item into the channel.
135    ///
136    /// # Errors
137    ///
138    /// If the receiver has disconnected (does not exist anymore), then `Err(SendError)` be returned
139    /// with the given `item`.
140    pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
141        // Calculate how many permits we need, and wait until we can acquire all of them.
142        let permits_required = self.get_required_permits_for_item(&item);
143        let Ok(permits) = self
144            .inner
145            .limiter
146            .clone()
147            .acquire_many_owned(permits_required)
148            .await
149        else {
150            return Err(SendError(item));
151        };
152
153        self.inner.data.push((permits, item));
154        self.inner.read_waker.notify_one();
155
156        trace!("Sent item.");
157
158        Ok(())
159    }
160
161    /// Attempts to send an item into the channel.
162    ///
163    /// # Errors
164    ///
165    /// If the receiver has disconnected (does not exist anymore), then
166    /// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
167    /// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
168    /// returned with the given `item`.
169    ///
170    /// # Panics
171    ///
172    /// Will panic if adding ack amount overflows.
173    pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
174        // Calculate how many permits we need, and try to acquire them all without waiting.
175        let permits_required = self.get_required_permits_for_item(&item);
176        let permits = match self
177            .inner
178            .limiter
179            .clone()
180            .try_acquire_many_owned(permits_required)
181        {
182            Ok(permits) => permits,
183            Err(ae) => {
184                return match ae {
185                    TryAcquireError::NoPermits => Err(TrySendError::InsufficientCapacity(item)),
186                    TryAcquireError::Closed => Err(TrySendError::Disconnected(item)),
187                }
188            }
189        };
190
191        self.inner.data.push((permits, item));
192        self.inner.read_waker.notify_one();
193
194        trace!("Attempt to send item succeeded.");
195
196        Ok(())
197    }
198}
199
200impl<T> Clone for LimitedSender<T> {
201    fn clone(&self) -> Self {
202        self.sender_count.fetch_add(1, Ordering::SeqCst);
203
204        Self {
205            inner: self.inner.clone(),
206            sender_count: Arc::clone(&self.sender_count),
207        }
208    }
209}
210
211impl<T> Drop for LimitedSender<T> {
212    fn drop(&mut self) {
213        // If we're the last sender to drop, close the semaphore on our way out the door.
214        if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
215            self.inner.limiter.close();
216            self.inner.read_waker.notify_one();
217        }
218    }
219}
220
221#[derive(Debug)]
222pub struct LimitedReceiver<T> {
223    inner: Inner<T>,
224}
225
226impl<T: Send + 'static> LimitedReceiver<T> {
227    /// Gets the number of items that this channel could accept.
228    pub fn available_capacity(&self) -> usize {
229        self.inner.limiter.available_permits()
230    }
231
232    pub async fn next(&mut self) -> Option<T> {
233        loop {
234            if let Some((_permit, item)) = self.inner.data.pop() {
235                return Some(item);
236            }
237
238            // There wasn't an item for us to pop, so see if the channel is actually closed.  If so,
239            // then it's time for us to close up shop as well.
240            if self.inner.limiter.is_closed() {
241                return None;
242            }
243
244            // We're not closed, so we need to wait for a writer to tell us they made some
245            // progress.  This might end up being a spurious wakeup since `Notify` will
246            // store a wake-up if there are no waiters, but oh well.
247            self.inner.read_waker.notified().await;
248        }
249    }
250
251    pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
252        let mut receiver = self;
253        Box::pin(stream! {
254            while let Some(item) = receiver.next().await {
255                yield item;
256            }
257        })
258    }
259}
260
261impl<T> Drop for LimitedReceiver<T> {
262    fn drop(&mut self) {
263        // Notify senders that the channel is now closed by closing the semaphore.  Any pending
264        // acquisitions will be awoken and notified that the semaphore is closed, and further new
265        // sends will immediately see the semaphore is closed.
266        self.inner.limiter.close();
267    }
268}
269
270pub fn limited<T: InMemoryBufferable + fmt::Debug>(
271    limit: MemoryBufferSize,
272) -> (LimitedSender<T>, LimitedReceiver<T>) {
273    let inner = match limit {
274        MemoryBufferSize::MaxEvents(max_events) => Inner {
275            data: Arc::new(ArrayQueue::new(max_events.get())),
276            limit,
277            limiter: Arc::new(Semaphore::new(max_events.get())),
278            read_waker: Arc::new(Notify::new()),
279        },
280        MemoryBufferSize::MaxSize(max_size) => Inner {
281            data: Arc::new(SegQueue::new()),
282            limit,
283            limiter: Arc::new(Semaphore::new(max_size.get())),
284            read_waker: Arc::new(Notify::new()),
285        },
286    };
287
288    let sender = LimitedSender {
289        inner: inner.clone(),
290        sender_count: Arc::new(AtomicUsize::new(1)),
291    };
292    let receiver = LimitedReceiver { inner };
293
294    (sender, receiver)
295}
296
297#[cfg(test)]
298mod tests {
299    use std::num::NonZeroUsize;
300
301    use tokio_test::{assert_pending, assert_ready, task::spawn};
302    use vector_common::byte_size_of::ByteSizeOf;
303
304    use super::limited;
305    use crate::{
306        test::MultiEventRecord,
307        topology::{channel::limited_queue::SendError, test_util::Sample},
308        MemoryBufferSize,
309    };
310
311    #[tokio::test]
312    async fn send_receive() {
313        let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap()));
314
315        assert_eq!(2, tx.available_capacity());
316
317        let msg = Sample::new(42);
318
319        // Create our send and receive futures.
320        let mut send = spawn(async { tx.send(msg).await });
321
322        let mut recv = spawn(async { rx.next().await });
323
324        // Nobody should be woken up.
325        assert!(!send.is_woken());
326        assert!(!recv.is_woken());
327
328        // Try polling our receive, which should be pending because we haven't anything yet.
329        assert_pending!(recv.poll());
330
331        // We should immediately be able to complete a send as there is available capacity.
332        assert_eq!(Ok(()), assert_ready!(send.poll()));
333
334        // Now our receive should have been woken up, and should immediately be ready.
335        assert!(recv.is_woken());
336        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
337    }
338
339    #[test]
340    fn test_limiting_by_byte_size() {
341        let max_elements = 10;
342        let msg = Sample::new_with_heap_allocated_values(50);
343        let msg_size = msg.allocated_bytes();
344        let max_allowed_bytes = msg_size * max_elements;
345
346        // With this configuration a maximum of exactly 10 messages can fit in the channel
347        let (mut tx, mut rx) = limited(MemoryBufferSize::MaxSize(
348            NonZeroUsize::new(max_allowed_bytes).unwrap(),
349        ));
350
351        assert_eq!(max_allowed_bytes, tx.available_capacity());
352
353        // Send 10 messages into the channel, filling it
354        for _ in 0..10 {
355            let msg_clone = msg.clone();
356            let mut f = spawn(async { tx.send(msg_clone).await });
357            assert_eq!(Ok(()), assert_ready!(f.poll()));
358        }
359        // With the 10th message in the channel no space should be left
360        assert_eq!(0, tx.available_capacity());
361
362        // Attemting to produce one more then the max capacity should block
363        let mut send_final = spawn({
364            let msg_clone = msg.clone();
365            async { tx.send(msg_clone).await }
366        });
367        assert_pending!(send_final.poll());
368
369        // Read all data from the channel, assert final states are as expected
370        for _ in 0..10 {
371            let mut f = spawn(async { rx.next().await });
372            let value = assert_ready!(f.poll());
373            assert_eq!(value.allocated_bytes(), msg_size);
374        }
375        // Channel should have no more data
376        let mut recv = spawn(async { rx.next().await });
377        assert_pending!(recv.poll());
378    }
379
380    #[test]
381    fn sender_waits_for_more_capacity_when_none_available() {
382        let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
383
384        assert_eq!(1, tx.available_capacity());
385
386        let msg1 = Sample::new(42);
387        let msg2 = Sample::new(43);
388
389        // Create our send and receive futures.
390        let mut send1 = spawn(async { tx.send(msg1).await });
391
392        let mut recv1 = spawn(async { rx.next().await });
393
394        // Nobody should be woken up.
395        assert!(!send1.is_woken());
396        assert!(!recv1.is_woken());
397
398        // Try polling our receive, which should be pending because we haven't anything yet.
399        assert_pending!(recv1.poll());
400
401        // We should immediately be able to complete a send as there is available capacity.
402        assert_eq!(Ok(()), assert_ready!(send1.poll()));
403        drop(send1);
404
405        assert_eq!(0, tx.available_capacity());
406
407        // Now our receive should have been woken up, and should immediately be ready... but we
408        // aren't going to read the value just yet.
409        assert!(recv1.is_woken());
410
411        // Now trigger a second send, which should block as there's no available capacity.
412        let mut send2 = spawn(async { tx.send(msg2).await });
413
414        assert!(!send2.is_woken());
415        assert_pending!(send2.poll());
416
417        // Now if we receive the item, our second send should be woken up and be able to send in.
418        assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
419        drop(recv1);
420
421        // Since the second send was already waiting for permits, the semaphore returns them
422        // directly to our waiting send, which should now be woken up and able to complete:
423        assert_eq!(0, rx.available_capacity());
424        assert!(send2.is_woken());
425
426        let mut recv2 = spawn(async { rx.next().await });
427        assert_pending!(recv2.poll());
428
429        assert_eq!(Ok(()), assert_ready!(send2.poll()));
430        drop(send2);
431
432        assert_eq!(0, tx.available_capacity());
433
434        // And the final receive to get our second send:
435        assert!(recv2.is_woken());
436        assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
437
438        assert_eq!(1, tx.available_capacity());
439    }
440
441    #[test]
442    fn sender_waits_for_more_capacity_when_partial_available() {
443        let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap()));
444
445        assert_eq!(7, tx.available_capacity());
446
447        let msgs1 = vec![
448            MultiEventRecord::new(1),
449            MultiEventRecord::new(2),
450            MultiEventRecord::new(3),
451        ];
452        let msg2 = MultiEventRecord::new(4);
453
454        // Create our send and receive futures.
455        let mut small_sends = spawn(async {
456            for msg in msgs1.clone() {
457                tx.send(msg).await?;
458            }
459
460            Ok::<_, SendError<MultiEventRecord>>(())
461        });
462
463        let mut recv1 = spawn(async { rx.next().await });
464
465        // Nobody should be woken up.
466        assert!(!small_sends.is_woken());
467        assert!(!recv1.is_woken());
468
469        // Try polling our receive, which should be pending because we haven't anything yet.
470        assert_pending!(recv1.poll());
471
472        // We should immediately be able to complete our three event sends, which we have
473        // available capacity for, but will consume all but one of the available slots.
474        assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
475        drop(small_sends);
476
477        assert_eq!(1, tx.available_capacity());
478
479        // Now our receive should have been woken up, and should immediately be ready, but we won't
480        // receive just yet.
481        assert!(recv1.is_woken());
482
483        // Now trigger a second send that has four events, and needs to wait for two receives to happen.
484        let mut send2 = spawn(tx.send(msg2.clone()));
485
486        assert!(!send2.is_woken());
487        assert_pending!(send2.poll());
488
489        // Now if we receive the first item, our second send should be woken up but still not able
490        // to send.
491        assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
492        drop(recv1);
493
494        // Callers waiting to acquire permits have the permits immediately transfer to them when one
495        // (or more) are released, so we expect this to be zero until we send and then read the
496        // third item.
497        assert_eq!(0, rx.available_capacity());
498
499        // We don't get woken up until all permits have been acquired.
500        assert!(!send2.is_woken());
501
502        // Our second read should unlock enough available capacity for the second send once complete.
503        let mut recv2 = spawn(async { rx.next().await });
504        assert!(!recv2.is_woken());
505        assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
506        drop(recv2);
507
508        assert_eq!(0, rx.available_capacity());
509
510        assert!(send2.is_woken());
511        assert_eq!(Ok(()), assert_ready!(send2.poll()));
512
513        // And just make sure we see those last two sends.
514        let mut recv3 = spawn(async { rx.next().await });
515        assert!(!recv3.is_woken());
516        assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
517        drop(recv3);
518
519        assert_eq!(3, rx.available_capacity());
520
521        let mut recv4 = spawn(async { rx.next().await });
522        assert!(!recv4.is_woken());
523        assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
524        drop(recv4);
525
526        assert_eq!(7, rx.available_capacity());
527    }
528
529    #[test]
530    fn empty_receiver_returns_none_when_last_sender_drops() {
531        let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
532
533        assert_eq!(1, tx.available_capacity());
534
535        let tx2 = tx.clone();
536        let msg = Sample::new(42);
537
538        // Create our send and receive futures.
539        let mut send = spawn(async { tx.send(msg).await });
540
541        let mut recv = spawn(async { rx.next().await });
542
543        // Nobody should be woken up.
544        assert!(!send.is_woken());
545        assert!(!recv.is_woken());
546
547        // Try polling our receive, which should be pending because we haven't anything yet.
548        assert_pending!(recv.poll());
549
550        // Now drop our second sender, which shouldn't do anything yet.
551        drop(tx2);
552        assert!(!recv.is_woken());
553        assert_pending!(recv.poll());
554
555        // Now drop our second sender, but not before doing a send, which should trigger closing the
556        // semaphore which should let the receiver complete with no further waiting: one item and
557        // then `None`.
558        assert_eq!(Ok(()), assert_ready!(send.poll()));
559        drop(send);
560        drop(tx);
561
562        assert!(recv.is_woken());
563        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
564        drop(recv);
565
566        let mut recv2 = spawn(async { rx.next().await });
567        assert!(!recv2.is_woken());
568        assert_eq!(None, assert_ready!(recv2.poll()));
569    }
570
571    #[test]
572    fn receiver_returns_none_once_empty_when_last_sender_drops() {
573        let (tx, mut rx) =
574            limited::<Sample>(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
575
576        assert_eq!(1, tx.available_capacity());
577
578        let tx2 = tx.clone();
579
580        // Create our receive future.
581        let mut recv = spawn(async { rx.next().await });
582
583        // Nobody should be woken up.
584        assert!(!recv.is_woken());
585
586        // Try polling our receive, which should be pending because we haven't anything yet.
587        assert_pending!(recv.poll());
588
589        // Now drop our first sender, which shouldn't do anything yet.
590        drop(tx);
591        assert!(!recv.is_woken());
592        assert_pending!(recv.poll());
593
594        // Now drop our second sender, which should trigger closing the semaphore which should let
595        // the receive complete as there are no items to read.
596        drop(tx2);
597        assert!(recv.is_woken());
598        assert_eq!(None, assert_ready!(recv.poll()));
599    }
600
601    #[test]
602    fn oversized_send_allowed_when_empty() {
603        let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap()));
604
605        assert_eq!(1, tx.available_capacity());
606
607        let msg = MultiEventRecord::new(2);
608
609        // Create our send and receive futures.
610        let mut send = spawn(async { tx.send(msg.clone()).await });
611
612        let mut recv = spawn(async { rx.next().await });
613
614        // Nobody should be woken up.
615        assert!(!send.is_woken());
616        assert!(!recv.is_woken());
617
618        // We should immediately be able to complete our send, which we don't have full
619        // available capacity for, but will consume all of the available slots.
620        assert_eq!(Ok(()), assert_ready!(send.poll()));
621        drop(send);
622
623        assert_eq!(0, tx.available_capacity());
624
625        // Now we should be able to get back the oversized item, but our capacity should not be
626        // greater than what we started with.
627        assert_eq!(Some(msg), assert_ready!(recv.poll()));
628        drop(recv);
629
630        assert_eq!(1, rx.available_capacity());
631    }
632
633    #[test]
634    fn oversized_send_allowed_when_partial_capacity() {
635        let (mut tx, mut rx) = limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap()));
636
637        assert_eq!(2, tx.available_capacity());
638
639        let msg1 = MultiEventRecord::new(1);
640        let msg2 = MultiEventRecord::new(3);
641
642        // Create our send future.
643        let mut send = spawn(async { tx.send(msg1.clone()).await });
644
645        // Nobody should be woken up.
646        assert!(!send.is_woken());
647
648        // We should immediately be able to complete our send, which will only use up a single slot.
649        assert_eq!(Ok(()), assert_ready!(send.poll()));
650        drop(send);
651
652        assert_eq!(1, tx.available_capacity());
653
654        // Now we'll trigger another send which has an oversized item.  It shouldn't be able to send
655        // until all permits are available.
656        let mut send2 = spawn(async { tx.send(msg2.clone()).await });
657
658        assert!(!send2.is_woken());
659        assert_pending!(send2.poll());
660
661        assert_eq!(0, rx.available_capacity());
662
663        // Now do a receive which should return the one consumed slot, essentially allowing all
664        // permits to be acquired by the blocked send.
665        let mut recv = spawn(async { rx.next().await });
666        assert!(!recv.is_woken());
667        assert!(!send2.is_woken());
668
669        assert_eq!(Some(msg1), assert_ready!(recv.poll()));
670        drop(recv);
671
672        assert_eq!(0, rx.available_capacity());
673
674        // Now our blocked send should be able to proceed, and we should be able to read back the
675        // item.
676        assert_eq!(Ok(()), assert_ready!(send2.poll()));
677        drop(send2);
678
679        assert_eq!(0, tx.available_capacity());
680
681        let mut recv2 = spawn(async { rx.next().await });
682        assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
683
684        assert_eq!(2, tx.available_capacity());
685    }
686}