codecs/encoding/framing/
varint_length_delimited.rs

1use bytes::{BufMut, BytesMut};
2use derivative::Derivative;
3use snafu::Snafu;
4use tokio_util::codec::Encoder;
5use vector_config::configurable_component;
6
7use super::{BoxedFramingError, FramingError};
8
9/// Errors that can occur during varint length delimited framing.
10#[derive(Debug, Snafu)]
11pub enum VarintFramingError {
12    #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
13    FrameTooLarge { length: usize, max: usize },
14}
15
16impl FramingError for VarintFramingError {}
17
18/// Config used to build a `VarintLengthDelimitedEncoder`.
19#[configurable_component]
20#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
21#[derivative(Default)]
22pub struct VarintLengthDelimitedEncoderConfig {
23    /// Maximum frame length
24    #[serde(default = "default_max_frame_length")]
25    pub max_frame_length: usize,
26}
27
28const fn default_max_frame_length() -> usize {
29    8 * 1_024 * 1_024
30}
31
32impl VarintLengthDelimitedEncoderConfig {
33    /// Build the `VarintLengthDelimitedEncoder` from this configuration.
34    pub fn build(&self) -> VarintLengthDelimitedEncoder {
35        VarintLengthDelimitedEncoder::new(self.max_frame_length)
36    }
37}
38
39/// A codec for handling bytes sequences whose length is encoded as a varint prefix.
40/// This is compatible with protobuf's length-delimited encoding.
41#[derive(Debug, Clone)]
42pub struct VarintLengthDelimitedEncoder {
43    max_frame_length: usize,
44}
45
46impl VarintLengthDelimitedEncoder {
47    /// Creates a new `VarintLengthDelimitedEncoder`.
48    pub fn new(max_frame_length: usize) -> Self {
49        Self { max_frame_length }
50    }
51
52    /// Encode a varint into the buffer
53    fn encode_varint(&self, value: usize, buf: &mut BytesMut) -> Result<(), BoxedFramingError> {
54        if value > self.max_frame_length {
55            return Err(VarintFramingError::FrameTooLarge {
56                length: value,
57                max: self.max_frame_length,
58            }
59            .into());
60        }
61
62        let mut val = value;
63        while val >= 0x80 {
64            buf.put_u8((val as u8) | 0x80);
65            val >>= 7;
66        }
67        buf.put_u8(val as u8);
68        Ok(())
69    }
70}
71
72impl Default for VarintLengthDelimitedEncoder {
73    fn default() -> Self {
74        Self::new(default_max_frame_length())
75    }
76}
77
78impl Encoder<()> for VarintLengthDelimitedEncoder {
79    type Error = BoxedFramingError;
80
81    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
82        // This encoder expects the data to already be in the buffer
83        // We just need to prepend the varint length
84        let data_length = buffer.len();
85        if data_length == 0 {
86            return Ok(());
87        }
88
89        // Create a temporary buffer to hold the varint
90        let mut varint_buffer = BytesMut::new();
91        self.encode_varint(data_length, &mut varint_buffer)?;
92
93        // Prepend the varint to the buffer
94        let varint_bytes = varint_buffer.freeze();
95        let data_bytes = buffer.split_to(buffer.len());
96        buffer.extend_from_slice(&varint_bytes);
97        buffer.extend_from_slice(&data_bytes);
98        Ok(())
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[test]
107    fn encode_single_byte_varint() {
108        let mut buffer = BytesMut::from(&b"foo"[..]);
109        let mut encoder = VarintLengthDelimitedEncoder::default();
110
111        encoder.encode((), &mut buffer).unwrap();
112        assert_eq!(buffer, &[0x03, b'f', b'o', b'o'][..]);
113    }
114
115    #[test]
116    fn encode_multi_byte_varint() {
117        let mut buffer = BytesMut::from(&b"foo"[..]);
118        let mut encoder = VarintLengthDelimitedEncoder::new(1000);
119
120        // Set a larger frame to trigger multi-byte varint
121        buffer.clear();
122        buffer.extend_from_slice(&vec![b'x'; 300]);
123        encoder.encode((), &mut buffer).unwrap();
124
125        // 300 in varint encoding: 0xAC 0x02
126        assert_eq!(buffer[0..2], [0xAC, 0x02]);
127        assert_eq!(buffer.len(), 302); // 2 bytes varint + 300 bytes data
128    }
129
130    #[test]
131    fn encode_frame_too_large() {
132        let large_data = vec![b'x'; 1001];
133        let mut buffer = BytesMut::from(&large_data[..]);
134        let mut encoder = VarintLengthDelimitedEncoder::new(1000);
135
136        assert!(encoder.encode((), &mut buffer).is_err());
137    }
138
139    #[test]
140    fn encode_empty_buffer() {
141        let mut buffer = BytesMut::new();
142        let mut encoder = VarintLengthDelimitedEncoder::default();
143
144        encoder.encode((), &mut buffer).unwrap();
145        assert_eq!(buffer.len(), 0);
146    }
147}