vector_stream/batcher/
limiter.rs1use vector_common::byte_size_of::ByteSizeOf;
2
3use crate::batcher::data::BatchData;
4
5pub trait BatchLimiter<T, B> {
6 type ItemMetadata;
7
8 fn is_batch_full(&self, batch: &B) -> bool;
10
11 fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata);
16
17 fn push_item(&mut self, metadata: Self::ItemMetadata);
19
20 fn reset(&mut self);
22}
23
24pub struct SizeLimit<I> {
25 pub batch_size_limit: usize,
31
32 pub batch_item_limit: usize,
37
38 pub current_size: usize,
39 pub item_size_calculator: I,
40}
41
42impl<T, B, I> BatchLimiter<T, B> for SizeLimit<I>
43where
44 B: BatchData<T>,
45 I: ItemBatchSize<T>,
46{
47 type ItemMetadata = usize;
48
49 fn is_batch_full(&self, batch: &B) -> bool {
50 batch.len() >= self.batch_item_limit || self.current_size >= self.batch_size_limit
51 }
52
53 fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata) {
54 let item_size = self.item_size_calculator.size(item);
55 if batch.len() == 0 {
56 return (true, item_size);
58 }
59 let fits = self.current_size + item_size <= self.batch_size_limit;
60 (fits, item_size)
61 }
62
63 fn push_item(&mut self, item_size: usize) {
64 self.current_size += item_size;
65 }
66
67 fn reset(&mut self) {
68 self.current_size = 0;
69 }
70}
71
72pub trait ItemBatchSize<T> {
73 fn size(&self, item: &T) -> usize;
75}
76
77pub struct ByteSizeOfItemSize;
78
79impl<T: ByteSizeOf> ItemBatchSize<T> for ByteSizeOfItemSize {
80 fn size(&self, item: &T) -> usize {
81 item.size_of()
82 }
83}
84
85impl<T, F> ItemBatchSize<T> for F
86where
87 F: Fn(&T) -> usize,
88{
89 fn size(&self, item: &T) -> usize {
90 (self)(item)
91 }
92}