codecs/decoding/framing/
length_delimited.rs1use bytes::{Bytes, BytesMut};
2use derivative::Derivative;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use super::BoxedFramingError;
7use crate::common::length_delimited::LengthDelimitedCoderOptions;
8
9#[configurable_component]
11#[derive(Debug, Clone, Derivative)]
12#[derivative(Default)]
13pub struct LengthDelimitedDecoderConfig {
14 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
16 pub length_delimited: LengthDelimitedCoderOptions,
17}
18
19impl LengthDelimitedDecoderConfig {
20 pub fn build(&self) -> LengthDelimitedDecoder {
22 LengthDelimitedDecoder::new(&self.length_delimited)
23 }
24}
25
26#[derive(Debug, Clone)]
28pub struct LengthDelimitedDecoder(tokio_util::codec::LengthDelimitedCodec);
29
30impl LengthDelimitedDecoder {
31 pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
33 Self(config.build_codec())
34 }
35}
36
37impl Default for LengthDelimitedDecoder {
38 fn default() -> Self {
39 Self(tokio_util::codec::LengthDelimitedCodec::new())
40 }
41}
42
43impl Decoder for LengthDelimitedDecoder {
44 type Item = Bytes;
45 type Error = BoxedFramingError;
46
47 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
48 self.0
49 .decode(src)
50 .map(|bytes| bytes.map(BytesMut::freeze))
51 .map_err(Into::into)
52 }
53
54 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
55 self.0
56 .decode_eof(src)
57 .map(|bytes| bytes.map(BytesMut::freeze))
58 .map_err(Into::into)
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65
66 #[test]
67 fn decode_frame() {
68 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
69 let mut decoder = LengthDelimitedDecoder::default();
70
71 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
72 assert_eq!(decoder.decode(&mut input).unwrap(), None);
73 }
74
75 #[test]
76 fn decode_frame_2byte_length() {
77 let mut input = BytesMut::from("\x00\x03foo");
78 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
79 length_field_length: 2,
80 ..Default::default()
81 });
82
83 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
84 assert_eq!(decoder.decode(&mut input).unwrap(), None);
85 }
86
87 #[test]
88 fn decode_frame_little_endian() {
89 let mut input = BytesMut::from("\x03\x00\x00\x00foo");
90 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
91 length_field_is_big_endian: false,
92 ..Default::default()
93 });
94
95 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
96 assert_eq!(decoder.decode(&mut input).unwrap(), None);
97 }
98
99 #[test]
100 fn decode_frame_2byte_length_with_offset() {
101 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
102 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
103 length_field_length: 2,
104 length_field_offset: 2,
105 ..Default::default()
106 });
107
108 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
109 assert_eq!(decoder.decode(&mut input).unwrap(), None);
110 }
111
112 #[test]
113 fn decode_frame_ignore_unexpected_eof() {
114 let mut input = BytesMut::from("\x00\x00\x00\x03fo");
115 let mut decoder = LengthDelimitedDecoder::default();
116
117 assert_eq!(decoder.decode(&mut input).unwrap(), None);
118 }
119
120 #[test]
121 fn decode_frame_ignore_exceeding_bytes_without_header() {
122 let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
123 let mut decoder = LengthDelimitedDecoder::default();
124
125 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
126 assert_eq!(decoder.decode(&mut input).unwrap(), None);
127 }
128
129 #[test]
130 fn decode_frame_ignore_missing_header() {
131 let mut input = BytesMut::from("foo");
132 let mut decoder = LengthDelimitedDecoder::default();
133
134 assert_eq!(decoder.decode(&mut input).unwrap(), None);
135 }
136
137 #[test]
138 fn decode_frames() {
139 let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
140 let mut decoder = LengthDelimitedDecoder::default();
141
142 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
143 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
144 assert_eq!(decoder.decode(&mut input).unwrap(), None);
145 }
146
147 #[test]
148 fn decode_eof_frame() {
149 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
150 let mut decoder = LengthDelimitedDecoder::default();
151
152 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
153 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
154 }
155
156 #[test]
157 fn decode_eof_frame_unexpected_eof() {
158 let mut input = BytesMut::from("\x00\x00\x00\x03fo");
159 let mut decoder = LengthDelimitedDecoder::default();
160
161 assert!(decoder.decode_eof(&mut input).is_err());
162 }
163
164 #[test]
165 fn decode_eof_frame_exceeding_bytes_without_header() {
166 let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
167 let mut decoder = LengthDelimitedDecoder::default();
168
169 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
170 assert!(decoder.decode_eof(&mut input).is_err());
171 }
172
173 #[test]
174 fn decode_eof_frame_missing_header() {
175 let mut input = BytesMut::from("foo");
176 let mut decoder = LengthDelimitedDecoder::default();
177
178 assert!(decoder.decode_eof(&mut input).is_err());
179 }
180
181 #[test]
182 fn decode_eof_frames() {
183 let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
184 let mut decoder = LengthDelimitedDecoder::default();
185
186 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
187 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
188 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
189 }
190}