1use crate::codecs::{Encoder, EncoderKind, Transformer};
2use vector_lib::{
3 codecs::{
4 CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder,
5 encoding::{Framer, FramingConfig, Serializer, SerializerConfig},
6 },
7 configurable::configurable_component,
8};
9
10#[cfg(feature = "codecs-opentelemetry")]
11use vector_lib::codecs::BytesEncoder;
12
13#[configurable_component]
15#[derive(Clone, Debug)]
16pub struct EncodingConfig {
19 #[serde(flatten)]
20 encoding: SerializerConfig,
21
22 #[serde(flatten)]
23 transformer: Transformer,
24}
25
26impl EncodingConfig {
27 pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
29 Self {
30 encoding,
31 transformer,
32 }
33 }
34
35 pub fn transformer(&self) -> Transformer {
37 self.transformer.clone()
38 }
39
40 pub const fn config(&self) -> &SerializerConfig {
42 &self.encoding
43 }
44
45 pub fn build(&self) -> crate::Result<Serializer> {
47 self.encoding.build()
48 }
49}
50
51impl<T> From<T> for EncodingConfig
52where
53 T: Into<SerializerConfig>,
54{
55 fn from(encoding: T) -> Self {
56 Self {
57 encoding: encoding.into(),
58 transformer: Default::default(),
59 }
60 }
61}
62
63#[configurable_component]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct EncodingConfigWithFraming {
68 #[configurable(derived)]
69 framing: Option<FramingConfig>,
70
71 #[configurable(derived)]
72 encoding: EncodingConfig,
73}
74
75impl EncodingConfigWithFraming {
76 pub const fn new(
79 framing: Option<FramingConfig>,
80 encoding: SerializerConfig,
81 transformer: Transformer,
82 ) -> Self {
83 Self {
84 framing,
85 encoding: EncodingConfig {
86 encoding,
87 transformer,
88 },
89 }
90 }
91
92 pub fn transformer(&self) -> Transformer {
94 self.encoding.transformer.clone()
95 }
96
97 pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
99 (&self.framing, &self.encoding.encoding)
100 }
101
102 pub fn build(&self, sink_type: SinkType) -> crate::Result<(Framer, Serializer)> {
104 let framer = self.framing.as_ref().map(|framing| framing.build());
105 let serializer = self.encoding.build()?;
106
107 let framer = match (framer, &serializer) {
108 (Some(framer), _) => framer,
109 (None, Serializer::Json(_)) => match sink_type {
110 SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
111 SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
112 },
113 (None, Serializer::Avro(_) | Serializer::Native(_)) => {
114 LengthDelimitedEncoder::default().into()
115 }
116 (None, Serializer::Gelf(_)) => {
117 CharacterDelimitedEncoder::new(0).into()
120 }
121 (None, Serializer::Protobuf(_)) => {
122 LengthDelimitedEncoder::default().into()
125 }
126 (
127 None,
128 Serializer::Cef(_)
129 | Serializer::Csv(_)
130 | Serializer::Logfmt(_)
131 | Serializer::NativeJson(_)
132 | Serializer::RawMessage(_)
133 | Serializer::Text(_),
134 ) => NewlineDelimitedEncoder::default().into(),
135 #[cfg(feature = "codecs-opentelemetry")]
136 (None, Serializer::Otlp(_)) => BytesEncoder.into(),
137 };
138
139 Ok((framer, serializer))
140 }
141
142 pub fn build_encoder(&self, sink_type: SinkType) -> crate::Result<(Transformer, EncoderKind)> {
144 let (framer, serializer) = self.build(sink_type)?;
145 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
146 Ok((self.transformer(), encoder))
147 }
148}
149
150pub enum SinkType {
152 StreamBased,
154 MessageBased,
156}
157
158impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
159where
160 F: Into<FramingConfig>,
161 S: Into<SerializerConfig>,
162{
163 fn from((framing, encoding): (Option<F>, S)) -> Self {
164 Self {
165 framing: framing.map(Into::into),
166 encoding: encoding.into().into(),
167 }
168 }
169}
170
171#[cfg(test)]
172mod test {
173 use vector_lib::lookup::lookup_v2::{ConfigValuePath, parse_value_path};
174
175 use super::*;
176 use crate::codecs::encoding::TimestampFormat;
177
178 #[test]
179 fn deserialize_encoding_config() {
180 let string = r#"
181 {
182 "codec": "json",
183 "only_fields": ["a.b[0]"],
184 "except_fields": ["ignore_me"],
185 "timestamp_format": "unix"
186 }
187 "#;
188
189 let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
190 let serializer = encoding.config();
191
192 assert!(matches!(serializer, SerializerConfig::Json(_)));
193
194 let transformer = encoding.transformer();
195
196 assert_eq!(
197 transformer.only_fields(),
198 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
199 );
200 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
201 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
202 }
203
204 #[test]
205 fn deserialize_encoding_config_with_framing() {
206 let string = r#"
207 {
208 "framing": {
209 "method": "newline_delimited"
210 },
211 "encoding": {
212 "codec": "json",
213 "only_fields": ["a.b[0]"],
214 "except_fields": ["ignore_me"],
215 "timestamp_format": "unix"
216 }
217 }
218 "#;
219
220 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
221 let (framing, serializer) = encoding.config();
222
223 assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
224 assert!(matches!(serializer, SerializerConfig::Json(_)));
225
226 let transformer = encoding.transformer();
227
228 assert_eq!(
229 transformer.only_fields(),
230 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
231 );
232 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
233 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
234 }
235
236 #[test]
237 fn deserialize_encoding_config_without_framing() {
238 let string = r#"
239 {
240 "encoding": {
241 "codec": "json",
242 "only_fields": ["a.b[0]"],
243 "except_fields": ["ignore_me"],
244 "timestamp_format": "unix"
245 }
246 }
247 "#;
248
249 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
250 let (framing, serializer) = encoding.config();
251
252 assert!(framing.is_none());
253 assert!(matches!(serializer, SerializerConfig::Json(_)));
254
255 let transformer = encoding.transformer();
256
257 assert_eq!(
258 transformer.only_fields(),
259 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
260 );
261 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
262 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
263 }
264}