vector_buffers/topology/channel/
limited_queue.rs

1use std::{
2    cmp, fmt,
3    fmt::Debug,
4    pin::Pin,
5    sync::{
6        Arc,
7        atomic::{AtomicUsize, Ordering},
8    },
9};
10
11#[cfg(test)]
12use std::sync::Mutex;
13
14use async_stream::stream;
15use crossbeam_queue::{ArrayQueue, SegQueue};
16use futures::Stream;
17use metrics::{Gauge, Histogram, gauge, histogram};
18use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
19use vector_common::stats::AtomicEwma;
20
21use crate::{InMemoryBufferable, config::MemoryBufferSize};
22
23/// The alpha value for the Exponentially Weighted Moving Average (EWMA) calculation. This is a
24/// measure of how much weight to give to the current value versus the previous values. A value of
25/// 0.9 results in a "half life" of 6-7 measurements.
26pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;
27
28/// Error returned by `LimitedSender::send` when the receiver has disconnected.
29#[derive(Debug, PartialEq, Eq)]
30pub struct SendError<T>(pub T);
31
32impl<T> fmt::Display for SendError<T> {
33    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
34        write!(fmt, "receiver disconnected")
35    }
36}
37
38impl<T: fmt::Debug> std::error::Error for SendError<T> {}
39
40/// Error returned by `LimitedSender::try_send`.
41#[derive(Debug, PartialEq, Eq)]
42pub enum TrySendError<T> {
43    InsufficientCapacity(T),
44    Disconnected(T),
45}
46
47impl<T> TrySendError<T> {
48    pub fn into_inner(self) -> T {
49        match self {
50            Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
51        }
52    }
53}
54
55impl<T> fmt::Display for TrySendError<T> {
56    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
57        match self {
58            Self::InsufficientCapacity(_) => {
59                write!(fmt, "channel lacks sufficient capacity for send")
60            }
61            Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
62        }
63    }
64}
65
66impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
67
68// Trait over common queue operations so implementation can be chosen at initialization phase
69trait QueueImpl<T>: Send + Sync + fmt::Debug {
70    fn push(&self, item: T);
71    fn pop(&self) -> Option<T>;
72}
73
74impl<T> QueueImpl<T> for ArrayQueue<T>
75where
76    T: Send + Sync + fmt::Debug,
77{
78    fn push(&self, item: T) {
79        self.push(item)
80            .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
81    }
82
83    fn pop(&self) -> Option<T> {
84        self.pop()
85    }
86}
87
88impl<T> QueueImpl<T> for SegQueue<T>
89where
90    T: Send + Sync + fmt::Debug,
91{
92    fn push(&self, item: T) {
93        self.push(item);
94    }
95
96    fn pop(&self) -> Option<T> {
97        self.pop()
98    }
99}
100
101#[derive(Clone, Debug)]
102pub struct ChannelMetricMetadata {
103    prefix: &'static str,
104    output: Option<String>,
105}
106
107impl ChannelMetricMetadata {
108    pub fn new(prefix: &'static str, output: Option<String>) -> Self {
109        Self { prefix, output }
110    }
111}
112
113#[derive(Clone, Debug)]
114struct Metrics {
115    histogram: Histogram,
116    gauge: Gauge,
117    mean_gauge: Gauge,
118    ewma: Arc<AtomicEwma>,
119    // We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
120    // since the value is static, we never need to update it. The compiler detects this as an unused
121    // field, so we need to suppress the warning here.
122    #[expect(dead_code)]
123    max_gauge: Gauge,
124    #[expect(dead_code)]
125    legacy_max_gauge: Gauge,
126    #[cfg(test)]
127    recorded_values: Arc<Mutex<Vec<usize>>>,
128}
129
130impl Metrics {
131    #[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
132    fn new(
133        limit: MemoryBufferSize,
134        metadata: ChannelMetricMetadata,
135        ewma_alpha: Option<f64>,
136    ) -> Self {
137        let ChannelMetricMetadata { prefix, output } = metadata;
138        let (legacy_suffix, gauge_suffix, max_value) = match limit {
139            MemoryBufferSize::MaxEvents(max_events) => (
140                "_max_event_size",
141                "_max_size_events",
142                max_events.get() as f64,
143            ),
144            MemoryBufferSize::MaxSize(max_bytes) => {
145                ("_max_byte_size", "_max_size_bytes", max_bytes.get() as f64)
146            }
147        };
148        let max_gauge_name = format!("{prefix}{gauge_suffix}");
149        let legacy_max_gauge_name = format!("{prefix}{legacy_suffix}");
150        let histogram_name = format!("{prefix}_utilization");
151        let gauge_name = format!("{prefix}_utilization_level");
152        let mean_name = format!("{prefix}_utilization_mean");
153        let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
154        #[cfg(test)]
155        let recorded_values = Arc::new(Mutex::new(Vec::new()));
156        if let Some(label_value) = output {
157            let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
158            max_gauge.set(max_value);
159            // DEPRECATED: buffer-bytes-events-metrics
160            let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
161            legacy_max_gauge.set(max_value);
162            Self {
163                histogram: histogram!(histogram_name, "output" => label_value.clone()),
164                gauge: gauge!(gauge_name, "output" => label_value.clone()),
165                mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
166                max_gauge,
167                ewma,
168                legacy_max_gauge,
169                #[cfg(test)]
170                recorded_values,
171            }
172        } else {
173            let max_gauge = gauge!(max_gauge_name);
174            max_gauge.set(max_value);
175            // DEPRECATED: buffer-bytes-events-metrics
176            let legacy_max_gauge = gauge!(legacy_max_gauge_name);
177            legacy_max_gauge.set(max_value);
178            Self {
179                histogram: histogram!(histogram_name),
180                gauge: gauge!(gauge_name),
181                mean_gauge: gauge!(mean_name),
182                max_gauge,
183                ewma,
184                legacy_max_gauge,
185                #[cfg(test)]
186                recorded_values,
187            }
188        }
189    }
190
191    #[expect(clippy::cast_precision_loss)]
192    fn record(&self, value: usize) {
193        self.histogram.record(value as f64);
194        self.gauge.set(value as f64);
195        let avg = self.ewma.update(value as f64);
196        self.mean_gauge.set(avg);
197        #[cfg(test)]
198        if let Ok(mut recorded) = self.recorded_values.lock() {
199            recorded.push(value);
200        }
201    }
202}
203
204#[derive(Debug)]
205struct Inner<T> {
206    data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
207    limit: MemoryBufferSize,
208    limiter: Arc<Semaphore>,
209    read_waker: Arc<Notify>,
210    metrics: Option<Metrics>,
211}
212
213impl<T> Clone for Inner<T> {
214    fn clone(&self) -> Self {
215        Self {
216            data: self.data.clone(),
217            limit: self.limit,
218            limiter: self.limiter.clone(),
219            read_waker: self.read_waker.clone(),
220            metrics: self.metrics.clone(),
221        }
222    }
223}
224
225impl<T: InMemoryBufferable> Inner<T> {
226    fn new(
227        limit: MemoryBufferSize,
228        metric_metadata: Option<ChannelMetricMetadata>,
229        ewma_alpha: Option<f64>,
230    ) -> Self {
231        let read_waker = Arc::new(Notify::new());
232        let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_alpha));
233        match limit {
234            MemoryBufferSize::MaxEvents(max_events) => Inner {
235                data: Arc::new(ArrayQueue::new(max_events.get())),
236                limit,
237                limiter: Arc::new(Semaphore::new(max_events.get())),
238                read_waker,
239                metrics,
240            },
241            MemoryBufferSize::MaxSize(max_bytes) => Inner {
242                data: Arc::new(SegQueue::new()),
243                limit,
244                limiter: Arc::new(Semaphore::new(max_bytes.get())),
245                read_waker,
246                metrics,
247            },
248        }
249    }
250
251    /// Records a send after acquiring all required permits.
252    ///
253    /// The `total` value represents the channel utilization after this send completes.  It may be
254    /// greater than the configured limit because the channel intentionally allows a single
255    /// oversized payload to flow through rather than forcing the sender to split it.
256    fn send_with_permit(&mut self, total: usize, permits: OwnedSemaphorePermit, item: T) {
257        self.data.push((permits, item));
258        self.read_waker.notify_one();
259        // Due to the race between getting the available capacity, acquiring the permits, and the
260        // above push, the total may be inaccurate. Record it anyways as the histogram totals will
261        // _eventually_ converge on a true picture of the buffer utilization.
262        if let Some(metrics) = self.metrics.as_ref() {
263            metrics.record(total);
264        }
265    }
266}
267
268#[derive(Debug)]
269pub struct LimitedSender<T> {
270    inner: Inner<T>,
271    sender_count: Arc<AtomicUsize>,
272}
273
274impl<T: InMemoryBufferable> LimitedSender<T> {
275    #[allow(clippy::cast_possible_truncation)]
276    fn calc_required_permits(&self, item: &T) -> (usize, usize, u32) {
277        // We have to limit the number of permits we ask for to the overall limit since we're always
278        // willing to store more items than the limit if the queue is entirely empty, because
279        // otherwise we might deadlock ourselves by not being able to send a single item.
280        let (limit, value) = match self.inner.limit {
281            MemoryBufferSize::MaxSize(max_size) => (max_size, item.allocated_bytes()),
282            MemoryBufferSize::MaxEvents(max_events) => (max_events, item.event_count()),
283        };
284        let limit = limit.get();
285        (limit, value, cmp::min(limit, value) as u32)
286    }
287
288    /// Gets the number of items that this channel could accept.
289    pub fn available_capacity(&self) -> usize {
290        self.inner.limiter.available_permits()
291    }
292
293    /// Sends an item into the channel.
294    ///
295    /// # Errors
296    ///
297    /// If the receiver has disconnected (does not exist anymore), then `Err(SendError)` be returned
298    /// with the given `item`.
299    pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
300        // Calculate how many permits we need, and wait until we can acquire all of them.
301        let (limit, count, permits_required) = self.calc_required_permits(&item);
302        let in_use = limit.saturating_sub(self.available_capacity());
303        match self
304            .inner
305            .limiter
306            .clone()
307            .acquire_many_owned(permits_required)
308            .await
309        {
310            Ok(permits) => {
311                self.inner.send_with_permit(in_use + count, permits, item);
312                trace!("Sent item.");
313                Ok(())
314            }
315            Err(_) => Err(SendError(item)),
316        }
317    }
318
319    /// Attempts to send an item into the channel.
320    ///
321    /// # Errors
322    ///
323    /// If the receiver has disconnected (does not exist anymore), then
324    /// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
325    /// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
326    /// returned with the given `item`.
327    ///
328    /// # Panics
329    ///
330    /// Will panic if adding ack amount overflows.
331    pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
332        // Calculate how many permits we need, and try to acquire them all without waiting.
333        let (limit, count, permits_required) = self.calc_required_permits(&item);
334        let in_use = limit.saturating_sub(self.available_capacity());
335        match self
336            .inner
337            .limiter
338            .clone()
339            .try_acquire_many_owned(permits_required)
340        {
341            Ok(permits) => {
342                self.inner.send_with_permit(in_use + count, permits, item);
343                trace!("Attempt to send item succeeded.");
344                Ok(())
345            }
346            Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
347            Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
348        }
349    }
350}
351
352impl<T> Clone for LimitedSender<T> {
353    fn clone(&self) -> Self {
354        self.sender_count.fetch_add(1, Ordering::SeqCst);
355
356        Self {
357            inner: self.inner.clone(),
358            sender_count: Arc::clone(&self.sender_count),
359        }
360    }
361}
362
363impl<T> Drop for LimitedSender<T> {
364    fn drop(&mut self) {
365        // If we're the last sender to drop, close the semaphore on our way out the door.
366        if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
367            self.inner.limiter.close();
368            self.inner.read_waker.notify_one();
369        }
370    }
371}
372
373#[derive(Debug)]
374pub struct LimitedReceiver<T> {
375    inner: Inner<T>,
376}
377
378impl<T: Send + 'static> LimitedReceiver<T> {
379    /// Gets the number of items that this channel could accept.
380    pub fn available_capacity(&self) -> usize {
381        self.inner.limiter.available_permits()
382    }
383
384    pub async fn next(&mut self) -> Option<T> {
385        loop {
386            if let Some((_permit, item)) = self.inner.data.pop() {
387                return Some(item);
388            }
389
390            // There wasn't an item for us to pop, so see if the channel is actually closed.  If so,
391            // then it's time for us to close up shop as well.
392            if self.inner.limiter.is_closed() {
393                return None;
394            }
395
396            // We're not closed, so we need to wait for a writer to tell us they made some
397            // progress.  This might end up being a spurious wakeup since `Notify` will
398            // store a wake-up if there are no waiters, but oh well.
399            self.inner.read_waker.notified().await;
400        }
401    }
402
403    pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
404        let mut receiver = self;
405        Box::pin(stream! {
406            while let Some(item) = receiver.next().await {
407                yield item;
408            }
409        })
410    }
411}
412
413impl<T> Drop for LimitedReceiver<T> {
414    fn drop(&mut self) {
415        // Notify senders that the channel is now closed by closing the semaphore.  Any pending
416        // acquisitions will be awoken and notified that the semaphore is closed, and further new
417        // sends will immediately see the semaphore is closed.
418        self.inner.limiter.close();
419    }
420}
421
422pub fn limited<T: InMemoryBufferable + fmt::Debug>(
423    limit: MemoryBufferSize,
424    metric_metadata: Option<ChannelMetricMetadata>,
425    ewma_alpha: Option<f64>,
426) -> (LimitedSender<T>, LimitedReceiver<T>) {
427    let inner = Inner::new(limit, metric_metadata, ewma_alpha);
428
429    let sender = LimitedSender {
430        inner: inner.clone(),
431        sender_count: Arc::new(AtomicUsize::new(1)),
432    };
433    let receiver = LimitedReceiver { inner };
434
435    (sender, receiver)
436}
437
438#[cfg(test)]
439mod tests {
440    use std::num::NonZeroUsize;
441
442    use tokio_test::{assert_pending, assert_ready, task::spawn};
443    use vector_common::byte_size_of::ByteSizeOf;
444
445    use super::{ChannelMetricMetadata, limited};
446    use crate::{
447        MemoryBufferSize,
448        test::MultiEventRecord,
449        topology::{channel::limited_queue::SendError, test_util::Sample},
450    };
451
452    #[tokio::test]
453    async fn send_receive() {
454        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
455        let (mut tx, mut rx) = limited(limit, None, None);
456
457        assert_eq!(2, tx.available_capacity());
458
459        let msg = Sample::new(42);
460
461        // Create our send and receive futures.
462        let mut send = spawn(async { tx.send(msg).await });
463
464        let mut recv = spawn(async { rx.next().await });
465
466        // Nobody should be woken up.
467        assert!(!send.is_woken());
468        assert!(!recv.is_woken());
469
470        // Try polling our receive, which should be pending because we haven't anything yet.
471        assert_pending!(recv.poll());
472
473        // We should immediately be able to complete a send as there is available capacity.
474        assert_eq!(Ok(()), assert_ready!(send.poll()));
475
476        // Now our receive should have been woken up, and should immediately be ready.
477        assert!(recv.is_woken());
478        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
479    }
480
481    #[tokio::test]
482    async fn records_utilization_on_send() {
483        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
484        let (mut tx, mut rx) = limited(
485            limit,
486            Some(ChannelMetricMetadata::new("test_channel", None)),
487            None,
488        );
489
490        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
491
492        tx.send(Sample::new(1)).await.expect("send should succeed");
493        assert_eq!(metrics.lock().unwrap().last().copied(), Some(1));
494
495        let _ = rx.next().await;
496    }
497
498    #[test]
499    fn test_limiting_by_byte_size() {
500        let max_elements = 10;
501        let msg = Sample::new_with_heap_allocated_values(50);
502        let msg_size = msg.allocated_bytes();
503        let max_allowed_bytes = msg_size * max_elements;
504
505        // With this configuration a maximum of exactly 10 messages can fit in the channel
506        let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
507        let (mut tx, mut rx) = limited(limit, None, None);
508
509        assert_eq!(max_allowed_bytes, tx.available_capacity());
510
511        // Send 10 messages into the channel, filling it
512        for _ in 0..10 {
513            let msg_clone = msg.clone();
514            let mut f = spawn(async { tx.send(msg_clone).await });
515            assert_eq!(Ok(()), assert_ready!(f.poll()));
516        }
517        // With the 10th message in the channel no space should be left
518        assert_eq!(0, tx.available_capacity());
519
520        // Attemting to produce one more then the max capacity should block
521        let mut send_final = spawn({
522            let msg_clone = msg.clone();
523            async { tx.send(msg_clone).await }
524        });
525        assert_pending!(send_final.poll());
526
527        // Read all data from the channel, assert final states are as expected
528        for _ in 0..10 {
529            let mut f = spawn(async { rx.next().await });
530            let value = assert_ready!(f.poll());
531            assert_eq!(value.allocated_bytes(), msg_size);
532        }
533        // Channel should have no more data
534        let mut recv = spawn(async { rx.next().await });
535        assert_pending!(recv.poll());
536    }
537
538    #[test]
539    fn sender_waits_for_more_capacity_when_none_available() {
540        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
541        let (mut tx, mut rx) = limited(limit, None, None);
542
543        assert_eq!(1, tx.available_capacity());
544
545        let msg1 = Sample::new(42);
546        let msg2 = Sample::new(43);
547
548        // Create our send and receive futures.
549        let mut send1 = spawn(async { tx.send(msg1).await });
550
551        let mut recv1 = spawn(async { rx.next().await });
552
553        // Nobody should be woken up.
554        assert!(!send1.is_woken());
555        assert!(!recv1.is_woken());
556
557        // Try polling our receive, which should be pending because we haven't anything yet.
558        assert_pending!(recv1.poll());
559
560        // We should immediately be able to complete a send as there is available capacity.
561        assert_eq!(Ok(()), assert_ready!(send1.poll()));
562        drop(send1);
563
564        assert_eq!(0, tx.available_capacity());
565
566        // Now our receive should have been woken up, and should immediately be ready... but we
567        // aren't going to read the value just yet.
568        assert!(recv1.is_woken());
569
570        // Now trigger a second send, which should block as there's no available capacity.
571        let mut send2 = spawn(async { tx.send(msg2).await });
572
573        assert!(!send2.is_woken());
574        assert_pending!(send2.poll());
575
576        // Now if we receive the item, our second send should be woken up and be able to send in.
577        assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
578        drop(recv1);
579
580        // Since the second send was already waiting for permits, the semaphore returns them
581        // directly to our waiting send, which should now be woken up and able to complete:
582        assert_eq!(0, rx.available_capacity());
583        assert!(send2.is_woken());
584
585        let mut recv2 = spawn(async { rx.next().await });
586        assert_pending!(recv2.poll());
587
588        assert_eq!(Ok(()), assert_ready!(send2.poll()));
589        drop(send2);
590
591        assert_eq!(0, tx.available_capacity());
592
593        // And the final receive to get our second send:
594        assert!(recv2.is_woken());
595        assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
596
597        assert_eq!(1, tx.available_capacity());
598    }
599
600    #[test]
601    fn sender_waits_for_more_capacity_when_partial_available() {
602        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
603        let (mut tx, mut rx) = limited(limit, None, None);
604
605        assert_eq!(7, tx.available_capacity());
606
607        let msgs1 = vec![
608            MultiEventRecord::new(1),
609            MultiEventRecord::new(2),
610            MultiEventRecord::new(3),
611        ];
612        let msg2 = MultiEventRecord::new(4);
613
614        // Create our send and receive futures.
615        let mut small_sends = spawn(async {
616            for msg in msgs1.clone() {
617                tx.send(msg).await?;
618            }
619
620            Ok::<_, SendError<MultiEventRecord>>(())
621        });
622
623        let mut recv1 = spawn(async { rx.next().await });
624
625        // Nobody should be woken up.
626        assert!(!small_sends.is_woken());
627        assert!(!recv1.is_woken());
628
629        // Try polling our receive, which should be pending because we haven't anything yet.
630        assert_pending!(recv1.poll());
631
632        // We should immediately be able to complete our three event sends, which we have
633        // available capacity for, but will consume all but one of the available slots.
634        assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
635        drop(small_sends);
636
637        assert_eq!(1, tx.available_capacity());
638
639        // Now our receive should have been woken up, and should immediately be ready, but we won't
640        // receive just yet.
641        assert!(recv1.is_woken());
642
643        // Now trigger a second send that has four events, and needs to wait for two receives to happen.
644        let mut send2 = spawn(tx.send(msg2.clone()));
645
646        assert!(!send2.is_woken());
647        assert_pending!(send2.poll());
648
649        // Now if we receive the first item, our second send should be woken up but still not able
650        // to send.
651        assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
652        drop(recv1);
653
654        // Callers waiting to acquire permits have the permits immediately transfer to them when one
655        // (or more) are released, so we expect this to be zero until we send and then read the
656        // third item.
657        assert_eq!(0, rx.available_capacity());
658
659        // We don't get woken up until all permits have been acquired.
660        assert!(!send2.is_woken());
661
662        // Our second read should unlock enough available capacity for the second send once complete.
663        let mut recv2 = spawn(async { rx.next().await });
664        assert!(!recv2.is_woken());
665        assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
666        drop(recv2);
667
668        assert_eq!(0, rx.available_capacity());
669
670        assert!(send2.is_woken());
671        assert_eq!(Ok(()), assert_ready!(send2.poll()));
672
673        // And just make sure we see those last two sends.
674        let mut recv3 = spawn(async { rx.next().await });
675        assert!(!recv3.is_woken());
676        assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
677        drop(recv3);
678
679        assert_eq!(3, rx.available_capacity());
680
681        let mut recv4 = spawn(async { rx.next().await });
682        assert!(!recv4.is_woken());
683        assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
684        drop(recv4);
685
686        assert_eq!(7, rx.available_capacity());
687    }
688
689    #[test]
690    fn empty_receiver_returns_none_when_last_sender_drops() {
691        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
692        let (mut tx, mut rx) = limited(limit, None, None);
693
694        assert_eq!(1, tx.available_capacity());
695
696        let tx2 = tx.clone();
697        let msg = Sample::new(42);
698
699        // Create our send and receive futures.
700        let mut send = spawn(async { tx.send(msg).await });
701
702        let mut recv = spawn(async { rx.next().await });
703
704        // Nobody should be woken up.
705        assert!(!send.is_woken());
706        assert!(!recv.is_woken());
707
708        // Try polling our receive, which should be pending because we haven't anything yet.
709        assert_pending!(recv.poll());
710
711        // Now drop our second sender, which shouldn't do anything yet.
712        drop(tx2);
713        assert!(!recv.is_woken());
714        assert_pending!(recv.poll());
715
716        // Now drop our second sender, but not before doing a send, which should trigger closing the
717        // semaphore which should let the receiver complete with no further waiting: one item and
718        // then `None`.
719        assert_eq!(Ok(()), assert_ready!(send.poll()));
720        drop(send);
721        drop(tx);
722
723        assert!(recv.is_woken());
724        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
725        drop(recv);
726
727        let mut recv2 = spawn(async { rx.next().await });
728        assert!(!recv2.is_woken());
729        assert_eq!(None, assert_ready!(recv2.poll()));
730    }
731
732    #[test]
733    fn receiver_returns_none_once_empty_when_last_sender_drops() {
734        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
735        let (tx, mut rx) = limited::<Sample>(limit, None, None);
736
737        assert_eq!(1, tx.available_capacity());
738
739        let tx2 = tx.clone();
740
741        // Create our receive future.
742        let mut recv = spawn(async { rx.next().await });
743
744        // Nobody should be woken up.
745        assert!(!recv.is_woken());
746
747        // Try polling our receive, which should be pending because we haven't anything yet.
748        assert_pending!(recv.poll());
749
750        // Now drop our first sender, which shouldn't do anything yet.
751        drop(tx);
752        assert!(!recv.is_woken());
753        assert_pending!(recv.poll());
754
755        // Now drop our second sender, which should trigger closing the semaphore which should let
756        // the receive complete as there are no items to read.
757        drop(tx2);
758        assert!(recv.is_woken());
759        assert_eq!(None, assert_ready!(recv.poll()));
760    }
761
762    #[test]
763    fn oversized_send_allowed_when_empty() {
764        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
765        let (mut tx, mut rx) = limited(limit, None, None);
766
767        assert_eq!(1, tx.available_capacity());
768
769        let msg = MultiEventRecord::new(2);
770
771        // Create our send and receive futures.
772        let mut send = spawn(async { tx.send(msg.clone()).await });
773
774        let mut recv = spawn(async { rx.next().await });
775
776        // Nobody should be woken up.
777        assert!(!send.is_woken());
778        assert!(!recv.is_woken());
779
780        // We should immediately be able to complete our send, which we don't have full
781        // available capacity for, but will consume all of the available slots.
782        assert_eq!(Ok(()), assert_ready!(send.poll()));
783        drop(send);
784
785        assert_eq!(0, tx.available_capacity());
786
787        // Now we should be able to get back the oversized item, but our capacity should not be
788        // greater than what we started with.
789        assert_eq!(Some(msg), assert_ready!(recv.poll()));
790        drop(recv);
791
792        assert_eq!(1, rx.available_capacity());
793    }
794
795    #[test]
796    fn oversized_send_allowed_when_partial_capacity() {
797        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
798        let (mut tx, mut rx) = limited(limit, None, None);
799
800        assert_eq!(2, tx.available_capacity());
801
802        let msg1 = MultiEventRecord::new(1);
803        let msg2 = MultiEventRecord::new(3);
804
805        // Create our send future.
806        let mut send = spawn(async { tx.send(msg1.clone()).await });
807
808        // Nobody should be woken up.
809        assert!(!send.is_woken());
810
811        // We should immediately be able to complete our send, which will only use up a single slot.
812        assert_eq!(Ok(()), assert_ready!(send.poll()));
813        drop(send);
814
815        assert_eq!(1, tx.available_capacity());
816
817        // Now we'll trigger another send which has an oversized item.  It shouldn't be able to send
818        // until all permits are available.
819        let mut send2 = spawn(async { tx.send(msg2.clone()).await });
820
821        assert!(!send2.is_woken());
822        assert_pending!(send2.poll());
823
824        assert_eq!(0, rx.available_capacity());
825
826        // Now do a receive which should return the one consumed slot, essentially allowing all
827        // permits to be acquired by the blocked send.
828        let mut recv = spawn(async { rx.next().await });
829        assert!(!recv.is_woken());
830        assert!(!send2.is_woken());
831
832        assert_eq!(Some(msg1), assert_ready!(recv.poll()));
833        drop(recv);
834
835        assert_eq!(0, rx.available_capacity());
836
837        // Now our blocked send should be able to proceed, and we should be able to read back the
838        // item.
839        assert_eq!(Ok(()), assert_ready!(send2.poll()));
840        drop(send2);
841
842        assert_eq!(0, tx.available_capacity());
843
844        let mut recv2 = spawn(async { rx.next().await });
845        assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
846
847        assert_eq!(2, tx.available_capacity());
848    }
849}