file_source/buffer.rs
1use std::{
2 cmp::min,
3 io::{self, BufRead},
4};
5
6use bstr::Finder;
7use bytes::BytesMut;
8
9use crate::FilePosition;
10
11pub struct ReadResult {
12 pub successfully_read: Option<usize>,
13 pub discarded_for_size_and_truncated: Vec<BytesMut>,
14}
15
16/// Read up to `max_size` bytes from `reader`, splitting by `delim`
17///
18/// The function reads up to `max_size` bytes from `reader`, splitting the input
19/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
20/// are read then the reader is polled until `delim` is found and the results
21/// are discarded. Else, the result is written into `buf`.
22///
23/// The return is unusual. In the Err case this function has not written into
24/// `buf` and the caller should not examine its contents. In the Ok case if the
25/// inner value is None the caller should retry the call as the buffering read
26/// hit the end of the buffer but did not find a `delim` yet, indicating that
27/// we've sheered a write or that there were no bytes available in the `reader`
28/// and the `reader` was very sure about it. If the inner value is Some the
29/// interior `usize` is the number of bytes written into `buf`.
30///
31/// Tweak of
32/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
33///
34/// # Performance
35///
36/// Benchmarks indicate that this function processes in the high single-digit
37/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
38/// the overhead of setup dominates our benchmarks.
39pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
40 reader: &'a mut R,
41 position: &'a mut FilePosition,
42 delim: &'a [u8],
43 buf: &'a mut BytesMut,
44 max_size: usize,
45) -> io::Result<ReadResult> {
46 let mut total_read = 0;
47 let mut discarding = false;
48 let delim_finder = Finder::new(delim);
49 let delim_len = delim.len();
50 let mut discarded_for_size_and_truncated = Vec::new();
51 loop {
52 let available: &[u8] = match reader.fill_buf() {
53 Ok(n) => n,
54 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
55 Err(e) => return Err(e),
56 };
57
58 let (done, used) = {
59 match delim_finder.find(available) {
60 Some(i) => {
61 if !discarding {
62 buf.extend_from_slice(&available[..i]);
63 }
64 (true, i + delim_len)
65 }
66 None => {
67 if !discarding {
68 buf.extend_from_slice(available);
69 }
70 (false, available.len())
71 }
72 }
73 };
74 reader.consume(used);
75 *position += used as u64; // do this at exactly same time
76 total_read += used;
77
78 if !discarding && buf.len() > max_size {
79 // keep only the first <1k bytes to make sure we can actually emit a usable error
80 let length_to_keep = min(1000, max_size);
81 let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
82 truncated.copy_from_slice(&buf[0..length_to_keep]);
83 discarded_for_size_and_truncated.push(truncated);
84 discarding = true;
85 }
86
87 if done {
88 if !discarding {
89 return Ok(ReadResult {
90 successfully_read: Some(total_read),
91 discarded_for_size_and_truncated,
92 });
93 } else {
94 discarding = false;
95 buf.clear();
96 }
97 } else if used == 0 {
98 // We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes
99 // us to observe an incomplete write. We return None here and let the loop continue
100 // next time the method is called. This is safe because the buffer is specific to this
101 // FileWatcher.
102 return Ok(ReadResult {
103 successfully_read: None,
104 discarded_for_size_and_truncated,
105 });
106 }
107 }
108}
109
110#[cfg(test)]
111mod test {
112 use std::{io::Cursor, num::NonZeroU8, ops::Range};
113
114 use bytes::{BufMut, BytesMut};
115 use quickcheck::{QuickCheck, TestResult};
116
117 use crate::buffer::ReadResult;
118
119 use super::read_until_with_max_size;
120
121 fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
122 // The `global_data` is the view of `chunks` as a single contiguous
123 // block of memory. Where `chunks` simulates what happens when bytes are
124 // fitfully available `global_data` is the view of all chunks assembled
125 // after every byte is available.
126 let mut global_data = BytesMut::new();
127
128 // `DelimDetails` describes the nature of each delimiter found in the
129 // `chunks`.
130 #[derive(Clone)]
131 struct DelimDetails {
132 /// Index in `global_data`, absolute offset
133 global_index: usize,
134 /// Index in each `chunk`, relative offset
135 interior_index: usize,
136 /// Whether this delimiter was within `max_size` of its previous
137 /// peer
138 within_max_size: bool,
139 /// Which chunk in `chunks` this delimiter was found in
140 chunk_index: usize,
141 /// The range of bytes that this delimiter bounds with its previous
142 /// peer
143 byte_range: Range<usize>,
144 }
145
146 // Move through the `chunks` and discover at what positions an instance
147 // of `delim` exists in the chunk stream and whether that `delim` is
148 // more than `max_size` bytes away from the previous `delim`. This loop
149 // constructs the `facts` vector that holds `DelimDetails` instances and
150 // also populates `global_data`.
151 let mut facts: Vec<DelimDetails> = Vec::new();
152 let mut global_index: usize = 0;
153 let mut previous_offset: usize = 0;
154 for (i, chunk) in chunks.iter().enumerate() {
155 for (interior_index, byte) in chunk.iter().enumerate() {
156 global_data.put_u8(*byte);
157 if *byte == delim {
158 let span = global_index - previous_offset;
159 let within_max_size = span <= max_size.get() as usize;
160 facts.push(DelimDetails {
161 global_index,
162 within_max_size,
163 chunk_index: i,
164 interior_index,
165 byte_range: (previous_offset..global_index),
166 });
167 previous_offset = global_index + 1;
168 }
169 global_index += 1;
170 }
171 }
172
173 // Our model is only concerned with the first valid delimiter in the
174 // chunk stream. As such, discover that first valid delimiter and record
175 // it specially.
176 let mut first_delim: Option<DelimDetails> = None;
177 for fact in &facts {
178 if fact.within_max_size {
179 first_delim = Some(fact.clone());
180 break;
181 }
182 }
183
184 let mut position = 0;
185 let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
186 // NOTE: The delimiter may be multiple bytes wide but for the purpose of
187 // this model a single byte does well enough.
188 let delimiter: [u8; 1] = [delim];
189 for (idx, chunk) in chunks.iter().enumerate() {
190 let mut reader = Cursor::new(&chunk);
191
192 match read_until_with_max_size(
193 &mut reader,
194 &mut position,
195 &delimiter,
196 &mut buffer,
197 max_size.get() as usize,
198 )
199 .unwrap()
200 {
201 ReadResult {
202 successfully_read: None,
203 discarded_for_size_and_truncated: _,
204 } => {
205 // Subject only returns None if this is the last chunk _and_
206 // the chunk did not contain a delimiter _or_ the delimiter
207 // was outside the max_size range _or_ the current chunk is empty.
208 let has_valid_delimiter = facts
209 .iter()
210 .any(|details| ((details.chunk_index == idx) && details.within_max_size));
211 assert!(chunk.is_empty() || !has_valid_delimiter)
212 }
213 ReadResult {
214 successfully_read: Some(total_read),
215 discarded_for_size_and_truncated: _,
216 } => {
217 // Now that the function has returned we confirm that the
218 // returned details match our `first_delim` and also that
219 // the `buffer` is populated correctly.
220 assert!(first_delim.is_some());
221 assert_eq!(
222 first_delim.clone().unwrap().global_index + 1,
223 position as usize
224 );
225 assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
226 assert_eq!(
227 buffer.get(..),
228 global_data.get(first_delim.unwrap().byte_range)
229 );
230 break;
231 }
232 }
233 }
234
235 TestResult::passed()
236 }
237
238 #[test]
239 fn qc_read_until_with_max_size() {
240 // The `read_until_with_max` function is intended to be called
241 // multiple times until it returns Ok(Some(usize)). The function
242 // should never return error in this test. If the return is None we
243 // will call until it is not. Once return is Some the interior value
244 // should be the position of the first delim in the buffer, unless
245 // that delim is past the max_size barrier in which case it will be
246 // the position of the first delim that is within some multiple of
247 // max_size.
248 //
249 // I think I will adjust the function to have a richer return
250 // type. This will help in the transition to async.
251 QuickCheck::new()
252 .tests(1_000)
253 .max_tests(2_000)
254 .quickcheck(qc_inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
255 }
256}