codecs/encoding/framing/
framer.rs

1//! Configuration types for framing methods.
2
3use bytes::BytesMut;
4use vector_config::configurable_component;
5
6use super::{
7    BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
8    CharacterDelimitedEncoderConfig, LengthDelimitedEncoder, LengthDelimitedEncoderConfig,
9    NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder,
10    VarintLengthDelimitedEncoderConfig,
11};
12
13/// Framing configuration.
14#[configurable_component]
15#[derive(Clone, Debug, Eq, PartialEq)]
16#[serde(tag = "method", rename_all = "snake_case")]
17#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
18pub enum FramingConfig {
19    /// Event data is not delimited at all.
20    Bytes,
21
22    /// Event data is delimited by a single ASCII (7-bit) character.
23    CharacterDelimited(CharacterDelimitedEncoderConfig),
24
25    /// Event data is prefixed with its length in bytes.
26    ///
27    /// The prefix is a 32-bit unsigned integer, little endian.
28    LengthDelimited(LengthDelimitedEncoderConfig),
29
30    /// Event data is delimited by a newline (LF) character.
31    NewlineDelimited,
32
33    /// Event data is prefixed with its length in bytes as a varint.
34    ///
35    /// This is compatible with protobuf's length-delimited encoding.
36    VarintLengthDelimited(VarintLengthDelimitedEncoderConfig),
37}
38
39impl From<BytesEncoderConfig> for FramingConfig {
40    fn from(_: BytesEncoderConfig) -> Self {
41        Self::Bytes
42    }
43}
44
45impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
46    fn from(config: CharacterDelimitedEncoderConfig) -> Self {
47        Self::CharacterDelimited(config)
48    }
49}
50
51impl From<LengthDelimitedEncoderConfig> for FramingConfig {
52    fn from(config: LengthDelimitedEncoderConfig) -> Self {
53        Self::LengthDelimited(config)
54    }
55}
56
57impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
58    fn from(_: NewlineDelimitedEncoderConfig) -> Self {
59        Self::NewlineDelimited
60    }
61}
62
63impl From<VarintLengthDelimitedEncoderConfig> for FramingConfig {
64    fn from(config: VarintLengthDelimitedEncoderConfig) -> Self {
65        Self::VarintLengthDelimited(config)
66    }
67}
68
69impl FramingConfig {
70    /// Build the `Framer` from this configuration.
71    pub fn build(&self) -> Framer {
72        match self {
73            FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
74            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
75            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
76            FramingConfig::NewlineDelimited => {
77                Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
78            }
79            FramingConfig::VarintLengthDelimited(config) => {
80                Framer::VarintLengthDelimited(config.build())
81            }
82        }
83    }
84}
85
86/// Produce a byte stream from byte frames.
87#[derive(Debug, Clone)]
88pub enum Framer {
89    /// Uses a `BytesEncoder` for framing.
90    Bytes(BytesEncoder),
91    /// Uses a `CharacterDelimitedEncoder` for framing.
92    CharacterDelimited(CharacterDelimitedEncoder),
93    /// Uses a `LengthDelimitedEncoder` for framing.
94    LengthDelimited(LengthDelimitedEncoder),
95    /// Uses a `NewlineDelimitedEncoder` for framing.
96    NewlineDelimited(NewlineDelimitedEncoder),
97    /// Uses a `VarintLengthDelimitedEncoder` for framing.
98    VarintLengthDelimited(VarintLengthDelimitedEncoder),
99    /// Uses an opaque `Encoder` implementation for framing.
100    Boxed(BoxedFramer),
101}
102
103impl From<BytesEncoder> for Framer {
104    fn from(encoder: BytesEncoder) -> Self {
105        Self::Bytes(encoder)
106    }
107}
108
109impl From<CharacterDelimitedEncoder> for Framer {
110    fn from(encoder: CharacterDelimitedEncoder) -> Self {
111        Self::CharacterDelimited(encoder)
112    }
113}
114
115impl From<LengthDelimitedEncoder> for Framer {
116    fn from(encoder: LengthDelimitedEncoder) -> Self {
117        Self::LengthDelimited(encoder)
118    }
119}
120
121impl From<NewlineDelimitedEncoder> for Framer {
122    fn from(encoder: NewlineDelimitedEncoder) -> Self {
123        Self::NewlineDelimited(encoder)
124    }
125}
126
127impl From<VarintLengthDelimitedEncoder> for Framer {
128    fn from(encoder: VarintLengthDelimitedEncoder) -> Self {
129        Self::VarintLengthDelimited(encoder)
130    }
131}
132
133impl From<BoxedFramer> for Framer {
134    fn from(encoder: BoxedFramer) -> Self {
135        Self::Boxed(encoder)
136    }
137}
138
139impl tokio_util::codec::Encoder<()> for Framer {
140    type Error = BoxedFramingError;
141
142    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
143        match self {
144            Framer::Bytes(framer) => framer.encode((), buffer),
145            Framer::CharacterDelimited(framer) => framer.encode((), buffer),
146            Framer::LengthDelimited(framer) => framer.encode((), buffer),
147            Framer::NewlineDelimited(framer) => framer.encode((), buffer),
148            Framer::VarintLengthDelimited(framer) => framer.encode((), buffer),
149            Framer::Boxed(framer) => framer.encode((), buffer),
150        }
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn test_framing_config_build() {
160        // Test that FramingConfig can be built to create Framers
161        let config = FramingConfig::NewlineDelimited;
162        let framer = config.build();
163        assert!(matches!(framer, Framer::NewlineDelimited(_)));
164
165        let config = FramingConfig::Bytes;
166        let framer = config.build();
167        assert!(matches!(framer, Framer::Bytes(_)));
168    }
169
170    #[test]
171    fn test_framing_config_from_encoder_config() {
172        // Test that FramingConfig can be created from encoder configs
173        let bytes_config = BytesEncoderConfig;
174        let framing_config: FramingConfig = bytes_config.into();
175        assert!(matches!(framing_config, FramingConfig::Bytes));
176
177        let newline_config = NewlineDelimitedEncoderConfig;
178        let framing_config: FramingConfig = newline_config.into();
179        assert!(matches!(framing_config, FramingConfig::NewlineDelimited));
180    }
181
182    #[test]
183    fn test_framer_from_encoder() {
184        // Test that Framer can be created from encoders
185        let bytes_encoder = BytesEncoderConfig.build();
186        let framer: Framer = bytes_encoder.into();
187        assert!(matches!(framer, Framer::Bytes(_)));
188
189        let newline_encoder = NewlineDelimitedEncoderConfig.build();
190        let framer: Framer = newline_encoder.into();
191        assert!(matches!(framer, Framer::NewlineDelimited(_)));
192    }
193
194    #[test]
195    fn test_framing_config_equality() {
196        // Test that FramingConfig can be compared for equality
197        let config1 = FramingConfig::NewlineDelimited;
198        let config2 = FramingConfig::NewlineDelimited;
199        assert_eq!(config1, config2);
200
201        let config3 = FramingConfig::Bytes;
202        assert_ne!(config1, config3);
203    }
204
205    #[test]
206    fn test_framing_config_clone() {
207        // Test that FramingConfig can be cloned
208        let config = FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default());
209        let cloned = config.clone();
210        assert_eq!(config, cloned);
211    }
212}