vector/codecs/encoding/
config.rs1use vector_lib::{
2 codecs::{
3 CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
4 encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
5 },
6 configurable::configurable_component,
7};
8
9use crate::codecs::Transformer;
10
11#[configurable_component]
13#[derive(Clone, Debug)]
14pub struct EncodingConfig {
17 #[serde(flatten)]
18 encoding: SerializerConfig,
19
20 #[serde(flatten)]
21 transformer: Transformer,
22}
23
24impl EncodingConfig {
25 pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
27 Self {
28 encoding,
29 transformer,
30 }
31 }
32
33 pub fn transformer(&self) -> Transformer {
35 self.transformer.clone()
36 }
37
38 pub const fn config(&self) -> &SerializerConfig {
40 &self.encoding
41 }
42
43 pub fn build(&self) -> crate::Result<Serializer> {
45 self.encoding.build()
46 }
47}
48
49impl<T> From<T> for EncodingConfig
50where
51 T: Into<SerializerConfig>,
52{
53 fn from(encoding: T) -> Self {
54 Self {
55 encoding: encoding.into(),
56 transformer: Default::default(),
57 }
58 }
59}
60
61#[configurable_component]
63#[derive(Clone, Debug)]
64#[serde(deny_unknown_fields)]
65pub struct EncodingConfigWithFraming {
66 #[configurable(derived)]
67 framing: Option<FramingConfig>,
68
69 #[configurable(derived)]
70 encoding: EncodingConfig,
71}
72
73impl EncodingConfigWithFraming {
74 pub const fn new(
77 framing: Option<FramingConfig>,
78 encoding: SerializerConfig,
79 transformer: Transformer,
80 ) -> Self {
81 Self {
82 framing,
83 encoding: EncodingConfig {
84 encoding,
85 transformer,
86 },
87 }
88 }
89
90 pub fn transformer(&self) -> Transformer {
92 self.encoding.transformer.clone()
93 }
94
95 pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
97 (&self.framing, &self.encoding.encoding)
98 }
99
100 pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
102 let framer = self.framing.as_ref().map(|framing| framing.build());
103 let serializer = self.encoding.build()?;
104
105 let framer = match (framer, &serializer) {
106 (Some(framer), _) => framer,
107 (None, Serializer::Json(_)) => match sink_type {
108 SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
109 SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
110 },
111 (None, Serializer::Avro(_) | Serializer::Native(_)) => {
112 LengthDelimitedEncoder::default().into()
113 }
114 (None, Serializer::Gelf(_)) => {
115 CharacterDelimitedEncoder::new(0).into()
118 }
119 (None, Serializer::Protobuf(_)) => {
120 LengthDelimitedEncoder::default().into()
123 }
124 (
125 None,
126 Serializer::Cef(_)
127 | Serializer::Csv(_)
128 | Serializer::Logfmt(_)
129 | Serializer::NativeJson(_)
130 | Serializer::RawMessage(_)
131 | Serializer::Text(_),
132 ) => NewlineDelimitedEncoder::default().into(),
133 };
134
135 Ok((framer, serializer))
136 }
137}
138
139pub enum SinkType {
141 StreamBased,
143 MessageBased,
145}
146
147impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
148where
149 F: Into<FramingConfig>,
150 S: Into<SerializerConfig>,
151{
152 fn from((framing, encoding): (Option<F>, S)) -> Self {
153 Self {
154 framing: framing.map(Into::into),
155 encoding: encoding.into().into(),
156 }
157 }
158}
159
160#[cfg(test)]
161mod test {
162 use vector_lib::lookup::lookup_v2::{ConfigValuePath, parse_value_path};
163
164 use super::*;
165 use crate::codecs::encoding::TimestampFormat;
166
167 #[test]
168 fn deserialize_encoding_config() {
169 let string = r#"
170 {
171 "codec": "json",
172 "only_fields": ["a.b[0]"],
173 "except_fields": ["ignore_me"],
174 "timestamp_format": "unix"
175 }
176 "#;
177
178 let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
179 let serializer = encoding.config();
180
181 assert!(matches!(serializer, SerializerConfig::Json(_)));
182
183 let transformer = encoding.transformer();
184
185 assert_eq!(
186 transformer.only_fields(),
187 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
188 );
189 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
190 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
191 }
192
193 #[test]
194 fn deserialize_encoding_config_with_framing() {
195 let string = r#"
196 {
197 "framing": {
198 "method": "newline_delimited"
199 },
200 "encoding": {
201 "codec": "json",
202 "only_fields": ["a.b[0]"],
203 "except_fields": ["ignore_me"],
204 "timestamp_format": "unix"
205 }
206 }
207 "#;
208
209 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
210 let (framing, serializer) = encoding.config();
211
212 assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
213 assert!(matches!(serializer, SerializerConfig::Json(_)));
214
215 let transformer = encoding.transformer();
216
217 assert_eq!(
218 transformer.only_fields(),
219 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
220 );
221 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
222 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
223 }
224
225 #[test]
226 fn deserialize_encoding_config_without_framing() {
227 let string = r#"
228 {
229 "encoding": {
230 "codec": "json",
231 "only_fields": ["a.b[0]"],
232 "except_fields": ["ignore_me"],
233 "timestamp_format": "unix"
234 }
235 }
236 "#;
237
238 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
239 let (framing, serializer) = encoding.config();
240
241 assert!(framing.is_none());
242 assert!(matches!(serializer, SerializerConfig::Json(_)));
243
244 let transformer = encoding.transformer();
245
246 assert_eq!(
247 transformer.only_fields(),
248 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
249 );
250 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
251 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
252 }
253}