file_source_common/
buffer.rs

1use crate::FilePosition;
2use std::{cmp::min, io};
3
4use bstr::Finder;
5use bytes::BytesMut;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7
8pub struct ReadResult {
9    pub successfully_read: Option<usize>,
10    pub discarded_for_size_and_truncated: Vec<BytesMut>,
11}
12
13/// Read up to `max_size` bytes from `reader`, splitting by `delim`
14///
15/// The function reads up to `max_size` bytes from `reader`, splitting the input
16/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
17/// are read then the reader is polled until `delim` is found and the results
18/// are discarded. Else, the result is written into `buf`.
19///
20/// The return is unusual. In the Err case this function has not written into
21/// `buf` and the caller should not examine its contents. In the Ok case if the
22/// inner value is None the caller should retry the call as the buffering read
23/// hit the end of the buffer but did not find a `delim` yet, indicating that
24/// we've sheered a write or that there were no bytes available in the `reader`
25/// and the `reader` was very sure about it. If the inner value is Some the
26/// interior `usize` is the number of bytes written into `buf`.
27///
28/// Tweak of
29/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
30///
31/// # Performance
32///
33/// Benchmarks indicate that this function processes in the high single-digit
34/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
35/// the overhead of setup dominates our benchmarks.
36pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
37    reader: &'a mut R,
38    position: &'a mut FilePosition,
39    delim: &'a [u8],
40    buf: &'a mut BytesMut,
41    max_size: usize,
42) -> io::Result<ReadResult> {
43    let mut total_read = 0;
44    let mut discarding = false;
45    let delim_finder = Finder::new(delim);
46    let delim_len = delim.len();
47    let mut discarded_for_size_and_truncated = Vec::new();
48
49    // Used to track partial delimiter matches across buffer boundaries.
50    // Data is read in chunks from the reader (see `fill_buf` below).
51    // A multi-byte delimiter may be split across the "old" and "new" buffers.
52    // Any potential partial delimiter that was found in the "old" buffer is stored in this variable.
53    let mut partial_delim: BytesMut = BytesMut::with_capacity(delim_len);
54
55    loop {
56        // Read the next chunk of data
57        let available: &[u8] = match reader.fill_buf().await {
58            Ok(n) => n,
59            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
60            Err(e) => return Err(e),
61        };
62
63        // First, check if we have a partial delimiter from the previous iteration/buffer
64        if !partial_delim.is_empty() {
65            let expected_suffix = &delim[partial_delim.len()..];
66            let expected_suffix_len = expected_suffix.len();
67
68            // We already know that we have a partial delimiter match from the previous buffer.
69            // Here we check what part of the delimiter is missing and whether the new buffer
70            // contains the remaining part.
71            if available.len() >= expected_suffix_len
72                && &available[..expected_suffix_len] == expected_suffix
73            {
74                // Complete delimiter found! Consume the remainder of the delimiter so we can start
75                // processing data after the delimiter.
76                reader.consume(expected_suffix_len);
77                *position += expected_suffix_len as u64;
78                total_read += expected_suffix_len;
79                partial_delim.clear();
80
81                // Found a complete delimiter, return the current buffer so we can proceed with the
82                // next record after this delimiter in the next call.
83                return Ok(ReadResult {
84                    successfully_read: Some(total_read),
85                    discarded_for_size_and_truncated,
86                });
87            } else {
88                // Not a complete delimiter after all.
89                // Add partial_delim to output buffer as it is actual data.
90                if !discarding {
91                    buf.extend_from_slice(&partial_delim);
92                }
93                partial_delim.clear();
94                // Continue processing current available buffer
95            }
96        }
97
98        let (done, used) = {
99            match delim_finder.find(available) {
100                Some(i) => {
101                    if !discarding {
102                        buf.extend_from_slice(&available[..i]);
103                    }
104                    (true, i + delim_len)
105                }
106                None => {
107                    // No delimiter found in current buffer. But there could be a partial delimiter
108                    // at the end of this buffer. For multi-byte delimiters like \r\n, we need
109                    // to handle the case where the delimiter is split across buffer boundaries
110                    // (e.g. \r in the "old" buffer, then we read new data and find \n in the new
111                    // buffer).
112                    let mut partial_match_len = 0;
113
114                    // We only need to check if we're not already at the end of the buffer and if we
115                    // have a delimiter that has more than one byte.
116                    if !available.is_empty() && delim_len > 1 {
117                        // Check if the end of the current buffer matches a prefix of the delimiter
118                        // by testing from longest to shortest possible prefix.
119                        //
120                        // This loop runs at most (delim_len - 1) iterations:
121                        //   - 2-byte delimiter (\r\n): 1 iteration max
122                        //   - 5-byte delimiter: 4 iterations max
123                        //
124                        // This part of the code is only called if all of these are true:
125                        //
126                        // - We have a new buffer (e.g. every 8kB, i.e. only called once per buffer)
127                        // - We have a multi-byte delimiter
128                        // - This delimiter could not be found in the current buffer
129                        //
130                        // Even for longer delimiters the performance impact is negligible.
131                        //
132                        // Example 1:
133                        //   Delimiter: \r\n
134                        //   Iteration 1: It checks if the current buffer ends with "\r",
135                        //     if it does we have a potential partial delimiter.
136                        //   The next chunk will confirm whether this is truly part of a delimiter.
137
138                        // Example 2:
139                        //   Delimiter: ABCDE
140                        //   Iteration 1: It checks if the current buffer ends with "ABCD" (we don't
141                        //     need to check "ABCDE" because that would have been caught by
142                        //     `delim_finder.find` earlier)
143                        //   Iteration 2: It checks if the current buffer ends with "ABC"
144                        //   Iterations 3-4: Same for "AB" and "A"
145                        for prefix_len in (1..delim_len).rev() {
146                            if available.len() >= prefix_len
147                                && available.ends_with(&delim[..prefix_len])
148                            {
149                                partial_match_len = prefix_len;
150                                break;
151                            }
152                        }
153                    }
154
155                    let bytes_to_copy = available.len() - partial_match_len;
156
157                    if !discarding && bytes_to_copy > 0 {
158                        buf.extend_from_slice(&available[..bytes_to_copy]);
159                    }
160
161                    // If we found a potential partial delimiter, save it for the next iteration
162                    if partial_match_len > 0 {
163                        partial_delim.clear();
164                        partial_delim.extend_from_slice(&available[bytes_to_copy..]);
165                    }
166
167                    (false, available.len())
168                }
169            }
170        };
171
172        // Check if we're at EOF before we start processing
173        // (for borrow checker, has to come before `consume`)
174        let at_eof = available.is_empty();
175
176        reader.consume(used);
177        *position += used as u64; // do this at exactly same time
178        total_read += used;
179
180        if !discarding && buf.len() > max_size {
181            // keep only the first <1k bytes to make sure we can actually emit a usable error
182            let length_to_keep = min(1000, max_size);
183            let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
184            truncated.copy_from_slice(&buf[0..length_to_keep]);
185            discarded_for_size_and_truncated.push(truncated);
186            discarding = true;
187        }
188
189        if done {
190            if !discarding {
191                return Ok(ReadResult {
192                    successfully_read: Some(total_read),
193                    discarded_for_size_and_truncated,
194                });
195            } else {
196                discarding = false;
197                buf.clear();
198            }
199        } else if used == 0 && at_eof {
200            // We've hit EOF but haven't seen a delimiter. This can happen when:
201            // 1. The file ends without a trailing delimiter
202            // 2. We're observing an incomplete write
203            //
204            // Return None to signal the caller to retry later.
205            return Ok(ReadResult {
206                successfully_read: None,
207                discarded_for_size_and_truncated,
208            });
209        }
210    }
211}
212
213#[cfg(test)]
214mod test {
215    use std::{io::Cursor, num::NonZeroU8, ops::Range};
216
217    use bytes::{BufMut, BytesMut};
218    use quickcheck::{QuickCheck, TestResult};
219    use tokio::io::BufReader;
220
221    use super::read_until_with_max_size;
222    use crate::buffer::ReadResult;
223
224    async fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
225        // The `global_data` is the view of `chunks` as a single contiguous
226        // block of memory. Where `chunks` simulates what happens when bytes are
227        // fitfully available `global_data` is the view of all chunks assembled
228        // after every byte is available.
229        let mut global_data = BytesMut::new();
230
231        // `DelimDetails` describes the nature of each delimiter found in the
232        // `chunks`.
233        #[derive(Clone)]
234        struct DelimDetails {
235            /// Index in `global_data`, absolute offset
236            global_index: usize,
237            /// Index in each `chunk`, relative offset
238            interior_index: usize,
239            /// Whether this delimiter was within `max_size` of its previous
240            /// peer
241            within_max_size: bool,
242            /// Which chunk in `chunks` this delimiter was found in
243            chunk_index: usize,
244            /// The range of bytes that this delimiter bounds with its previous
245            /// peer
246            byte_range: Range<usize>,
247        }
248
249        // Move through the `chunks` and discover at what positions an instance
250        // of `delim` exists in the chunk stream and whether that `delim` is
251        // more than `max_size` bytes away from the previous `delim`. This loop
252        // constructs the `facts` vector that holds `DelimDetails` instances and
253        // also populates `global_data`.
254        let mut facts: Vec<DelimDetails> = Vec::new();
255        let mut global_index: usize = 0;
256        let mut previous_offset: usize = 0;
257        for (i, chunk) in chunks.iter().enumerate() {
258            for (interior_index, byte) in chunk.iter().enumerate() {
259                global_data.put_u8(*byte);
260                if *byte == delim {
261                    let span = global_index - previous_offset;
262                    let within_max_size = span <= max_size.get() as usize;
263                    facts.push(DelimDetails {
264                        global_index,
265                        within_max_size,
266                        chunk_index: i,
267                        interior_index,
268                        byte_range: (previous_offset..global_index),
269                    });
270                    previous_offset = global_index + 1;
271                }
272                global_index += 1;
273            }
274        }
275
276        // Our model is only concerned with the first valid delimiter in the
277        // chunk stream. As such, discover that first valid delimiter and record
278        // it specially.
279        let mut first_delim: Option<DelimDetails> = None;
280        for fact in &facts {
281            if fact.within_max_size {
282                first_delim = Some(fact.clone());
283                break;
284            }
285        }
286
287        let mut position = 0;
288        let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
289        // NOTE: The delimiter may be multiple bytes wide but for the purpose of
290        // this model a single byte does well enough.
291        let delimiter: [u8; 1] = [delim];
292        for (idx, chunk) in chunks.iter().enumerate() {
293            let mut reader = BufReader::new(Cursor::new(&chunk));
294
295            match read_until_with_max_size(
296                &mut reader,
297                &mut position,
298                &delimiter,
299                &mut buffer,
300                max_size.get() as usize,
301            )
302            .await
303            .unwrap()
304            {
305                ReadResult {
306                    successfully_read: None,
307                    discarded_for_size_and_truncated: _,
308                } => {
309                    // Subject only returns None if this is the last chunk _and_
310                    // the chunk did not contain a delimiter _or_ the delimiter
311                    // was outside the max_size range _or_ the current chunk is empty.
312                    let has_valid_delimiter = facts
313                        .iter()
314                        .any(|details| (details.chunk_index == idx) && details.within_max_size);
315                    assert!(chunk.is_empty() || !has_valid_delimiter)
316                }
317                ReadResult {
318                    successfully_read: Some(total_read),
319                    discarded_for_size_and_truncated: _,
320                } => {
321                    // Now that the function has returned we confirm that the
322                    // returned details match our `first_delim` and also that
323                    // the `buffer` is populated correctly.
324                    assert!(first_delim.is_some());
325                    assert_eq!(
326                        first_delim.clone().unwrap().global_index + 1,
327                        position as usize
328                    );
329                    assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
330                    assert_eq!(
331                        buffer.get(..),
332                        global_data.get(first_delim.unwrap().byte_range)
333                    );
334                    break;
335                }
336            }
337        }
338
339        TestResult::passed()
340    }
341
342    #[tokio::test]
343    async fn qc_read_until_with_max_size() {
344        // The `read_until_with_max` function is intended to be called
345        // multiple times until it returns Ok(Some(usize)). The function
346        // should never return error in this test. If the return is None we
347        // will call until it is not. Once return is Some the interior value
348        // should be the position of the first delim in the buffer, unless
349        // that delim is past the max_size barrier in which case it will be
350        // the position of the first delim that is within some multiple of
351        // max_size.
352        //
353        // I think I will adjust the function to have a richer return
354        // type. This will help in the transition to async.
355        fn inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
356            let handle = tokio::runtime::Handle::current();
357            handle.block_on(qc_inner(chunks, delim, max_size));
358            TestResult::passed()
359        }
360
361        tokio::task::spawn_blocking(|| {
362            QuickCheck::new()
363                .tests(1_000)
364                .max_tests(2_000)
365                .quickcheck(inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
366        })
367        .await
368        .unwrap()
369    }
370
371    /// Generic test helper that tests delimiter splits across buffer boundaries
372    /// for any delimiter length. This function:
373    /// 1. Creates test data with delimiters positioned to split at buffer boundaries
374    /// 2. Tests multiple iterations to ensure state tracking works correctly
375    /// 3. Verifies all lines are correctly separated without merging
376    async fn test_delimiter_boundary_split_helper(delimiter: &[u8], num_lines: usize) {
377        let delimiter_len = delimiter.len();
378
379        // Use a buffer capacity that will force splits
380        // We'll position delimiters to split at this boundary
381        let buffer_capacity = 10;
382
383        // Build test data where each delimiter is positioned to split across buffer boundary
384        // Strategy: For each line, calculate position so delimiter starts at boundary - (delimiter_len - 1)
385        let mut data = Vec::new();
386        let mut expected_lines = Vec::new();
387
388        for i in 0..num_lines {
389            // Create line content that positions the delimiter to split at buffer boundary
390            // We want the delimiter to straddle a buffer_capacity boundary
391
392            // Calculate how many bytes until the next buffer boundary
393            let current_pos = data.len();
394            let bytes_until_boundary = buffer_capacity - (current_pos % buffer_capacity);
395
396            // Create line content that will position delimiter to split
397            // We want (delimiter_len - 1) bytes before boundary, then 1 byte after
398            let line_content = if bytes_until_boundary > delimiter_len {
399                let content_len = bytes_until_boundary - (delimiter_len - 1);
400                format!("line{:0width$}", i, width = content_len.saturating_sub(4)).into_bytes()
401            } else {
402                // Not enough room in this buffer, pad to next boundary
403                let padding = bytes_until_boundary;
404                let extra_content = buffer_capacity - (delimiter_len - 1);
405                let mut content = vec![b'X'; padding];
406                content.extend_from_slice(
407                    format!("L{:0width$}", i, width = extra_content.saturating_sub(1)).as_bytes(),
408                );
409                content
410            };
411
412            expected_lines.push(line_content.clone());
413            data.extend_from_slice(&line_content);
414            data.extend_from_slice(delimiter);
415        }
416
417        // Now test reading this data
418        let cursor = Cursor::new(data);
419        let mut reader = BufReader::with_capacity(buffer_capacity, cursor);
420        let mut position = 0;
421        let max_size = 1024;
422
423        // Read each line and verify it matches expected
424        for (i, expected_line) in expected_lines.iter().enumerate() {
425            let mut buffer = BytesMut::new();
426            let result = read_until_with_max_size(
427                &mut reader,
428                &mut position,
429                delimiter,
430                &mut buffer,
431                max_size,
432            )
433            .await
434            .unwrap();
435
436            assert_eq!(
437                buffer.as_ref(),
438                expected_line.as_slice(),
439                "Line {} should match expected content. Got: {:?}, Expected: {:?}",
440                i,
441                String::from_utf8_lossy(&buffer),
442                String::from_utf8_lossy(expected_line)
443            );
444
445            assert!(
446                result.successfully_read.is_some(),
447                "Should find delimiter for line {}",
448                i
449            );
450        }
451    }
452
453    #[tokio::test]
454    async fn test_single_byte_delimiter_boundary() {
455        // Test single-byte delimiter (should work without any special handling)
456        test_delimiter_boundary_split_helper(b"\n", 5).await;
457    }
458
459    #[tokio::test]
460    async fn test_two_byte_delimiter_boundary() {
461        // Test two-byte delimiter (CRLF case)
462        test_delimiter_boundary_split_helper(b"\r\n", 5).await;
463    }
464
465    #[tokio::test]
466    async fn test_three_byte_delimiter_boundary() {
467        test_delimiter_boundary_split_helper(b"|||", 5).await;
468    }
469
470    #[tokio::test]
471    async fn test_four_byte_delimiter_boundary() {
472        test_delimiter_boundary_split_helper(b"<|>|", 5).await;
473    }
474
475    #[tokio::test]
476    async fn test_five_byte_delimiter_boundary() {
477        test_delimiter_boundary_split_helper(b"<<>>>", 5).await;
478    }
479}