file_source_common/
buffer.rs

1use crate::FilePosition;
2use std::{cmp::min, io, pin::Pin};
3
4use bstr::Finder;
5use bytes::BytesMut;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7
8pub struct ReadResult {
9    pub successfully_read: Option<usize>,
10    pub discarded_for_size_and_truncated: Vec<BytesMut>,
11}
12
13/// Read up to `max_size` bytes from `reader`, splitting by `delim`
14///
15/// The function reads up to `max_size` bytes from `reader`, splitting the input
16/// by `delim`. If a `delim` is not found in `reader` before `max_size` bytes
17/// are read then the reader is polled until `delim` is found and the results
18/// are discarded. Else, the result is written into `buf`.
19///
20/// The return is unusual. In the Err case this function has not written into
21/// `buf` and the caller should not examine its contents. In the Ok case if the
22/// inner value is None the caller should retry the call as the buffering read
23/// hit the end of the buffer but did not find a `delim` yet, indicating that
24/// we've sheered a write or that there were no bytes available in the `reader`
25/// and the `reader` was very sure about it. If the inner value is Some the
26/// interior `usize` is the number of bytes written into `buf`.
27///
28/// Tweak of
29/// <https://github.com/rust-lang/rust/blob/bf843eb9c2d48a80a5992a5d60858e27269f9575/src/libstd/io/mod.rs#L1471>.
30///
31/// # Performance
32///
33/// Benchmarks indicate that this function processes in the high single-digit
34/// GiB/s range for buffers of length 1KiB. For buffers any smaller than this
35/// the overhead of setup dominates our benchmarks.
36pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
37    reader: Pin<Box<&'a mut R>>,
38    position: &'a mut FilePosition,
39    delim: &'a [u8],
40    buf: &'a mut BytesMut,
41    max_size: usize,
42) -> io::Result<ReadResult> {
43    let mut total_read = 0;
44    let mut discarding = false;
45    let delim_finder = Finder::new(delim);
46    let delim_len = delim.len();
47    let mut discarded_for_size_and_truncated = Vec::new();
48    let mut reader = Box::new(reader);
49    loop {
50        let available: &[u8] = match reader.fill_buf().await {
51            Ok(n) => n,
52            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
53            Err(e) => return Err(e),
54        };
55
56        let (done, used) = {
57            match delim_finder.find(available) {
58                Some(i) => {
59                    if !discarding {
60                        buf.extend_from_slice(&available[..i]);
61                    }
62                    (true, i + delim_len)
63                }
64                None => {
65                    if !discarding {
66                        buf.extend_from_slice(available);
67                    }
68                    (false, available.len())
69                }
70            }
71        };
72        reader.consume(used);
73        *position += used as u64; // do this at exactly same time
74        total_read += used;
75
76        if !discarding && buf.len() > max_size {
77            // keep only the first <1k bytes to make sure we can actually emit a usable error
78            let length_to_keep = min(1000, max_size);
79            let mut truncated: BytesMut = BytesMut::zeroed(length_to_keep);
80            truncated.copy_from_slice(&buf[0..length_to_keep]);
81            discarded_for_size_and_truncated.push(truncated);
82            discarding = true;
83        }
84
85        if done {
86            if !discarding {
87                return Ok(ReadResult {
88                    successfully_read: Some(total_read),
89                    discarded_for_size_and_truncated,
90                });
91            } else {
92                discarding = false;
93                buf.clear();
94            }
95        } else if used == 0 {
96            // We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes
97            // us to observe an incomplete write. We return None here and let the loop continue
98            // next time the method is called. This is safe because the buffer is specific to this
99            // FileWatcher.
100            return Ok(ReadResult {
101                successfully_read: None,
102                discarded_for_size_and_truncated,
103            });
104        }
105    }
106}
107
108#[cfg(test)]
109mod test {
110    use std::{io::Cursor, num::NonZeroU8, ops::Range};
111
112    use bytes::{BufMut, BytesMut};
113    use quickcheck::{QuickCheck, TestResult};
114    use tokio::io::BufReader;
115
116    use super::read_until_with_max_size;
117    use crate::buffer::ReadResult;
118
119    async fn qc_inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
120        // The `global_data` is the view of `chunks` as a single contiguous
121        // block of memory. Where `chunks` simulates what happens when bytes are
122        // fitfully available `global_data` is the view of all chunks assembled
123        // after every byte is available.
124        let mut global_data = BytesMut::new();
125
126        // `DelimDetails` describes the nature of each delimiter found in the
127        // `chunks`.
128        #[derive(Clone)]
129        struct DelimDetails {
130            /// Index in `global_data`, absolute offset
131            global_index: usize,
132            /// Index in each `chunk`, relative offset
133            interior_index: usize,
134            /// Whether this delimiter was within `max_size` of its previous
135            /// peer
136            within_max_size: bool,
137            /// Which chunk in `chunks` this delimiter was found in
138            chunk_index: usize,
139            /// The range of bytes that this delimiter bounds with its previous
140            /// peer
141            byte_range: Range<usize>,
142        }
143
144        // Move through the `chunks` and discover at what positions an instance
145        // of `delim` exists in the chunk stream and whether that `delim` is
146        // more than `max_size` bytes away from the previous `delim`. This loop
147        // constructs the `facts` vector that holds `DelimDetails` instances and
148        // also populates `global_data`.
149        let mut facts: Vec<DelimDetails> = Vec::new();
150        let mut global_index: usize = 0;
151        let mut previous_offset: usize = 0;
152        for (i, chunk) in chunks.iter().enumerate() {
153            for (interior_index, byte) in chunk.iter().enumerate() {
154                global_data.put_u8(*byte);
155                if *byte == delim {
156                    let span = global_index - previous_offset;
157                    let within_max_size = span <= max_size.get() as usize;
158                    facts.push(DelimDetails {
159                        global_index,
160                        within_max_size,
161                        chunk_index: i,
162                        interior_index,
163                        byte_range: (previous_offset..global_index),
164                    });
165                    previous_offset = global_index + 1;
166                }
167                global_index += 1;
168            }
169        }
170
171        // Our model is only concerned with the first valid delimiter in the
172        // chunk stream. As such, discover that first valid delimiter and record
173        // it specially.
174        let mut first_delim: Option<DelimDetails> = None;
175        for fact in &facts {
176            if fact.within_max_size {
177                first_delim = Some(fact.clone());
178                break;
179            }
180        }
181
182        let mut position = 0;
183        let mut buffer = BytesMut::with_capacity(max_size.get() as usize);
184        // NOTE: The delimiter may be multiple bytes wide but for the purpose of
185        // this model a single byte does well enough.
186        let delimiter: [u8; 1] = [delim];
187        for (idx, chunk) in chunks.iter().enumerate() {
188            let mut reader = BufReader::new(Cursor::new(&chunk));
189
190            match read_until_with_max_size(
191                Box::pin(&mut reader),
192                &mut position,
193                &delimiter,
194                &mut buffer,
195                max_size.get() as usize,
196            )
197            .await
198            .unwrap()
199            {
200                ReadResult {
201                    successfully_read: None,
202                    discarded_for_size_and_truncated: _,
203                } => {
204                    // Subject only returns None if this is the last chunk _and_
205                    // the chunk did not contain a delimiter _or_ the delimiter
206                    // was outside the max_size range _or_ the current chunk is empty.
207                    let has_valid_delimiter = facts
208                        .iter()
209                        .any(|details| ((details.chunk_index == idx) && details.within_max_size));
210                    assert!(chunk.is_empty() || !has_valid_delimiter)
211                }
212                ReadResult {
213                    successfully_read: Some(total_read),
214                    discarded_for_size_and_truncated: _,
215                } => {
216                    // Now that the function has returned we confirm that the
217                    // returned details match our `first_delim` and also that
218                    // the `buffer` is populated correctly.
219                    assert!(first_delim.is_some());
220                    assert_eq!(
221                        first_delim.clone().unwrap().global_index + 1,
222                        position as usize
223                    );
224                    assert_eq!(first_delim.clone().unwrap().interior_index + 1, total_read);
225                    assert_eq!(
226                        buffer.get(..),
227                        global_data.get(first_delim.unwrap().byte_range)
228                    );
229                    break;
230                }
231            }
232        }
233
234        TestResult::passed()
235    }
236
237    #[tokio::test]
238    async fn qc_read_until_with_max_size() {
239        // The `read_until_with_max` function is intended to be called
240        // multiple times until it returns Ok(Some(usize)). The function
241        // should never return error in this test. If the return is None we
242        // will call until it is not. Once return is Some the interior value
243        // should be the position of the first delim in the buffer, unless
244        // that delim is past the max_size barrier in which case it will be
245        // the position of the first delim that is within some multiple of
246        // max_size.
247        //
248        // I think I will adjust the function to have a richer return
249        // type. This will help in the transition to async.
250        fn inner(chunks: Vec<Vec<u8>>, delim: u8, max_size: NonZeroU8) -> TestResult {
251            let handle = tokio::runtime::Handle::current();
252            handle.block_on(qc_inner(chunks, delim, max_size));
253            TestResult::passed()
254        }
255
256        tokio::task::spawn_blocking(|| {
257            QuickCheck::new()
258                .tests(1_000)
259                .max_tests(2_000)
260                .quickcheck(inner as fn(Vec<Vec<u8>>, u8, NonZeroU8) -> TestResult);
261        })
262        .await
263        .unwrap()
264    }
265}