file_source/
buffer.rs

1use std::{
2    cmp::min,
3    io::{self, BufRead},
4};
5
6use bstr::Finder;
7use bytes::BytesMut;
8
9use crate::FilePosition;
10
11pub struct ReadResult {
12    pub successfully_read: Option<usize>,
13    pub discarded_for_size_and_truncated: Vec<BytesMut>,
14}
15
16/// Read up to `max_size` bytes from `reader`, splitting by `delim`
17///
18/// The function reads up to `max_size` bytes from `reader`, splitting the input
19/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
20/// are read then the reader is polled until `delim` is found and the results
21/// are discarded. Else, the result is written into `buf`.
22///
23/// The return is unusual. In the Err case this function has not written into
24/// `buf` and the caller should not examine its contents. In the Ok case if the
25/// inner value is None the caller should retry the call as the buffering read
26/// hit the end of the buffer but did not find a `delim` yet, indicating that
27/// we've sheered a write or that there were no bytes available in the `reader`
28/// and the `reader` was very sure about it. If the inner value is Some the
29/// interior `usize` is the number of bytes written into `buf`.
30///
31/// Tweak of
32/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
33///
34/// # Performance
35///
36/// Benchmarks indicate that this function processes in the high single-digit
37/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
38/// the overhead of setup dominates our benchmarks.
39pub fn read_until_with_max_size<'a, R: BufRead + ?Sized>(
40    reader: &'a mut R,
41    position: &'a mut FilePosition,
42    delim: &'a [u8],
43    buf: &'a mut BytesMut,
44    max_size: usize,
45) -> io::Result<ReadResult> {
46    let mut total_read = 0;
47    let mut discarding = false;
48    let delim_finder = Finder::new(delim);
49    let delim_len = delim.len();
50    let mut discarded_for_size_and_truncated = Vec::new();
51    loop {
52        let available: &[u8] = match reader.fill_buf() {
53            Ok(n) => n,
54            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
55            Err(e) => return Err(e),
56        };
57
58        let (done, used) = {
59            match delim_finder.find(available) {
60                Some(i) => {
61                    if !discarding {
62                        buf.extend_from_slice(&available[..i]);
63                    }
64                    (true, i + delim_len)
65                }
66                None => {
67                    if !discarding {
68                        buf.extend_from_slice(available);
69                    }
70                    (false, available.len())
71                }
72            }
73        };
74        reader.consume(used);
75        *position += used as u64; // do this at exactly same time
76        total_read += used;
77
78        if !discarding && buf.len() > max_size {
79            // keep only the first <1k bytes to make sure we can actually emit a usable error
80            let length_to_keep = min(1000, max_size);
81            let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
82            truncated.copy_from_slice(&buf[0..length_to_keep]);
83            discarded_for_size_and_truncated.push(truncated);
84            discarding = true;
85        }
86
87        if done {
88            if !discarding {
89                return Ok(ReadResult {
90                    successfully_read: Some(total_read),
91                    discarded_for_size_and_truncated,
92                });
93            } else {
94                discarding = false;
95                buf.clear();
96            }
97        } else if used == 0 {
98            // We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes
99            // us to observe an incomplete write. We return None here and let the loop continue
100            // next time the method is called. This is safe because the buffer is specific to this
101            // FileWatcher.
102            return Ok(ReadResult {
103                successfully_read: None,
104                discarded_for_size_and_truncated,
105            });
106        }
107    }
108}
109
110#[cfg(test)]
111mod test {
112    use std::{io::Cursor, num::NonZeroU8, ops::Range};
113
114    use bytes::{BufMut, BytesMut};
115    use quickcheck::{QuickCheck, TestResult};
116
117    use crate::buffer::ReadResult;
118
119    use super::read_until_with_max_size;
120
121    fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
122        // The `global_data` is the view of `chunks` as a single contiguous
123        // block of memory. Where `chunks` simulates what happens when bytes are
124        // fitfully available `global_data` is the view of all chunks assembled
125        // after every byte is available.
126        let mut global_data = BytesMut::new();
127
128        // `DelimDetails` describes the nature of each delimiter found in the
129        // `chunks`.
130        #[derive(Clone)]
131        struct DelimDetails {
132            /// Index in `global_data`, absolute offset
133            global_index: usize,
134            /// Index in each `chunk`, relative offset
135            interior_index: usize,
136            /// Whether this delimiter was within `max_size` of its previous
137            /// peer
138            within_max_size: bool,
139            /// Which chunk in `chunks` this delimiter was found in
140            chunk_index: usize,
141            /// The range of bytes that this delimiter bounds with its previous
142            /// peer
143            byte_range: Range<usize>,
144        }
145
146        // Move through the `chunks` and discover at what positions an instance
147        // of `delim` exists in the chunk stream and whether that `delim` is
148        // more than `max_size` bytes away from the previous `delim`. This loop
149        // constructs the `facts` vector that holds `DelimDetails` instances and
150        // also populates `global_data`.
151        let mut facts: Vec<DelimDetails> = Vec::new();
152        let mut global_index: usize = 0;
153        let mut previous_offset: usize = 0;
154        for (i, chunk) in chunks.iter().enumerate() {
155            for (interior_index, byte) in chunk.iter().enumerate() {
156                global_data.put_u8(*byte);
157                if *byte == delim {
158                    let span = global_index - previous_offset;
159                    let within_max_size = span <= max_size.get() as usize;
160                    facts.push(DelimDetails {
161                        global_index,
162                        within_max_size,
163                        chunk_index: i,
164                        interior_index,
165                        byte_range: (previous_offset..global_index),
166                    });
167                    previous_offset = global_index + 1;
168                }
169                global_index += 1;
170            }
171        }
172
173        // Our model is only concerned with the first valid delimiter in the
174        // chunk stream. As such, discover that first valid delimiter and record
175        // it specially.
176        let mut first_delim: Option<DelimDetails> = None;
177        for fact in &facts {
178            if fact.within_max_size {
179                first_delim = Some(fact.clone());
180                break;
181            }
182        }
183
184        let mut position = 0;
185        let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
186        // NOTE: The delimiter may be multiple bytes wide but for the purpose of
187        // this model a single byte does well enough.
188        let delimiter: [u8; 1] = [delim];
189        for (idx, chunk) in chunks.iter().enumerate() {
190            let mut reader = Cursor::new(&chunk);
191
192            match read_until_with_max_size(
193                &mut reader,
194                &mut position,
195                &delimiter,
196                &mut buffer,
197                max_size.get() as usize,
198            )
199            .unwrap()
200            {
201                ReadResult {
202                    successfully_read: None,
203                    discarded_for_size_and_truncated: _,
204                } => {
205                    // Subject only returns None if this is the last chunk _and_
206                    // the chunk did not contain a delimiter _or_ the delimiter
207                    // was outside the max_size range _or_ the current chunk is empty.
208                    let has_valid_delimiter = facts
209                        .iter()
210                        .any(|details| ((details.chunk_index == idx) && details.within_max_size));
211                    assert!(chunk.is_empty() || !has_valid_delimiter)
212                }
213                ReadResult {
214                    successfully_read: Some(total_read),
215                    discarded_for_size_and_truncated: _,
216                } => {
217                    // Now that the function has returned we confirm that the
218                    // returned details match our `first_delim` and also that
219                    // the `buffer` is populated correctly.
220                    assert!(first_delim.is_some());
221                    assert_eq!(
222                        first_delim.clone().unwrap().global_index + 1,
223                        position as usize
224                    );
225                    assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
226                    assert_eq!(
227                        buffer.get(..),
228                        global_data.get(first_delim.unwrap().byte_range)
229                    );
230                    break;
231                }
232            }
233        }
234
235        TestResult::passed()
236    }
237
238    #[test]
239    fn qc_read_until_with_max_size() {
240        // The `read_until_with_max` function is intended to be called
241        // multiple times until it returns Ok(Some(usize)). The function
242        // should never return error in this test. If the return is None we
243        // will call until it is not. Once return is Some the interior value
244        // should be the position of the first delim in the buffer, unless
245        // that delim is past the max_size barrier in which case it will be
246        // the position of the first delim that is within some multiple of
247        // max_size.
248        //
249        // I think I will adjust the function to have a richer return
250        // type. This will help in the transition to async.
251        QuickCheck::new()
252            .tests(1_000)
253            .max_tests(2_000)
254            .quickcheck(qc_inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
255    }
256}