vector_stream/batcher/
limiter.rs

1use vector_common::byte_size_of::ByteSizeOf;
2
3use crate::batcher::data::BatchData;
4
5pub trait BatchLimiter<T, B> {
6    type ItemMetadata;
7
8    /// Return true if it is not possible for another item to fit in the batch
9    fn is_batch_full(&self, batch: &B) -> bool;
10
11    /// It is safe to assume that `is_batch_full` would return `false` before this is called.
12    /// You can return arbitrary metadata for an item that will be given back when the item
13    /// is actually pushed onto the batch. This is useful if there is an expensive calculation
14    /// to determine the "size" of the item.
15    fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata);
16
17    /// Add a single item to the batch using the metadata that was calculated by `item_fits_in_batch`
18    fn push_item(&mut self, metadata: Self::ItemMetadata);
19
20    /// Reset internal state from a batch being taken.
21    fn reset(&mut self);
22}
23
24pub struct SizeLimit<I> {
25    /// The total "size" of all items in a batch. Size is intentionally
26    /// vague here since it is user defined, and can vary.
27    ///
28    /// To ensure any individual event can be placed in a batch, the first element in a batch is not
29    /// subject to this limit.
30    pub batch_size_limit: usize,
31
32    /// Total number of items that will be placed in a single batch.
33    ///
34    /// To ensure any individual event can be placed in a batch, the first element in a batch is not
35    /// subject to this limit.
36    pub batch_item_limit: usize,
37
38    pub current_size: usize,
39    pub item_size_calculator: I,
40}
41
42impl<T, B, I> BatchLimiter<T, B> for SizeLimit<I>
43where
44    B: BatchData<T>,
45    I: ItemBatchSize<T>,
46{
47    type ItemMetadata = usize;
48
49    fn is_batch_full(&self, batch: &B) -> bool {
50        batch.len() >= self.batch_item_limit || self.current_size >= self.batch_size_limit
51    }
52
53    fn item_fits_in_batch(&self, item: &T, batch: &B) -> (bool, Self::ItemMetadata) {
54        let item_size = self.item_size_calculator.size(item);
55        if batch.len() == 0 {
56            // make sure any individual item can always fit in a batch
57            return (true, item_size);
58        }
59        let fits = self.current_size + item_size <= self.batch_size_limit;
60        (fits, item_size)
61    }
62
63    fn push_item(&mut self, item_size: usize) {
64        self.current_size += item_size;
65    }
66
67    fn reset(&mut self) {
68        self.current_size = 0;
69    }
70}
71
72pub trait ItemBatchSize<T> {
73    /// The size of an individual item in a batch.
74    fn size(&self, item: &T) -> usize;
75}
76
77pub struct ByteSizeOfItemSize;
78
79impl<T: ByteSizeOf> ItemBatchSize<T> for ByteSizeOfItemSize {
80    fn size(&self, item: &T) -> usize {
81        item.size_of()
82    }
83}
84
85impl<T, F> ItemBatchSize<T> for F
86where
87    F: Fn(&T) -> usize,
88{
89    fn size(&self, item: &T) -> usize {
90        (self)(item)
91    }
92}