1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use bytes::BytesMut;
use derivative::Derivative;
use tokio_util::codec::{Encoder, LengthDelimitedCodec};
use vector_config::configurable_component;

use crate::common::length_delimited::LengthDelimitedCoderOptions;

use super::BoxedFramingError;

/// Config used to build a `LengthDelimitedEncoder`.
#[configurable_component]
#[derive(Debug, Clone, Derivative, Eq, PartialEq)]
#[derivative(Default)]
pub struct LengthDelimitedEncoderConfig {
    /// Options for the length delimited decoder.
    #[serde(skip_serializing_if = "vector_core::serde::is_default")]
    pub length_delimited: LengthDelimitedCoderOptions,
}

impl LengthDelimitedEncoderConfig {
    /// Build the `LengthDelimitedEncoder` from this configuration.
    pub fn build(&self) -> LengthDelimitedEncoder {
        LengthDelimitedEncoder::new(&self.length_delimited)
    }
}

/// An encoder for handling bytes that are delimited by a length header.
#[derive(Debug, Clone)]
pub struct LengthDelimitedEncoder(LengthDelimitedCodec);

impl LengthDelimitedEncoder {
    /// Creates a new `LengthDelimitedEncoder`.
    pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
        Self(config.build_codec())
    }
}

impl Default for LengthDelimitedEncoder {
    fn default() -> Self {
        Self(LengthDelimitedCodec::new())
    }
}

impl Encoder<()> for LengthDelimitedEncoder {
    type Error = BoxedFramingError;

    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), BoxedFramingError> {
        let bytes = buffer.split().freeze();
        self.0.encode(bytes, buffer)?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn encode() {
        let mut codec = LengthDelimitedEncoder::default();

        let mut buffer = BytesMut::from("abc");
        codec.encode((), &mut buffer).unwrap();

        assert_eq!(&buffer[..], b"\0\0\0\x03abc");
    }

    #[test]
    fn encode_2byte_length() {
        let mut codec = LengthDelimitedEncoder::new(&LengthDelimitedCoderOptions {
            length_field_length: 2,
            ..Default::default()
        });

        let mut buffer = BytesMut::from("abc");
        codec.encode((), &mut buffer).unwrap();

        assert_eq!(&buffer[..], b"\0\x03abc");
    }
}