codecs/encoding/framing/
varint_length_delimited.rs1use bytes::{BufMut, BytesMut};
2use derivative::Derivative;
3use snafu::Snafu;
4use tokio_util::codec::Encoder;
5use vector_config::configurable_component;
6
7use super::{BoxedFramingError, FramingError};
8
9#[derive(Debug, Snafu)]
11pub enum VarintFramingError {
12 #[snafu(display("Frame too large: {length} bytes (max: {max})"))]
13 FrameTooLarge { length: usize, max: usize },
14}
15
16impl FramingError for VarintFramingError {}
17
18#[configurable_component]
20#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
21#[derivative(Default)]
22pub struct VarintLengthDelimitedEncoderConfig {
23 #[serde(default = "default_max_frame_length")]
25 pub max_frame_length: usize,
26}
27
28const fn default_max_frame_length() -> usize {
29 8 * 1_024 * 1_024
30}
31
32impl VarintLengthDelimitedEncoderConfig {
33 pub fn build(&self) -> VarintLengthDelimitedEncoder {
35 VarintLengthDelimitedEncoder::new(self.max_frame_length)
36 }
37}
38
39#[derive(Debug, Clone)]
42pub struct VarintLengthDelimitedEncoder {
43 max_frame_length: usize,
44}
45
46impl VarintLengthDelimitedEncoder {
47 pub fn new(max_frame_length: usize) -> Self {
49 Self { max_frame_length }
50 }
51
52 fn encode_varint(&self, value: usize, buf: &mut BytesMut) -> Result<(), BoxedFramingError> {
54 if value > self.max_frame_length {
55 return Err(VarintFramingError::FrameTooLarge {
56 length: value,
57 max: self.max_frame_length,
58 }
59 .into());
60 }
61
62 let mut val = value;
63 while val >= 0x80 {
64 buf.put_u8((val as u8) | 0x80);
65 val >>= 7;
66 }
67 buf.put_u8(val as u8);
68 Ok(())
69 }
70}
71
72impl Default for VarintLengthDelimitedEncoder {
73 fn default() -> Self {
74 Self::new(default_max_frame_length())
75 }
76}
77
78impl Encoder<()> for VarintLengthDelimitedEncoder {
79 type Error = BoxedFramingError;
80
81 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
82 let data_length = buffer.len();
85 if data_length == 0 {
86 return Ok(());
87 }
88
89 let mut varint_buffer = BytesMut::new();
91 self.encode_varint(data_length, &mut varint_buffer)?;
92
93 let varint_bytes = varint_buffer.freeze();
95 let data_bytes = buffer.split_to(buffer.len());
96 buffer.extend_from_slice(&varint_bytes);
97 buffer.extend_from_slice(&data_bytes);
98 Ok(())
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 #[test]
107 fn encode_single_byte_varint() {
108 let mut buffer = BytesMut::from(&b"foo"[..]);
109 let mut encoder = VarintLengthDelimitedEncoder::default();
110
111 encoder.encode((), &mut buffer).unwrap();
112 assert_eq!(buffer, &[0x03, b'f', b'o', b'o'][..]);
113 }
114
115 #[test]
116 fn encode_multi_byte_varint() {
117 let mut buffer = BytesMut::from(&b"foo"[..]);
118 let mut encoder = VarintLengthDelimitedEncoder::new(1000);
119
120 buffer.clear();
122 buffer.extend_from_slice(&vec![b'x'; 300]);
123 encoder.encode((), &mut buffer).unwrap();
124
125 assert_eq!(buffer[0..2], [0xAC, 0x02]);
127 assert_eq!(buffer.len(), 302); }
129
130 #[test]
131 fn encode_frame_too_large() {
132 let large_data = vec![b'x'; 1001];
133 let mut buffer = BytesMut::from(&large_data[..]);
134 let mut encoder = VarintLengthDelimitedEncoder::new(1000);
135
136 assert!(encoder.encode((), &mut buffer).is_err());
137 }
138
139 #[test]
140 fn encode_empty_buffer() {
141 let mut buffer = BytesMut::new();
142 let mut encoder = VarintLengthDelimitedEncoder::default();
143
144 encoder.encode((), &mut buffer).unwrap();
145 assert_eq!(buffer.len(), 0);
146 }
147}