codecs/decoding/framing/
length_delimited.rs1use bytes::{Bytes, BytesMut};
2use derivative::Derivative;
3use tokio_util::codec::Decoder;
4use vector_config::configurable_component;
5
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8use super::BoxedFramingError;
9
10#[configurable_component]
12#[derive(Debug, Clone, Derivative)]
13#[derivative(Default)]
14pub struct LengthDelimitedDecoderConfig {
15 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
17 pub length_delimited: LengthDelimitedCoderOptions,
18}
19
20impl LengthDelimitedDecoderConfig {
21 pub fn build(&self) -> LengthDelimitedDecoder {
23 LengthDelimitedDecoder::new(&self.length_delimited)
24 }
25}
26
27#[derive(Debug, Clone)]
29pub struct LengthDelimitedDecoder(tokio_util::codec::LengthDelimitedCodec);
30
31impl LengthDelimitedDecoder {
32 pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
34 Self(config.build_codec())
35 }
36}
37
38impl Default for LengthDelimitedDecoder {
39 fn default() -> Self {
40 Self(tokio_util::codec::LengthDelimitedCodec::new())
41 }
42}
43
44impl Decoder for LengthDelimitedDecoder {
45 type Item = Bytes;
46 type Error = BoxedFramingError;
47
48 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
49 self.0
50 .decode(src)
51 .map(|bytes| bytes.map(BytesMut::freeze))
52 .map_err(Into::into)
53 }
54
55 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
56 self.0
57 .decode_eof(src)
58 .map(|bytes| bytes.map(BytesMut::freeze))
59 .map_err(Into::into)
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 use super::*;
66
67 #[test]
68 fn decode_frame() {
69 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
70 let mut decoder = LengthDelimitedDecoder::default();
71
72 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
73 assert_eq!(decoder.decode(&mut input).unwrap(), None);
74 }
75
76 #[test]
77 fn decode_frame_2byte_length() {
78 let mut input = BytesMut::from("\x00\x03foo");
79 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
80 length_field_length: 2,
81 ..Default::default()
82 });
83
84 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
85 assert_eq!(decoder.decode(&mut input).unwrap(), None);
86 }
87
88 #[test]
89 fn decode_frame_little_endian() {
90 let mut input = BytesMut::from("\x03\x00\x00\x00foo");
91 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
92 length_field_is_big_endian: false,
93 ..Default::default()
94 });
95
96 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
97 assert_eq!(decoder.decode(&mut input).unwrap(), None);
98 }
99
100 #[test]
101 fn decode_frame_2byte_length_with_offset() {
102 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
103 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
104 length_field_length: 2,
105 length_field_offset: 2,
106 ..Default::default()
107 });
108
109 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
110 assert_eq!(decoder.decode(&mut input).unwrap(), None);
111 }
112
113 #[test]
114 fn decode_frame_ignore_unexpected_eof() {
115 let mut input = BytesMut::from("\x00\x00\x00\x03fo");
116 let mut decoder = LengthDelimitedDecoder::default();
117
118 assert_eq!(decoder.decode(&mut input).unwrap(), None);
119 }
120
121 #[test]
122 fn decode_frame_ignore_exceeding_bytes_without_header() {
123 let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
124 let mut decoder = LengthDelimitedDecoder::default();
125
126 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
127 assert_eq!(decoder.decode(&mut input).unwrap(), None);
128 }
129
130 #[test]
131 fn decode_frame_ignore_missing_header() {
132 let mut input = BytesMut::from("foo");
133 let mut decoder = LengthDelimitedDecoder::default();
134
135 assert_eq!(decoder.decode(&mut input).unwrap(), None);
136 }
137
138 #[test]
139 fn decode_frames() {
140 let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
141 let mut decoder = LengthDelimitedDecoder::default();
142
143 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
144 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
145 assert_eq!(decoder.decode(&mut input).unwrap(), None);
146 }
147
148 #[test]
149 fn decode_eof_frame() {
150 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
151 let mut decoder = LengthDelimitedDecoder::default();
152
153 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
154 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
155 }
156
157 #[test]
158 fn decode_eof_frame_unexpected_eof() {
159 let mut input = BytesMut::from("\x00\x00\x00\x03fo");
160 let mut decoder = LengthDelimitedDecoder::default();
161
162 assert!(decoder.decode_eof(&mut input).is_err());
163 }
164
165 #[test]
166 fn decode_eof_frame_exceeding_bytes_without_header() {
167 let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
168 let mut decoder = LengthDelimitedDecoder::default();
169
170 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
171 assert!(decoder.decode_eof(&mut input).is_err());
172 }
173
174 #[test]
175 fn decode_eof_frame_missing_header() {
176 let mut input = BytesMut::from("foo");
177 let mut decoder = LengthDelimitedDecoder::default();
178
179 assert!(decoder.decode_eof(&mut input).is_err());
180 }
181
182 #[test]
183 fn decode_eof_frames() {
184 let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
185 let mut decoder = LengthDelimitedDecoder::default();
186
187 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
188 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
189 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
190 }
191}