vector/codecs/encoding/
config.rs1use crate::codecs::Transformer;
2use vector_lib::codecs::{
3 encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
4 CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
5};
6use vector_lib::configurable::configurable_component;
7
8#[configurable_component]
10#[derive(Clone, Debug)]
11pub struct EncodingConfig {
14 #[serde(flatten)]
15 encoding: SerializerConfig,
16
17 #[serde(flatten)]
18 transformer: Transformer,
19}
20
21impl EncodingConfig {
22 pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
24 Self {
25 encoding,
26 transformer,
27 }
28 }
29
30 pub fn transformer(&self) -> Transformer {
32 self.transformer.clone()
33 }
34
35 pub const fn config(&self) -> &SerializerConfig {
37 &self.encoding
38 }
39
40 pub fn build(&self) -> crate::Result<Serializer> {
42 self.encoding.build()
43 }
44}
45
46impl<T> From<T> for EncodingConfig
47where
48 T: Into<SerializerConfig>,
49{
50 fn from(encoding: T) -> Self {
51 Self {
52 encoding: encoding.into(),
53 transformer: Default::default(),
54 }
55 }
56}
57
58#[configurable_component]
60#[derive(Clone, Debug)]
61#[serde(deny_unknown_fields)]
62pub struct EncodingConfigWithFraming {
63 #[configurable(derived)]
64 framing: Option<FramingConfig>,
65
66 #[configurable(derived)]
67 encoding: EncodingConfig,
68}
69
70impl EncodingConfigWithFraming {
71 pub const fn new(
74 framing: Option<FramingConfig>,
75 encoding: SerializerConfig,
76 transformer: Transformer,
77 ) -> Self {
78 Self {
79 framing,
80 encoding: EncodingConfig {
81 encoding,
82 transformer,
83 },
84 }
85 }
86
87 pub fn transformer(&self) -> Transformer {
89 self.encoding.transformer.clone()
90 }
91
92 pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
94 (&self.framing, &self.encoding.encoding)
95 }
96
97 pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
99 let framer = self.framing.as_ref().map(|framing| framing.build());
100 let serializer = self.encoding.build()?;
101
102 let framer = match (framer, &serializer) {
103 (Some(framer), _) => framer,
104 (None, Serializer::Json(_)) => match sink_type {
105 SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
106 SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
107 },
108 (None, Serializer::Avro(_) | Serializer::Native(_)) => {
109 LengthDelimitedEncoder::default().into()
110 }
111 (None, Serializer::Gelf(_)) => {
112 CharacterDelimitedEncoder::new(0).into()
115 }
116 (None, Serializer::Protobuf(_)) => {
117 LengthDelimitedEncoder::default().into()
120 }
121 (
122 None,
123 Serializer::Cef(_)
124 | Serializer::Csv(_)
125 | Serializer::Logfmt(_)
126 | Serializer::NativeJson(_)
127 | Serializer::RawMessage(_)
128 | Serializer::Text(_),
129 ) => NewlineDelimitedEncoder::default().into(),
130 };
131
132 Ok((framer, serializer))
133 }
134}
135
136pub enum SinkType {
138 StreamBased,
140 MessageBased,
142}
143
144impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
145where
146 F: Into<FramingConfig>,
147 S: Into<SerializerConfig>,
148{
149 fn from((framing, encoding): (Option<F>, S)) -> Self {
150 Self {
151 framing: framing.map(Into::into),
152 encoding: encoding.into().into(),
153 }
154 }
155}
156
157#[cfg(test)]
158mod test {
159 use vector_lib::lookup::lookup_v2::{parse_value_path, ConfigValuePath};
160
161 use super::*;
162 use crate::codecs::encoding::TimestampFormat;
163
164 #[test]
165 fn deserialize_encoding_config() {
166 let string = r#"
167 {
168 "codec": "json",
169 "only_fields": ["a.b[0]"],
170 "except_fields": ["ignore_me"],
171 "timestamp_format": "unix"
172 }
173 "#;
174
175 let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
176 let serializer = encoding.config();
177
178 assert!(matches!(serializer, SerializerConfig::Json(_)));
179
180 let transformer = encoding.transformer();
181
182 assert_eq!(
183 transformer.only_fields(),
184 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
185 );
186 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
187 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
188 }
189
190 #[test]
191 fn deserialize_encoding_config_with_framing() {
192 let string = r#"
193 {
194 "framing": {
195 "method": "newline_delimited"
196 },
197 "encoding": {
198 "codec": "json",
199 "only_fields": ["a.b[0]"],
200 "except_fields": ["ignore_me"],
201 "timestamp_format": "unix"
202 }
203 }
204 "#;
205
206 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
207 let (framing, serializer) = encoding.config();
208
209 assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
210 assert!(matches!(serializer, SerializerConfig::Json(_)));
211
212 let transformer = encoding.transformer();
213
214 assert_eq!(
215 transformer.only_fields(),
216 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
217 );
218 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
219 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
220 }
221
222 #[test]
223 fn deserialize_encoding_config_without_framing() {
224 let string = r#"
225 {
226 "encoding": {
227 "codec": "json",
228 "only_fields": ["a.b[0]"],
229 "except_fields": ["ignore_me"],
230 "timestamp_format": "unix"
231 }
232 }
233 "#;
234
235 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
236 let (framing, serializer) = encoding.config();
237
238 assert!(framing.is_none());
239 assert!(matches!(serializer, SerializerConfig::Json(_)));
240
241 let transformer = encoding.transformer();
242
243 assert_eq!(
244 transformer.only_fields(),
245 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
246 );
247 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
248 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
249 }
250}