codecs/encoding/framing/
length_delimited.rs1use bytes::BytesMut;
2use derivative::Derivative;
3use tokio_util::codec::{Encoder, LengthDelimitedCodec};
4use vector_config::configurable_component;
5
6use super::BoxedFramingError;
7use crate::common::length_delimited::LengthDelimitedCoderOptions;
8
9#[configurable_component]
11#[derive(Debug, Clone, Derivative, Eq, PartialEq)]
12#[derivative(Default)]
13pub struct LengthDelimitedEncoderConfig {
14 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
16 pub length_delimited: LengthDelimitedCoderOptions,
17}
18
19impl LengthDelimitedEncoderConfig {
20 pub fn build(&self) -> LengthDelimitedEncoder {
22 LengthDelimitedEncoder::new(&self.length_delimited)
23 }
24}
25
26#[derive(Debug, Clone)]
28pub struct LengthDelimitedEncoder {
29 codec: LengthDelimitedCodec,
30 inner_buffer: BytesMut,
31}
32
33impl LengthDelimitedEncoder {
34 pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
36 Self {
37 codec: config.build_codec(),
38 inner_buffer: BytesMut::new(),
39 }
40 }
41}
42
43impl Default for LengthDelimitedEncoder {
44 fn default() -> Self {
45 Self {
46 codec: LengthDelimitedCodec::new(),
47 inner_buffer: BytesMut::new(),
48 }
49 }
50}
51
52impl Encoder<()> for LengthDelimitedEncoder {
53 type Error = BoxedFramingError;
54
55 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), BoxedFramingError> {
56 self.inner_buffer.clear();
57 self.inner_buffer.extend_from_slice(buffer);
58 buffer.clear();
59 let bytes = self.inner_buffer.split().freeze();
60 self.codec.encode(bytes, buffer)?;
61 Ok(())
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68
69 #[test]
70 fn encode() {
71 let mut codec = LengthDelimitedEncoder::default();
72
73 let mut buffer = BytesMut::from("abc");
74 codec.encode((), &mut buffer).unwrap();
75
76 assert_eq!(&buffer[..], b"\0\0\0\x03abc");
77 }
78
79 #[test]
80 fn encode_2byte_length() {
81 let mut codec = LengthDelimitedEncoder::new(&LengthDelimitedCoderOptions {
82 length_field_length: 2,
83 ..Default::default()
84 });
85
86 let mut buffer = BytesMut::from("abc");
87 codec.encode((), &mut buffer).unwrap();
88
89 assert_eq!(&buffer[..], b"\0\x03abc");
90 }
91}