file_source_common/buffer.rs
1use crate::FilePosition;
2use std::{cmp::min, io, pin::Pin};
3
4use bstr::Finder;
5use bytes::BytesMut;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7
8pub struct ReadResult {
9 pub successfully_read: Option<usize>,
10 pub discarded_for_size_and_truncated: Vec<BytesMut>,
11}
12
13/// Read up to `max_size` bytes from `reader`, splitting by `delim`
14///
15/// The function reads up to `max_size` bytes from `reader`, splitting the input
16/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
17/// are read then the reader is polled until `delim` is found and the results
18/// are discarded. Else, the result is written into `buf`.
19///
20/// The return is unusual. In the Err case this function has not written into
21/// `buf` and the caller should not examine its contents. In the Ok case if the
22/// inner value is None the caller should retry the call as the buffering read
23/// hit the end of the buffer but did not find a `delim` yet, indicating that
24/// we've sheered a write or that there were no bytes available in the `reader`
25/// and the `reader` was very sure about it. If the inner value is Some the
26/// interior `usize` is the number of bytes written into `buf`.
27///
28/// Tweak of
29/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
30///
31/// # Performance
32///
33/// Benchmarks indicate that this function processes in the high single-digit
34/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
35/// the overhead of setup dominates our benchmarks.
36pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
37 reader: Pin<Box<&'a mut R>>,
38 position: &'a mut FilePosition,
39 delim: &'a [u8],
40 buf: &'a mut BytesMut,
41 max_size: usize,
42) -> io::Result<ReadResult> {
43 let mut total_read = 0;
44 let mut discarding = false;
45 let delim_finder = Finder::new(delim);
46 let delim_len = delim.len();
47 let mut discarded_for_size_and_truncated = Vec::new();
48 let mut reader = Box::new(reader);
49
50 // Used to track partial delimiter matches across buffer boundaries.
51 // Data is read in chunks from the reader (see `fill_buf` below).
52 // A multi-byte delimiter may be split across the "old" and "new" buffers.
53 // Any potential partial delimiter that was found in the "old" buffer is stored in this variable.
54 let mut partial_delim: BytesMut = BytesMut::with_capacity(delim_len);
55
56 loop {
57 // Read the next chunk of data
58 let available: &[u8] = match reader.fill_buf().await {
59 Ok(n) => n,
60 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
61 Err(e) => return Err(e),
62 };
63
64 // First, check if we have a partial delimiter from the previous iteration/buffer
65 if !partial_delim.is_empty() {
66 let expected_suffix = &delim[partial_delim.len()..];
67 let expected_suffix_len = expected_suffix.len();
68
69 // We already know that we have a partial delimiter match from the previous buffer.
70 // Here we check what part of the delimiter is missing and whether the new buffer
71 // contains the remaining part.
72 if available.len() >= expected_suffix_len
73 && &available[..expected_suffix_len] == expected_suffix
74 {
75 // Complete delimiter found! Consume the remainder of the delimiter so we can start
76 // processing data after the delimiter.
77 reader.consume(expected_suffix_len);
78 *position += expected_suffix_len as u64;
79 total_read += expected_suffix_len;
80 partial_delim.clear();
81
82 // Found a complete delimiter, return the current buffer so we can proceed with the
83 // next record after this delimiter in the next call.
84 return Ok(ReadResult {
85 successfully_read: Some(total_read),
86 discarded_for_size_and_truncated,
87 });
88 } else {
89 // Not a complete delimiter after all.
90 // Add partial_delim to output buffer as it is actual data.
91 if !discarding {
92 buf.extend_from_slice(&partial_delim);
93 }
94 partial_delim.clear();
95 // Continue processing current available buffer
96 }
97 }
98
99 let (done, used) = {
100 match delim_finder.find(available) {
101 Some(i) => {
102 if !discarding {
103 buf.extend_from_slice(&available[..i]);
104 }
105 (true, i + delim_len)
106 }
107 None => {
108 // No delimiter found in current buffer. But there could be a partial delimiter
109 // at the end of this buffer. For multi-byte delimiters like \r\n, we need
110 // to handle the case where the delimiter is split across buffer boundaries
111 // (e.g. \r in the "old" buffer, then we read new data and find \n in the new
112 // buffer).
113 let mut partial_match_len = 0;
114
115 // We only need to check if we're not already at the end of the buffer and if we
116 // have a delimiter that has more than one byte.
117 if !available.is_empty() && delim_len > 1 {
118 // Check if the end of the current buffer matches a prefix of the delimiter
119 // by testing from longest to shortest possible prefix.
120 //
121 // This loop runs at most (delim_len - 1) iterations:
122 // - 2-byte delimiter (\r\n): 1 iteration max
123 // - 5-byte delimiter: 4 iterations max
124 //
125 // This part of the code is only called if all of these are true:
126 //
127 // - We have a new buffer (e.g. every 8kB, i.e. only called once per buffer)
128 // - We have a multi-byte delimiter
129 // - This delimiter could not be found in the current buffer
130 //
131 // Even for longer delimiters the performance impact is negligible.
132 //
133 // Example 1:
134 // Delimiter: \r\n
135 // Iteration 1: It checks if the current buffer ends with "\r",
136 // if it does we have a potential partial delimiter.
137 // The next chunk will confirm whether this is truly part of a delimiter.
138
139 // Example 2:
140 // Delimiter: ABCDE
141 // Iteration 1: It checks if the current buffer ends with "ABCD" (we don't
142 // need to check "ABCDE" because that would have been caught by
143 // `delim_finder.find` earlier)
144 // Iteration 2: It checks if the current buffer ends with "ABC"
145 // Iterations 3-4: Same for "AB" and "A"
146 for prefix_len in (1..delim_len).rev() {
147 if available.len() >= prefix_len
148 && available.ends_with(&delim[..prefix_len])
149 {
150 partial_match_len = prefix_len;
151 break;
152 }
153 }
154 }
155
156 let bytes_to_copy = available.len() - partial_match_len;
157
158 if !discarding && bytes_to_copy > 0 {
159 buf.extend_from_slice(&available[..bytes_to_copy]);
160 }
161
162 // If we found a potential partial delimiter, save it for the next iteration
163 if partial_match_len > 0 {
164 partial_delim.clear();
165 partial_delim.extend_from_slice(&available[bytes_to_copy..]);
166 }
167
168 (false, available.len())
169 }
170 }
171 };
172
173 // Check if we're at EOF before we start processing
174 // (for borrow checker, has to come before `consume`)
175 let at_eof = available.is_empty();
176
177 reader.consume(used);
178 *position += used as u64; // do this at exactly same time
179 total_read += used;
180
181 if !discarding && buf.len() > max_size {
182 // keep only the first <1k bytes to make sure we can actually emit a usable error
183 let length_to_keep = min(1000, max_size);
184 let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
185 truncated.copy_from_slice(&buf[0..length_to_keep]);
186 discarded_for_size_and_truncated.push(truncated);
187 discarding = true;
188 }
189
190 if done {
191 if !discarding {
192 return Ok(ReadResult {
193 successfully_read: Some(total_read),
194 discarded_for_size_and_truncated,
195 });
196 } else {
197 discarding = false;
198 buf.clear();
199 }
200 } else if used == 0 && at_eof {
201 // We've hit EOF but haven't seen a delimiter. This can happen when:
202 // 1. The file ends without a trailing delimiter
203 // 2. We're observing an incomplete write
204 //
205 // Return None to signal the caller to retry later.
206 return Ok(ReadResult {
207 successfully_read: None,
208 discarded_for_size_and_truncated,
209 });
210 }
211 }
212}
213
214#[cfg(test)]
215mod test {
216 use std::{io::Cursor, num::NonZeroU8, ops::Range};
217
218 use bytes::{BufMut, BytesMut};
219 use quickcheck::{QuickCheck, TestResult};
220 use tokio::io::BufReader;
221
222 use super::read_until_with_max_size;
223 use crate::buffer::ReadResult;
224
225 async fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
226 // The `global_data` is the view of `chunks` as a single contiguous
227 // block of memory. Where `chunks` simulates what happens when bytes are
228 // fitfully available `global_data` is the view of all chunks assembled
229 // after every byte is available.
230 let mut global_data = BytesMut::new();
231
232 // `DelimDetails` describes the nature of each delimiter found in the
233 // `chunks`.
234 #[derive(Clone)]
235 struct DelimDetails {
236 /// Index in `global_data`, absolute offset
237 global_index: usize,
238 /// Index in each `chunk`, relative offset
239 interior_index: usize,
240 /// Whether this delimiter was within `max_size` of its previous
241 /// peer
242 within_max_size: bool,
243 /// Which chunk in `chunks` this delimiter was found in
244 chunk_index: usize,
245 /// The range of bytes that this delimiter bounds with its previous
246 /// peer
247 byte_range: Range<usize>,
248 }
249
250 // Move through the `chunks` and discover at what positions an instance
251 // of `delim` exists in the chunk stream and whether that `delim` is
252 // more than `max_size` bytes away from the previous `delim`. This loop
253 // constructs the `facts` vector that holds `DelimDetails` instances and
254 // also populates `global_data`.
255 let mut facts: Vec<DelimDetails> = Vec::new();
256 let mut global_index: usize = 0;
257 let mut previous_offset: usize = 0;
258 for (i, chunk) in chunks.iter().enumerate() {
259 for (interior_index, byte) in chunk.iter().enumerate() {
260 global_data.put_u8(*byte);
261 if *byte == delim {
262 let span = global_index - previous_offset;
263 let within_max_size = span <= max_size.get() as usize;
264 facts.push(DelimDetails {
265 global_index,
266 within_max_size,
267 chunk_index: i,
268 interior_index,
269 byte_range: (previous_offset..global_index),
270 });
271 previous_offset = global_index + 1;
272 }
273 global_index += 1;
274 }
275 }
276
277 // Our model is only concerned with the first valid delimiter in the
278 // chunk stream. As such, discover that first valid delimiter and record
279 // it specially.
280 let mut first_delim: Option<DelimDetails> = None;
281 for fact in &facts {
282 if fact.within_max_size {
283 first_delim = Some(fact.clone());
284 break;
285 }
286 }
287
288 let mut position = 0;
289 let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
290 // NOTE: The delimiter may be multiple bytes wide but for the purpose of
291 // this model a single byte does well enough.
292 let delimiter: [u8; 1] = [delim];
293 for (idx, chunk) in chunks.iter().enumerate() {
294 let mut reader = BufReader::new(Cursor::new(&chunk));
295
296 match read_until_with_max_size(
297 Box::pin(&mut reader),
298 &mut position,
299 &delimiter,
300 &mut buffer,
301 max_size.get() as usize,
302 )
303 .await
304 .unwrap()
305 {
306 ReadResult {
307 successfully_read: None,
308 discarded_for_size_and_truncated: _,
309 } => {
310 // Subject only returns None if this is the last chunk _and_
311 // the chunk did not contain a delimiter _or_ the delimiter
312 // was outside the max_size range _or_ the current chunk is empty.
313 let has_valid_delimiter = facts
314 .iter()
315 .any(|details| (details.chunk_index == idx) && details.within_max_size);
316 assert!(chunk.is_empty() || !has_valid_delimiter)
317 }
318 ReadResult {
319 successfully_read: Some(total_read),
320 discarded_for_size_and_truncated: _,
321 } => {
322 // Now that the function has returned we confirm that the
323 // returned details match our `first_delim` and also that
324 // the `buffer` is populated correctly.
325 assert!(first_delim.is_some());
326 assert_eq!(
327 first_delim.clone().unwrap().global_index + 1,
328 position as usize
329 );
330 assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
331 assert_eq!(
332 buffer.get(..),
333 global_data.get(first_delim.unwrap().byte_range)
334 );
335 break;
336 }
337 }
338 }
339
340 TestResult::passed()
341 }
342
343 #[tokio::test]
344 async fn qc_read_until_with_max_size() {
345 // The `read_until_with_max` function is intended to be called
346 // multiple times until it returns Ok(Some(usize)). The function
347 // should never return error in this test. If the return is None we
348 // will call until it is not. Once return is Some the interior value
349 // should be the position of the first delim in the buffer, unless
350 // that delim is past the max_size barrier in which case it will be
351 // the position of the first delim that is within some multiple of
352 // max_size.
353 //
354 // I think I will adjust the function to have a richer return
355 // type. This will help in the transition to async.
356 fn inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
357 let handle = tokio::runtime::Handle::current();
358 handle.block_on(qc_inner(chunks, delim, max_size));
359 TestResult::passed()
360 }
361
362 tokio::task::spawn_blocking(|| {
363 QuickCheck::new()
364 .tests(1_000)
365 .max_tests(2_000)
366 .quickcheck(inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
367 })
368 .await
369 .unwrap()
370 }
371
372 /// Generic test helper that tests delimiter splits across buffer boundaries
373 /// for any delimiter length. This function:
374 /// 1. Creates test data with delimiters positioned to split at buffer boundaries
375 /// 2. Tests multiple iterations to ensure state tracking works correctly
376 /// 3. Verifies all lines are correctly separated without merging
377 async fn test_delimiter_boundary_split_helper(delimiter: &[u8], num_lines: usize) {
378 let delimiter_len = delimiter.len();
379
380 // Use a buffer capacity that will force splits
381 // We'll position delimiters to split at this boundary
382 let buffer_capacity = 10;
383
384 // Build test data where each delimiter is positioned to split across buffer boundary
385 // Strategy: For each line, calculate position so delimiter starts at boundary - (delimiter_len - 1)
386 let mut data = Vec::new();
387 let mut expected_lines = Vec::new();
388
389 for i in 0..num_lines {
390 // Create line content that positions the delimiter to split at buffer boundary
391 // We want the delimiter to straddle a buffer_capacity boundary
392
393 // Calculate how many bytes until the next buffer boundary
394 let current_pos = data.len();
395 let bytes_until_boundary = buffer_capacity - (current_pos % buffer_capacity);
396
397 // Create line content that will position delimiter to split
398 // We want (delimiter_len - 1) bytes before boundary, then 1 byte after
399 let line_content = if bytes_until_boundary > delimiter_len {
400 let content_len = bytes_until_boundary - (delimiter_len - 1);
401 format!("line{:0width$}", i, width = content_len.saturating_sub(4)).into_bytes()
402 } else {
403 // Not enough room in this buffer, pad to next boundary
404 let padding = bytes_until_boundary;
405 let extra_content = buffer_capacity - (delimiter_len - 1);
406 let mut content = vec![b'X'; padding];
407 content.extend_from_slice(
408 format!("L{:0width$}", i, width = extra_content.saturating_sub(1)).as_bytes(),
409 );
410 content
411 };
412
413 expected_lines.push(line_content.clone());
414 data.extend_from_slice(&line_content);
415 data.extend_from_slice(delimiter);
416 }
417
418 // Now test reading this data
419 let cursor = Cursor::new(data);
420 let mut reader = BufReader::with_capacity(buffer_capacity, cursor);
421 let mut position = 0;
422 let max_size = 1024;
423
424 // Read each line and verify it matches expected
425 for (i, expected_line) in expected_lines.iter().enumerate() {
426 let mut buffer = BytesMut::new();
427 let result = read_until_with_max_size(
428 Box::pin(&mut reader),
429 &mut position,
430 delimiter,
431 &mut buffer,
432 max_size,
433 )
434 .await
435 .unwrap();
436
437 assert_eq!(
438 buffer.as_ref(),
439 expected_line.as_slice(),
440 "Line {} should match expected content. Got: {:?}, Expected: {:?}",
441 i,
442 String::from_utf8_lossy(&buffer),
443 String::from_utf8_lossy(expected_line)
444 );
445
446 assert!(
447 result.successfully_read.is_some(),
448 "Should find delimiter for line {}",
449 i
450 );
451 }
452 }
453
454 #[tokio::test]
455 async fn test_single_byte_delimiter_boundary() {
456 // Test single-byte delimiter (should work without any special handling)
457 test_delimiter_boundary_split_helper(b"\n", 5).await;
458 }
459
460 #[tokio::test]
461 async fn test_two_byte_delimiter_boundary() {
462 // Test two-byte delimiter (CRLF case)
463 test_delimiter_boundary_split_helper(b"\r\n", 5).await;
464 }
465
466 #[tokio::test]
467 async fn test_three_byte_delimiter_boundary() {
468 test_delimiter_boundary_split_helper(b"|||", 5).await;
469 }
470
471 #[tokio::test]
472 async fn test_four_byte_delimiter_boundary() {
473 test_delimiter_boundary_split_helper(b"<|>|", 5).await;
474 }
475
476 #[tokio::test]
477 async fn test_five_byte_delimiter_boundary() {
478 test_delimiter_boundary_split_helper(b"<<>>>", 5).await;
479 }
480}