file_source_common/buffer.rs
1use crate::FilePosition;
2use std::{cmp::min, io};
3
4use bstr::Finder;
5use bytes::BytesMut;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7
8pub struct ReadResult {
9 pub successfully_read: Option<usize>,
10 pub discarded_for_size_and_truncated: Vec<BytesMut>,
11}
12
13/// Read up to `max_size` bytes from `reader`, splitting by `delim`
14///
15/// The function reads up to `max_size` bytes from `reader`, splitting the input
16/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
17/// are read then the reader is polled until `delim` is found and the results
18/// are discarded. Else, the result is written into `buf`.
19///
20/// The return is unusual. In the Err case this function has not written into
21/// `buf` and the caller should not examine its contents. In the Ok case if the
22/// inner value is None the caller should retry the call as the buffering read
23/// hit the end of the buffer but did not find a `delim` yet, indicating that
24/// we've sheered a write or that there were no bytes available in the `reader`
25/// and the `reader` was very sure about it. If the inner value is Some the
26/// interior `usize` is the number of bytes written into `buf`.
27///
28/// Tweak of
29/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
30///
31/// # Performance
32///
33/// Benchmarks indicate that this function processes in the high single-digit
34/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
35/// the overhead of setup dominates our benchmarks.
36pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
37 reader: &'a mut R,
38 position: &'a mut FilePosition,
39 delim: &'a [u8],
40 buf: &'a mut BytesMut,
41 max_size: usize,
42) -> io::Result<ReadResult> {
43 let mut total_read = 0;
44 let mut discarding = false;
45 let delim_finder = Finder::new(delim);
46 let delim_len = delim.len();
47 let mut discarded_for_size_and_truncated = Vec::new();
48
49 // Used to track partial delimiter matches across buffer boundaries.
50 // Data is read in chunks from the reader (see `fill_buf` below).
51 // A multi-byte delimiter may be split across the "old" and "new" buffers.
52 // Any potential partial delimiter that was found in the "old" buffer is stored in this variable.
53 let mut partial_delim: BytesMut = BytesMut::with_capacity(delim_len);
54
55 loop {
56 // Read the next chunk of data
57 let available: &[u8] = match reader.fill_buf().await {
58 Ok(n) => n,
59 Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
60 Err(e) => return Err(e),
61 };
62
63 // First, check if we have a partial delimiter from the previous iteration/buffer
64 if !partial_delim.is_empty() {
65 let expected_suffix = &delim[partial_delim.len()..];
66 let expected_suffix_len = expected_suffix.len();
67
68 // We already know that we have a partial delimiter match from the previous buffer.
69 // Here we check what part of the delimiter is missing and whether the new buffer
70 // contains the remaining part.
71 if available.len() >= expected_suffix_len
72 && &available[..expected_suffix_len] == expected_suffix
73 {
74 // Complete delimiter found! Consume the remainder of the delimiter so we can start
75 // processing data after the delimiter.
76 reader.consume(expected_suffix_len);
77 *position += expected_suffix_len as u64;
78 total_read += expected_suffix_len;
79 partial_delim.clear();
80
81 // Found a complete delimiter, return the current buffer so we can proceed with the
82 // next record after this delimiter in the next call.
83 return Ok(ReadResult {
84 successfully_read: Some(total_read),
85 discarded_for_size_and_truncated,
86 });
87 } else {
88 // Not a complete delimiter after all.
89 // Add partial_delim to output buffer as it is actual data.
90 if !discarding {
91 buf.extend_from_slice(&partial_delim);
92 }
93 partial_delim.clear();
94 // Continue processing current available buffer
95 }
96 }
97
98 let (done, used) = {
99 match delim_finder.find(available) {
100 Some(i) => {
101 if !discarding {
102 buf.extend_from_slice(&available[..i]);
103 }
104 (true, i + delim_len)
105 }
106 None => {
107 // No delimiter found in current buffer. But there could be a partial delimiter
108 // at the end of this buffer. For multi-byte delimiters like \r\n, we need
109 // to handle the case where the delimiter is split across buffer boundaries
110 // (e.g. \r in the "old" buffer, then we read new data and find \n in the new
111 // buffer).
112 let mut partial_match_len = 0;
113
114 // We only need to check if we're not already at the end of the buffer and if we
115 // have a delimiter that has more than one byte.
116 if !available.is_empty() && delim_len > 1 {
117 // Check if the end of the current buffer matches a prefix of the delimiter
118 // by testing from longest to shortest possible prefix.
119 //
120 // This loop runs at most (delim_len - 1) iterations:
121 // - 2-byte delimiter (\r\n): 1 iteration max
122 // - 5-byte delimiter: 4 iterations max
123 //
124 // This part of the code is only called if all of these are true:
125 //
126 // - We have a new buffer (e.g. every 8kB, i.e. only called once per buffer)
127 // - We have a multi-byte delimiter
128 // - This delimiter could not be found in the current buffer
129 //
130 // Even for longer delimiters the performance impact is negligible.
131 //
132 // Example 1:
133 // Delimiter: \r\n
134 // Iteration 1: It checks if the current buffer ends with "\r",
135 // if it does we have a potential partial delimiter.
136 // The next chunk will confirm whether this is truly part of a delimiter.
137
138 // Example 2:
139 // Delimiter: ABCDE
140 // Iteration 1: It checks if the current buffer ends with "ABCD" (we don't
141 // need to check "ABCDE" because that would have been caught by
142 // `delim_finder.find` earlier)
143 // Iteration 2: It checks if the current buffer ends with "ABC"
144 // Iterations 3-4: Same for "AB" and "A"
145 for prefix_len in (1..delim_len).rev() {
146 if available.len() >= prefix_len
147 && available.ends_with(&delim[..prefix_len])
148 {
149 partial_match_len = prefix_len;
150 break;
151 }
152 }
153 }
154
155 let bytes_to_copy = available.len() - partial_match_len;
156
157 if !discarding && bytes_to_copy > 0 {
158 buf.extend_from_slice(&available[..bytes_to_copy]);
159 }
160
161 // If we found a potential partial delimiter, save it for the next iteration
162 if partial_match_len > 0 {
163 partial_delim.clear();
164 partial_delim.extend_from_slice(&available[bytes_to_copy..]);
165 }
166
167 (false, available.len())
168 }
169 }
170 };
171
172 // Check if we're at EOF before we start processing
173 // (for borrow checker, has to come before `consume`)
174 let at_eof = available.is_empty();
175
176 reader.consume(used);
177 *position += used as u64; // do this at exactly same time
178 total_read += used;
179
180 if !discarding && buf.len() > max_size {
181 // keep only the first <1k bytes to make sure we can actually emit a usable error
182 let length_to_keep = min(1000, max_size);
183 let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
184 truncated.copy_from_slice(&buf[0..length_to_keep]);
185 discarded_for_size_and_truncated.push(truncated);
186 discarding = true;
187 }
188
189 if done {
190 if !discarding {
191 return Ok(ReadResult {
192 successfully_read: Some(total_read),
193 discarded_for_size_and_truncated,
194 });
195 } else {
196 discarding = false;
197 buf.clear();
198 }
199 } else if used == 0 && at_eof {
200 // We've hit EOF but haven't seen a delimiter. This can happen when:
201 // 1. The file ends without a trailing delimiter
202 // 2. We're observing an incomplete write
203 //
204 // Return None to signal the caller to retry later.
205 return Ok(ReadResult {
206 successfully_read: None,
207 discarded_for_size_and_truncated,
208 });
209 }
210 }
211}
212
213#[cfg(test)]
214mod test {
215 use std::{io::Cursor, num::NonZeroU8, ops::Range};
216
217 use bytes::{BufMut, BytesMut};
218 use quickcheck::{QuickCheck, TestResult};
219 use tokio::io::BufReader;
220
221 use super::read_until_with_max_size;
222 use crate::buffer::ReadResult;
223
224 async fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
225 // The `global_data` is the view of `chunks` as a single contiguous
226 // block of memory. Where `chunks` simulates what happens when bytes are
227 // fitfully available `global_data` is the view of all chunks assembled
228 // after every byte is available.
229 let mut global_data = BytesMut::new();
230
231 // `DelimDetails` describes the nature of each delimiter found in the
232 // `chunks`.
233 #[derive(Clone)]
234 struct DelimDetails {
235 /// Index in `global_data`, absolute offset
236 global_index: usize,
237 /// Index in each `chunk`, relative offset
238 interior_index: usize,
239 /// Whether this delimiter was within `max_size` of its previous
240 /// peer
241 within_max_size: bool,
242 /// Which chunk in `chunks` this delimiter was found in
243 chunk_index: usize,
244 /// The range of bytes that this delimiter bounds with its previous
245 /// peer
246 byte_range: Range<usize>,
247 }
248
249 // Move through the `chunks` and discover at what positions an instance
250 // of `delim` exists in the chunk stream and whether that `delim` is
251 // more than `max_size` bytes away from the previous `delim`. This loop
252 // constructs the `facts` vector that holds `DelimDetails` instances and
253 // also populates `global_data`.
254 let mut facts: Vec<DelimDetails> = Vec::new();
255 let mut global_index: usize = 0;
256 let mut previous_offset: usize = 0;
257 for (i, chunk) in chunks.iter().enumerate() {
258 for (interior_index, byte) in chunk.iter().enumerate() {
259 global_data.put_u8(*byte);
260 if *byte == delim {
261 let span = global_index - previous_offset;
262 let within_max_size = span <= max_size.get() as usize;
263 facts.push(DelimDetails {
264 global_index,
265 within_max_size,
266 chunk_index: i,
267 interior_index,
268 byte_range: (previous_offset..global_index),
269 });
270 previous_offset = global_index + 1;
271 }
272 global_index += 1;
273 }
274 }
275
276 // Our model is only concerned with the first valid delimiter in the
277 // chunk stream. As such, discover that first valid delimiter and record
278 // it specially.
279 let mut first_delim: Option<DelimDetails> = None;
280 for fact in &facts {
281 if fact.within_max_size {
282 first_delim = Some(fact.clone());
283 break;
284 }
285 }
286
287 let mut position = 0;
288 let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
289 // NOTE: The delimiter may be multiple bytes wide but for the purpose of
290 // this model a single byte does well enough.
291 let delimiter: [u8; 1] = [delim];
292 for (idx, chunk) in chunks.iter().enumerate() {
293 let mut reader = BufReader::new(Cursor::new(&chunk));
294
295 match read_until_with_max_size(
296 &mut reader,
297 &mut position,
298 &delimiter,
299 &mut buffer,
300 max_size.get() as usize,
301 )
302 .await
303 .unwrap()
304 {
305 ReadResult {
306 successfully_read: None,
307 discarded_for_size_and_truncated: _,
308 } => {
309 // Subject only returns None if this is the last chunk _and_
310 // the chunk did not contain a delimiter _or_ the delimiter
311 // was outside the max_size range _or_ the current chunk is empty.
312 let has_valid_delimiter = facts
313 .iter()
314 .any(|details| (details.chunk_index == idx) && details.within_max_size);
315 assert!(chunk.is_empty() || !has_valid_delimiter)
316 }
317 ReadResult {
318 successfully_read: Some(total_read),
319 discarded_for_size_and_truncated: _,
320 } => {
321 // Now that the function has returned we confirm that the
322 // returned details match our `first_delim` and also that
323 // the `buffer` is populated correctly.
324 assert!(first_delim.is_some());
325 assert_eq!(
326 first_delim.clone().unwrap().global_index + 1,
327 position as usize
328 );
329 assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
330 assert_eq!(
331 buffer.get(..),
332 global_data.get(first_delim.unwrap().byte_range)
333 );
334 break;
335 }
336 }
337 }
338
339 TestResult::passed()
340 }
341
342 #[tokio::test]
343 async fn qc_read_until_with_max_size() {
344 // The `read_until_with_max` function is intended to be called
345 // multiple times until it returns Ok(Some(usize)). The function
346 // should never return error in this test. If the return is None we
347 // will call until it is not. Once return is Some the interior value
348 // should be the position of the first delim in the buffer, unless
349 // that delim is past the max_size barrier in which case it will be
350 // the position of the first delim that is within some multiple of
351 // max_size.
352 //
353 // I think I will adjust the function to have a richer return
354 // type. This will help in the transition to async.
355 fn inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
356 let handle = tokio::runtime::Handle::current();
357 handle.block_on(qc_inner(chunks, delim, max_size));
358 TestResult::passed()
359 }
360
361 tokio::task::spawn_blocking(|| {
362 QuickCheck::new()
363 .tests(1_000)
364 .max_tests(2_000)
365 .quickcheck(inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
366 })
367 .await
368 .unwrap()
369 }
370
371 /// Generic test helper that tests delimiter splits across buffer boundaries
372 /// for any delimiter length. This function:
373 /// 1. Creates test data with delimiters positioned to split at buffer boundaries
374 /// 2. Tests multiple iterations to ensure state tracking works correctly
375 /// 3. Verifies all lines are correctly separated without merging
376 async fn test_delimiter_boundary_split_helper(delimiter: &[u8], num_lines: usize) {
377 let delimiter_len = delimiter.len();
378
379 // Use a buffer capacity that will force splits
380 // We'll position delimiters to split at this boundary
381 let buffer_capacity = 10;
382
383 // Build test data where each delimiter is positioned to split across buffer boundary
384 // Strategy: For each line, calculate position so delimiter starts at boundary - (delimiter_len - 1)
385 let mut data = Vec::new();
386 let mut expected_lines = Vec::new();
387
388 for i in 0..num_lines {
389 // Create line content that positions the delimiter to split at buffer boundary
390 // We want the delimiter to straddle a buffer_capacity boundary
391
392 // Calculate how many bytes until the next buffer boundary
393 let current_pos = data.len();
394 let bytes_until_boundary = buffer_capacity - (current_pos % buffer_capacity);
395
396 // Create line content that will position delimiter to split
397 // We want (delimiter_len - 1) bytes before boundary, then 1 byte after
398 let line_content = if bytes_until_boundary > delimiter_len {
399 let content_len = bytes_until_boundary - (delimiter_len - 1);
400 format!("line{:0width$}", i, width = content_len.saturating_sub(4)).into_bytes()
401 } else {
402 // Not enough room in this buffer, pad to next boundary
403 let padding = bytes_until_boundary;
404 let extra_content = buffer_capacity - (delimiter_len - 1);
405 let mut content = vec![b'X'; padding];
406 content.extend_from_slice(
407 format!("L{:0width$}", i, width = extra_content.saturating_sub(1)).as_bytes(),
408 );
409 content
410 };
411
412 expected_lines.push(line_content.clone());
413 data.extend_from_slice(&line_content);
414 data.extend_from_slice(delimiter);
415 }
416
417 // Now test reading this data
418 let cursor = Cursor::new(data);
419 let mut reader = BufReader::with_capacity(buffer_capacity, cursor);
420 let mut position = 0;
421 let max_size = 1024;
422
423 // Read each line and verify it matches expected
424 for (i, expected_line) in expected_lines.iter().enumerate() {
425 let mut buffer = BytesMut::new();
426 let result = read_until_with_max_size(
427 &mut reader,
428 &mut position,
429 delimiter,
430 &mut buffer,
431 max_size,
432 )
433 .await
434 .unwrap();
435
436 assert_eq!(
437 buffer.as_ref(),
438 expected_line.as_slice(),
439 "Line {} should match expected content. Got: {:?}, Expected: {:?}",
440 i,
441 String::from_utf8_lossy(&buffer),
442 String::from_utf8_lossy(expected_line)
443 );
444
445 assert!(
446 result.successfully_read.is_some(),
447 "Should find delimiter for line {}",
448 i
449 );
450 }
451 }
452
453 #[tokio::test]
454 async fn test_single_byte_delimiter_boundary() {
455 // Test single-byte delimiter (should work without any special handling)
456 test_delimiter_boundary_split_helper(b"\n", 5).await;
457 }
458
459 #[tokio::test]
460 async fn test_two_byte_delimiter_boundary() {
461 // Test two-byte delimiter (CRLF case)
462 test_delimiter_boundary_split_helper(b"\r\n", 5).await;
463 }
464
465 #[tokio::test]
466 async fn test_three_byte_delimiter_boundary() {
467 test_delimiter_boundary_split_helper(b"|||", 5).await;
468 }
469
470 #[tokio::test]
471 async fn test_four_byte_delimiter_boundary() {
472 test_delimiter_boundary_split_helper(b"<|>|", 5).await;
473 }
474
475 #[tokio::test]
476 async fn test_five_byte_delimiter_boundary() {
477 test_delimiter_boundary_split_helper(b"<<>>>", 5).await;
478 }
479}