codecs/decoding/framing/
length_delimited.rs1use bytes::{Bytes, BytesMut};
2use tokio_util::codec::Decoder;
3use vector_config::configurable_component;
4
5use super::BoxedFramingError;
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct LengthDelimitedDecoderConfig {
12 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
14 pub length_delimited: LengthDelimitedCoderOptions,
15}
16
17impl LengthDelimitedDecoderConfig {
18 pub fn build(&self) -> LengthDelimitedDecoder {
20 LengthDelimitedDecoder::new(&self.length_delimited)
21 }
22}
23
24#[derive(Debug, Clone)]
26pub struct LengthDelimitedDecoder(tokio_util::codec::LengthDelimitedCodec);
27
28impl LengthDelimitedDecoder {
29 pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
31 Self(config.build_codec())
32 }
33}
34
35impl Default for LengthDelimitedDecoder {
36 fn default() -> Self {
37 Self(tokio_util::codec::LengthDelimitedCodec::new())
38 }
39}
40
41impl Decoder for LengthDelimitedDecoder {
42 type Item = Bytes;
43 type Error = BoxedFramingError;
44
45 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
46 self.0
47 .decode(src)
48 .map(|bytes| bytes.map(BytesMut::freeze))
49 .map_err(Into::into)
50 }
51
52 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
53 self.0
54 .decode_eof(src)
55 .map(|bytes| bytes.map(BytesMut::freeze))
56 .map_err(Into::into)
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::*;
63
64 #[test]
65 fn decode_frame() {
66 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
67 let mut decoder = LengthDelimitedDecoder::default();
68
69 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
70 assert_eq!(decoder.decode(&mut input).unwrap(), None);
71 }
72
73 #[test]
74 fn decode_frame_2byte_length() {
75 let mut input = BytesMut::from("\x00\x03foo");
76 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
77 length_field_length: 2,
78 ..Default::default()
79 });
80
81 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
82 assert_eq!(decoder.decode(&mut input).unwrap(), None);
83 }
84
85 #[test]
86 fn decode_frame_little_endian() {
87 let mut input = BytesMut::from("\x03\x00\x00\x00foo");
88 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
89 length_field_is_big_endian: false,
90 ..Default::default()
91 });
92
93 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
94 assert_eq!(decoder.decode(&mut input).unwrap(), None);
95 }
96
97 #[test]
98 fn decode_frame_2byte_length_with_offset() {
99 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
100 let mut decoder = LengthDelimitedDecoder::new(&LengthDelimitedCoderOptions {
101 length_field_length: 2,
102 length_field_offset: 2,
103 ..Default::default()
104 });
105
106 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
107 assert_eq!(decoder.decode(&mut input).unwrap(), None);
108 }
109
110 #[test]
111 fn decode_frame_ignore_unexpected_eof() {
112 let mut input = BytesMut::from("\x00\x00\x00\x03fo");
113 let mut decoder = LengthDelimitedDecoder::default();
114
115 assert_eq!(decoder.decode(&mut input).unwrap(), None);
116 }
117
118 #[test]
119 fn decode_frame_ignore_exceeding_bytes_without_header() {
120 let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
121 let mut decoder = LengthDelimitedDecoder::default();
122
123 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
124 assert_eq!(decoder.decode(&mut input).unwrap(), None);
125 }
126
127 #[test]
128 fn decode_frame_ignore_missing_header() {
129 let mut input = BytesMut::from("foo");
130 let mut decoder = LengthDelimitedDecoder::default();
131
132 assert_eq!(decoder.decode(&mut input).unwrap(), None);
133 }
134
135 #[test]
136 fn decode_frames() {
137 let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
138 let mut decoder = LengthDelimitedDecoder::default();
139
140 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo");
141 assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "bar");
142 assert_eq!(decoder.decode(&mut input).unwrap(), None);
143 }
144
145 #[test]
146 fn decode_eof_frame() {
147 let mut input = BytesMut::from("\x00\x00\x00\x03foo");
148 let mut decoder = LengthDelimitedDecoder::default();
149
150 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
151 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
152 }
153
154 #[test]
155 fn decode_eof_frame_unexpected_eof() {
156 let mut input = BytesMut::from("\x00\x00\x00\x03fo");
157 let mut decoder = LengthDelimitedDecoder::default();
158
159 assert!(decoder.decode_eof(&mut input).is_err());
160 }
161
162 #[test]
163 fn decode_eof_frame_exceeding_bytes_without_header() {
164 let mut input = BytesMut::from("\x00\x00\x00\x03fooo");
165 let mut decoder = LengthDelimitedDecoder::default();
166
167 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
168 assert!(decoder.decode_eof(&mut input).is_err());
169 }
170
171 #[test]
172 fn decode_eof_frame_missing_header() {
173 let mut input = BytesMut::from("foo");
174 let mut decoder = LengthDelimitedDecoder::default();
175
176 assert!(decoder.decode_eof(&mut input).is_err());
177 }
178
179 #[test]
180 fn decode_eof_frames() {
181 let mut input = BytesMut::from("\x00\x00\x00\x03foo\x00\x00\x00\x03bar");
182 let mut decoder = LengthDelimitedDecoder::default();
183
184 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "foo");
185 assert_eq!(decoder.decode_eof(&mut input).unwrap().unwrap(), "bar");
186 assert_eq!(decoder.decode_eof(&mut input).unwrap(), None);
187 }
188}