codecs/decoding/framing/
length_delimited.rs

1use bytes::{Bytes, BytesMut};
2use derivative::Derivative;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8use super::BoxedFramingError;
9
10/// Config used to build a `LengthDelimitedDecoder`.
11#[configurable_component]
12#[derive(Debug, Clone, Derivative)]
13#[derivative(Default)]
14pub struct LengthDelimitedDecoderConfig {
15    /// Options for the length delimited decoder.
16    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
17    pub length_delimited: LengthDelimitedCoderOptions,
18}
19
20impl LengthDelimitedDecoderConfig {
21    /// Build the `LengthDelimitedDecoder` from this configuration.
22    pub fn build(&self) -> LengthDelimitedDecoder {
23        LengthDelimitedDecoder::new(&self.length_delimited)
24    }
25}
26
27/// A codec for handling bytes sequences whose length is encoded in a frame head.
28#[derive(Debug, Clone)]
29pub struct LengthDelimitedDecoder(tokio_util::codec::LengthDelimitedCodec);
30
31impl LengthDelimitedDecoder {
32    /// Creates a new `LengthDelimitedDecoder`.
33    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
34        Self(config.build_codec())
35    }
36}
37
38impl Default for LengthDelimitedDecoder {
39    fn default() -> Self {
40        Self(tokio_util::codec::LengthDelimitedCodec::new())
41    }
42}
43
44impl Decoder for LengthDelimitedDecoder {
45    type Item = Bytes;
46    type Error = BoxedFramingError;
47
48    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
49        self.0
50            .decode(src)
51            .map(|bytes| bytes.map(BytesMut::freeze))
52            .map_err(Into::into)
53    }
54
55    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
56        self.0
57            .decode_eof(src)
58            .map(|bytes| bytes.map(BytesMut::freeze))
59            .map_err(Into::into)
60    }
61}
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66
67    #[test]
68    fn decode_frame() {
69        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
70        let mut decoder = LengthDelimitedDecoder::default();
71
72        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
73        assert_eq!(decoder.decode(&mut input).unwrap(), None);
74    }
75
76    #[test]
77    fn decode_frame_2byte_length() {
78        let mut input = BytesMut::from("\x00\x03foo");
79        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
80            length_field_length: 2,
81            ..Default::default()
82        });
83
84        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
85        assert_eq!(decoder.decode(&mut input).unwrap(), None);
86    }
87
88    #[test]
89    fn decode_frame_little_endian() {
90        let mut input = BytesMut::from("\x03\x00\x00\x00foo");
91        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
92            length_field_is_big_endian: false,
93            ..Default::default()
94        });
95
96        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
97        assert_eq!(decoder.decode(&mut input).unwrap(), None);
98    }
99
100    #[test]
101    fn decode_frame_2byte_length_with_offset() {
102        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
103        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
104            length_field_length: 2,
105            length_field_offset: 2,
106            ..Default::default()
107        });
108
109        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
110        assert_eq!(decoder.decode(&mut input).unwrap(), None);
111    }
112
113    #[test]
114    fn decode_frame_ignore_unexpected_eof() {
115        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
116        let mut decoder = LengthDelimitedDecoder::default();
117
118        assert_eq!(decoder.decode(&mut input).unwrap(), None);
119    }
120
121    #[test]
122    fn decode_frame_ignore_exceeding_bytes_without_header() {
123        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
124        let mut decoder = LengthDelimitedDecoder::default();
125
126        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
127        assert_eq!(decoder.decode(&mut input).unwrap(), None);
128    }
129
130    #[test]
131    fn decode_frame_ignore_missing_header() {
132        let mut input = BytesMut::from("foo");
133        let mut decoder = LengthDelimitedDecoder::default();
134
135        assert_eq!(decoder.decode(&mut input).unwrap(), None);
136    }
137
138    #[test]
139    fn decode_frames() {
140        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
141        let mut decoder = LengthDelimitedDecoder::default();
142
143        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
144        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
145        assert_eq!(decoder.decode(&mut input).unwrap(), None);
146    }
147
148    #[test]
149    fn decode_eof_frame() {
150        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
151        let mut decoder = LengthDelimitedDecoder::default();
152
153        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
154        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
155    }
156
157    #[test]
158    fn decode_eof_frame_unexpected_eof() {
159        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
160        let mut decoder = LengthDelimitedDecoder::default();
161
162        assert!(decoder.decode_eof(&mut input).is_err());
163    }
164
165    #[test]
166    fn decode_eof_frame_exceeding_bytes_without_header() {
167        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
168        let mut decoder = LengthDelimitedDecoder::default();
169
170        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
171        assert!(decoder.decode_eof(&mut input).is_err());
172    }
173
174    #[test]
175    fn decode_eof_frame_missing_header() {
176        let mut input = BytesMut::from("foo");
177        let mut decoder = LengthDelimitedDecoder::default();
178
179        assert!(decoder.decode_eof(&mut input).is_err());
180    }
181
182    #[test]
183    fn decode_eof_frames() {
184        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
185        let mut decoder = LengthDelimitedDecoder::default();
186
187        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
188        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
189        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
190    }
191}