1use std::{
2 cmp,
3 fmt::{self, Debug},
4 num::NonZeroUsize,
5 pin::Pin,
6 sync::{
7 Arc,
8 atomic::{AtomicUsize, Ordering},
9 },
10 time::Instant,
11};
12
13#[cfg(test)]
14use std::sync::Mutex;
15
16use async_stream::stream;
17use crossbeam_queue::{ArrayQueue, SegQueue};
18use futures::Stream;
19use metrics::{Gauge, Histogram, gauge, histogram};
20use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
21use vector_common::stats::TimeEwmaGauge;
22
23use crate::{InMemoryBufferable, config::MemoryBufferSize};
24
25pub const DEFAULT_EWMA_HALF_LIFE_SECONDS: f64 = 5.0;
26
27#[derive(Debug, PartialEq, Eq)]
29pub struct SendError<T>(pub T);
30
31impl<T> fmt::Display for SendError<T> {
32 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
33 write!(fmt, "receiver disconnected")
34 }
35}
36
37impl<T: fmt::Debug> std::error::Error for SendError<T> {}
38
39#[derive(Debug, PartialEq, Eq)]
41pub enum TrySendError<T> {
42 InsufficientCapacity(T),
43 Disconnected(T),
44}
45
46impl<T> TrySendError<T> {
47 pub fn into_inner(self) -> T {
48 match self {
49 Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
50 }
51 }
52}
53
54impl<T> fmt::Display for TrySendError<T> {
55 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
56 match self {
57 Self::InsufficientCapacity(_) => {
58 write!(fmt, "channel lacks sufficient capacity for send")
59 }
60 Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
61 }
62 }
63}
64
65impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
66
67trait QueueImpl<T>: Send + Sync + fmt::Debug {
69 fn push(&self, item: T);
70 fn pop(&self) -> Option<T>;
71}
72
73impl<T> QueueImpl<T> for ArrayQueue<T>
74where
75 T: Send + Sync + fmt::Debug,
76{
77 fn push(&self, item: T) {
78 self.push(item)
79 .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
80 }
81
82 fn pop(&self) -> Option<T> {
83 self.pop()
84 }
85}
86
87impl<T> QueueImpl<T> for SegQueue<T>
88where
89 T: Send + Sync + fmt::Debug,
90{
91 fn push(&self, item: T) {
92 self.push(item);
93 }
94
95 fn pop(&self) -> Option<T> {
96 self.pop()
97 }
98}
99
100#[derive(Clone, Debug)]
101pub struct ChannelMetricMetadata {
102 prefix: &'static str,
103 output: Option<String>,
104}
105
106impl ChannelMetricMetadata {
107 pub fn new(prefix: &'static str, output: Option<String>) -> Self {
108 Self { prefix, output }
109 }
110}
111
112#[derive(Clone, Debug)]
113struct Metrics {
114 histogram: Histogram,
115 gauge: Gauge,
116 mean_gauge: TimeEwmaGauge,
117 #[expect(dead_code)]
121 max_gauge: Gauge,
122 #[expect(dead_code)]
123 legacy_max_gauge: Gauge,
124 #[cfg(test)]
125 recorded_values: Arc<Mutex<Vec<usize>>>,
126}
127
128impl Metrics {
129 #[expect(clippy::cast_precision_loss)] fn new(
131 limit: MemoryBufferSize,
132 metadata: ChannelMetricMetadata,
133 ewma_half_life_seconds: Option<f64>,
134 ) -> Self {
135 let ewma_half_life_seconds =
136 ewma_half_life_seconds.unwrap_or(DEFAULT_EWMA_HALF_LIFE_SECONDS);
137 let ChannelMetricMetadata { prefix, output } = metadata;
138 let (legacy_suffix, gauge_suffix, max_value) = match limit {
139 MemoryBufferSize::MaxEvents(max_events) => (
140 "_max_event_size",
141 "_max_size_events",
142 max_events.get() as f64,
143 ),
144 MemoryBufferSize::MaxSize(max_bytes) => {
145 ("_max_byte_size", "_max_size_bytes", max_bytes.get() as f64)
146 }
147 };
148 let max_gauge_name = format!("{prefix}{gauge_suffix}");
149 let legacy_max_gauge_name = format!("{prefix}{legacy_suffix}");
150 let histogram_name = format!("{prefix}_utilization");
151 let gauge_name = format!("{prefix}_utilization_level");
152 let mean_name = format!("{prefix}_utilization_mean");
153 #[cfg(test)]
154 let recorded_values = Arc::new(Mutex::new(Vec::new()));
155 if let Some(label_value) = output {
156 let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
157 max_gauge.set(max_value);
158 let mean_gauge_handle = gauge!(mean_name, "output" => label_value.clone());
159 let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
161 legacy_max_gauge.set(max_value);
162 Self {
163 histogram: histogram!(histogram_name, "output" => label_value.clone()),
164 gauge: gauge!(gauge_name, "output" => label_value.clone()),
165 mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
166 max_gauge,
167 legacy_max_gauge,
168 #[cfg(test)]
169 recorded_values,
170 }
171 } else {
172 let max_gauge = gauge!(max_gauge_name);
173 max_gauge.set(max_value);
174 let mean_gauge_handle = gauge!(mean_name);
175 let legacy_max_gauge = gauge!(legacy_max_gauge_name);
177 legacy_max_gauge.set(max_value);
178 Self {
179 histogram: histogram!(histogram_name),
180 gauge: gauge!(gauge_name),
181 mean_gauge: TimeEwmaGauge::new(mean_gauge_handle, ewma_half_life_seconds),
182 max_gauge,
183 legacy_max_gauge,
184 #[cfg(test)]
185 recorded_values,
186 }
187 }
188 }
189
190 #[expect(clippy::cast_precision_loss)]
191 fn record(&self, value: usize, reference: Instant) {
192 self.histogram.record(value as f64);
193 self.gauge.set(value as f64);
194 self.mean_gauge.record(value as f64, reference);
195 #[cfg(test)]
196 if let Ok(mut recorded) = self.recorded_values.lock() {
197 recorded.push(value);
198 }
199 }
200}
201
202#[derive(Debug)]
203struct Inner<T> {
204 data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
205 limit: MemoryBufferSize,
206 limiter: Arc<Semaphore>,
207 read_waker: Arc<Notify>,
208 metrics: Option<Metrics>,
209 capacity: NonZeroUsize,
210}
211
212impl<T> Clone for Inner<T> {
213 fn clone(&self) -> Self {
214 Self {
215 data: self.data.clone(),
216 limit: self.limit,
217 limiter: self.limiter.clone(),
218 read_waker: self.read_waker.clone(),
219 metrics: self.metrics.clone(),
220 capacity: self.capacity,
221 }
222 }
223}
224
225impl<T: Send + Sync + Debug + 'static> Inner<T> {
226 fn new(
227 limit: MemoryBufferSize,
228 metric_metadata: Option<ChannelMetricMetadata>,
229 ewma_half_life_seconds: Option<f64>,
230 ) -> Self {
231 let read_waker = Arc::new(Notify::new());
232 let metrics =
233 metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_half_life_seconds));
234 match limit {
235 MemoryBufferSize::MaxEvents(max_events) => Inner {
236 data: Arc::new(ArrayQueue::new(max_events.get())),
237 limit,
238 limiter: Arc::new(Semaphore::new(max_events.get())),
239 read_waker,
240 metrics,
241 capacity: max_events,
242 },
243 MemoryBufferSize::MaxSize(max_bytes) => Inner {
244 data: Arc::new(SegQueue::new()),
245 limit,
246 limiter: Arc::new(Semaphore::new(max_bytes.get())),
247 read_waker,
248 metrics,
249 capacity: max_bytes,
250 },
251 }
252 }
253
254 fn send_with_permits(&mut self, size: usize, permits: OwnedSemaphorePermit, item: T) {
259 if let Some(metrics) = &self.metrics {
260 let utilization = size.max(self.used_capacity());
265 metrics.record(utilization, Instant::now());
266 }
267 self.data.push((permits, item));
268 self.read_waker.notify_one();
269 }
270}
271
272impl<T> Inner<T> {
273 fn used_capacity(&self) -> usize {
274 self.capacity.get() - self.limiter.available_permits()
275 }
276
277 fn pop_and_record(&self) -> Option<T> {
278 self.data.pop().map(|(permit, item)| {
279 if let Some(metrics) = &self.metrics {
280 let utilization = self.used_capacity().saturating_sub(permit.num_permits());
284 metrics.record(utilization, Instant::now());
285 }
286 drop(permit);
289 item
290 })
291 }
292}
293
294#[derive(Debug)]
295pub struct LimitedSender<T> {
296 inner: Inner<T>,
297 sender_count: Arc<AtomicUsize>,
298}
299
300impl<T: InMemoryBufferable> LimitedSender<T> {
301 #[allow(clippy::cast_possible_truncation)]
302 fn calc_required_permits(&self, item: &T) -> (usize, u32) {
303 let value = match self.inner.limit {
307 MemoryBufferSize::MaxSize(_) => item.allocated_bytes(),
308 MemoryBufferSize::MaxEvents(_) => item.event_count(),
309 };
310 let limit = self.inner.capacity.get();
311 (value, cmp::min(limit, value) as u32)
312 }
313
314 pub fn available_capacity(&self) -> usize {
316 self.inner.limiter.available_permits()
317 }
318
319 pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
326 let (size, permits_required) = self.calc_required_permits(&item);
328 match self
329 .inner
330 .limiter
331 .clone()
332 .acquire_many_owned(permits_required)
333 .await
334 {
335 Ok(permits) => {
336 self.inner.send_with_permits(size, permits, item);
337 trace!("Sent item.");
338 Ok(())
339 }
340 Err(_) => Err(SendError(item)),
341 }
342 }
343
344 pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
357 let (size, permits_required) = self.calc_required_permits(&item);
359 match self
360 .inner
361 .limiter
362 .clone()
363 .try_acquire_many_owned(permits_required)
364 {
365 Ok(permits) => {
366 self.inner.send_with_permits(size, permits, item);
367 trace!("Attempt to send item succeeded.");
368 Ok(())
369 }
370 Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
371 Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
372 }
373 }
374}
375
376impl<T> Clone for LimitedSender<T> {
377 fn clone(&self) -> Self {
378 self.sender_count.fetch_add(1, Ordering::SeqCst);
379
380 Self {
381 inner: self.inner.clone(),
382 sender_count: Arc::clone(&self.sender_count),
383 }
384 }
385}
386
387impl<T> Drop for LimitedSender<T> {
388 fn drop(&mut self) {
389 if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
391 self.inner.limiter.close();
392 self.inner.read_waker.notify_one();
393 }
394 }
395}
396
397#[derive(Debug)]
398pub struct LimitedReceiver<T> {
399 inner: Inner<T>,
400}
401
402impl<T: Send + 'static> LimitedReceiver<T> {
403 pub fn available_capacity(&self) -> usize {
405 self.inner.limiter.available_permits()
406 }
407
408 pub async fn next(&mut self) -> Option<T> {
409 loop {
410 if let Some(item) = self.inner.pop_and_record() {
411 return Some(item);
412 }
413
414 if self.inner.limiter.is_closed() {
417 if self.available_capacity() < self.inner.capacity.get() {
418 tokio::task::yield_now().await;
421 continue;
422 }
423 return None;
424 }
425
426 self.inner.read_waker.notified().await;
430 }
431 }
432
433 pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
434 let mut receiver = self;
435 Box::pin(stream! {
436 while let Some(item) = receiver.next().await {
437 yield item;
438 }
439 })
440 }
441}
442
443impl<T> Drop for LimitedReceiver<T> {
444 fn drop(&mut self) {
445 self.inner.limiter.close();
449 }
450}
451
452pub fn limited<T: InMemoryBufferable + fmt::Debug>(
453 limit: MemoryBufferSize,
454 metric_metadata: Option<ChannelMetricMetadata>,
455 ewma_half_life_seconds: Option<f64>,
456) -> (LimitedSender<T>, LimitedReceiver<T>) {
457 let inner = Inner::new(limit, metric_metadata, ewma_half_life_seconds);
458
459 let sender = LimitedSender {
460 inner: inner.clone(),
461 sender_count: Arc::new(AtomicUsize::new(1)),
462 };
463 let receiver = LimitedReceiver { inner };
464
465 (sender, receiver)
466}
467
468#[cfg(test)]
469mod tests {
470 use std::num::NonZeroUsize;
471
472 use rand::{Rng as _, SeedableRng as _, rngs::SmallRng};
473 use tokio_test::{assert_pending, assert_ready, task::spawn};
474 use vector_common::byte_size_of::ByteSizeOf;
475
476 use super::{ChannelMetricMetadata, LimitedReceiver, LimitedSender, limited};
477 use crate::{
478 MemoryBufferSize,
479 test::MultiEventRecord,
480 topology::{channel::limited_queue::SendError, test_util::Sample},
481 };
482
483 #[tokio::test]
484 async fn send_receive() {
485 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
486 let (mut tx, mut rx) = limited(limit, None, None);
487
488 assert_eq!(2, tx.available_capacity());
489
490 let msg = Sample::new(42);
491
492 let mut send = spawn(async { tx.send(msg).await });
494
495 let mut recv = spawn(async { rx.next().await });
496
497 assert!(!send.is_woken());
499 assert!(!recv.is_woken());
500
501 assert_pending!(recv.poll());
503
504 assert_eq!(Ok(()), assert_ready!(send.poll()));
506
507 assert!(recv.is_woken());
509 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
510 }
511
512 #[tokio::test]
513 async fn records_utilization() {
514 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
515 let (mut tx, mut rx) = limited(
516 limit,
517 Some(ChannelMetricMetadata::new("test_channel", None)),
518 None,
519 );
520
521 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
522
523 tx.send(Sample::new(1)).await.expect("send should succeed");
524 let records = metrics.lock().unwrap().clone();
525 assert_eq!(records.len(), 1);
526 assert_eq!(records.last().copied(), Some(1));
527
528 assert_eq!(Sample::new(1), rx.next().await.unwrap());
529 let records = metrics.lock().unwrap();
530 assert_eq!(records.len(), 2);
531 assert_eq!(records.last().copied(), Some(0));
532 }
533
534 #[tokio::test]
535 async fn oversized_send_records_true_utilization_via_normal_send_path() {
536 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
537 let (mut tx, mut rx) = limited(
538 limit,
539 Some(ChannelMetricMetadata::new("test_channel_oversized", None)),
540 None,
541 );
542 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
543
544 let oversized = MultiEventRecord::new(3);
547 tx.send(oversized.clone())
548 .await
549 .expect("send should succeed");
550
551 let records = metrics.lock().unwrap().clone();
552 assert_eq!(records.len(), 1);
553 assert_eq!(records.last().copied(), Some(3));
554
555 assert_eq!(Some(oversized), rx.next().await);
556 let records = metrics.lock().unwrap().clone();
557 assert_eq!(records.len(), 2);
558 assert_eq!(records.last().copied(), Some(0));
559 }
560
561 #[test]
562 fn test_limiting_by_byte_size() {
563 let max_elements = 10;
564 let msg = Sample::new_with_heap_allocated_values(50);
565 let msg_size = msg.allocated_bytes();
566 let max_allowed_bytes = msg_size * max_elements;
567
568 let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
570 let (mut tx, mut rx) = limited(limit, None, None);
571
572 assert_eq!(max_allowed_bytes, tx.available_capacity());
573
574 for _ in 0..10 {
576 let msg_clone = msg.clone();
577 let mut f = spawn(async { tx.send(msg_clone).await });
578 assert_eq!(Ok(()), assert_ready!(f.poll()));
579 }
580 assert_eq!(0, tx.available_capacity());
582
583 let mut send_final = spawn({
585 let msg_clone = msg.clone();
586 async { tx.send(msg_clone).await }
587 });
588 assert_pending!(send_final.poll());
589
590 for _ in 0..10 {
592 let mut f = spawn(async { rx.next().await });
593 let value = assert_ready!(f.poll());
594 assert_eq!(value.allocated_bytes(), msg_size);
595 }
596 let mut recv = spawn(async { rx.next().await });
598 assert_pending!(recv.poll());
599 }
600
601 #[test]
602 fn sender_waits_for_more_capacity_when_none_available() {
603 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
604 let (mut tx, mut rx) = limited(limit, None, None);
605
606 assert_eq!(1, tx.available_capacity());
607
608 let msg1 = Sample::new(42);
609 let msg2 = Sample::new(43);
610
611 let mut send1 = spawn(async { tx.send(msg1).await });
613
614 let mut recv1 = spawn(async { rx.next().await });
615
616 assert!(!send1.is_woken());
618 assert!(!recv1.is_woken());
619
620 assert_pending!(recv1.poll());
622
623 assert_eq!(Ok(()), assert_ready!(send1.poll()));
625 drop(send1);
626
627 assert_eq!(0, tx.available_capacity());
628
629 assert!(recv1.is_woken());
632
633 let mut send2 = spawn(async { tx.send(msg2).await });
635
636 assert!(!send2.is_woken());
637 assert_pending!(send2.poll());
638
639 assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
641 drop(recv1);
642
643 assert_eq!(0, rx.available_capacity());
646 assert!(send2.is_woken());
647
648 let mut recv2 = spawn(async { rx.next().await });
649 assert_pending!(recv2.poll());
650
651 assert_eq!(Ok(()), assert_ready!(send2.poll()));
652 drop(send2);
653
654 assert_eq!(0, tx.available_capacity());
655
656 assert!(recv2.is_woken());
658 assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
659
660 assert_eq!(1, tx.available_capacity());
661 }
662
663 #[test]
664 fn sender_waits_for_more_capacity_when_partial_available() {
665 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
666 let (mut tx, mut rx) = limited(limit, None, None);
667
668 assert_eq!(7, tx.available_capacity());
669
670 let msgs1 = vec![
671 MultiEventRecord::new(1),
672 MultiEventRecord::new(2),
673 MultiEventRecord::new(3),
674 ];
675 let msg2 = MultiEventRecord::new(4);
676
677 let mut small_sends = spawn(async {
679 for msg in msgs1.clone() {
680 tx.send(msg).await?;
681 }
682
683 Ok::<_, SendError<MultiEventRecord>>(())
684 });
685
686 let mut recv1 = spawn(async { rx.next().await });
687
688 assert!(!small_sends.is_woken());
690 assert!(!recv1.is_woken());
691
692 assert_pending!(recv1.poll());
694
695 assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
698 drop(small_sends);
699
700 assert_eq!(1, tx.available_capacity());
701
702 assert!(recv1.is_woken());
705
706 let mut send2 = spawn(tx.send(msg2.clone()));
708
709 assert!(!send2.is_woken());
710 assert_pending!(send2.poll());
711
712 assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
715 drop(recv1);
716
717 assert_eq!(0, rx.available_capacity());
721
722 assert!(!send2.is_woken());
724
725 let mut recv2 = spawn(async { rx.next().await });
727 assert!(!recv2.is_woken());
728 assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
729 drop(recv2);
730
731 assert_eq!(0, rx.available_capacity());
732
733 assert!(send2.is_woken());
734 assert_eq!(Ok(()), assert_ready!(send2.poll()));
735
736 let mut recv3 = spawn(async { rx.next().await });
738 assert!(!recv3.is_woken());
739 assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
740 drop(recv3);
741
742 assert_eq!(3, rx.available_capacity());
743
744 let mut recv4 = spawn(async { rx.next().await });
745 assert!(!recv4.is_woken());
746 assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
747 drop(recv4);
748
749 assert_eq!(7, rx.available_capacity());
750 }
751
752 #[test]
753 fn empty_receiver_returns_none_when_last_sender_drops() {
754 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
755 let (mut tx, mut rx) = limited(limit, None, None);
756
757 assert_eq!(1, tx.available_capacity());
758
759 let tx2 = tx.clone();
760 let msg = Sample::new(42);
761
762 let mut send = spawn(async { tx.send(msg).await });
764
765 let mut recv = spawn(async { rx.next().await });
766
767 assert!(!send.is_woken());
769 assert!(!recv.is_woken());
770
771 assert_pending!(recv.poll());
773
774 drop(tx2);
776 assert!(!recv.is_woken());
777 assert_pending!(recv.poll());
778
779 assert_eq!(Ok(()), assert_ready!(send.poll()));
783 drop(send);
784 drop(tx);
785
786 assert!(recv.is_woken());
787 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
788 drop(recv);
789
790 let mut recv2 = spawn(async { rx.next().await });
791 assert!(!recv2.is_woken());
792 assert_eq!(None, assert_ready!(recv2.poll()));
793 }
794
795 #[test]
796 fn receiver_returns_none_once_empty_when_last_sender_drops() {
797 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
798 let (tx, mut rx) = limited::<Sample>(limit, None, None);
799
800 assert_eq!(1, tx.available_capacity());
801
802 let tx2 = tx.clone();
803
804 let mut recv = spawn(async { rx.next().await });
806
807 assert!(!recv.is_woken());
809
810 assert_pending!(recv.poll());
812
813 drop(tx);
815 assert!(!recv.is_woken());
816 assert_pending!(recv.poll());
817
818 drop(tx2);
821 assert!(recv.is_woken());
822 assert_eq!(None, assert_ready!(recv.poll()));
823 }
824
825 #[test]
826 fn oversized_send_allowed_when_empty() {
827 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
828 let (mut tx, mut rx) = limited(limit, None, None);
829
830 assert_eq!(1, tx.available_capacity());
831
832 let msg = MultiEventRecord::new(2);
833
834 let mut send = spawn(async { tx.send(msg.clone()).await });
836
837 let mut recv = spawn(async { rx.next().await });
838
839 assert!(!send.is_woken());
841 assert!(!recv.is_woken());
842
843 assert_eq!(Ok(()), assert_ready!(send.poll()));
846 drop(send);
847
848 assert_eq!(0, tx.available_capacity());
849
850 assert_eq!(Some(msg), assert_ready!(recv.poll()));
853 drop(recv);
854
855 assert_eq!(1, rx.available_capacity());
856 }
857
858 #[test]
859 fn oversized_send_allowed_when_partial_capacity() {
860 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
861 let (mut tx, mut rx) = limited(limit, None, None);
862
863 assert_eq!(2, tx.available_capacity());
864
865 let msg1 = MultiEventRecord::new(1);
866 let msg2 = MultiEventRecord::new(3);
867
868 let mut send = spawn(async { tx.send(msg1.clone()).await });
870
871 assert!(!send.is_woken());
873
874 assert_eq!(Ok(()), assert_ready!(send.poll()));
876 drop(send);
877
878 assert_eq!(1, tx.available_capacity());
879
880 let mut send2 = spawn(async { tx.send(msg2.clone()).await });
883
884 assert!(!send2.is_woken());
885 assert_pending!(send2.poll());
886
887 assert_eq!(0, rx.available_capacity());
888
889 let mut recv = spawn(async { rx.next().await });
892 assert!(!recv.is_woken());
893 assert!(!send2.is_woken());
894
895 assert_eq!(Some(msg1), assert_ready!(recv.poll()));
896 drop(recv);
897
898 assert_eq!(0, rx.available_capacity());
899
900 assert_eq!(Ok(()), assert_ready!(send2.poll()));
903 drop(send2);
904
905 assert_eq!(0, tx.available_capacity());
906
907 let mut recv2 = spawn(async { rx.next().await });
908 assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
909
910 assert_eq!(2, tx.available_capacity());
911 }
912
913 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
914 async fn concurrent_send_receive_metrics_remain_valid() {
915 const ITEM_COUNT: usize = 4_000;
916
917 for size in 1..=100 {
919 let limit = NonZeroUsize::new(size * 10).unwrap();
920 let (tx, rx) = limited(
921 MemoryBufferSize::MaxEvents(limit),
922 Some(ChannelMetricMetadata::new("test_channel_concurrent", None)),
923 None,
924 );
925 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
926
927 let sender = tokio::spawn(send_samples(tx, ITEM_COUNT));
928 let receiver = tokio::spawn(receive_samples(rx, ITEM_COUNT));
929
930 sender.await.expect("sender task should not panic");
931 receiver.await.expect("receiver task should not panic");
932
933 let recorded = metrics.lock().unwrap().clone();
934 assert_eq!(
935 recorded.len(),
936 ITEM_COUNT * 2,
937 "expected one metric update per send and per receive"
938 );
939
940 let max_allowed = limit.get();
943 let observed_max = recorded.iter().copied().max().unwrap_or_default();
944 assert!(
945 recorded.iter().all(|value| *value <= max_allowed),
946 "observed utilization value above valid bound: max={observed_max}, allowed={max_allowed}"
947 );
948 }
949 }
950
951 async fn send_samples(mut tx: LimitedSender<Sample>, item_count: usize) {
952 let mut rng = SmallRng::from_rng(&mut rand::rng());
953
954 for i in 0..item_count {
955 tx.send(Sample::new(i as u64))
956 .await
957 .expect("send should succeed");
958 if rng.random::<u8>() % 8 == 0 {
959 tokio::task::yield_now().await;
960 }
961 }
962 }
963
964 async fn receive_samples(mut rx: LimitedReceiver<Sample>, item_count: usize) {
965 let mut rng = SmallRng::from_rng(&mut rand::rng());
966
967 for i in 0..item_count {
968 let next = rx
969 .next()
970 .await
971 .expect("receiver should yield all sent items");
972 assert_eq!(Sample::new(i as u64), next);
973 if rng.random::<u8>() % 8 == 0 {
974 tokio::task::yield_now().await;
975 }
976 }
977 }
978}