file_source_common/buffer.rs
1use crate::FilePosition;
2use std::{cmp::min, io, pin::Pin};
3
4use bstr::Finder;
5use bytes::BytesMut;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7
8pub struct ReadResult {
9 pub successfully_read: Option<usize>,
10 pub discarded_for_size_and_truncated: Vec<BytesMut>,
11}
12
13/// Read up to `max_size` bytes from `reader`, splitting by `delim`
14///
15/// The function reads up to `max_size` bytes from `reader`, splitting the input
16/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
17/// are read then the reader is polled until `delim` is found and the results
18/// are discarded. Else, the result is written into `buf`.
19///
20/// The return is unusual. In the Err case this function has not written into
21/// `buf` and the caller should not examine its contents. In the Ok case if the
22/// inner value is None the caller should retry the call as the buffering read
23/// hit the end of the buffer but did not find a `delim` yet, indicating that
24/// we've sheered a write or that there were no bytes available in the `reader`
25/// and the `reader` was very sure about it. If the inner value is Some the
26/// interior `usize` is the number of bytes written into `buf`.
27///
28/// Tweak of
29/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
30///
31/// # Performance
32///
33/// Benchmarks indicate that this function processes in the high single-digit
34/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
35/// the overhead of setup dominates our benchmarks.
36pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
37 reader: Pin<Box<&'a mut R>>,
38 position: &'a mut FilePosition,
39 delim: &'a [u8],
40 buf: &'a mut BytesMut,
41 max_size: usize,
42) -> io::Result<ReadResult> {
43 let mut total_read = 0;
44 let mut discarding = false;
45 let delim_finder = Finder::new(delim);
46 let delim_len = delim.len();
47 let mut discarded_for_size_and_truncated = Vec::new();
48 let mut reader = Box::new(reader);
49 loop {
50 let available: &[u8] = match reader.fill_buf().await {
51 Ok(n) => n,
52 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
53 Err(e) => return Err(e),
54 };
55
56 let (done, used) = {
57 match delim_finder.find(available) {
58 Some(i) => {
59 if !discarding {
60 buf.extend_from_slice(&available[..i]);
61 }
62 (true, i + delim_len)
63 }
64 None => {
65 if !discarding {
66 buf.extend_from_slice(available);
67 }
68 (false, available.len())
69 }
70 }
71 };
72 reader.consume(used);
73 *position += used as u64; // do this at exactly same time
74 total_read += used;
75
76 if !discarding && buf.len() > max_size {
77 // keep only the first <1k bytes to make sure we can actually emit a usable error
78 let length_to_keep = min(1000, max_size);
79 let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
80 truncated.copy_from_slice(&buf[0..length_to_keep]);
81 discarded_for_size_and_truncated.push(truncated);
82 discarding = true;
83 }
84
85 if done {
86 if !discarding {
87 return Ok(ReadResult {
88 successfully_read: Some(total_read),
89 discarded_for_size_and_truncated,
90 });
91 } else {
92 discarding = false;
93 buf.clear();
94 }
95 } else if used == 0 {
96 // We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes
97 // us to observe an incomplete write. We return None here and let the loop continue
98 // next time the method is called. This is safe because the buffer is specific to this
99 // FileWatcher.
100 return Ok(ReadResult {
101 successfully_read: None,
102 discarded_for_size_and_truncated,
103 });
104 }
105 }
106}
107
108#[cfg(test)]
109mod test {
110 use std::{io::Cursor, num::NonZeroU8, ops::Range};
111
112 use bytes::{BufMut, BytesMut};
113 use quickcheck::{QuickCheck, TestResult};
114 use tokio::io::BufReader;
115
116 use super::read_until_with_max_size;
117 use crate::buffer::ReadResult;
118
119 async fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
120 // The `global_data` is the view of `chunks` as a single contiguous
121 // block of memory. Where `chunks` simulates what happens when bytes are
122 // fitfully available `global_data` is the view of all chunks assembled
123 // after every byte is available.
124 let mut global_data = BytesMut::new();
125
126 // `DelimDetails` describes the nature of each delimiter found in the
127 // `chunks`.
128 #[derive(Clone)]
129 struct DelimDetails {
130 /// Index in `global_data`, absolute offset
131 global_index: usize,
132 /// Index in each `chunk`, relative offset
133 interior_index: usize,
134 /// Whether this delimiter was within `max_size` of its previous
135 /// peer
136 within_max_size: bool,
137 /// Which chunk in `chunks` this delimiter was found in
138 chunk_index: usize,
139 /// The range of bytes that this delimiter bounds with its previous
140 /// peer
141 byte_range: Range<usize>,
142 }
143
144 // Move through the `chunks` and discover at what positions an instance
145 // of `delim` exists in the chunk stream and whether that `delim` is
146 // more than `max_size` bytes away from the previous `delim`. This loop
147 // constructs the `facts` vector that holds `DelimDetails` instances and
148 // also populates `global_data`.
149 let mut facts: Vec<DelimDetails> = Vec::new();
150 let mut global_index: usize = 0;
151 let mut previous_offset: usize = 0;
152 for (i, chunk) in chunks.iter().enumerate() {
153 for (interior_index, byte) in chunk.iter().enumerate() {
154 global_data.put_u8(*byte);
155 if *byte == delim {
156 let span = global_index - previous_offset;
157 let within_max_size = span <= max_size.get() as usize;
158 facts.push(DelimDetails {
159 global_index,
160 within_max_size,
161 chunk_index: i,
162 interior_index,
163 byte_range: (previous_offset..global_index),
164 });
165 previous_offset = global_index + 1;
166 }
167 global_index += 1;
168 }
169 }
170
171 // Our model is only concerned with the first valid delimiter in the
172 // chunk stream. As such, discover that first valid delimiter and record
173 // it specially.
174 let mut first_delim: Option<DelimDetails> = None;
175 for fact in &facts {
176 if fact.within_max_size {
177 first_delim = Some(fact.clone());
178 break;
179 }
180 }
181
182 let mut position = 0;
183 let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
184 // NOTE: The delimiter may be multiple bytes wide but for the purpose of
185 // this model a single byte does well enough.
186 let delimiter: [u8; 1] = [delim];
187 for (idx, chunk) in chunks.iter().enumerate() {
188 let mut reader = BufReader::new(Cursor::new(&chunk));
189
190 match read_until_with_max_size(
191 Box::pin(&mut reader),
192 &mut position,
193 &delimiter,
194 &mut buffer,
195 max_size.get() as usize,
196 )
197 .await
198 .unwrap()
199 {
200 ReadResult {
201 successfully_read: None,
202 discarded_for_size_and_truncated: _,
203 } => {
204 // Subject only returns None if this is the last chunk _and_
205 // the chunk did not contain a delimiter _or_ the delimiter
206 // was outside the max_size range _or_ the current chunk is empty.
207 let has_valid_delimiter = facts
208 .iter()
209 .any(|details| ((details.chunk_index == idx) && details.within_max_size));
210 assert!(chunk.is_empty() || !has_valid_delimiter)
211 }
212 ReadResult {
213 successfully_read: Some(total_read),
214 discarded_for_size_and_truncated: _,
215 } => {
216 // Now that the function has returned we confirm that the
217 // returned details match our `first_delim` and also that
218 // the `buffer` is populated correctly.
219 assert!(first_delim.is_some());
220 assert_eq!(
221 first_delim.clone().unwrap().global_index + 1,
222 position as usize
223 );
224 assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
225 assert_eq!(
226 buffer.get(..),
227 global_data.get(first_delim.unwrap().byte_range)
228 );
229 break;
230 }
231 }
232 }
233
234 TestResult::passed()
235 }
236
237 #[tokio::test]
238 async fn qc_read_until_with_max_size() {
239 // The `read_until_with_max` function is intended to be called
240 // multiple times until it returns Ok(Some(usize)). The function
241 // should never return error in this test. If the return is None we
242 // will call until it is not. Once return is Some the interior value
243 // should be the position of the first delim in the buffer, unless
244 // that delim is past the max_size barrier in which case it will be
245 // the position of the first delim that is within some multiple of
246 // max_size.
247 //
248 // I think I will adjust the function to have a richer return
249 // type. This will help in the transition to async.
250 fn inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
251 let handle = tokio::runtime::Handle::current();
252 handle.block_on(qc_inner(chunks, delim, max_size));
253 TestResult::passed()
254 }
255
256 tokio::task::spawn_blocking(|| {
257 QuickCheck::new()
258 .tests(1_000)
259 .max_tests(2_000)
260 .quickcheck(inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
261 })
262 .await
263 .unwrap()
264 }
265}