vector_stream/batcher/
config.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use std::time::Duration;

use data::BatchData;
use limiter::BatchLimiter;

use super::{data, limiter};

pub struct BatchConfigParts<L, D> {
    pub batch_limiter: L,
    pub batch_data: D,
    pub timeout: Duration,
}

pub trait BatchConfig<T> {
    type ItemMetadata;
    type Batch;

    /// Returns the number of elements in the batch
    fn len(&self) -> usize;

    /// Determines whether the batch is empty or not
    fn is_empty(&self) -> bool {
        self.len() == 0
    }

    /// Returns the current batch, and resets any internal state
    fn take_batch(&mut self) -> Self::Batch;

    /// Adds a single item to the batch, with the given metadata that was calculated by `item_fits_in_batch`
    fn push(&mut self, item: T, metadata: Self::ItemMetadata);

    /// Returns true if it is not possible for another item to fit in the batch
    fn is_batch_full(&self) -> bool;

    /// It is safe to assume that `is_batch_full` would return `false` before this is called.
    /// You can return arbitrary metadata for an item that will be given back when the item
    /// is actually pushed onto the batch. This is useful if there is an expensive calculation
    /// to determine the "size" of the item.
    fn item_fits_in_batch(&self, item: &T) -> (bool, Self::ItemMetadata);

    /// Returns the maximum amount of time to wait for inputs to a single batch.
    /// The timer starts when the first item is received for a batch.
    fn timeout(&self) -> Duration;
}

impl<T, L, B> BatchConfig<T> for BatchConfigParts<L, B>
where
    L: BatchLimiter<T, B>,
    B: BatchData<T>,
{
    type ItemMetadata = L::ItemMetadata;
    type Batch = B::Batch;

    fn len(&self) -> usize {
        self.batch_data.len()
    }

    fn take_batch(&mut self) -> Self::Batch {
        self.batch_limiter.reset();
        self.batch_data.take_batch()
    }

    fn push(&mut self, item: T, metadata: Self::ItemMetadata) {
        self.batch_data.push_item(item);
        self.batch_limiter.push_item(metadata);
    }

    fn is_batch_full(&self) -> bool {
        self.batch_limiter.is_batch_full(&self.batch_data)
    }

    fn item_fits_in_batch(&self, item: &T) -> (bool, Self::ItemMetadata) {
        self.batch_limiter
            .item_fits_in_batch(item, &self.batch_data)
    }

    fn timeout(&self) -> Duration {
        self.timeout
    }
}