vector/sinks/util/buffer/
vec.rs

1use bytes::Bytes;
2
3use super::{err_event_too_large, Batch, BatchSize, PushResult};
4
5pub trait EncodedLength {
6    fn encoded_length(&self) -> usize;
7}
8
9/// Note: This has been deprecated, please do not use when creating new Sinks.
10#[derive(Clone)]
11pub struct VecBuffer<T> {
12    batch: Option<Vec<T>>,
13    bytes: usize,
14    settings: BatchSize<Self>,
15}
16
17impl<T> VecBuffer<T> {
18    pub const fn new(settings: BatchSize<Self>) -> Self {
19        Self::new_with_settings(settings)
20    }
21
22    const fn new_with_settings(settings: BatchSize<Self>) -> Self {
23        Self {
24            batch: None,
25            bytes: 0,
26            settings,
27        }
28    }
29}
30
31impl<T: EncodedLength> Batch for VecBuffer<T> {
32    type Input = T;
33    type Output = Vec<T>;
34
35    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
36        let new_bytes = self.bytes + item.encoded_length();
37        if self.is_empty() && item.encoded_length() > self.settings.bytes {
38            err_event_too_large(item.encoded_length(), self.settings.bytes)
39        } else if self.num_items() >= self.settings.events || new_bytes > self.settings.bytes {
40            PushResult::Overflow(item)
41        } else {
42            let events = self.settings.events;
43            let batch = self.batch.get_or_insert_with(|| Vec::with_capacity(events));
44            batch.push(item);
45            self.bytes = new_bytes;
46            PushResult::Ok(batch.len() >= self.settings.events || new_bytes >= self.settings.bytes)
47        }
48    }
49
50    fn is_empty(&self) -> bool {
51        self.batch.as_ref().map(Vec::is_empty).unwrap_or(true)
52    }
53
54    fn fresh(&self) -> Self {
55        Self::new_with_settings(self.settings)
56    }
57
58    fn finish(self) -> Self::Output {
59        self.batch.unwrap_or_default()
60    }
61
62    fn num_items(&self) -> usize {
63        self.batch.as_ref().map(Vec::len).unwrap_or(0)
64    }
65}
66
67impl EncodedLength for Bytes {
68    fn encoded_length(&self) -> usize {
69        self.len()
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76    use crate::sinks::util::BatchSettings;
77
78    impl EncodedLength for String {
79        fn encoded_length(&self) -> usize {
80            self.len() + 1
81        }
82    }
83
84    #[test]
85    fn obeys_max_events() {
86        let mut batch_settings = BatchSettings::default();
87        batch_settings.size.events = 2;
88
89        let mut buffer = VecBuffer::new(batch_settings.size);
90        let data = "dummy".to_string();
91
92        assert!(buffer.is_empty());
93        assert_eq!(buffer.num_items(), 0);
94
95        assert_eq!(buffer.push(data.clone()), PushResult::Ok(false));
96        assert!(!buffer.is_empty());
97        assert_eq!(buffer.num_items(), 1);
98
99        assert_eq!(buffer.push(data.clone()), PushResult::Ok(true));
100        assert!(!buffer.is_empty());
101        assert_eq!(buffer.num_items(), 2);
102
103        assert_eq!(buffer.push(data.clone()), PushResult::Overflow(data));
104        assert!(!buffer.is_empty());
105        assert_eq!(buffer.num_items(), 2);
106
107        assert_eq!(buffer.finish().len(), 2);
108    }
109
110    #[test]
111    fn obeys_max_bytes() {
112        let mut batch_settings = BatchSettings::default();
113        batch_settings.size.bytes = 22;
114        batch_settings.size.events = 99;
115
116        let mut buffer = VecBuffer::new(batch_settings.size);
117        let data = "some bytes".to_string();
118
119        assert!(buffer.is_empty());
120        assert_eq!(buffer.num_items(), 0);
121
122        assert_eq!(
123            buffer.push("this record is just too long to be inserted".into()),
124            PushResult::Ok(false)
125        );
126        assert!(buffer.is_empty());
127        assert_eq!(buffer.num_items(), 0);
128
129        assert_eq!(buffer.push(data.clone()), PushResult::Ok(false));
130        assert!(!buffer.is_empty());
131        assert_eq!(buffer.num_items(), 1);
132
133        assert_eq!(buffer.push(data.clone()), PushResult::Ok(true));
134        assert!(!buffer.is_empty());
135        assert_eq!(buffer.num_items(), 2);
136
137        assert_eq!(buffer.push(data.clone()), PushResult::Overflow(data));
138        assert!(!buffer.is_empty());
139        assert_eq!(buffer.num_items(), 2);
140
141        assert_eq!(buffer.finish().len(), 2);
142    }
143}