vector_buffers/topology/channel/
limited_queue.rs

1use std::{
2    cmp,
3    fmt::{self, Debug},
4    num::NonZeroUsize,
5    pin::Pin,
6    sync::{
7        Arc,
8        atomic::{AtomicUsize, Ordering},
9    },
10    time::Instant,
11};
12
13#[cfg(test)]
14use std::sync::Mutex;
15
16use async_stream::stream;
17use crossbeam_queue::{ArrayQueue, SegQueue};
18use futures::Stream;
19use metrics::{Gauge, Histogram, gauge, histogram};
20use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
21use vector_common::stats::TimeEwmaGauge;
22
23use crate::{InMemoryBufferable, config::MemoryBufferSize};
24
25pub const DEFAULT_EWMA_HALF_LIFE_SECONDS: f64 = 5.0;
26
27/// Error returned by `LimitedSender::send` when the receiver has disconnected.
28#[derive(Debug, PartialEq, Eq)]
29pub struct SendError<T>(pub T);
30
31impl<T> fmt::Display for SendError<T> {
32    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
33        write!(fmt, "receiver disconnected")
34    }
35}
36
37impl<T: fmt::Debug> std::error::Error for SendError<T> {}
38
39/// Error returned by `LimitedSender::try_send`.
40#[derive(Debug, PartialEq, Eq)]
41pub enum TrySendError<T> {
42    InsufficientCapacity(T),
43    Disconnected(T),
44}
45
46impl<T> TrySendError<T> {
47    pub fn into_inner(self) -> T {
48        match self {
49            Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
50        }
51    }
52}
53
54impl<T> fmt::Display for TrySendError<T> {
55    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
56        match self {
57            Self::InsufficientCapacity(_) => {
58                write!(fmt, "channel lacks sufficient capacity for send")
59            }
60            Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
61        }
62    }
63}
64
65impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
66
67// Trait over common queue operations so implementation can be chosen at initialization phase
68trait QueueImpl<T>: Send + Sync + fmt::Debug {
69    fn push(&self, item: T);
70    fn pop(&self) -> Option<T>;
71}
72
73impl<T> QueueImpl<T> for ArrayQueue<T>
74where
75    T: Send + Sync + fmt::Debug,
76{
77    fn push(&self, item: T) {
78        self.push(item)
79            .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
80    }
81
82    fn pop(&self) -> Option<T> {
83        self.pop()
84    }
85}
86
87impl<T> QueueImpl<T> for SegQueue<T>
88where
89    T: Send + Sync + fmt::Debug,
90{
91    fn push(&self, item: T) {
92        self.push(item);
93    }
94
95    fn pop(&self) -> Option<T> {
96        self.pop()
97    }
98}
99
100#[derive(Clone, Debug)]
101pub struct ChannelMetricMetadata {
102    prefix: &'static str,
103    output: Option<String>,
104}
105
106impl ChannelMetricMetadata {
107    pub fn new(prefix: &'static str, output: Option<String>) -> Self {
108        Self { prefix, output }
109    }
110}
111
112#[derive(Clone, Debug)]
113struct Metrics {
114    histogram: Histogram,
115    gauge: Gauge,
116    mean_gauge: TimeEwmaGauge,
117    // We hold a handle to the max gauge to avoid it being dropped by the metrics collector, but
118    // since the value is static, we never need to update it. The compiler detects this as an unused
119    // field, so we need to suppress the warning here.
120    #[expect(dead_code)]
121    max_gauge: Gauge,
122    #[expect(dead_code)]
123    legacy_max_gauge: Gauge,
124    #[cfg(test)]
125    recorded_values: Arc<Mutex<Vec<usize>>>,
126}
127
128impl Metrics {
129    #[expect(clippy::cast_precision_loss)] // We have to convert buffer sizes for a gauge, it's okay to lose precision here.
130    fn new(
131        limit: MemoryBufferSize,
132        metadata: ChannelMetricMetadata,
133        ewma_half_life_seconds: Option<f64>,
134    ) -> Self {
135        let ewma_half_life_seconds =
136            ewma_half_life_seconds.unwrap_or(DEFAULT_EWMA_HALF_LIFE_SECONDS);
137        let ChannelMetricMetadata { prefix, output } = metadata;
138        let (legacy_suffix, gauge_suffix, max_value) = match limit {
139            MemoryBufferSize::MaxEvents(max_events) => (
140                "_max_event_size",
141                "_max_size_events",
142                max_events.get() as f64,
143            ),
144            MemoryBufferSize::MaxSize(max_bytes) => {
145                ("_max_byte_size", "_max_size_bytes", max_bytes.get() as f64)
146            }
147        };
148        let max_gauge_name = format!("{prefix}{gauge_suffix}");
149        let legacy_max_gauge_name = format!("{prefix}{legacy_suffix}");
150        let histogram_name = format!("{prefix}_utilization");
151        let gauge_name = format!("{prefix}_utilization_level");
152        let mean_name = format!("{prefix}_utilization_mean");
153        #[cfg(test)]
154        let recorded_values = Arc::new(Mutex::new(Vec::new()));
155        if let Some(label_value) = output {
156            let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
157            max_gauge.set(max_value);
158            let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
159            // DEPRECATED: buffer-bytes-events-metrics
160            let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
161            legacy_max_gauge.set(max_value);
162            Self {
163                histogram: histogram!(histogram_name, "output" => label_value.clone()),
164                gauge: gauge!(gauge_name, "output" => label_value.clone()),
165                mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
166                max_gauge,
167                legacy_max_gauge,
168                #[cfg(test)]
169                recorded_values,
170            }
171        } else {
172            let max_gauge = gauge!(max_gauge_name);
173            max_gauge.set(max_value);
174            let mean_gauge_handle = gauge!(mean_name);
175            // DEPRECATED: buffer-bytes-events-metrics
176            let legacy_max_gauge = gauge!(legacy_max_gauge_name);
177            legacy_max_gauge.set(max_value);
178            Self {
179                histogram: histogram!(histogram_name),
180                gauge: gauge!(gauge_name),
181                mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
182                max_gauge,
183                legacy_max_gauge,
184                #[cfg(test)]
185                recorded_values,
186            }
187        }
188    }
189
190    #[expect(clippy::cast_precision_loss)]
191    fn record(&self, value: usize, reference: Instant) {
192        self.histogram.record(value as f64);
193        self.gauge.set(value as f64);
194        self.mean_gauge.record(value as f64, reference);
195        #[cfg(test)]
196        if let Ok(mut recorded) = self.recorded_values.lock() {
197            recorded.push(value);
198        }
199    }
200}
201
202#[derive(Debug)]
203struct Inner<T> {
204    data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
205    limit: MemoryBufferSize,
206    limiter: Arc<Semaphore>,
207    read_waker: Arc<Notify>,
208    metrics: Option<Metrics>,
209    capacity: NonZeroUsize,
210}
211
212impl<T> Clone for Inner<T> {
213    fn clone(&self) -> Self {
214        Self {
215            data: self.data.clone(),
216            limit: self.limit,
217            limiter: self.limiter.clone(),
218            read_waker: self.read_waker.clone(),
219            metrics: self.metrics.clone(),
220            capacity: self.capacity,
221        }
222    }
223}
224
225impl<T: Send + Sync + Debug + 'static> Inner<T> {
226    fn new(
227        limit: MemoryBufferSize,
228        metric_metadata: Option<ChannelMetricMetadata>,
229        ewma_half_life_seconds: Option<f64>,
230    ) -> Self {
231        let read_waker = Arc::new(Notify::new());
232        let metrics =
233            metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_half_life_seconds));
234        match limit {
235            MemoryBufferSize::MaxEvents(max_events) => Inner {
236                data: Arc::new(ArrayQueue::new(max_events.get())),
237                limit,
238                limiter: Arc::new(Semaphore::new(max_events.get())),
239                read_waker,
240                metrics,
241                capacity: max_events,
242            },
243            MemoryBufferSize::MaxSize(max_bytes) => Inner {
244                data: Arc::new(SegQueue::new()),
245                limit,
246                limiter: Arc::new(Semaphore::new(max_bytes.get())),
247                read_waker,
248                metrics,
249                capacity: max_bytes,
250            },
251        }
252    }
253
254    /// Records a send after acquiring all required permits.
255    ///
256    /// The `size` value is the true utilization contribution of `item`, which may exceed the number
257    /// of permits acquired for oversized payloads.
258    fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
259        if let Some(metrics) = &self.metrics {
260            // For normal items, capacity - available_permits() exactly represents the total queued
261            // utilization (including this item's just-acquired permits). For oversized items that
262            // acquired fewer permits than their true size, `size` is the correct utilization since
263            // the queue must have been empty for the oversized acquire to succeed.
264            let utilization = size.max(self.used_capacity());
265            metrics.record(utilization, Instant::now());
266        }
267        self.data.push((permits, item));
268        self.read_waker.notify_one();
269    }
270}
271
272impl<T> Inner<T> {
273    fn used_capacity(&self) -> usize {
274        self.capacity.get() - self.limiter.available_permits()
275    }
276
277    fn pop_and_record(&self) -> Option<T> {
278        self.data.pop().map(|(permit, item)| {
279            if let Some(metrics) = &self.metrics {
280                // Compute remaining utilization from the semaphore state. Since our permits haven't
281                // been released yet, used_capacity is stable against racing senders acquiring those
282                // permits.
283                let utilization = self.used_capacity().saturating_sub(permit.num_permits());
284                metrics.record(utilization, Instant::now());
285            }
286            // Release permits after recording so a waiting sender cannot enqueue a new item
287            // before this pop's utilization measurement is taken.
288            drop(permit);
289            item
290        })
291    }
292}
293
294#[derive(Debug)]
295pub struct LimitedSender<T> {
296    inner: Inner<T>,
297    sender_count: Arc<AtomicUsize>,
298}
299
300impl<T: InMemoryBufferable> LimitedSender<T> {
301    #[allow(clippy::cast_possible_truncation)]
302    fn calc_required_permits(&self, item: &T) -> (usize, u32) {
303        // We have to limit the number of permits we ask for to the overall limit since we're always
304        // willing to store more items than the limit if the queue is entirely empty, because
305        // otherwise we might deadlock ourselves by not being able to send a single item.
306        let value = match self.inner.limit {
307            MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
308            MemoryBufferSize::MaxEvents(_) => item.event_count(),
309        };
310        let limit = self.inner.capacity.get();
311        (value, cmp::min(limit, value) as u32)
312    }
313
314    /// Gets the number of items that this channel could accept.
315    pub fn available_capacity(&self) -> usize {
316        self.inner.limiter.available_permits()
317    }
318
319    /// Sends an item into the channel.
320    ///
321    /// # Errors
322    ///
323    /// If the receiver has disconnected (does not exist anymore), then `Err(SendError)` be returned
324    /// with the given `item`.
325    pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
326        // Calculate how many permits we need, and wait until we can acquire all of them.
327        let (size, permits_required) = self.calc_required_permits(&item);
328        match self
329            .inner
330            .limiter
331            .clone()
332            .acquire_many_owned(permits_required)
333            .await
334        {
335            Ok(permits) => {
336                self.inner.send_with_permits(size, permits, item);
337                trace!("Sent item.");
338                Ok(())
339            }
340            Err(_) => Err(SendError(item)),
341        }
342    }
343
344    /// Attempts to send an item into the channel.
345    ///
346    /// # Errors
347    ///
348    /// If the receiver has disconnected (does not exist anymore), then
349    /// `Err(TrySendError::Disconnected)` be returned with the given `item`. If the channel has
350    /// insufficient capacity for the item, then `Err(TrySendError::InsufficientCapacity)` will be
351    /// returned with the given `item`.
352    ///
353    /// # Panics
354    ///
355    /// Will panic if adding ack amount overflows.
356    pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
357        // Calculate how many permits we need, and try to acquire them all without waiting.
358        let (size, permits_required) = self.calc_required_permits(&item);
359        match self
360            .inner
361            .limiter
362            .clone()
363            .try_acquire_many_owned(permits_required)
364        {
365            Ok(permits) => {
366                self.inner.send_with_permits(size, permits, item);
367                trace!("Attempt to send item succeeded.");
368                Ok(())
369            }
370            Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
371            Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
372        }
373    }
374}
375
376impl<T> Clone for LimitedSender<T> {
377    fn clone(&self) -> Self {
378        self.sender_count.fetch_add(1, Ordering::SeqCst);
379
380        Self {
381            inner: self.inner.clone(),
382            sender_count: Arc::clone(&self.sender_count),
383        }
384    }
385}
386
387impl<T> Drop for LimitedSender<T> {
388    fn drop(&mut self) {
389        // If we're the last sender to drop, close the semaphore on our way out the door.
390        if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
391            self.inner.limiter.close();
392            self.inner.read_waker.notify_one();
393        }
394    }
395}
396
397#[derive(Debug)]
398pub struct LimitedReceiver<T> {
399    inner: Inner<T>,
400}
401
402impl<T: Send + 'static> LimitedReceiver<T> {
403    /// Gets the number of items that this channel could accept.
404    pub fn available_capacity(&self) -> usize {
405        self.inner.limiter.available_permits()
406    }
407
408    pub async fn next(&mut self) -> Option<T> {
409        loop {
410            if let Some(item) = self.inner.pop_and_record() {
411                return Some(item);
412            }
413
414            // There wasn't an item for us to pop, so see if the channel is actually closed.  If so,
415            // then it's time for us to close up shop as well.
416            if self.inner.limiter.is_closed() {
417                if self.available_capacity() < self.inner.capacity.get() {
418                    // We only terminate when closed and fully drained. A close can race with queue
419                    // visibility while items/in-flight permits still exist.
420                    tokio::task::yield_now().await;
421                    continue;
422                }
423                return None;
424            }
425
426            // We're not closed, so we need to wait for a writer to tell us they made some
427            // progress.  This might end up being a spurious wakeup since `Notify` will
428            // store a wake-up if there are no waiters, but oh well.
429            self.inner.read_waker.notified().await;
430        }
431    }
432
433    pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
434        let mut receiver = self;
435        Box::pin(stream! {
436            while let Some(item) = receiver.next().await {
437                yield item;
438            }
439        })
440    }
441}
442
443impl<T> Drop for LimitedReceiver<T> {
444    fn drop(&mut self) {
445        // Notify senders that the channel is now closed by closing the semaphore.  Any pending
446        // acquisitions will be awoken and notified that the semaphore is closed, and further new
447        // sends will immediately see the semaphore is closed.
448        self.inner.limiter.close();
449    }
450}
451
452pub fn limited<T: InMemoryBufferable + fmt::Debug>(
453    limit: MemoryBufferSize,
454    metric_metadata: Option<ChannelMetricMetadata>,
455    ewma_half_life_seconds: Option<f64>,
456) -> (LimitedSender<T>, LimitedReceiver<T>) {
457    let inner = Inner::new(limit, metric_metadata, ewma_half_life_seconds);
458
459    let sender = LimitedSender {
460        inner: inner.clone(),
461        sender_count: Arc::new(AtomicUsize::new(1)),
462    };
463    let receiver = LimitedReceiver { inner };
464
465    (sender, receiver)
466}
467
468#[cfg(test)]
469mod tests {
470    use std::num::NonZeroUsize;
471
472    use rand::{Rng as _, SeedableRng as _, rngs::SmallRng};
473    use tokio_test::{assert_pending, assert_ready, task::spawn};
474    use vector_common::byte_size_of::ByteSizeOf;
475
476    use super::{ChannelMetricMetadata, LimitedReceiver, LimitedSender, limited};
477    use crate::{
478        MemoryBufferSize,
479        test::MultiEventRecord,
480        topology::{channel::limited_queue::SendError, test_util::Sample},
481    };
482
483    #[tokio::test]
484    async fn send_receive() {
485        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
486        let (mut tx, mut rx) = limited(limit, None, None);
487
488        assert_eq!(2, tx.available_capacity());
489
490        let msg = Sample::new(42);
491
492        // Create our send and receive futures.
493        let mut send = spawn(async { tx.send(msg).await });
494
495        let mut recv = spawn(async { rx.next().await });
496
497        // Nobody should be woken up.
498        assert!(!send.is_woken());
499        assert!(!recv.is_woken());
500
501        // Try polling our receive, which should be pending because we haven't anything yet.
502        assert_pending!(recv.poll());
503
504        // We should immediately be able to complete a send as there is available capacity.
505        assert_eq!(Ok(()), assert_ready!(send.poll()));
506
507        // Now our receive should have been woken up, and should immediately be ready.
508        assert!(recv.is_woken());
509        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
510    }
511
512    #[tokio::test]
513    async fn records_utilization() {
514        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
515        let (mut tx, mut rx) = limited(
516            limit,
517            Some(ChannelMetricMetadata::new("test_channel", None)),
518            None,
519        );
520
521        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
522
523        tx.send(Sample::new(1)).await.expect("send should succeed");
524        let records = metrics.lock().unwrap().clone();
525        assert_eq!(records.len(), 1);
526        assert_eq!(records.last().copied(), Some(1));
527
528        assert_eq!(Sample::new(1), rx.next().await.unwrap());
529        let records = metrics.lock().unwrap();
530        assert_eq!(records.len(), 2);
531        assert_eq!(records.last().copied(), Some(0));
532    }
533
534    #[tokio::test]
535    async fn oversized_send_records_true_utilization_via_normal_send_path() {
536        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
537        let (mut tx, mut rx) = limited(
538            limit,
539            Some(ChannelMetricMetadata::new("test_channel_oversized", None)),
540            None,
541        );
542        let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
543
544        // Normal send path: permits are capped to the limit (2), but utilization should reflect
545        // the true item contribution (3).
546        let oversized = MultiEventRecord::new(3);
547        tx.send(oversized.clone())
548            .await
549            .expect("send should succeed");
550
551        let records = metrics.lock().unwrap().clone();
552        assert_eq!(records.len(), 1);
553        assert_eq!(records.last().copied(), Some(3));
554
555        assert_eq!(Some(oversized), rx.next().await);
556        let records = metrics.lock().unwrap().clone();
557        assert_eq!(records.len(), 2);
558        assert_eq!(records.last().copied(), Some(0));
559    }
560
561    #[test]
562    fn test_limiting_by_byte_size() {
563        let max_elements = 10;
564        let msg = Sample::new_with_heap_allocated_values(50);
565        let msg_size = msg.allocated_bytes();
566        let max_allowed_bytes = msg_size * max_elements;
567
568        // With this configuration a maximum of exactly 10 messages can fit in the channel
569        let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
570        let (mut tx, mut rx) = limited(limit, None, None);
571
572        assert_eq!(max_allowed_bytes, tx.available_capacity());
573
574        // Send 10 messages into the channel, filling it
575        for _ in 0..10 {
576            let msg_clone = msg.clone();
577            let mut f = spawn(async { tx.send(msg_clone).await });
578            assert_eq!(Ok(()), assert_ready!(f.poll()));
579        }
580        // With the 10th message in the channel no space should be left
581        assert_eq!(0, tx.available_capacity());
582
583        // Attemting to produce one more then the max capacity should block
584        let mut send_final = spawn({
585            let msg_clone = msg.clone();
586            async { tx.send(msg_clone).await }
587        });
588        assert_pending!(send_final.poll());
589
590        // Read all data from the channel, assert final states are as expected
591        for _ in 0..10 {
592            let mut f = spawn(async { rx.next().await });
593            let value = assert_ready!(f.poll());
594            assert_eq!(value.allocated_bytes(), msg_size);
595        }
596        // Channel should have no more data
597        let mut recv = spawn(async { rx.next().await });
598        assert_pending!(recv.poll());
599    }
600
601    #[test]
602    fn sender_waits_for_more_capacity_when_none_available() {
603        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
604        let (mut tx, mut rx) = limited(limit, None, None);
605
606        assert_eq!(1, tx.available_capacity());
607
608        let msg1 = Sample::new(42);
609        let msg2 = Sample::new(43);
610
611        // Create our send and receive futures.
612        let mut send1 = spawn(async { tx.send(msg1).await });
613
614        let mut recv1 = spawn(async { rx.next().await });
615
616        // Nobody should be woken up.
617        assert!(!send1.is_woken());
618        assert!(!recv1.is_woken());
619
620        // Try polling our receive, which should be pending because we haven't anything yet.
621        assert_pending!(recv1.poll());
622
623        // We should immediately be able to complete a send as there is available capacity.
624        assert_eq!(Ok(()), assert_ready!(send1.poll()));
625        drop(send1);
626
627        assert_eq!(0, tx.available_capacity());
628
629        // Now our receive should have been woken up, and should immediately be ready... but we
630        // aren't going to read the value just yet.
631        assert!(recv1.is_woken());
632
633        // Now trigger a second send, which should block as there's no available capacity.
634        let mut send2 = spawn(async { tx.send(msg2).await });
635
636        assert!(!send2.is_woken());
637        assert_pending!(send2.poll());
638
639        // Now if we receive the item, our second send should be woken up and be able to send in.
640        assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
641        drop(recv1);
642
643        // Since the second send was already waiting for permits, the semaphore returns them
644        // directly to our waiting send, which should now be woken up and able to complete:
645        assert_eq!(0, rx.available_capacity());
646        assert!(send2.is_woken());
647
648        let mut recv2 = spawn(async { rx.next().await });
649        assert_pending!(recv2.poll());
650
651        assert_eq!(Ok(()), assert_ready!(send2.poll()));
652        drop(send2);
653
654        assert_eq!(0, tx.available_capacity());
655
656        // And the final receive to get our second send:
657        assert!(recv2.is_woken());
658        assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
659
660        assert_eq!(1, tx.available_capacity());
661    }
662
663    #[test]
664    fn sender_waits_for_more_capacity_when_partial_available() {
665        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
666        let (mut tx, mut rx) = limited(limit, None, None);
667
668        assert_eq!(7, tx.available_capacity());
669
670        let msgs1 = vec![
671            MultiEventRecord::new(1),
672            MultiEventRecord::new(2),
673            MultiEventRecord::new(3),
674        ];
675        let msg2 = MultiEventRecord::new(4);
676
677        // Create our send and receive futures.
678        let mut small_sends = spawn(async {
679            for msg in msgs1.clone() {
680                tx.send(msg).await?;
681            }
682
683            Ok::<_, SendError<MultiEventRecord>>(())
684        });
685
686        let mut recv1 = spawn(async { rx.next().await });
687
688        // Nobody should be woken up.
689        assert!(!small_sends.is_woken());
690        assert!(!recv1.is_woken());
691
692        // Try polling our receive, which should be pending because we haven't anything yet.
693        assert_pending!(recv1.poll());
694
695        // We should immediately be able to complete our three event sends, which we have
696        // available capacity for, but will consume all but one of the available slots.
697        assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
698        drop(small_sends);
699
700        assert_eq!(1, tx.available_capacity());
701
702        // Now our receive should have been woken up, and should immediately be ready, but we won't
703        // receive just yet.
704        assert!(recv1.is_woken());
705
706        // Now trigger a second send that has four events, and needs to wait for two receives to happen.
707        let mut send2 = spawn(tx.send(msg2.clone()));
708
709        assert!(!send2.is_woken());
710        assert_pending!(send2.poll());
711
712        // Now if we receive the first item, our second send should be woken up but still not able
713        // to send.
714        assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
715        drop(recv1);
716
717        // Callers waiting to acquire permits have the permits immediately transfer to them when one
718        // (or more) are released, so we expect this to be zero until we send and then read the
719        // third item.
720        assert_eq!(0, rx.available_capacity());
721
722        // We don't get woken up until all permits have been acquired.
723        assert!(!send2.is_woken());
724
725        // Our second read should unlock enough available capacity for the second send once complete.
726        let mut recv2 = spawn(async { rx.next().await });
727        assert!(!recv2.is_woken());
728        assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
729        drop(recv2);
730
731        assert_eq!(0, rx.available_capacity());
732
733        assert!(send2.is_woken());
734        assert_eq!(Ok(()), assert_ready!(send2.poll()));
735
736        // And just make sure we see those last two sends.
737        let mut recv3 = spawn(async { rx.next().await });
738        assert!(!recv3.is_woken());
739        assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
740        drop(recv3);
741
742        assert_eq!(3, rx.available_capacity());
743
744        let mut recv4 = spawn(async { rx.next().await });
745        assert!(!recv4.is_woken());
746        assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
747        drop(recv4);
748
749        assert_eq!(7, rx.available_capacity());
750    }
751
752    #[test]
753    fn empty_receiver_returns_none_when_last_sender_drops() {
754        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
755        let (mut tx, mut rx) = limited(limit, None, None);
756
757        assert_eq!(1, tx.available_capacity());
758
759        let tx2 = tx.clone();
760        let msg = Sample::new(42);
761
762        // Create our send and receive futures.
763        let mut send = spawn(async { tx.send(msg).await });
764
765        let mut recv = spawn(async { rx.next().await });
766
767        // Nobody should be woken up.
768        assert!(!send.is_woken());
769        assert!(!recv.is_woken());
770
771        // Try polling our receive, which should be pending because we haven't anything yet.
772        assert_pending!(recv.poll());
773
774        // Now drop our second sender, which shouldn't do anything yet.
775        drop(tx2);
776        assert!(!recv.is_woken());
777        assert_pending!(recv.poll());
778
779        // Now drop our second sender, but not before doing a send, which should trigger closing the
780        // semaphore which should let the receiver complete with no further waiting: one item and
781        // then `None`.
782        assert_eq!(Ok(()), assert_ready!(send.poll()));
783        drop(send);
784        drop(tx);
785
786        assert!(recv.is_woken());
787        assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
788        drop(recv);
789
790        let mut recv2 = spawn(async { rx.next().await });
791        assert!(!recv2.is_woken());
792        assert_eq!(None, assert_ready!(recv2.poll()));
793    }
794
795    #[test]
796    fn receiver_returns_none_once_empty_when_last_sender_drops() {
797        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
798        let (tx, mut rx) = limited::<Sample>(limit, None, None);
799
800        assert_eq!(1, tx.available_capacity());
801
802        let tx2 = tx.clone();
803
804        // Create our receive future.
805        let mut recv = spawn(async { rx.next().await });
806
807        // Nobody should be woken up.
808        assert!(!recv.is_woken());
809
810        // Try polling our receive, which should be pending because we haven't anything yet.
811        assert_pending!(recv.poll());
812
813        // Now drop our first sender, which shouldn't do anything yet.
814        drop(tx);
815        assert!(!recv.is_woken());
816        assert_pending!(recv.poll());
817
818        // Now drop our second sender, which should trigger closing the semaphore which should let
819        // the receive complete as there are no items to read.
820        drop(tx2);
821        assert!(recv.is_woken());
822        assert_eq!(None, assert_ready!(recv.poll()));
823    }
824
825    #[test]
826    fn oversized_send_allowed_when_empty() {
827        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
828        let (mut tx, mut rx) = limited(limit, None, None);
829
830        assert_eq!(1, tx.available_capacity());
831
832        let msg = MultiEventRecord::new(2);
833
834        // Create our send and receive futures.
835        let mut send = spawn(async { tx.send(msg.clone()).await });
836
837        let mut recv = spawn(async { rx.next().await });
838
839        // Nobody should be woken up.
840        assert!(!send.is_woken());
841        assert!(!recv.is_woken());
842
843        // We should immediately be able to complete our send, which we don't have full
844        // available capacity for, but will consume all of the available slots.
845        assert_eq!(Ok(()), assert_ready!(send.poll()));
846        drop(send);
847
848        assert_eq!(0, tx.available_capacity());
849
850        // Now we should be able to get back the oversized item, but our capacity should not be
851        // greater than what we started with.
852        assert_eq!(Some(msg), assert_ready!(recv.poll()));
853        drop(recv);
854
855        assert_eq!(1, rx.available_capacity());
856    }
857
858    #[test]
859    fn oversized_send_allowed_when_partial_capacity() {
860        let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
861        let (mut tx, mut rx) = limited(limit, None, None);
862
863        assert_eq!(2, tx.available_capacity());
864
865        let msg1 = MultiEventRecord::new(1);
866        let msg2 = MultiEventRecord::new(3);
867
868        // Create our send future.
869        let mut send = spawn(async { tx.send(msg1.clone()).await });
870
871        // Nobody should be woken up.
872        assert!(!send.is_woken());
873
874        // We should immediately be able to complete our send, which will only use up a single slot.
875        assert_eq!(Ok(()), assert_ready!(send.poll()));
876        drop(send);
877
878        assert_eq!(1, tx.available_capacity());
879
880        // Now we'll trigger another send which has an oversized item.  It shouldn't be able to send
881        // until all permits are available.
882        let mut send2 = spawn(async { tx.send(msg2.clone()).await });
883
884        assert!(!send2.is_woken());
885        assert_pending!(send2.poll());
886
887        assert_eq!(0, rx.available_capacity());
888
889        // Now do a receive which should return the one consumed slot, essentially allowing all
890        // permits to be acquired by the blocked send.
891        let mut recv = spawn(async { rx.next().await });
892        assert!(!recv.is_woken());
893        assert!(!send2.is_woken());
894
895        assert_eq!(Some(msg1), assert_ready!(recv.poll()));
896        drop(recv);
897
898        assert_eq!(0, rx.available_capacity());
899
900        // Now our blocked send should be able to proceed, and we should be able to read back the
901        // item.
902        assert_eq!(Ok(()), assert_ready!(send2.poll()));
903        drop(send2);
904
905        assert_eq!(0, tx.available_capacity());
906
907        let mut recv2 = spawn(async { rx.next().await });
908        assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
909
910        assert_eq!(2, tx.available_capacity());
911    }
912
913    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
914    async fn concurrent_send_receive_metrics_remain_valid() {
915        const ITEM_COUNT: usize = 4_000;
916
917        // Try different sizes of the buffer, from 10 to 1000 events.
918        for size in 1..=100 {
919            let limit = NonZeroUsize::new(size * 10).unwrap();
920            let (tx, rx) = limited(
921                MemoryBufferSize::MaxEvents(limit),
922                Some(ChannelMetricMetadata::new("test_channel_concurrent", None)),
923                None,
924            );
925            let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
926
927            let sender = tokio::spawn(send_samples(tx, ITEM_COUNT));
928            let receiver = tokio::spawn(receive_samples(rx, ITEM_COUNT));
929
930            sender.await.expect("sender task should not panic");
931            receiver.await.expect("receiver task should not panic");
932
933            let recorded = metrics.lock().unwrap().clone();
934            assert_eq!(
935                recorded.len(),
936                ITEM_COUNT * 2,
937                "expected one metric update per send and per receive"
938            );
939
940            // For MaxEvents with single-event messages, the occupancy counter tracks exact
941            // utilization, so values must stay within [0, limit].
942            let max_allowed = limit.get();
943            let observed_max = recorded.iter().copied().max().unwrap_or_default();
944            assert!(
945                recorded.iter().all(|value| *value <= max_allowed),
946                "observed utilization value above valid bound: max={observed_max}, allowed={max_allowed}"
947            );
948        }
949    }
950
951    async fn send_samples(mut tx: LimitedSender<Sample>, item_count: usize) {
952        let mut rng = SmallRng::from_rng(&mut rand::rng());
953
954        for i in 0..item_count {
955            tx.send(Sample::new(i as u64))
956                .await
957                .expect("send should succeed");
958            if rng.random::<u8>() % 8 == 0 {
959                tokio::task::yield_now().await;
960            }
961        }
962    }
963
964    async fn receive_samples(mut rx: LimitedReceiver<Sample>, item_count: usize) {
965        let mut rng = SmallRng::from_rng(&mut rand::rng());
966
967        for i in 0..item_count {
968            let next = rx
969                .next()
970                .await
971                .expect("receiver should yield all sent items");
972            assert_eq!(Sample::new(i as u64), next);
973            if rng.random::<u8>() % 8 == 0 {
974                tokio::task::yield_now().await;
975            }
976        }
977    }
978}