codecs/encoding/framing/
length_delimited.rs

1use bytes::BytesMut;
2use derivative::Derivative;
3use tokio_util::codec::{Encoder, LengthDelimitedCodec};
4use vector_config::configurable_component;
5
6use super::BoxedFramingError;
7use crate::common::length_delimited::LengthDelimitedCoderOptions;
8
9/// Config used to build a `LengthDelimitedEncoder`.
10#[configurable_component]
11#[derive(Debug, Clone, Derivative, Eq, PartialEq)]
12#[derivative(Default)]
13pub struct LengthDelimitedEncoderConfig {
14    /// Options for the length delimited decoder.
15    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
16    pub length_delimited: LengthDelimitedCoderOptions,
17}
18
19impl LengthDelimitedEncoderConfig {
20    /// Build the `LengthDelimitedEncoder` from this configuration.
21    pub fn build(&self) -> LengthDelimitedEncoder {
22        LengthDelimitedEncoder::new(&self.length_delimited)
23    }
24}
25
26/// An encoder for handling bytes that are delimited by a length header.
27#[derive(Debug, Clone)]
28pub struct LengthDelimitedEncoder {
29    codec: LengthDelimitedCodec,
30    inner_buffer: BytesMut,
31}
32
33impl LengthDelimitedEncoder {
34    /// Creates a new `LengthDelimitedEncoder`.
35    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
36        Self {
37            codec: config.build_codec(),
38            inner_buffer: BytesMut::new(),
39        }
40    }
41}
42
43impl Default for LengthDelimitedEncoder {
44    fn default() -> Self {
45        Self {
46            codec: LengthDelimitedCodec::new(),
47            inner_buffer: BytesMut::new(),
48        }
49    }
50}
51
52impl Encoder<()> for LengthDelimitedEncoder {
53    type Error = BoxedFramingError;
54
55    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), BoxedFramingError> {
56        self.inner_buffer.clear();
57        self.inner_buffer.extend_from_slice(buffer);
58        buffer.clear();
59        let bytes = self.inner_buffer.split().freeze();
60        self.codec.encode(bytes, buffer)?;
61        Ok(())
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use super::*;
68
69    #[test]
70    fn encode() {
71        let mut codec = LengthDelimitedEncoder::default();
72
73        let mut buffer = BytesMut::from("abc");
74        codec.encode((), &mut buffer).unwrap();
75
76        assert_eq!(&buffer[..], b"\0\0\0\x03abc");
77    }
78
79    #[test]
80    fn encode_2byte_length() {
81        let mut codec = LengthDelimitedEncoder::new(&LengthDelimitedCoderOptions {
82            length_field_length: 2,
83            ..Default::default()
84        });
85
86        let mut buffer = BytesMut::from("abc");
87        codec.encode((), &mut buffer).unwrap();
88
89        assert_eq!(&buffer[..], b"\0\x03abc");
90    }
91}