codecs/decoding/framing/
bytes.rs

1use bytes::{Bytes, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Decoder;
4
5use super::BoxedFramingError;
6
7/// Config used to build a `BytesDecoderConfig`.
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct BytesDecoderConfig;
10
11impl BytesDecoderConfig {
12    /// Creates a new `BytesDecoderConfig`.
13    pub const fn new() -> Self {
14        Self
15    }
16
17    /// Build the `ByteDecoder` from this configuration.
18    pub const fn build(&self) -> BytesDecoder {
19        BytesDecoder::new()
20    }
21}
22
23/// A decoder for passing through bytes as-is.
24///
25/// This is basically a no-op and is used to convert from `BytesMut` to `Bytes`.
26#[derive(Debug, Clone)]
27pub struct BytesDecoder {
28    /// Whether the empty buffer has been flushed. This is important to
29    /// propagate empty frames in message based transports.
30    flushed: bool,
31}
32
33impl BytesDecoder {
34    /// Creates a new `BytesDecoder`.
35    pub const fn new() -> Self {
36        Self { flushed: false }
37    }
38}
39
40impl Default for BytesDecoder {
41    fn default() -> Self {
42        Self::new()
43    }
44}
45
46impl Decoder for BytesDecoder {
47    type Item = Bytes;
48    type Error = BoxedFramingError;
49
50    fn decode(&mut self, _src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
51        self.flushed = false;
52        Ok(None)
53    }
54
55    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
56        if self.flushed && src.is_empty() {
57            Ok(None)
58        } else {
59            self.flushed = true;
60            let frame = src.split();
61            Ok(Some(frame.freeze()))
62        }
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use futures::StreamExt;
69    use tokio_util::codec::FramedRead;
70
71    use super::*;
72
73    #[test]
74    fn decode_frame() {
75        let mut input = BytesMut::from("some bytes");
76        let mut decoder = BytesDecoder::new();
77
78        assert_eq!(decoder.decode(&mut input).unwrap(), None);
79        assert_eq!(
80            decoder.decode_eof(&mut input).unwrap().unwrap(),
81            "some bytes"
82        );
83        assert_eq!(decoder.decode(&mut input).unwrap(), None);
84    }
85
86    #[tokio::test]
87    async fn decode_frame_reader() {
88        let input: &[u8] = b"foo";
89        let decoder = BytesDecoder::new();
90
91        let mut reader = FramedRead::new(input, decoder);
92
93        assert_eq!(reader.next().await.unwrap().unwrap(), "foo");
94        assert!(reader.next().await.is_none());
95    }
96
97    #[tokio::test]
98    async fn decode_frame_reader_empty() {
99        let input: &[u8] = b"";
100        let decoder = BytesDecoder::new();
101
102        let mut reader = FramedRead::new(input, decoder);
103
104        assert_eq!(reader.next().await.unwrap().unwrap(), "");
105        assert!(reader.next().await.is_none());
106    }
107}