codecs/encoding/framing/
length_delimited.rs

1use bytes::BytesMut;
2use derivative::Derivative;
3use tokio_util::codec::{Encoder, LengthDelimitedCodec};
4use vector_config::configurable_component;
5
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8use super::BoxedFramingError;
9
10/// Config used to build a `LengthDelimitedEncoder`.
11#[configurable_component]
12#[derive(Debug, Clone, Derivative, Eq, PartialEq)]
13#[derivative(Default)]
14pub struct LengthDelimitedEncoderConfig {
15    /// Options for the length delimited decoder.
16    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
17    pub length_delimited: LengthDelimitedCoderOptions,
18}
19
20impl LengthDelimitedEncoderConfig {
21    /// Build the `LengthDelimitedEncoder` from this configuration.
22    pub fn build(&self) -> LengthDelimitedEncoder {
23        LengthDelimitedEncoder::new(&self.length_delimited)
24    }
25}
26
27/// An encoder for handling bytes that are delimited by a length header.
28#[derive(Debug, Clone)]
29pub struct LengthDelimitedEncoder {
30    codec: LengthDelimitedCodec,
31    inner_buffer: BytesMut,
32}
33
34impl LengthDelimitedEncoder {
35    /// Creates a new `LengthDelimitedEncoder`.
36    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
37        Self {
38            codec: config.build_codec(),
39            inner_buffer: BytesMut::new(),
40        }
41    }
42}
43
44impl Default for LengthDelimitedEncoder {
45    fn default() -> Self {
46        Self {
47            codec: LengthDelimitedCodec::new(),
48            inner_buffer: BytesMut::new(),
49        }
50    }
51}
52
53impl Encoder<()> for LengthDelimitedEncoder {
54    type Error = BoxedFramingError;
55
56    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), BoxedFramingError> {
57        self.inner_buffer.clear();
58        self.inner_buffer.extend_from_slice(buffer);
59        buffer.clear();
60        let bytes = self.inner_buffer.split().freeze();
61        self.codec.encode(bytes, buffer)?;
62        Ok(())
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69
70    #[test]
71    fn encode() {
72        let mut codec = LengthDelimitedEncoder::default();
73
74        let mut buffer = BytesMut::from("abc");
75        codec.encode((), &mut buffer).unwrap();
76
77        assert_eq!(&buffer[..], b"\0\0\0\x03abc");
78    }
79
80    #[test]
81    fn encode_2byte_length() {
82        let mut codec = LengthDelimitedEncoder::new(&LengthDelimitedCoderOptions {
83            length_field_length: 2,
84            ..Default::default()
85        });
86
87        let mut buffer = BytesMut::from("abc");
88        codec.encode((), &mut buffer).unwrap();
89
90        assert_eq!(&buffer[..], b"\0\x03abc");
91    }
92}