vector/codecs/encoding/
config.rs1use crate::codecs::Transformer;
2use vector_lib::{
3 codecs::{
4 CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
5 encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
6 },
7 configurable::configurable_component,
8};
9
10#[cfg(feature = "codecs-opentelemetry")]
11use vector_lib::codecs::BytesEncoder;
12
13#[configurable_component]
15#[derive(Clone, Debug)]
16pub struct EncodingConfig {
19 #[serde(flatten)]
20 encoding: SerializerConfig,
21
22 #[serde(flatten)]
23 transformer: Transformer,
24}
25
26impl EncodingConfig {
27 pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
29 Self {
30 encoding,
31 transformer,
32 }
33 }
34
35 pub fn transformer(&self) -> Transformer {
37 self.transformer.clone()
38 }
39
40 pub const fn config(&self) -> &SerializerConfig {
42 &self.encoding
43 }
44
45 pub fn build(&self) -> crate::Result<Serializer> {
47 self.encoding.build()
48 }
49}
50
51impl<T> From<T> for EncodingConfig
52where
53 T: Into<SerializerConfig>,
54{
55 fn from(encoding: T) -> Self {
56 Self {
57 encoding: encoding.into(),
58 transformer: Default::default(),
59 }
60 }
61}
62
63#[configurable_component]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct EncodingConfigWithFraming {
68 #[configurable(derived)]
69 framing: Option<FramingConfig>,
70
71 #[configurable(derived)]
72 encoding: EncodingConfig,
73}
74
75impl EncodingConfigWithFraming {
76 pub const fn new(
79 framing: Option<FramingConfig>,
80 encoding: SerializerConfig,
81 transformer: Transformer,
82 ) -> Self {
83 Self {
84 framing,
85 encoding: EncodingConfig {
86 encoding,
87 transformer,
88 },
89 }
90 }
91
92 pub fn transformer(&self) -> Transformer {
94 self.encoding.transformer.clone()
95 }
96
97 pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
99 (&self.framing, &self.encoding.encoding)
100 }
101
102 pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
104 let framer = self.framing.as_ref().map(|framing| framing.build());
105 let serializer = self.encoding.build()?;
106
107 let framer = match (framer, &serializer) {
108 (Some(framer), _) => framer,
109 (None, Serializer::Json(_)) => match sink_type {
110 SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
111 SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
112 },
113 (None, Serializer::Avro(_) | Serializer::Native(_)) => {
114 LengthDelimitedEncoder::default().into()
115 }
116 (None, Serializer::Gelf(_)) => {
117 CharacterDelimitedEncoder::new(0).into()
120 }
121 (None, Serializer::Protobuf(_)) => {
122 LengthDelimitedEncoder::default().into()
125 }
126 (
127 None,
128 Serializer::Cef(_)
129 | Serializer::Csv(_)
130 | Serializer::Logfmt(_)
131 | Serializer::NativeJson(_)
132 | Serializer::RawMessage(_)
133 | Serializer::Text(_),
134 ) => NewlineDelimitedEncoder::default().into(),
135 #[cfg(feature = "codecs-opentelemetry")]
136 (None, Serializer::Otlp(_)) => BytesEncoder.into(),
137 };
138
139 Ok((framer, serializer))
140 }
141}
142
143pub enum SinkType {
145 StreamBased,
147 MessageBased,
149}
150
151impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
152where
153 F: Into<FramingConfig>,
154 S: Into<SerializerConfig>,
155{
156 fn from((framing, encoding): (Option<F>, S)) -> Self {
157 Self {
158 framing: framing.map(Into::into),
159 encoding: encoding.into().into(),
160 }
161 }
162}
163
164#[cfg(test)]
165mod test {
166 use vector_lib::lookup::lookup_v2::{ConfigValuePath, parse_value_path};
167
168 use super::*;
169 use crate::codecs::encoding::TimestampFormat;
170
171 #[test]
172 fn deserialize_encoding_config() {
173 let string = r#"
174 {
175 "codec": "json",
176 "only_fields": ["a.b[0]"],
177 "except_fields": ["ignore_me"],
178 "timestamp_format": "unix"
179 }
180 "#;
181
182 let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
183 let serializer = encoding.config();
184
185 assert!(matches!(serializer, SerializerConfig::Json(_)));
186
187 let transformer = encoding.transformer();
188
189 assert_eq!(
190 transformer.only_fields(),
191 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
192 );
193 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
194 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
195 }
196
197 #[test]
198 fn deserialize_encoding_config_with_framing() {
199 let string = r#"
200 {
201 "framing": {
202 "method": "newline_delimited"
203 },
204 "encoding": {
205 "codec": "json",
206 "only_fields": ["a.b[0]"],
207 "except_fields": ["ignore_me"],
208 "timestamp_format": "unix"
209 }
210 }
211 "#;
212
213 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
214 let (framing, serializer) = encoding.config();
215
216 assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
217 assert!(matches!(serializer, SerializerConfig::Json(_)));
218
219 let transformer = encoding.transformer();
220
221 assert_eq!(
222 transformer.only_fields(),
223 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
224 );
225 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
226 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
227 }
228
229 #[test]
230 fn deserialize_encoding_config_without_framing() {
231 let string = r#"
232 {
233 "encoding": {
234 "codec": "json",
235 "only_fields": ["a.b[0]"],
236 "except_fields": ["ignore_me"],
237 "timestamp_format": "unix"
238 }
239 }
240 "#;
241
242 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
243 let (framing, serializer) = encoding.config();
244
245 assert!(framing.is_none());
246 assert!(matches!(serializer, SerializerConfig::Json(_)));
247
248 let transformer = encoding.transformer();
249
250 assert_eq!(
251 transformer.only_fields(),
252 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
253 );
254 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
255 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
256 }
257}