vector/sinks/util/buffer/
vec.rs1use bytes::Bytes;
2
3use super::{err_event_too_large, Batch, BatchSize, PushResult};
4
5pub trait EncodedLength {
6 fn encoded_length(&self) -> usize;
7}
8
9#[derive(Clone)]
11pub struct VecBuffer<T> {
12 batch: Option<Vec<T>>,
13 bytes: usize,
14 settings: BatchSize<Self>,
15}
16
17impl<T> VecBuffer<T> {
18 pub const fn new(settings: BatchSize<Self>) -> Self {
19 Self::new_with_settings(settings)
20 }
21
22 const fn new_with_settings(settings: BatchSize<Self>) -> Self {
23 Self {
24 batch: None,
25 bytes: 0,
26 settings,
27 }
28 }
29}
30
31impl<T: EncodedLength> Batch for VecBuffer<T> {
32 type Input = T;
33 type Output = Vec<T>;
34
35 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
36 let new_bytes = self.bytes + item.encoded_length();
37 if self.is_empty() && item.encoded_length() > self.settings.bytes {
38 err_event_too_large(item.encoded_length(), self.settings.bytes)
39 } else if self.num_items() >= self.settings.events || new_bytes > self.settings.bytes {
40 PushResult::Overflow(item)
41 } else {
42 let events = self.settings.events;
43 let batch = self.batch.get_or_insert_with(|| Vec::with_capacity(events));
44 batch.push(item);
45 self.bytes = new_bytes;
46 PushResult::Ok(batch.len() >= self.settings.events || new_bytes >= self.settings.bytes)
47 }
48 }
49
50 fn is_empty(&self) -> bool {
51 self.batch.as_ref().map(Vec::is_empty).unwrap_or(true)
52 }
53
54 fn fresh(&self) -> Self {
55 Self::new_with_settings(self.settings)
56 }
57
58 fn finish(self) -> Self::Output {
59 self.batch.unwrap_or_default()
60 }
61
62 fn num_items(&self) -> usize {
63 self.batch.as_ref().map(Vec::len).unwrap_or(0)
64 }
65}
66
67impl EncodedLength for Bytes {
68 fn encoded_length(&self) -> usize {
69 self.len()
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76 use crate::sinks::util::BatchSettings;
77
78 impl EncodedLength for String {
79 fn encoded_length(&self) -> usize {
80 self.len() + 1
81 }
82 }
83
84 #[test]
85 fn obeys_max_events() {
86 let mut batch_settings = BatchSettings::default();
87 batch_settings.size.events = 2;
88
89 let mut buffer = VecBuffer::new(batch_settings.size);
90 let data = "dummy".to_string();
91
92 assert!(buffer.is_empty());
93 assert_eq!(buffer.num_items(), 0);
94
95 assert_eq!(buffer.push(data.clone()), PushResult::Ok(false));
96 assert!(!buffer.is_empty());
97 assert_eq!(buffer.num_items(), 1);
98
99 assert_eq!(buffer.push(data.clone()), PushResult::Ok(true));
100 assert!(!buffer.is_empty());
101 assert_eq!(buffer.num_items(), 2);
102
103 assert_eq!(buffer.push(data.clone()), PushResult::Overflow(data));
104 assert!(!buffer.is_empty());
105 assert_eq!(buffer.num_items(), 2);
106
107 assert_eq!(buffer.finish().len(), 2);
108 }
109
110 #[test]
111 fn obeys_max_bytes() {
112 let mut batch_settings = BatchSettings::default();
113 batch_settings.size.bytes = 22;
114 batch_settings.size.events = 99;
115
116 let mut buffer = VecBuffer::new(batch_settings.size);
117 let data = "some bytes".to_string();
118
119 assert!(buffer.is_empty());
120 assert_eq!(buffer.num_items(), 0);
121
122 assert_eq!(
123 buffer.push("this record is just too long to be inserted".into()),
124 PushResult::Ok(false)
125 );
126 assert!(buffer.is_empty());
127 assert_eq!(buffer.num_items(), 0);
128
129 assert_eq!(buffer.push(data.clone()), PushResult::Ok(false));
130 assert!(!buffer.is_empty());
131 assert_eq!(buffer.num_items(), 1);
132
133 assert_eq!(buffer.push(data.clone()), PushResult::Ok(true));
134 assert!(!buffer.is_empty());
135 assert_eq!(buffer.num_items(), 2);
136
137 assert_eq!(buffer.push(data.clone()), PushResult::Overflow(data));
138 assert!(!buffer.is_empty());
139 assert_eq!(buffer.num_items(), 2);
140
141 assert_eq!(buffer.finish().len(), 2);
142 }
143}