codecs/decoding/framing/
varint_length_delimited.rs

1use bytes::{Buf, Bytes, BytesMut};
2use derivative::Derivative;
3use snafu::Snafu;
4use tokio_util::codec::Decoder;
5use vector_config::configurable_component;
6
7use super::{BoxedFramingError, FramingError, StreamDecodingError};
8
9/// Errors that can occur during varint length delimited framing.
10#[derive(Debug, Snafu)]
11pub enum VarintFramingError {
12    #[snafu(display("Varint too large"))]
13    VarintOverflow,
14
15    #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
16    FrameTooLarge { length: usize, max: usize },
17
18    #[snafu(display("Trailing data at EOF"))]
19    TrailingData,
20}
21
22impl StreamDecodingError for VarintFramingError {
23    fn can_continue(&self) -> bool {
24        match self {
25            // Varint overflow and frame too large are not recoverable
26            Self::VarintOverflow | Self::FrameTooLarge { .. } => false,
27            // Trailing data at EOF is not recoverable
28            Self::TrailingData => false,
29        }
30    }
31}
32
33impl FramingError for VarintFramingError {
34    fn as_any(&self) -> &dyn std::any::Any {
35        self as &dyn std::any::Any
36    }
37}
38
39/// Config used to build a `VarintLengthDelimitedDecoder`.
40#[configurable_component]
41#[derive(Debug, Clone, Derivative)]
42#[derivative(Default)]
43pub struct VarintLengthDelimitedDecoderConfig {
44    /// Maximum frame length
45    #[serde(default = "default_max_frame_length")]
46    pub max_frame_length: usize,
47}
48
49const fn default_max_frame_length() -> usize {
50    8 * 1_024 * 1_024
51}
52
53impl VarintLengthDelimitedDecoderConfig {
54    /// Build the `VarintLengthDelimitedDecoder` from this configuration.
55    pub fn build(&self) -> VarintLengthDelimitedDecoder {
56        VarintLengthDelimitedDecoder::new(self.max_frame_length)
57    }
58}
59
60/// A codec for handling bytes sequences whose length is encoded as a varint prefix.
61/// This is compatible with protobuf's length-delimited encoding.
62#[derive(Debug, Clone)]
63pub struct VarintLengthDelimitedDecoder {
64    max_frame_length: usize,
65}
66
67impl VarintLengthDelimitedDecoder {
68    /// Creates a new `VarintLengthDelimitedDecoder`.
69    pub fn new(max_frame_length: usize) -> Self {
70        Self { max_frame_length }
71    }
72
73    /// Decode a varint from the buffer
74    fn decode_varint(&self, buf: &mut BytesMut) -> Result<Option<u64>, BoxedFramingError> {
75        if buf.is_empty() {
76            return Ok(None);
77        }
78
79        let mut value: u64 = 0;
80        let mut shift: u8 = 0;
81        let mut bytes_read = 0;
82
83        for byte in buf.iter() {
84            bytes_read += 1;
85            let byte_value = (*byte & 0x7F) as u64;
86            value |= byte_value << shift;
87
88            if *byte & 0x80 == 0 {
89                // Last byte of varint
90                buf.advance(bytes_read);
91                return Ok(Some(value));
92            }
93
94            shift += 7;
95            if shift >= 64 {
96                return Err(VarintFramingError::VarintOverflow.into());
97            }
98        }
99
100        // Incomplete varint
101        Ok(None)
102    }
103}
104
105impl Default for VarintLengthDelimitedDecoder {
106    fn default() -> Self {
107        Self::new(default_max_frame_length())
108    }
109}
110
111impl Decoder for VarintLengthDelimitedDecoder {
112    type Item = Bytes;
113    type Error = BoxedFramingError;
114
115    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
116        // First, try to decode the varint length
117        let length = match self.decode_varint(src)? {
118            Some(len) => len as usize,
119            None => return Ok(None), // Incomplete varint
120        };
121
122        // Check if the length is reasonable
123        if length > self.max_frame_length {
124            return Err(VarintFramingError::FrameTooLarge {
125                length,
126                max: self.max_frame_length,
127            }
128            .into());
129        }
130
131        // Check if we have enough data for the complete frame
132        if src.len() < length {
133            return Ok(None); // Incomplete frame
134        }
135
136        // Extract the frame
137        let frame = src.split_to(length).freeze();
138        Ok(Some(frame))
139    }
140
141    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
142        if src.is_empty() {
143            Ok(None)
144        } else {
145            // Try to decode what we have, even if incomplete
146            match self.decode(src)? {
147                Some(frame) => Ok(Some(frame)),
148                None => {
149                    // If we have data but couldn't decode it, it's trailing data
150                    if !src.is_empty() {
151                        Err(VarintFramingError::TrailingData.into())
152                    } else {
153                        Ok(None)
154                    }
155                }
156            }
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    #[test]
166    fn decode_single_byte_varint() {
167        let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o'][..]);
168        let mut decoder = VarintLengthDelimitedDecoder::default();
169
170        assert_eq!(
171            decoder.decode(&mut input).unwrap().unwrap(),
172            Bytes::from("foo")
173        );
174        assert_eq!(decoder.decode(&mut input).unwrap(), None);
175    }
176
177    #[test]
178    fn decode_multi_byte_varint() {
179        // 300 in varint encoding: 0xAC 0x02
180        let mut input = BytesMut::from(&[0xAC, 0x02][..]);
181        // Add 300 bytes of data
182        input.extend_from_slice(&vec![b'x'; 300]);
183        let mut decoder = VarintLengthDelimitedDecoder::default();
184
185        let result = decoder.decode(&mut input).unwrap().unwrap();
186        assert_eq!(result.len(), 300);
187        assert_eq!(decoder.decode(&mut input).unwrap(), None);
188    }
189
190    #[test]
191    fn decode_incomplete_varint() {
192        let mut input = BytesMut::from(&[0x80][..]); // Incomplete varint
193        let mut decoder = VarintLengthDelimitedDecoder::default();
194
195        assert_eq!(decoder.decode(&mut input).unwrap(), None);
196    }
197
198    #[test]
199    fn decode_incomplete_frame() {
200        let mut input = BytesMut::from(&[0x05, b'f', b'o'][..]); // Length 5, but only 2 bytes
201        let mut decoder = VarintLengthDelimitedDecoder::default();
202
203        assert_eq!(decoder.decode(&mut input).unwrap(), None);
204    }
205
206    #[test]
207    fn decode_frame_too_large() {
208        let mut input =
209            BytesMut::from(&[0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01][..]);
210        let mut decoder = VarintLengthDelimitedDecoder::new(1000);
211
212        assert!(decoder.decode(&mut input).is_err());
213    }
214
215    #[test]
216    fn decode_trailing_data_at_eof() {
217        let mut input = BytesMut::from(&[0x03, b'f', b'o', b'o', b'e', b'x', b't', b'r', b'a'][..]);
218        let mut decoder = VarintLengthDelimitedDecoder::default();
219
220        // First decode should succeed
221        assert_eq!(
222            decoder.decode(&mut input).unwrap().unwrap(),
223            Bytes::from("foo")
224        );
225
226        // Second decode should fail with trailing data
227        assert!(decoder.decode_eof(&mut input).is_err());
228    }
229}