vector_stream/batcher/
config.rs

1use std::time::Duration;
2
3use data::BatchData;
4use limiter::BatchLimiter;
5
6use super::{data, limiter};
7
8pub struct BatchConfigParts<L, D> {
9    pub batch_limiter: L,
10    pub batch_data: D,
11    pub timeout: Duration,
12}
13
14pub trait BatchConfig<T> {
15    type ItemMetadata;
16    type Batch;
17
18    /// Returns the number of elements in the batch
19    fn len(&self) -> usize;
20
21    /// Determines whether the batch is empty or not
22    fn is_empty(&self) -> bool {
23        self.len() == 0
24    }
25
26    /// Returns the current batch, and resets any internal state
27    fn take_batch(&mut self) -> Self::Batch;
28
29    /// Adds a single item to the batch, with the given metadata that was calculated by `item_fits_in_batch`
30    fn push(&mut self, item: T, metadata: Self::ItemMetadata);
31
32    /// Returns true if it is not possible for another item to fit in the batch
33    fn is_batch_full(&self) -> bool;
34
35    /// It is safe to assume that `is_batch_full` would return `false` before this is called.
36    /// You can return arbitrary metadata for an item that will be given back when the item
37    /// is actually pushed onto the batch. This is useful if there is an expensive calculation
38    /// to determine the "size" of the item.
39    fn item_fits_in_batch(&self, item: &T) -> (bool, Self::ItemMetadata);
40
41    /// Returns the maximum amount of time to wait for inputs to a single batch.
42    /// The timer starts when the first item is received for a batch.
43    fn timeout(&self) -> Duration;
44}
45
46impl<T, L, B> BatchConfig<T> for BatchConfigParts<L, B>
47where
48    L: BatchLimiter<T, B>,
49    B: BatchData<T>,
50{
51    type ItemMetadata = L::ItemMetadata;
52    type Batch = B::Batch;
53
54    fn len(&self) -> usize {
55        self.batch_data.len()
56    }
57
58    fn take_batch(&mut self) -> Self::Batch {
59        self.batch_limiter.reset();
60        self.batch_data.take_batch()
61    }
62
63    fn push(&mut self, item: T, metadata: Self::ItemMetadata) {
64        self.batch_data.push_item(item);
65        self.batch_limiter.push_item(metadata);
66    }
67
68    fn is_batch_full(&self) -> bool {
69        self.batch_limiter.is_batch_full(&self.batch_data)
70    }
71
72    fn item_fits_in_batch(&self, item: &T) -> (bool, Self::ItemMetadata) {
73        self.batch_limiter
74            .item_fits_in_batch(item, &self.batch_data)
75    }
76
77    fn timeout(&self) -> Duration {
78        self.timeout
79    }
80}