1use std::{
2 cmp, fmt,
3 fmt::Debug,
4 pin::Pin,
5 sync::{
6 Arc,
7 atomic::{AtomicUsize, Ordering},
8 },
9};
10
11#[cfg(test)]
12use std::sync::Mutex;
13
14use async_stream::stream;
15use crossbeam_queue::{ArrayQueue, SegQueue};
16use futures::Stream;
17use metrics::{Gauge, Histogram, gauge, histogram};
18use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore, TryAcquireError};
19use vector_common::stats::AtomicEwma;
20
21use crate::{InMemoryBufferable, config::MemoryBufferSize};
22
23pub const DEFAULT_EWMA_ALPHA: f64 = 0.9;
27
28#[derive(Debug, PartialEq, Eq)]
30pub struct SendError<T>(pub T);
31
32impl<T> fmt::Display for SendError<T> {
33 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
34 write!(fmt, "receiver disconnected")
35 }
36}
37
38impl<T: fmt::Debug> std::error::Error for SendError<T> {}
39
40#[derive(Debug, PartialEq, Eq)]
42pub enum TrySendError<T> {
43 InsufficientCapacity(T),
44 Disconnected(T),
45}
46
47impl<T> TrySendError<T> {
48 pub fn into_inner(self) -> T {
49 match self {
50 Self::InsufficientCapacity(item) | Self::Disconnected(item) => item,
51 }
52 }
53}
54
55impl<T> fmt::Display for TrySendError<T> {
56 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
57 match self {
58 Self::InsufficientCapacity(_) => {
59 write!(fmt, "channel lacks sufficient capacity for send")
60 }
61 Self::Disconnected(_) => write!(fmt, "receiver disconnected"),
62 }
63 }
64}
65
66impl<T: fmt::Debug> std::error::Error for TrySendError<T> {}
67
68trait QueueImpl<T>: Send + Sync + fmt::Debug {
70 fn push(&self, item: T);
71 fn pop(&self) -> Option<T>;
72}
73
74impl<T> QueueImpl<T> for ArrayQueue<T>
75where
76 T: Send + Sync + fmt::Debug,
77{
78 fn push(&self, item: T) {
79 self.push(item)
80 .unwrap_or_else(|_| unreachable!("acquired permits but channel reported being full."));
81 }
82
83 fn pop(&self) -> Option<T> {
84 self.pop()
85 }
86}
87
88impl<T> QueueImpl<T> for SegQueue<T>
89where
90 T: Send + Sync + fmt::Debug,
91{
92 fn push(&self, item: T) {
93 self.push(item);
94 }
95
96 fn pop(&self) -> Option<T> {
97 self.pop()
98 }
99}
100
101#[derive(Clone, Debug)]
102pub struct ChannelMetricMetadata {
103 prefix: &'static str,
104 output: Option<String>,
105}
106
107impl ChannelMetricMetadata {
108 pub fn new(prefix: &'static str, output: Option<String>) -> Self {
109 Self { prefix, output }
110 }
111}
112
113#[derive(Clone, Debug)]
114struct Metrics {
115 histogram: Histogram,
116 gauge: Gauge,
117 mean_gauge: Gauge,
118 ewma: Arc<AtomicEwma>,
119 #[expect(dead_code)]
123 max_gauge: Gauge,
124 #[expect(dead_code)]
125 legacy_max_gauge: Gauge,
126 #[cfg(test)]
127 recorded_values: Arc<Mutex<Vec<usize>>>,
128}
129
130impl Metrics {
131 #[expect(clippy::cast_precision_loss)] fn new(
133 limit: MemoryBufferSize,
134 metadata: ChannelMetricMetadata,
135 ewma_alpha: Option<f64>,
136 ) -> Self {
137 let ChannelMetricMetadata { prefix, output } = metadata;
138 let (legacy_suffix, gauge_suffix, max_value) = match limit {
139 MemoryBufferSize::MaxEvents(max_events) => (
140 "_max_event_size",
141 "_max_size_events",
142 max_events.get() as f64,
143 ),
144 MemoryBufferSize::MaxSize(max_bytes) => {
145 ("_max_byte_size", "_max_size_bytes", max_bytes.get() as f64)
146 }
147 };
148 let max_gauge_name = format!("{prefix}{gauge_suffix}");
149 let legacy_max_gauge_name = format!("{prefix}{legacy_suffix}");
150 let histogram_name = format!("{prefix}_utilization");
151 let gauge_name = format!("{prefix}_utilization_level");
152 let mean_name = format!("{prefix}_utilization_mean");
153 let ewma = Arc::new(AtomicEwma::new(ewma_alpha.unwrap_or(DEFAULT_EWMA_ALPHA)));
154 #[cfg(test)]
155 let recorded_values = Arc::new(Mutex::new(Vec::new()));
156 if let Some(label_value) = output {
157 let max_gauge = gauge!(max_gauge_name, "output" => label_value.clone());
158 max_gauge.set(max_value);
159 let legacy_max_gauge = gauge!(legacy_max_gauge_name, "output" => label_value.clone());
161 legacy_max_gauge.set(max_value);
162 Self {
163 histogram: histogram!(histogram_name, "output" => label_value.clone()),
164 gauge: gauge!(gauge_name, "output" => label_value.clone()),
165 mean_gauge: gauge!(mean_name, "output" => label_value.clone()),
166 max_gauge,
167 ewma,
168 legacy_max_gauge,
169 #[cfg(test)]
170 recorded_values,
171 }
172 } else {
173 let max_gauge = gauge!(max_gauge_name);
174 max_gauge.set(max_value);
175 let legacy_max_gauge = gauge!(legacy_max_gauge_name);
177 legacy_max_gauge.set(max_value);
178 Self {
179 histogram: histogram!(histogram_name),
180 gauge: gauge!(gauge_name),
181 mean_gauge: gauge!(mean_name),
182 max_gauge,
183 ewma,
184 legacy_max_gauge,
185 #[cfg(test)]
186 recorded_values,
187 }
188 }
189 }
190
191 #[expect(clippy::cast_precision_loss)]
192 fn record(&self, value: usize) {
193 self.histogram.record(value as f64);
194 self.gauge.set(value as f64);
195 let avg = self.ewma.update(value as f64);
196 self.mean_gauge.set(avg);
197 #[cfg(test)]
198 if let Ok(mut recorded) = self.recorded_values.lock() {
199 recorded.push(value);
200 }
201 }
202}
203
204#[derive(Debug)]
205struct Inner<T> {
206 data: Arc<dyn QueueImpl<(OwnedSemaphorePermit, T)>>,
207 limit: MemoryBufferSize,
208 limiter: Arc<Semaphore>,
209 read_waker: Arc<Notify>,
210 metrics: Option<Metrics>,
211}
212
213impl<T> Clone for Inner<T> {
214 fn clone(&self) -> Self {
215 Self {
216 data: self.data.clone(),
217 limit: self.limit,
218 limiter: self.limiter.clone(),
219 read_waker: self.read_waker.clone(),
220 metrics: self.metrics.clone(),
221 }
222 }
223}
224
225impl<T: InMemoryBufferable> Inner<T> {
226 fn new(
227 limit: MemoryBufferSize,
228 metric_metadata: Option<ChannelMetricMetadata>,
229 ewma_alpha: Option<f64>,
230 ) -> Self {
231 let read_waker = Arc::new(Notify::new());
232 let metrics = metric_metadata.map(|metadata| Metrics::new(limit, metadata, ewma_alpha));
233 match limit {
234 MemoryBufferSize::MaxEvents(max_events) => Inner {
235 data: Arc::new(ArrayQueue::new(max_events.get())),
236 limit,
237 limiter: Arc::new(Semaphore::new(max_events.get())),
238 read_waker,
239 metrics,
240 },
241 MemoryBufferSize::MaxSize(max_bytes) => Inner {
242 data: Arc::new(SegQueue::new()),
243 limit,
244 limiter: Arc::new(Semaphore::new(max_bytes.get())),
245 read_waker,
246 metrics,
247 },
248 }
249 }
250
251 fn send_with_permit(&mut self, total: usize, permits: OwnedSemaphorePermit, item: T) {
257 self.data.push((permits, item));
258 self.read_waker.notify_one();
259 if let Some(metrics) = self.metrics.as_ref() {
263 metrics.record(total);
264 }
265 }
266}
267
268#[derive(Debug)]
269pub struct LimitedSender<T> {
270 inner: Inner<T>,
271 sender_count: Arc<AtomicUsize>,
272}
273
274impl<T: InMemoryBufferable> LimitedSender<T> {
275 #[allow(clippy::cast_possible_truncation)]
276 fn calc_required_permits(&self, item: &T) -> (usize, usize, u32) {
277 let (limit, value) = match self.inner.limit {
281 MemoryBufferSize::MaxSize(max_size) => (max_size, item.allocated_bytes()),
282 MemoryBufferSize::MaxEvents(max_events) => (max_events, item.event_count()),
283 };
284 let limit = limit.get();
285 (limit, value, cmp::min(limit, value) as u32)
286 }
287
288 pub fn available_capacity(&self) -> usize {
290 self.inner.limiter.available_permits()
291 }
292
293 pub async fn send(&mut self, item: T) -> Result<(), SendError<T>> {
300 let (limit, count, permits_required) = self.calc_required_permits(&item);
302 let in_use = limit.saturating_sub(self.available_capacity());
303 match self
304 .inner
305 .limiter
306 .clone()
307 .acquire_many_owned(permits_required)
308 .await
309 {
310 Ok(permits) => {
311 self.inner.send_with_permit(in_use + count, permits, item);
312 trace!("Sent item.");
313 Ok(())
314 }
315 Err(_) => Err(SendError(item)),
316 }
317 }
318
319 pub fn try_send(&mut self, item: T) -> Result<(), TrySendError<T>> {
332 let (limit, count, permits_required) = self.calc_required_permits(&item);
334 let in_use = limit.saturating_sub(self.available_capacity());
335 match self
336 .inner
337 .limiter
338 .clone()
339 .try_acquire_many_owned(permits_required)
340 {
341 Ok(permits) => {
342 self.inner.send_with_permit(in_use + count, permits, item);
343 trace!("Attempt to send item succeeded.");
344 Ok(())
345 }
346 Err(TryAcquireError::NoPermits) => Err(TrySendError::InsufficientCapacity(item)),
347 Err(TryAcquireError::Closed) => Err(TrySendError::Disconnected(item)),
348 }
349 }
350}
351
352impl<T> Clone for LimitedSender<T> {
353 fn clone(&self) -> Self {
354 self.sender_count.fetch_add(1, Ordering::SeqCst);
355
356 Self {
357 inner: self.inner.clone(),
358 sender_count: Arc::clone(&self.sender_count),
359 }
360 }
361}
362
363impl<T> Drop for LimitedSender<T> {
364 fn drop(&mut self) {
365 if self.sender_count.fetch_sub(1, Ordering::SeqCst) == 1 {
367 self.inner.limiter.close();
368 self.inner.read_waker.notify_one();
369 }
370 }
371}
372
373#[derive(Debug)]
374pub struct LimitedReceiver<T> {
375 inner: Inner<T>,
376}
377
378impl<T: Send + 'static> LimitedReceiver<T> {
379 pub fn available_capacity(&self) -> usize {
381 self.inner.limiter.available_permits()
382 }
383
384 pub async fn next(&mut self) -> Option<T> {
385 loop {
386 if let Some((_permit, item)) = self.inner.data.pop() {
387 return Some(item);
388 }
389
390 if self.inner.limiter.is_closed() {
393 return None;
394 }
395
396 self.inner.read_waker.notified().await;
400 }
401 }
402
403 pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = T> + Send>> {
404 let mut receiver = self;
405 Box::pin(stream! {
406 while let Some(item) = receiver.next().await {
407 yield item;
408 }
409 })
410 }
411}
412
413impl<T> Drop for LimitedReceiver<T> {
414 fn drop(&mut self) {
415 self.inner.limiter.close();
419 }
420}
421
422pub fn limited<T: InMemoryBufferable + fmt::Debug>(
423 limit: MemoryBufferSize,
424 metric_metadata: Option<ChannelMetricMetadata>,
425 ewma_alpha: Option<f64>,
426) -> (LimitedSender<T>, LimitedReceiver<T>) {
427 let inner = Inner::new(limit, metric_metadata, ewma_alpha);
428
429 let sender = LimitedSender {
430 inner: inner.clone(),
431 sender_count: Arc::new(AtomicUsize::new(1)),
432 };
433 let receiver = LimitedReceiver { inner };
434
435 (sender, receiver)
436}
437
438#[cfg(test)]
439mod tests {
440 use std::num::NonZeroUsize;
441
442 use tokio_test::{assert_pending, assert_ready, task::spawn};
443 use vector_common::byte_size_of::ByteSizeOf;
444
445 use super::{ChannelMetricMetadata, limited};
446 use crate::{
447 MemoryBufferSize,
448 test::MultiEventRecord,
449 topology::{channel::limited_queue::SendError, test_util::Sample},
450 };
451
452 #[tokio::test]
453 async fn send_receive() {
454 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
455 let (mut tx, mut rx) = limited(limit, None, None);
456
457 assert_eq!(2, tx.available_capacity());
458
459 let msg = Sample::new(42);
460
461 let mut send = spawn(async { tx.send(msg).await });
463
464 let mut recv = spawn(async { rx.next().await });
465
466 assert!(!send.is_woken());
468 assert!(!recv.is_woken());
469
470 assert_pending!(recv.poll());
472
473 assert_eq!(Ok(()), assert_ready!(send.poll()));
475
476 assert!(recv.is_woken());
478 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
479 }
480
481 #[tokio::test]
482 async fn records_utilization_on_send() {
483 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
484 let (mut tx, mut rx) = limited(
485 limit,
486 Some(ChannelMetricMetadata::new("test_channel", None)),
487 None,
488 );
489
490 let metrics = tx.inner.metrics.as_ref().unwrap().recorded_values.clone();
491
492 tx.send(Sample::new(1)).await.expect("send should succeed");
493 assert_eq!(metrics.lock().unwrap().last().copied(), Some(1));
494
495 let _ = rx.next().await;
496 }
497
498 #[test]
499 fn test_limiting_by_byte_size() {
500 let max_elements = 10;
501 let msg = Sample::new_with_heap_allocated_values(50);
502 let msg_size = msg.allocated_bytes();
503 let max_allowed_bytes = msg_size * max_elements;
504
505 let limit = MemoryBufferSize::MaxSize(NonZeroUsize::new(max_allowed_bytes).unwrap());
507 let (mut tx, mut rx) = limited(limit, None, None);
508
509 assert_eq!(max_allowed_bytes, tx.available_capacity());
510
511 for _ in 0..10 {
513 let msg_clone = msg.clone();
514 let mut f = spawn(async { tx.send(msg_clone).await });
515 assert_eq!(Ok(()), assert_ready!(f.poll()));
516 }
517 assert_eq!(0, tx.available_capacity());
519
520 let mut send_final = spawn({
522 let msg_clone = msg.clone();
523 async { tx.send(msg_clone).await }
524 });
525 assert_pending!(send_final.poll());
526
527 for _ in 0..10 {
529 let mut f = spawn(async { rx.next().await });
530 let value = assert_ready!(f.poll());
531 assert_eq!(value.allocated_bytes(), msg_size);
532 }
533 let mut recv = spawn(async { rx.next().await });
535 assert_pending!(recv.poll());
536 }
537
538 #[test]
539 fn sender_waits_for_more_capacity_when_none_available() {
540 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
541 let (mut tx, mut rx) = limited(limit, None, None);
542
543 assert_eq!(1, tx.available_capacity());
544
545 let msg1 = Sample::new(42);
546 let msg2 = Sample::new(43);
547
548 let mut send1 = spawn(async { tx.send(msg1).await });
550
551 let mut recv1 = spawn(async { rx.next().await });
552
553 assert!(!send1.is_woken());
555 assert!(!recv1.is_woken());
556
557 assert_pending!(recv1.poll());
559
560 assert_eq!(Ok(()), assert_ready!(send1.poll()));
562 drop(send1);
563
564 assert_eq!(0, tx.available_capacity());
565
566 assert!(recv1.is_woken());
569
570 let mut send2 = spawn(async { tx.send(msg2).await });
572
573 assert!(!send2.is_woken());
574 assert_pending!(send2.poll());
575
576 assert_eq!(Some(Sample::new(42)), assert_ready!(recv1.poll()));
578 drop(recv1);
579
580 assert_eq!(0, rx.available_capacity());
583 assert!(send2.is_woken());
584
585 let mut recv2 = spawn(async { rx.next().await });
586 assert_pending!(recv2.poll());
587
588 assert_eq!(Ok(()), assert_ready!(send2.poll()));
589 drop(send2);
590
591 assert_eq!(0, tx.available_capacity());
592
593 assert!(recv2.is_woken());
595 assert_eq!(Some(Sample::new(43)), assert_ready!(recv2.poll()));
596
597 assert_eq!(1, tx.available_capacity());
598 }
599
600 #[test]
601 fn sender_waits_for_more_capacity_when_partial_available() {
602 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(7).unwrap());
603 let (mut tx, mut rx) = limited(limit, None, None);
604
605 assert_eq!(7, tx.available_capacity());
606
607 let msgs1 = vec![
608 MultiEventRecord::new(1),
609 MultiEventRecord::new(2),
610 MultiEventRecord::new(3),
611 ];
612 let msg2 = MultiEventRecord::new(4);
613
614 let mut small_sends = spawn(async {
616 for msg in msgs1.clone() {
617 tx.send(msg).await?;
618 }
619
620 Ok::<_, SendError<MultiEventRecord>>(())
621 });
622
623 let mut recv1 = spawn(async { rx.next().await });
624
625 assert!(!small_sends.is_woken());
627 assert!(!recv1.is_woken());
628
629 assert_pending!(recv1.poll());
631
632 assert_eq!(Ok(()), assert_ready!(small_sends.poll()));
635 drop(small_sends);
636
637 assert_eq!(1, tx.available_capacity());
638
639 assert!(recv1.is_woken());
642
643 let mut send2 = spawn(tx.send(msg2.clone()));
645
646 assert!(!send2.is_woken());
647 assert_pending!(send2.poll());
648
649 assert_eq!(Some(&msgs1[0]), assert_ready!(recv1.poll()).as_ref());
652 drop(recv1);
653
654 assert_eq!(0, rx.available_capacity());
658
659 assert!(!send2.is_woken());
661
662 let mut recv2 = spawn(async { rx.next().await });
664 assert!(!recv2.is_woken());
665 assert_eq!(Some(&msgs1[1]), assert_ready!(recv2.poll()).as_ref());
666 drop(recv2);
667
668 assert_eq!(0, rx.available_capacity());
669
670 assert!(send2.is_woken());
671 assert_eq!(Ok(()), assert_ready!(send2.poll()));
672
673 let mut recv3 = spawn(async { rx.next().await });
675 assert!(!recv3.is_woken());
676 assert_eq!(Some(&msgs1[2]), assert_ready!(recv3.poll()).as_ref());
677 drop(recv3);
678
679 assert_eq!(3, rx.available_capacity());
680
681 let mut recv4 = spawn(async { rx.next().await });
682 assert!(!recv4.is_woken());
683 assert_eq!(Some(msg2), assert_ready!(recv4.poll()));
684 drop(recv4);
685
686 assert_eq!(7, rx.available_capacity());
687 }
688
689 #[test]
690 fn empty_receiver_returns_none_when_last_sender_drops() {
691 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
692 let (mut tx, mut rx) = limited(limit, None, None);
693
694 assert_eq!(1, tx.available_capacity());
695
696 let tx2 = tx.clone();
697 let msg = Sample::new(42);
698
699 let mut send = spawn(async { tx.send(msg).await });
701
702 let mut recv = spawn(async { rx.next().await });
703
704 assert!(!send.is_woken());
706 assert!(!recv.is_woken());
707
708 assert_pending!(recv.poll());
710
711 drop(tx2);
713 assert!(!recv.is_woken());
714 assert_pending!(recv.poll());
715
716 assert_eq!(Ok(()), assert_ready!(send.poll()));
720 drop(send);
721 drop(tx);
722
723 assert!(recv.is_woken());
724 assert_eq!(Some(Sample::new(42)), assert_ready!(recv.poll()));
725 drop(recv);
726
727 let mut recv2 = spawn(async { rx.next().await });
728 assert!(!recv2.is_woken());
729 assert_eq!(None, assert_ready!(recv2.poll()));
730 }
731
732 #[test]
733 fn receiver_returns_none_once_empty_when_last_sender_drops() {
734 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
735 let (tx, mut rx) = limited::<Sample>(limit, None, None);
736
737 assert_eq!(1, tx.available_capacity());
738
739 let tx2 = tx.clone();
740
741 let mut recv = spawn(async { rx.next().await });
743
744 assert!(!recv.is_woken());
746
747 assert_pending!(recv.poll());
749
750 drop(tx);
752 assert!(!recv.is_woken());
753 assert_pending!(recv.poll());
754
755 drop(tx2);
758 assert!(recv.is_woken());
759 assert_eq!(None, assert_ready!(recv.poll()));
760 }
761
762 #[test]
763 fn oversized_send_allowed_when_empty() {
764 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(1).unwrap());
765 let (mut tx, mut rx) = limited(limit, None, None);
766
767 assert_eq!(1, tx.available_capacity());
768
769 let msg = MultiEventRecord::new(2);
770
771 let mut send = spawn(async { tx.send(msg.clone()).await });
773
774 let mut recv = spawn(async { rx.next().await });
775
776 assert!(!send.is_woken());
778 assert!(!recv.is_woken());
779
780 assert_eq!(Ok(()), assert_ready!(send.poll()));
783 drop(send);
784
785 assert_eq!(0, tx.available_capacity());
786
787 assert_eq!(Some(msg), assert_ready!(recv.poll()));
790 drop(recv);
791
792 assert_eq!(1, rx.available_capacity());
793 }
794
795 #[test]
796 fn oversized_send_allowed_when_partial_capacity() {
797 let limit = MemoryBufferSize::MaxEvents(NonZeroUsize::new(2).unwrap());
798 let (mut tx, mut rx) = limited(limit, None, None);
799
800 assert_eq!(2, tx.available_capacity());
801
802 let msg1 = MultiEventRecord::new(1);
803 let msg2 = MultiEventRecord::new(3);
804
805 let mut send = spawn(async { tx.send(msg1.clone()).await });
807
808 assert!(!send.is_woken());
810
811 assert_eq!(Ok(()), assert_ready!(send.poll()));
813 drop(send);
814
815 assert_eq!(1, tx.available_capacity());
816
817 let mut send2 = spawn(async { tx.send(msg2.clone()).await });
820
821 assert!(!send2.is_woken());
822 assert_pending!(send2.poll());
823
824 assert_eq!(0, rx.available_capacity());
825
826 let mut recv = spawn(async { rx.next().await });
829 assert!(!recv.is_woken());
830 assert!(!send2.is_woken());
831
832 assert_eq!(Some(msg1), assert_ready!(recv.poll()));
833 drop(recv);
834
835 assert_eq!(0, rx.available_capacity());
836
837 assert_eq!(Ok(()), assert_ready!(send2.poll()));
840 drop(send2);
841
842 assert_eq!(0, tx.available_capacity());
843
844 let mut recv2 = spawn(async { rx.next().await });
845 assert_eq!(Some(msg2), assert_ready!(recv2.poll()));
846
847 assert_eq!(2, tx.available_capacity());
848 }
849}