codecs/decoding/framing/
length_delimited.rs

1use bytes::{Bytes, BytesMut};
2use derivative::Derivative;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use super::BoxedFramingError;
7use crate::common::length_delimited::LengthDelimitedCoderOptions;
8
9/// Config used to build a `LengthDelimitedDecoder`.
10#[configurable_component]
11#[derive(Debug, Clone, Derivative)]
12#[derivative(Default)]
13pub struct LengthDelimitedDecoderConfig {
14    /// Options for the length delimited decoder.
15    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
16    pub length_delimited: LengthDelimitedCoderOptions,
17}
18
19impl LengthDelimitedDecoderConfig {
20    /// Build the `LengthDelimitedDecoder` from this configuration.
21    pub fn build(&self) -> LengthDelimitedDecoder {
22        LengthDelimitedDecoder::new(&self.length_delimited)
23    }
24}
25
26/// A codec for handling bytes sequences whose length is encoded in a frame head.
27#[derive(Debug, Clone)]
28pub struct LengthDelimitedDecoder(tokio_util::codec::LengthDelimitedCodec);
29
30impl LengthDelimitedDecoder {
31    /// Creates a new `LengthDelimitedDecoder`.
32    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
33        Self(config.build_codec())
34    }
35}
36
37impl Default for LengthDelimitedDecoder {
38    fn default() -> Self {
39        Self(tokio_util::codec::LengthDelimitedCodec::new())
40    }
41}
42
43impl Decoder for LengthDelimitedDecoder {
44    type Item = Bytes;
45    type Error = BoxedFramingError;
46
47    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
48        self.0
49            .decode(src)
50            .map(|bytes| bytes.map(BytesMut::freeze))
51            .map_err(Into::into)
52    }
53
54    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
55        self.0
56            .decode_eof(src)
57            .map(|bytes| bytes.map(BytesMut::freeze))
58            .map_err(Into::into)
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65
66    #[test]
67    fn decode_frame() {
68        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
69        let mut decoder = LengthDelimitedDecoder::default();
70
71        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
72        assert_eq!(decoder.decode(&mut input).unwrap(), None);
73    }
74
75    #[test]
76    fn decode_frame_2byte_length() {
77        let mut input = BytesMut::from("\x00\x03foo");
78        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
79            length_field_length: 2,
80            ..Default::default()
81        });
82
83        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
84        assert_eq!(decoder.decode(&mut input).unwrap(), None);
85    }
86
87    #[test]
88    fn decode_frame_little_endian() {
89        let mut input = BytesMut::from("\x03\x00\x00\x00foo");
90        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
91            length_field_is_big_endian: false,
92            ..Default::default()
93        });
94
95        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
96        assert_eq!(decoder.decode(&mut input).unwrap(), None);
97    }
98
99    #[test]
100    fn decode_frame_2byte_length_with_offset() {
101        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
102        let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
103            length_field_length: 2,
104            length_field_offset: 2,
105            ..Default::default()
106        });
107
108        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
109        assert_eq!(decoder.decode(&mut input).unwrap(), None);
110    }
111
112    #[test]
113    fn decode_frame_ignore_unexpected_eof() {
114        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
115        let mut decoder = LengthDelimitedDecoder::default();
116
117        assert_eq!(decoder.decode(&mut input).unwrap(), None);
118    }
119
120    #[test]
121    fn decode_frame_ignore_exceeding_bytes_without_header() {
122        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
123        let mut decoder = LengthDelimitedDecoder::default();
124
125        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
126        assert_eq!(decoder.decode(&mut input).unwrap(), None);
127    }
128
129    #[test]
130    fn decode_frame_ignore_missing_header() {
131        let mut input = BytesMut::from("foo");
132        let mut decoder = LengthDelimitedDecoder::default();
133
134        assert_eq!(decoder.decode(&mut input).unwrap(), None);
135    }
136
137    #[test]
138    fn decode_frames() {
139        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
140        let mut decoder = LengthDelimitedDecoder::default();
141
142        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
143        assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
144        assert_eq!(decoder.decode(&mut input).unwrap(), None);
145    }
146
147    #[test]
148    fn decode_eof_frame() {
149        let mut input = BytesMut::from("\x00\x00\x00\x03foo");
150        let mut decoder = LengthDelimitedDecoder::default();
151
152        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
153        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
154    }
155
156    #[test]
157    fn decode_eof_frame_unexpected_eof() {
158        let mut input = BytesMut::from("\x00\x00\x00\x03fo");
159        let mut decoder = LengthDelimitedDecoder::default();
160
161        assert!(decoder.decode_eof(&mut input).is_err());
162    }
163
164    #[test]
165    fn decode_eof_frame_exceeding_bytes_without_header() {
166        let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
167        let mut decoder = LengthDelimitedDecoder::default();
168
169        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
170        assert!(decoder.decode_eof(&mut input).is_err());
171    }
172
173    #[test]
174    fn decode_eof_frame_missing_header() {
175        let mut input = BytesMut::from("foo");
176        let mut decoder = LengthDelimitedDecoder::default();
177
178        assert!(decoder.decode_eof(&mut input).is_err());
179    }
180
181    #[test]
182    fn decode_eof_frames() {
183        let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
184        let mut decoder = LengthDelimitedDecoder::default();
185
186        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
187        assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
188        assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
189    }
190}