codecs/decoding/framing/
varint_length_delimited.rs1use bytes::{Buf, Bytes, BytesMut};
2use derivative::Derivative;
3use snafu::Snafu;
4use tokio_util::codec::Decoder;
5use vector_config::configurable_component;
6
7use super::{BoxedFramingError, FramingError, StreamDecodingError};
8
9#[derive(Debug, Snafu)]
11pub enum VarintFramingError {
12 #[snafu(display("Varint too large"))]
13 VarintOverflow,
14
15 #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
16 FrameTooLarge { length: usize, max: usize },
17
18 #[snafu(display("Trailing data at EOF"))]
19 TrailingData,
20}
21
22impl StreamDecodingError for VarintFramingError {
23 fn can_continue(&self) -> bool {
24 match self {
25 Self::VarintOverflow | Self::FrameTooLarge { .. } => false,
27 Self::TrailingData => false,
29 }
30 }
31}
32
33impl FramingError for VarintFramingError {
34 fn as_any(&self) -> &dyn std::any::Any {
35 self as &dyn std::any::Any
36 }
37}
38
39#[configurable_component]
41#[derive(Debug, Clone, Derivative)]
42#[derivative(Default)]
43pub struct VarintLengthDelimitedDecoderConfig {
44 #[serde(default = "default_max_frame_length")]
46 pub max_frame_length: usize,
47}
48
49const fn default_max_frame_length() -> usize {
50 8 * 1_024 * 1_024
51}
52
53impl VarintLengthDelimitedDecoderConfig {
54 pub fn build(&self) -> VarintLengthDelimitedDecoder {
56 VarintLengthDelimitedDecoder::new(self.max_frame_length)
57 }
58}
59
60#[derive(Debug, Clone)]
63pub struct VarintLengthDelimitedDecoder {
64 max_frame_length: usize,
65}
66
67impl VarintLengthDelimitedDecoder {
68 pub fn new(max_frame_length: usize) -> Self {
70 Self { max_frame_length }
71 }
72
73 fn decode_varint(&self, buf: &mut BytesMut) -> Result<Option<u64>, BoxedFramingError> {
75 if buf.is_empty() {
76 return Ok(None);
77 }
78
79 let mut value: u64 = 0;
80 let mut shift: u8 = 0;
81 let mut bytes_read = 0;
82
83 for byte in buf.iter() {
84 bytes_read += 1;
85 let byte_value = (*byte & 0x7F) as u64;
86 value |= byte_value << shift;
87
88 if *byte & 0x80 == 0 {
89 buf.advance(bytes_read);
91 return Ok(Some(value));
92 }
93
94 shift += 7;
95 if shift >= 64 {
96 return Err(VarintFramingError::VarintOverflow.into());
97 }
98 }
99
100 Ok(None)
102 }
103}
104
105impl Default for VarintLengthDelimitedDecoder {
106 fn default() -> Self {
107 Self::new(default_max_frame_length())
108 }
109}
110
111impl Decoder for VarintLengthDelimitedDecoder {
112 type Item = Bytes;
113 type Error = BoxedFramingError;
114
115 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
116 let length = match self.decode_varint(src)? {
118 Some(len) => len as usize,
119 None => return Ok(None), };
121
122 if length > self.max_frame_length {
124 return Err(VarintFramingError::FrameTooLarge {
125 length,
126 max: self.max_frame_length,
127 }
128 .into());
129 }
130
131 if src.len() < length {
133 return Ok(None); }
135
136 let frame = src.split_to(length).freeze();
138 Ok(Some(frame))
139 }
140
141 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
142 if src.is_empty() {
143 Ok(None)
144 } else {
145 match self.decode(src)? {
147 Some(frame) => Ok(Some(frame)),
148 None => {
149 if !src.is_empty() {
151 Err(VarintFramingError::TrailingData.into())
152 } else {
153 Ok(None)
154 }
155 }
156 }
157 }
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 #[test]
166 fn decode_single_byte_varint() {
167 let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o'][..]);
168 let mut decoder = VarintLengthDelimitedDecoder::default();
169
170 assert_eq!(
171 decoder.decode(&mut input).unwrap().unwrap(),
172 Bytes::from("foo")
173 );
174 assert_eq!(decoder.decode(&mut input).unwrap(), None);
175 }
176
177 #[test]
178 fn decode_multi_byte_varint() {
179 let mut input = BytesMut::from(&[0xAC, 0x02][..]);
181 input.extend_from_slice(&vec![b'x'; 300]);
183 let mut decoder = VarintLengthDelimitedDecoder::default();
184
185 let result = decoder.decode(&mut input).unwrap().unwrap();
186 assert_eq!(result.len(), 300);
187 assert_eq!(decoder.decode(&mut input).unwrap(), None);
188 }
189
190 #[test]
191 fn decode_incomplete_varint() {
192 let mut input = BytesMut::from(&[0x80][..]); let mut decoder = VarintLengthDelimitedDecoder::default();
194
195 assert_eq!(decoder.decode(&mut input).unwrap(), None);
196 }
197
198 #[test]
199 fn decode_incomplete_frame() {
200 let mut input = BytesMut::from(&[0x05, b'f', b'o'][..]); let mut decoder = VarintLengthDelimitedDecoder::default();
202
203 assert_eq!(decoder.decode(&mut input).unwrap(), None);
204 }
205
206 #[test]
207 fn decode_frame_too_large() {
208 let mut input =
209 BytesMut::from(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01][..]);
210 let mut decoder = VarintLengthDelimitedDecoder::new(1000);
211
212 assert!(decoder.decode(&mut input).is_err());
213 }
214
215 #[test]
216 fn decode_trailing_data_at_eof() {
217 let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o', b'e', b'x', b't', b'r', b'a'][..]);
218 let mut decoder = VarintLengthDelimitedDecoder::default();
219
220 assert_eq!(
222 decoder.decode(&mut input).unwrap().unwrap(),
223 Bytes::from("foo")
224 );
225
226 assert!(decoder.decode_eof(&mut input).is_err());
228 }
229}