vector/sinks/util/buffer/
partition.rs1use vector_lib::ByteSizeOf;
2
3use super::super::{
4 batch::{Batch, BatchConfig, BatchError, PushResult},
5 ElementCount,
6};
7use crate::sinks::util::{Merged, SinkBatchSettings};
8
9pub trait Partition<K> {
10 fn partition(&self) -> K;
11}
12#[derive(Debug)]
13pub struct PartitionBuffer<T, K> {
14 inner: T,
15 key: Option<K>,
16}
17
18#[derive(Debug, Clone)]
19pub struct PartitionInnerBuffer<T, K> {
20 pub(self) inner: T,
21 key: K,
22}
23
24impl<T, K> PartitionBuffer<T, K> {
25 pub const fn new(inner: T) -> Self {
26 Self { inner, key: None }
27 }
28}
29
30impl<T, K> Batch for PartitionBuffer<T, K>
31where
32 T: Batch,
33 K: Clone,
34{
35 type Input = PartitionInnerBuffer<T::Input, K>;
36 type Output = PartitionInnerBuffer<T::Output, K>;
37
38 fn get_settings_defaults<D: SinkBatchSettings + Clone>(
39 config: BatchConfig<D, Merged>,
40 ) -> Result<BatchConfig<D, Merged>, BatchError> {
41 T::get_settings_defaults(config)
42 }
43
44 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
45 let key = item.key;
46 match self.inner.push(item.inner) {
47 PushResult::Ok(full) => {
48 self.key = Some(key);
49 PushResult::Ok(full)
50 }
51 PushResult::Overflow(inner) => PushResult::Overflow(Self::Input { inner, key }),
52 }
53 }
54
55 fn is_empty(&self) -> bool {
56 self.inner.is_empty()
57 }
58
59 fn fresh(&self) -> Self {
60 Self::new(self.inner.fresh())
61 }
62
63 fn finish(mut self) -> Self::Output {
64 let key = self.key.take().unwrap();
65 let inner = self.inner.finish();
66 PartitionInnerBuffer { inner, key }
67 }
68
69 fn num_items(&self) -> usize {
70 self.inner.num_items()
71 }
72}
73
74impl<T, K> PartitionInnerBuffer<T, K> {
75 pub const fn new(inner: T, key: K) -> Self {
76 Self { inner, key }
77 }
78
79 #[allow(clippy::missing_const_for_fn)] pub fn into_parts(self) -> (T, K) {
81 (self.inner, self.key)
82 }
83}
84
85impl<T, K> Partition<K> for PartitionInnerBuffer<T, K>
86where
87 K: Clone,
88{
89 fn partition(&self) -> K {
90 self.key.clone()
91 }
92}
93
94impl<T: ByteSizeOf, K> ByteSizeOf for PartitionInnerBuffer<T, K> {
95 fn size_of(&self) -> usize {
97 self.inner.size_of()
98 }
99
100 fn allocated_bytes(&self) -> usize {
101 self.inner.allocated_bytes()
102 }
103}
104
105impl<T: ElementCount, K> ElementCount for PartitionInnerBuffer<T, K> {
106 fn element_count(&self) -> usize {
107 self.inner.element_count()
108 }
109}