file_source_common/
buffer.rs

1use crate::FilePosition;
2use std::{cmp::min, io, pin::Pin};
3
4use bstr::Finder;
5use bytes::BytesMut;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7
8pub struct ReadResult {
9    pub successfully_read: Option<usize>,
10    pub discarded_for_size_and_truncated: Vec<BytesMut>,
11}
12
13/// Read up to `max_size` bytes from `reader`, splitting by `delim`
14///
15/// The function reads up to `max_size` bytes from `reader`, splitting the input
16/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
17/// are read then the reader is polled until `delim` is found and the results
18/// are discarded. Else, the result is written into `buf`.
19///
20/// The return is unusual. In the Err case this function has not written into
21/// `buf` and the caller should not examine its contents. In the Ok case if the
22/// inner value is None the caller should retry the call as the buffering read
23/// hit the end of the buffer but did not find a `delim` yet, indicating that
24/// we've sheered a write or that there were no bytes available in the `reader`
25/// and the `reader` was very sure about it. If the inner value is Some the
26/// interior `usize` is the number of bytes written into `buf`.
27///
28/// Tweak of
29/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
30///
31/// # Performance
32///
33/// Benchmarks indicate that this function processes in the high single-digit
34/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
35/// the overhead of setup dominates our benchmarks.
36pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
37    reader: Pin<Box<&'a mut R>>,
38    position: &'a mut FilePosition,
39    delim: &'a [u8],
40    buf: &'a mut BytesMut,
41    max_size: usize,
42) -> io::Result<ReadResult> {
43    let mut total_read = 0;
44    let mut discarding = false;
45    let delim_finder = Finder::new(delim);
46    let delim_len = delim.len();
47    let mut discarded_for_size_and_truncated = Vec::new();
48    let mut reader = Box::new(reader);
49
50    // Used to track partial delimiter matches across buffer boundaries.
51    // Data is read in chunks from the reader (see `fill_buf` below).
52    // A multi-byte delimiter may be split across the "old" and "new" buffers.
53    // Any potential partial delimiter that was found in the "old" buffer is stored in this variable.
54    let mut partial_delim: BytesMut = BytesMut::with_capacity(delim_len);
55
56    loop {
57        // Read the next chunk of data
58        let available: &[u8] = match reader.fill_buf().await {
59            Ok(n) => n,
60            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
61            Err(e) => return Err(e),
62        };
63
64        // First, check if we have a partial delimiter from the previous iteration/buffer
65        if !partial_delim.is_empty() {
66            let expected_suffix = &delim[partial_delim.len()..];
67            let expected_suffix_len = expected_suffix.len();
68
69            // We already know that we have a partial delimiter match from the previous buffer.
70            // Here we check what part of the delimiter is missing and whether the new buffer
71            // contains the remaining part.
72            if available.len() >= expected_suffix_len
73                && &available[..expected_suffix_len] == expected_suffix
74            {
75                // Complete delimiter found! Consume the remainder of the delimiter so we can start
76                // processing data after the delimiter.
77                reader.consume(expected_suffix_len);
78                *position += expected_suffix_len as u64;
79                total_read += expected_suffix_len;
80                partial_delim.clear();
81
82                // Found a complete delimiter, return the current buffer so we can proceed with the
83                // next record after this delimiter in the next call.
84                return Ok(ReadResult {
85                    successfully_read: Some(total_read),
86                    discarded_for_size_and_truncated,
87                });
88            } else {
89                // Not a complete delimiter after all.
90                // Add partial_delim to output buffer as it is actual data.
91                if !discarding {
92                    buf.extend_from_slice(&partial_delim);
93                }
94                partial_delim.clear();
95                // Continue processing current available buffer
96            }
97        }
98
99        let (done, used) = {
100            match delim_finder.find(available) {
101                Some(i) => {
102                    if !discarding {
103                        buf.extend_from_slice(&available[..i]);
104                    }
105                    (true, i + delim_len)
106                }
107                None => {
108                    // No delimiter found in current buffer. But there could be a partial delimiter
109                    // at the end of this buffer. For multi-byte delimiters like \r\n, we need
110                    // to handle the case where the delimiter is split across buffer boundaries
111                    // (e.g. \r in the "old" buffer, then we read new data and find \n in the new
112                    // buffer).
113                    let mut partial_match_len = 0;
114
115                    // We only need to check if we're not already at the end of the buffer and if we
116                    // have a delimiter that has more than one byte.
117                    if !available.is_empty() && delim_len > 1 {
118                        // Check if the end of the current buffer matches a prefix of the delimiter
119                        // by testing from longest to shortest possible prefix.
120                        //
121                        // This loop runs at most (delim_len - 1) iterations:
122                        //   - 2-byte delimiter (\r\n): 1 iteration max
123                        //   - 5-byte delimiter: 4 iterations max
124                        //
125                        // This part of the code is only called if all of these are true:
126                        //
127                        // - We have a new buffer (e.g. every 8kB, i.e. only called once per buffer)
128                        // - We have a multi-byte delimiter
129                        // - This delimiter could not be found in the current buffer
130                        //
131                        // Even for longer delimiters the performance impact is negligible.
132                        //
133                        // Example 1:
134                        //   Delimiter: \r\n
135                        //   Iteration 1: It checks if the current buffer ends with "\r",
136                        //     if it does we have a potential partial delimiter.
137                        //   The next chunk will confirm whether this is truly part of a delimiter.
138
139                        // Example 2:
140                        //   Delimiter: ABCDE
141                        //   Iteration 1: It checks if the current buffer ends with "ABCD" (we don't
142                        //     need to check "ABCDE" because that would have been caught by
143                        //     `delim_finder.find` earlier)
144                        //   Iteration 2: It checks if the current buffer ends with "ABC"
145                        //   Iterations 3-4: Same for "AB" and "A"
146                        for prefix_len in (1..delim_len).rev() {
147                            if available.len() >= prefix_len
148                                && available.ends_with(&delim[..prefix_len])
149                            {
150                                partial_match_len = prefix_len;
151                                break;
152                            }
153                        }
154                    }
155
156                    let bytes_to_copy = available.len() - partial_match_len;
157
158                    if !discarding && bytes_to_copy > 0 {
159                        buf.extend_from_slice(&available[..bytes_to_copy]);
160                    }
161
162                    // If we found a potential partial delimiter, save it for the next iteration
163                    if partial_match_len > 0 {
164                        partial_delim.clear();
165                        partial_delim.extend_from_slice(&available[bytes_to_copy..]);
166                    }
167
168                    (false, available.len())
169                }
170            }
171        };
172
173        // Check if we're at EOF before we start processing
174        // (for borrow checker, has to come before `consume`)
175        let at_eof = available.is_empty();
176
177        reader.consume(used);
178        *position += used as u64; // do this at exactly same time
179        total_read += used;
180
181        if !discarding && buf.len() > max_size {
182            // keep only the first <1k bytes to make sure we can actually emit a usable error
183            let length_to_keep = min(1000, max_size);
184            let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
185            truncated.copy_from_slice(&buf[0..length_to_keep]);
186            discarded_for_size_and_truncated.push(truncated);
187            discarding = true;
188        }
189
190        if done {
191            if !discarding {
192                return Ok(ReadResult {
193                    successfully_read: Some(total_read),
194                    discarded_for_size_and_truncated,
195                });
196            } else {
197                discarding = false;
198                buf.clear();
199            }
200        } else if used == 0 && at_eof {
201            // We've hit EOF but haven't seen a delimiter. This can happen when:
202            // 1. The file ends without a trailing delimiter
203            // 2. We're observing an incomplete write
204            //
205            // Return None to signal the caller to retry later.
206            return Ok(ReadResult {
207                successfully_read: None,
208                discarded_for_size_and_truncated,
209            });
210        }
211    }
212}
213
214#[cfg(test)]
215mod test {
216    use std::{io::Cursor, num::NonZeroU8, ops::Range};
217
218    use bytes::{BufMut, BytesMut};
219    use quickcheck::{QuickCheck, TestResult};
220    use tokio::io::BufReader;
221
222    use super::read_until_with_max_size;
223    use crate::buffer::ReadResult;
224
225    async fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
226        // The `global_data` is the view of `chunks` as a single contiguous
227        // block of memory. Where `chunks` simulates what happens when bytes are
228        // fitfully available `global_data` is the view of all chunks assembled
229        // after every byte is available.
230        let mut global_data = BytesMut::new();
231
232        // `DelimDetails` describes the nature of each delimiter found in the
233        // `chunks`.
234        #[derive(Clone)]
235        struct DelimDetails {
236            /// Index in `global_data`, absolute offset
237            global_index: usize,
238            /// Index in each `chunk`, relative offset
239            interior_index: usize,
240            /// Whether this delimiter was within `max_size` of its previous
241            /// peer
242            within_max_size: bool,
243            /// Which chunk in `chunks` this delimiter was found in
244            chunk_index: usize,
245            /// The range of bytes that this delimiter bounds with its previous
246            /// peer
247            byte_range: Range<usize>,
248        }
249
250        // Move through the `chunks` and discover at what positions an instance
251        // of `delim` exists in the chunk stream and whether that `delim` is
252        // more than `max_size` bytes away from the previous `delim`. This loop
253        // constructs the `facts` vector that holds `DelimDetails` instances and
254        // also populates `global_data`.
255        let mut facts: Vec<DelimDetails> = Vec::new();
256        let mut global_index: usize = 0;
257        let mut previous_offset: usize = 0;
258        for (i, chunk) in chunks.iter().enumerate() {
259            for (interior_index, byte) in chunk.iter().enumerate() {
260                global_data.put_u8(*byte);
261                if *byte == delim {
262                    let span = global_index - previous_offset;
263                    let within_max_size = span <= max_size.get() as usize;
264                    facts.push(DelimDetails {
265                        global_index,
266                        within_max_size,
267                        chunk_index: i,
268                        interior_index,
269                        byte_range: (previous_offset..global_index),
270                    });
271                    previous_offset = global_index + 1;
272                }
273                global_index += 1;
274            }
275        }
276
277        // Our model is only concerned with the first valid delimiter in the
278        // chunk stream. As such, discover that first valid delimiter and record
279        // it specially.
280        let mut first_delim: Option<DelimDetails> = None;
281        for fact in &facts {
282            if fact.within_max_size {
283                first_delim = Some(fact.clone());
284                break;
285            }
286        }
287
288        let mut position = 0;
289        let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
290        // NOTE: The delimiter may be multiple bytes wide but for the purpose of
291        // this model a single byte does well enough.
292        let delimiter: [u8; 1] = [delim];
293        for (idx, chunk) in chunks.iter().enumerate() {
294            let mut reader = BufReader::new(Cursor::new(&chunk));
295
296            match read_until_with_max_size(
297                Box::pin(&mut reader),
298                &mut position,
299                &delimiter,
300                &mut buffer,
301                max_size.get() as usize,
302            )
303            .await
304            .unwrap()
305            {
306                ReadResult {
307                    successfully_read: None,
308                    discarded_for_size_and_truncated: _,
309                } => {
310                    // Subject only returns None if this is the last chunk _and_
311                    // the chunk did not contain a delimiter _or_ the delimiter
312                    // was outside the max_size range _or_ the current chunk is empty.
313                    let has_valid_delimiter = facts
314                        .iter()
315                        .any(|details| (details.chunk_index == idx) && details.within_max_size);
316                    assert!(chunk.is_empty() || !has_valid_delimiter)
317                }
318                ReadResult {
319                    successfully_read: Some(total_read),
320                    discarded_for_size_and_truncated: _,
321                } => {
322                    // Now that the function has returned we confirm that the
323                    // returned details match our `first_delim` and also that
324                    // the `buffer` is populated correctly.
325                    assert!(first_delim.is_some());
326                    assert_eq!(
327                        first_delim.clone().unwrap().global_index + 1,
328                        position as usize
329                    );
330                    assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
331                    assert_eq!(
332                        buffer.get(..),
333                        global_data.get(first_delim.unwrap().byte_range)
334                    );
335                    break;
336                }
337            }
338        }
339
340        TestResult::passed()
341    }
342
343    #[tokio::test]
344    async fn qc_read_until_with_max_size() {
345        // The `read_until_with_max` function is intended to be called
346        // multiple times until it returns Ok(Some(usize)). The function
347        // should never return error in this test. If the return is None we
348        // will call until it is not. Once return is Some the interior value
349        // should be the position of the first delim in the buffer, unless
350        // that delim is past the max_size barrier in which case it will be
351        // the position of the first delim that is within some multiple of
352        // max_size.
353        //
354        // I think I will adjust the function to have a richer return
355        // type. This will help in the transition to async.
356        fn inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
357            let handle = tokio::runtime::Handle::current();
358            handle.block_on(qc_inner(chunks, delim, max_size));
359            TestResult::passed()
360        }
361
362        tokio::task::spawn_blocking(|| {
363            QuickCheck::new()
364                .tests(1_000)
365                .max_tests(2_000)
366                .quickcheck(inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
367        })
368        .await
369        .unwrap()
370    }
371
372    /// Generic test helper that tests delimiter splits across buffer boundaries
373    /// for any delimiter length. This function:
374    /// 1. Creates test data with delimiters positioned to split at buffer boundaries
375    /// 2. Tests multiple iterations to ensure state tracking works correctly
376    /// 3. Verifies all lines are correctly separated without merging
377    async fn test_delimiter_boundary_split_helper(delimiter: &[u8], num_lines: usize) {
378        let delimiter_len = delimiter.len();
379
380        // Use a buffer capacity that will force splits
381        // We'll position delimiters to split at this boundary
382        let buffer_capacity = 10;
383
384        // Build test data where each delimiter is positioned to split across buffer boundary
385        // Strategy: For each line, calculate position so delimiter starts at boundary - (delimiter_len - 1)
386        let mut data = Vec::new();
387        let mut expected_lines = Vec::new();
388
389        for i in 0..num_lines {
390            // Create line content that positions the delimiter to split at buffer boundary
391            // We want the delimiter to straddle a buffer_capacity boundary
392
393            // Calculate how many bytes until the next buffer boundary
394            let current_pos = data.len();
395            let bytes_until_boundary = buffer_capacity - (current_pos % buffer_capacity);
396
397            // Create line content that will position delimiter to split
398            // We want (delimiter_len - 1) bytes before boundary, then 1 byte after
399            let line_content = if bytes_until_boundary > delimiter_len {
400                let content_len = bytes_until_boundary - (delimiter_len - 1);
401                format!("line{:0width$}", i, width = content_len.saturating_sub(4)).into_bytes()
402            } else {
403                // Not enough room in this buffer, pad to next boundary
404                let padding = bytes_until_boundary;
405                let extra_content = buffer_capacity - (delimiter_len - 1);
406                let mut content = vec![b'X'; padding];
407                content.extend_from_slice(
408                    format!("L{:0width$}", i, width = extra_content.saturating_sub(1)).as_bytes(),
409                );
410                content
411            };
412
413            expected_lines.push(line_content.clone());
414            data.extend_from_slice(&line_content);
415            data.extend_from_slice(delimiter);
416        }
417
418        // Now test reading this data
419        let cursor = Cursor::new(data);
420        let mut reader = BufReader::with_capacity(buffer_capacity, cursor);
421        let mut position = 0;
422        let max_size = 1024;
423
424        // Read each line and verify it matches expected
425        for (i, expected_line) in expected_lines.iter().enumerate() {
426            let mut buffer = BytesMut::new();
427            let result = read_until_with_max_size(
428                Box::pin(&mut reader),
429                &mut position,
430                delimiter,
431                &mut buffer,
432                max_size,
433            )
434            .await
435            .unwrap();
436
437            assert_eq!(
438                buffer.as_ref(),
439                expected_line.as_slice(),
440                "Line {} should match expected content. Got: {:?}, Expected: {:?}",
441                i,
442                String::from_utf8_lossy(&buffer),
443                String::from_utf8_lossy(expected_line)
444            );
445
446            assert!(
447                result.successfully_read.is_some(),
448                "Should find delimiter for line {}",
449                i
450            );
451        }
452    }
453
454    #[tokio::test]
455    async fn test_single_byte_delimiter_boundary() {
456        // Test single-byte delimiter (should work without any special handling)
457        test_delimiter_boundary_split_helper(b"\n", 5).await;
458    }
459
460    #[tokio::test]
461    async fn test_two_byte_delimiter_boundary() {
462        // Test two-byte delimiter (CRLF case)
463        test_delimiter_boundary_split_helper(b"\r\n", 5).await;
464    }
465
466    #[tokio::test]
467    async fn test_three_byte_delimiter_boundary() {
468        test_delimiter_boundary_split_helper(b"|||", 5).await;
469    }
470
471    #[tokio::test]
472    async fn test_four_byte_delimiter_boundary() {
473        test_delimiter_boundary_split_helper(b"<|>|", 5).await;
474    }
475
476    #[tokio::test]
477    async fn test_five_byte_delimiter_boundary() {
478        test_delimiter_boundary_split_helper(b"<<>>>", 5).await;
479    }
480}