codecs/encoding/framing/
framer.rs1use bytes::BytesMut;
4use vector_config::configurable_component;
5
6use super::{
7 BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
8 CharacterDelimitedEncoderConfig, LengthDelimitedEncoder, LengthDelimitedEncoderConfig,
9 NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder,
10 VarintLengthDelimitedEncoderConfig,
11};
12
13#[configurable_component]
15#[derive(Clone, Debug, Eq, PartialEq)]
16#[serde(tag = "method", rename_all = "snake_case")]
17#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
18pub enum FramingConfig {
19 Bytes,
21
22 CharacterDelimited(CharacterDelimitedEncoderConfig),
24
25 LengthDelimited(LengthDelimitedEncoderConfig),
29
30 NewlineDelimited,
32
33 VarintLengthDelimited(VarintLengthDelimitedEncoderConfig),
37}
38
39impl From<BytesEncoderConfig> for FramingConfig {
40 fn from(_: BytesEncoderConfig) -> Self {
41 Self::Bytes
42 }
43}
44
45impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
46 fn from(config: CharacterDelimitedEncoderConfig) -> Self {
47 Self::CharacterDelimited(config)
48 }
49}
50
51impl From<LengthDelimitedEncoderConfig> for FramingConfig {
52 fn from(config: LengthDelimitedEncoderConfig) -> Self {
53 Self::LengthDelimited(config)
54 }
55}
56
57impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
58 fn from(_: NewlineDelimitedEncoderConfig) -> Self {
59 Self::NewlineDelimited
60 }
61}
62
63impl From<VarintLengthDelimitedEncoderConfig> for FramingConfig {
64 fn from(config: VarintLengthDelimitedEncoderConfig) -> Self {
65 Self::VarintLengthDelimited(config)
66 }
67}
68
69impl FramingConfig {
70 pub fn build(&self) -> Framer {
72 match self {
73 FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
74 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
75 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
76 FramingConfig::NewlineDelimited => {
77 Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
78 }
79 FramingConfig::VarintLengthDelimited(config) => {
80 Framer::VarintLengthDelimited(config.build())
81 }
82 }
83 }
84}
85
86#[derive(Debug, Clone)]
88pub enum Framer {
89 Bytes(BytesEncoder),
91 CharacterDelimited(CharacterDelimitedEncoder),
93 LengthDelimited(LengthDelimitedEncoder),
95 NewlineDelimited(NewlineDelimitedEncoder),
97 VarintLengthDelimited(VarintLengthDelimitedEncoder),
99 Boxed(BoxedFramer),
101}
102
103impl From<BytesEncoder> for Framer {
104 fn from(encoder: BytesEncoder) -> Self {
105 Self::Bytes(encoder)
106 }
107}
108
109impl From<CharacterDelimitedEncoder> for Framer {
110 fn from(encoder: CharacterDelimitedEncoder) -> Self {
111 Self::CharacterDelimited(encoder)
112 }
113}
114
115impl From<LengthDelimitedEncoder> for Framer {
116 fn from(encoder: LengthDelimitedEncoder) -> Self {
117 Self::LengthDelimited(encoder)
118 }
119}
120
121impl From<NewlineDelimitedEncoder> for Framer {
122 fn from(encoder: NewlineDelimitedEncoder) -> Self {
123 Self::NewlineDelimited(encoder)
124 }
125}
126
127impl From<VarintLengthDelimitedEncoder> for Framer {
128 fn from(encoder: VarintLengthDelimitedEncoder) -> Self {
129 Self::VarintLengthDelimited(encoder)
130 }
131}
132
133impl From<BoxedFramer> for Framer {
134 fn from(encoder: BoxedFramer) -> Self {
135 Self::Boxed(encoder)
136 }
137}
138
139impl tokio_util::codec::Encoder<()> for Framer {
140 type Error = BoxedFramingError;
141
142 fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
143 match self {
144 Framer::Bytes(framer) => framer.encode((), buffer),
145 Framer::CharacterDelimited(framer) => framer.encode((), buffer),
146 Framer::LengthDelimited(framer) => framer.encode((), buffer),
147 Framer::NewlineDelimited(framer) => framer.encode((), buffer),
148 Framer::VarintLengthDelimited(framer) => framer.encode((), buffer),
149 Framer::Boxed(framer) => framer.encode((), buffer),
150 }
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::*;
157
158 #[test]
159 fn test_framing_config_build() {
160 let config = FramingConfig::NewlineDelimited;
162 let framer = config.build();
163 assert!(matches!(framer, Framer::NewlineDelimited(_)));
164
165 let config = FramingConfig::Bytes;
166 let framer = config.build();
167 assert!(matches!(framer, Framer::Bytes(_)));
168 }
169
170 #[test]
171 fn test_framing_config_from_encoder_config() {
172 let bytes_config = BytesEncoderConfig;
174 let framing_config: FramingConfig = bytes_config.into();
175 assert!(matches!(framing_config, FramingConfig::Bytes));
176
177 let newline_config = NewlineDelimitedEncoderConfig;
178 let framing_config: FramingConfig = newline_config.into();
179 assert!(matches!(framing_config, FramingConfig::NewlineDelimited));
180 }
181
182 #[test]
183 fn test_framer_from_encoder() {
184 let bytes_encoder = BytesEncoderConfig.build();
186 let framer: Framer = bytes_encoder.into();
187 assert!(matches!(framer, Framer::Bytes(_)));
188
189 let newline_encoder = NewlineDelimitedEncoderConfig.build();
190 let framer: Framer = newline_encoder.into();
191 assert!(matches!(framer, Framer::NewlineDelimited(_)));
192 }
193
194 #[test]
195 fn test_framing_config_equality() {
196 let config1 = FramingConfig::NewlineDelimited;
198 let config2 = FramingConfig::NewlineDelimited;
199 assert_eq!(config1, config2);
200
201 let config3 = FramingConfig::Bytes;
202 assert_ne!(config1, config3);
203 }
204
205 #[test]
206 fn test_framing_config_clone() {
207 let config = FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default());
209 let cloned = config.clone();
210 assert_eq!(config, cloned);
211 }
212}