codecs/decoding/framing/
length_delimited.rs

1use bytes::{Bytes, BytesMut};
2use tokio_util::codec::Decoder;
3use vector_config::configurable_component;
4
5use super::BoxedFramingError;
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8/// Config used to build a `LengthDelimitedDecoder`.
9#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct LengthDelimitedDecoderConfig {
12    /// Options for the length delimited decoder.
13    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
14    pub length_delimited: LengthDelimitedCoderOptions,
15}
16
17impl LengthDelimitedDecoderConfig {
18    /// Build the `LengthDelimitedDecoder` from this configuration.
19    pub fn build(&self) -> LengthDelimitedDecoder {
20        LengthDelimitedDecoder::new(&self.length_delimited)
21    }
22}
23
24/// A codec for handling bytes sequences whose length is encoded in a frame head.
25#[derive(Debug, Clone)]
26pub struct LengthDelimitedDecoder(tokio_util::codec::LengthDelimitedCodec);
27
28impl LengthDelimitedDecoder {
29    /// Creates a new `LengthDelimitedDecoder`.
30    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
31        Self(config.build_codec())
32    }
33}
34
35impl Default for LengthDelimitedDecoder {
36    fn default() -> Self {
37        Self(tokio_util::codec::LengthDelimitedCodec::new())
38    }
39}
40
41impl Decoder for LengthDelimitedDecoder {
42    type Item = Bytes;
43    type Error = BoxedFramingError;
44
45    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
46        self.0
47            .decode(src)
48            .map(|bytes| bytes.map(BytesMut::freeze))
49            .map_err(Into::into)
50    }
51
52    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
53        self.0
54            .decode_eof(src)
55            .map(|bytes| bytes.map(BytesMut::freeze))
56            .map_err(Into::into)
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63
64    #[test]
65    fn decode_frame() {
66        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
67        let mut decoder = LengthDelimitedDecoder::default();
68
69        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
70        assert_eq!(decoder.decode(&mut input).unwrap(), None);
71    }
72
73    #[test]
74    fn decode_frame_2byte_length() {
75        let mut input = BytesMut::from("\x00\x03foo");
76        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
77            length_field_length: 2,
78            ..Default::default()
79        });
80
81        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
82        assert_eq!(decoder.decode(&mut input).unwrap(), None);
83    }
84
85    #[test]
86    fn decode_frame_little_endian() {
87        let mut input = BytesMut::from("\x03\x00\x00\x00foo");
88        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
89            length_field_is_big_endian: false,
90            ..Default::default()
91        });
92
93        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
94        assert_eq!(decoder.decode(&mut input).unwrap(), None);
95    }
96
97    #[test]
98    fn decode_frame_2byte_length_with_offset() {
99        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
100        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
101            length_field_length: 2,
102            length_field_offset: 2,
103            ..Default::default()
104        });
105
106        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
107        assert_eq!(decoder.decode(&mut input).unwrap(), None);
108    }
109
110    #[test]
111    fn decode_frame_ignore_unexpected_eof() {
112        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
113        let mut decoder = LengthDelimitedDecoder::default();
114
115        assert_eq!(decoder.decode(&mut input).unwrap(), None);
116    }
117
118    #[test]
119    fn decode_frame_ignore_exceeding_bytes_without_header() {
120        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
121        let mut decoder = LengthDelimitedDecoder::default();
122
123        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
124        assert_eq!(decoder.decode(&mut input).unwrap(), None);
125    }
126
127    #[test]
128    fn decode_frame_ignore_missing_header() {
129        let mut input = BytesMut::from("foo");
130        let mut decoder = LengthDelimitedDecoder::default();
131
132        assert_eq!(decoder.decode(&mut input).unwrap(), None);
133    }
134
135    #[test]
136    fn decode_frames() {
137        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
138        let mut decoder = LengthDelimitedDecoder::default();
139
140        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
141        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
142        assert_eq!(decoder.decode(&mut input).unwrap(), None);
143    }
144
145    #[test]
146    fn decode_eof_frame() {
147        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
148        let mut decoder = LengthDelimitedDecoder::default();
149
150        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
151        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
152    }
153
154    #[test]
155    fn decode_eof_frame_unexpected_eof() {
156        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
157        let mut decoder = LengthDelimitedDecoder::default();
158
159        assert!(decoder.decode_eof(&mut input).is_err());
160    }
161
162    #[test]
163    fn decode_eof_frame_exceeding_bytes_without_header() {
164        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
165        let mut decoder = LengthDelimitedDecoder::default();
166
167        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
168        assert!(decoder.decode_eof(&mut input).is_err());
169    }
170
171    #[test]
172    fn decode_eof_frame_missing_header() {
173        let mut input = BytesMut::from("foo");
174        let mut decoder = LengthDelimitedDecoder::default();
175
176        assert!(decoder.decode_eof(&mut input).is_err());
177    }
178
179    #[test]
180    fn decode_eof_frames() {
181        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
182        let mut decoder = LengthDelimitedDecoder::default();
183
184        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
185        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
186        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
187    }
188}