vector/sinks/util/buffer/
partition.rs

1use vector_lib::ByteSizeOf;
2
3use super::super::{
4    batch::{Batch, BatchConfig, BatchError, PushResult},
5    ElementCount,
6};
7use crate::sinks::util::{Merged, SinkBatchSettings};
8
9pub trait Partition<K> {
10    fn partition(&self) -> K;
11}
12#[derive(Debug)]
13pub struct PartitionBuffer<T, K> {
14    inner: T,
15    key: Option<K>,
16}
17
18#[derive(Debug, Clone)]
19pub struct PartitionInnerBuffer<T, K> {
20    pub(self) inner: T,
21    key: K,
22}
23
24impl<T, K> PartitionBuffer<T, K> {
25    pub const fn new(inner: T) -> Self {
26        Self { inner, key: None }
27    }
28}
29
30impl<T, K> Batch for PartitionBuffer<T, K>
31where
32    T: Batch,
33    K: Clone,
34{
35    type Input = PartitionInnerBuffer<T::Input, K>;
36    type Output = PartitionInnerBuffer<T::Output, K>;
37
38    fn get_settings_defaults<D: SinkBatchSettings + Clone>(
39        config: BatchConfig<D, Merged>,
40    ) -> Result<BatchConfig<D, Merged>, BatchError> {
41        T::get_settings_defaults(config)
42    }
43
44    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
45        let key = item.key;
46        match self.inner.push(item.inner) {
47            PushResult::Ok(full) => {
48                self.key = Some(key);
49                PushResult::Ok(full)
50            }
51            PushResult::Overflow(inner) => PushResult::Overflow(Self::Input { inner, key }),
52        }
53    }
54
55    fn is_empty(&self) -> bool {
56        self.inner.is_empty()
57    }
58
59    fn fresh(&self) -> Self {
60        Self::new(self.inner.fresh())
61    }
62
63    fn finish(mut self) -> Self::Output {
64        let key = self.key.take().unwrap();
65        let inner = self.inner.finish();
66        PartitionInnerBuffer { inner, key }
67    }
68
69    fn num_items(&self) -> usize {
70        self.inner.num_items()
71    }
72}
73
74impl<T, K> PartitionInnerBuffer<T, K> {
75    pub const fn new(inner: T, key: K) -> Self {
76        Self { inner, key }
77    }
78
79    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
80    pub fn into_parts(self) -> (T, K) {
81        (self.inner, self.key)
82    }
83}
84
85impl<T, K> Partition<K> for PartitionInnerBuffer<T, K>
86where
87    K: Clone,
88{
89    fn partition(&self) -> K {
90        self.key.clone()
91    }
92}
93
94impl<T: ByteSizeOf, K> ByteSizeOf for PartitionInnerBuffer<T, K> {
95    // This ignores the size of the key, as it does not represent actual data size.
96    fn size_of(&self) -> usize {
97        self.inner.size_of()
98    }
99
100    fn allocated_bytes(&self) -> usize {
101        self.inner.allocated_bytes()
102    }
103}
104
105impl<T: ElementCount, K> ElementCount for PartitionInnerBuffer<T, K> {
106    fn element_count(&self) -> usize {
107        self.inner.element_count()
108    }
109}