codecs/decoding/framing/
bytes.rs1use bytes::{Bytes, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Decoder;
4
5use super::BoxedFramingError;
6
7#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct BytesDecoderConfig;
10
11impl BytesDecoderConfig {
12 pub const fn new() -> Self {
14 Self
15 }
16
17 pub const fn build(&self) -> BytesDecoder {
19 BytesDecoder::new()
20 }
21}
22
23#[derive(Debug, Clone)]
27pub struct BytesDecoder {
28 flushed: bool,
31}
32
33impl BytesDecoder {
34 pub const fn new() -> Self {
36 Self { flushed: false }
37 }
38}
39
40impl Default for BytesDecoder {
41 fn default() -> Self {
42 Self::new()
43 }
44}
45
46impl Decoder for BytesDecoder {
47 type Item = Bytes;
48 type Error = BoxedFramingError;
49
50 fn decode(&mut self, _src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
51 self.flushed = false;
52 Ok(None)
53 }
54
55 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
56 if self.flushed && src.is_empty() {
57 Ok(None)
58 } else {
59 self.flushed = true;
60 let frame = src.split();
61 Ok(Some(frame.freeze()))
62 }
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use futures::StreamExt;
69 use tokio_util::codec::FramedRead;
70
71 use super::*;
72
73 #[test]
74 fn decode_frame() {
75 let mut input = BytesMut::from("some bytes");
76 let mut decoder = BytesDecoder::new();
77
78 assert_eq!(decoder.decode(&mut input).unwrap(), None);
79 assert_eq!(
80 decoder.decode_eof(&mut input).unwrap().unwrap(),
81 "some bytes"
82 );
83 assert_eq!(decoder.decode(&mut input).unwrap(), None);
84 }
85
86 #[tokio::test]
87 async fn decode_frame_reader() {
88 let input: &[u8] = b"foo";
89 let decoder = BytesDecoder::new();
90
91 let mut reader = FramedRead::new(input, decoder);
92
93 assert_eq!(reader.next().await.unwrap().unwrap(), "foo");
94 assert!(reader.next().await.is_none());
95 }
96
97 #[tokio::test]
98 async fn decode_frame_reader_empty() {
99 let input: &[u8] = b"";
100 let decoder = BytesDecoder::new();
101
102 let mut reader = FramedRead::new(input, decoder);
103
104 assert_eq!(reader.next().await.unwrap().unwrap(), "");
105 assert!(reader.next().await.is_none());
106 }
107}