vector_stream/batcher/limiter.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
use vector_common::byte_size_of::ByteSizeOf;
use crate::batcher::data::BatchData;
pub trait BatchLimiter<T, B> {
type ItemMetadata;
/// Return true if it is not possible for another item to fit in the batch
fn is_batch_full(&self, batch: &B) -> bool;
/// It is safe to assume that `is_batch_full` would return `false` before this is called.
/// You can return arbitrary metadata for an item that will be given back when the item
/// is actually pushed onto the batch. This is useful if there is an expensive calculation
/// to determine the "size" of the item.
fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata);
/// Add a single item to the batch using the metadata that was calculated by `item_fits_in_batch`
fn push_item(&mut self, metadata: Self::ItemMetadata);
/// Reset internal state from a batch being taken.
fn reset(&mut self);
}
pub struct SizeLimit<I> {
/// The total "size" of all items in a batch. Size is intentionally
/// vague here since it is user defined, and can vary.
///
/// To ensure any individual event can be placed in a batch, the first element in a batch is not
/// subject to this limit.
pub batch_size_limit: usize,
/// Total number of items that will be placed in a single batch.
///
/// To ensure any individual event can be placed in a batch, the first element in a batch is not
/// subject to this limit.
pub batch_item_limit: usize,
pub current_size: usize,
pub item_size_calculator: I,
}
impl<T, B, I> BatchLimiter<T, B> for SizeLimit<I>
where
B: BatchData<T>,
I: ItemBatchSize<T>,
{
type ItemMetadata = usize;
fn is_batch_full(&self, batch: &B) -> bool {
batch.len() >= self.batch_item_limit || self.current_size >= self.batch_size_limit
}
fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata) {
let item_size = self.item_size_calculator.size(item);
if batch.len() == 0 {
// make sure any individual item can always fit in a batch
return (true, item_size);
}
let fits = self.current_size + item_size <= self.batch_size_limit;
(fits, item_size)
}
fn push_item(&mut self, item_size: usize) {
self.current_size += item_size;
}
fn reset(&mut self) {
self.current_size = 0;
}
}
pub trait ItemBatchSize<T> {
/// The size of an individual item in a batch.
fn size(&self, item: &T) -> usize;
}
pub struct ByteSizeOfItemSize;
impl<T: ByteSizeOf> ItemBatchSize<T> for ByteSizeOfItemSize {
fn size(&self, item: &T) -> usize {
item.size_of()
}
}
impl<T, F> ItemBatchSize<T> for F
where
F: Fn(&T) -> usize,
{
fn size(&self, item: &T) -> usize {
(self)(item)
}
}