codecs/encoding/framing/
length_delimited.rs1use bytes::BytesMut;
2use derivative::Derivative;
3use tokio_util::codec::{Encoder, LengthDelimitedCodec};
4use vector_config::configurable_component;
5
6use crate::common::length_delimited::LengthDelimitedCoderOptions;
7
8use super::BoxedFramingError;
9
10#[configurable_component]
12#[derive(Debug, Clone, Derivative, Eq, PartialEq)]
13#[derivative(Default)]
14pub struct LengthDelimitedEncoderConfig {
15 #[serde(skip_serializing_if = "vector_core::serde::is_default")]
17 pub length_delimited: LengthDelimitedCoderOptions,
18}
19
20impl LengthDelimitedEncoderConfig {
21 pub fn build(&self) -> LengthDelimitedEncoder {
23 LengthDelimitedEncoder::new(&self.length_delimited)
24 }
25}
26
27#[derive(Debug, Clone)]
29pub struct LengthDelimitedEncoder {
30 codec: LengthDelimitedCodec,
31 inner_buffer: BytesMut,
32}
33
34impl LengthDelimitedEncoder {
35 pub fn new(config: &LengthDelimitedCoderOptions) -> Self {
37 Self {
38 codec: config.build_codec(),
39 inner_buffer: BytesMut::new(),
40 }
41 }
42}
43
44impl Default for LengthDelimitedEncoder {
45 fn default() -> Self {
46 Self {
47 codec: LengthDelimitedCodec::new(),
48 inner_buffer: BytesMut::new(),
49 }
50 }
51}
52
53impl Encoder<()> for LengthDelimitedEncoder {
54 type Error = BoxedFramingError;
55
56 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), BoxedFramingError> {
57 self.inner_buffer.clear();
58 self.inner_buffer.extend_from_slice(buffer);
59 buffer.clear();
60 let bytes = self.inner_buffer.split().freeze();
61 self.codec.encode(bytes, buffer)?;
62 Ok(())
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69
70 #[test]
71 fn encode() {
72 let mut codec = LengthDelimitedEncoder::default();
73
74 let mut buffer = BytesMut::from("abc");
75 codec.encode((), &mut buffer).unwrap();
76
77 assert_eq!(&buffer[..], b"\0\0\0\x03abc");
78 }
79
80 #[test]
81 fn encode_2byte_length() {
82 let mut codec = LengthDelimitedEncoder::new(&LengthDelimitedCoderOptions {
83 length_field_length: 2,
84 ..Default::default()
85 });
86
87 let mut buffer = BytesMut::from("abc");
88 codec.encode((), &mut buffer).unwrap();
89
90 assert_eq!(&buffer[..], b"\0\x03abc");
91 }
92}