1use vector_config::configurable_component;
2
3use super::{Encoder, EncoderKind, Transformer};
4use crate::encoding::{
5 CharacterDelimitedEncoder, Framer, FramingConfig, LengthDelimitedEncoder,
6 NewlineDelimitedEncoder, Serializer, SerializerConfig,
7};
8
9#[cfg(feature = "opentelemetry")]
10use crate::encoding::BytesEncoder;
11
12#[configurable_component]
14#[derive(Clone, Debug)]
15pub struct EncodingConfig {
18 #[serde(flatten)]
19 encoding: SerializerConfig,
20
21 #[serde(flatten)]
22 transformer: Transformer,
23}
24
25impl EncodingConfig {
26 pub const fn new(encoding: SerializerConfig, transformer: Transformer) -> Self {
28 Self {
29 encoding,
30 transformer,
31 }
32 }
33
34 pub fn transformer(&self) -> Transformer {
36 self.transformer.clone()
37 }
38
39 pub const fn config(&self) -> &SerializerConfig {
41 &self.encoding
42 }
43
44 pub fn build(&self) -> vector_common::Result<Serializer> {
46 self.encoding.build()
47 }
48}
49
50impl<T> From<T> for EncodingConfig
51where
52 T: Into<SerializerConfig>,
53{
54 fn from(encoding: T) -> Self {
55 Self {
56 encoding: encoding.into(),
57 transformer: Default::default(),
58 }
59 }
60}
61
62#[configurable_component]
64#[derive(Clone, Debug)]
65#[serde(deny_unknown_fields)]
66pub struct EncodingConfigWithFraming {
67 #[configurable(derived)]
68 framing: Option<FramingConfig>,
69
70 #[configurable(derived)]
71 encoding: EncodingConfig,
72}
73
74impl EncodingConfigWithFraming {
75 pub const fn new(
78 framing: Option<FramingConfig>,
79 encoding: SerializerConfig,
80 transformer: Transformer,
81 ) -> Self {
82 Self {
83 framing,
84 encoding: EncodingConfig {
85 encoding,
86 transformer,
87 },
88 }
89 }
90
91 pub fn transformer(&self) -> Transformer {
93 self.encoding.transformer.clone()
94 }
95
96 pub const fn config(&self) -> (&Option<FramingConfig>, &SerializerConfig) {
98 (&self.framing, &self.encoding.encoding)
99 }
100
101 pub fn build(&self, sink_type: SinkType) -> vector_common::Result<(Framer, Serializer)> {
103 let framer = self.framing.as_ref().map(|framing| framing.build());
104 let serializer = self.encoding.build()?;
105
106 let framer = match (framer, &serializer) {
107 (Some(framer), _) => framer,
108 (None, Serializer::Json(_)) => match sink_type {
109 SinkType::StreamBased => NewlineDelimitedEncoder::default().into(),
110 SinkType::MessageBased => CharacterDelimitedEncoder::new(b',').into(),
111 },
112 (None, Serializer::Avro(_) | Serializer::Native(_)) => {
113 LengthDelimitedEncoder::default().into()
114 }
115 (None, Serializer::Gelf(_)) => {
116 CharacterDelimitedEncoder::new(0).into()
119 }
120 (None, Serializer::Protobuf(_)) => {
121 LengthDelimitedEncoder::default().into()
124 }
125 (
126 None,
127 Serializer::Cef(_)
128 | Serializer::Csv(_)
129 | Serializer::Logfmt(_)
130 | Serializer::NativeJson(_)
131 | Serializer::RawMessage(_)
132 | Serializer::Text(_),
133 ) => NewlineDelimitedEncoder::default().into(),
134 #[cfg(feature = "syslog")]
135 (None, Serializer::Syslog(_)) => NewlineDelimitedEncoder::default().into(),
136 #[cfg(feature = "opentelemetry")]
137 (None, Serializer::Otlp(_)) => BytesEncoder.into(),
138 };
139
140 Ok((framer, serializer))
141 }
142
143 pub fn build_encoder(
145 &self,
146 sink_type: SinkType,
147 ) -> vector_common::Result<(Transformer, EncoderKind)> {
148 let (framer, serializer) = self.build(sink_type)?;
149 let encoder = EncoderKind::Framed(Box::new(Encoder::<Framer>::new(framer, serializer)));
150 Ok((self.transformer(), encoder))
151 }
152}
153
154pub enum SinkType {
156 StreamBased,
158 MessageBased,
160}
161
162impl<F, S> From<(Option<F>, S)> for EncodingConfigWithFraming
163where
164 F: Into<FramingConfig>,
165 S: Into<SerializerConfig>,
166{
167 fn from((framing, encoding): (Option<F>, S)) -> Self {
168 Self {
169 framing: framing.map(Into::into),
170 encoding: encoding.into().into(),
171 }
172 }
173}
174
175#[cfg(test)]
176mod test {
177 use lookup::lookup_v2::{ConfigValuePath, parse_value_path};
178
179 use super::*;
180 use crate::encoding::TimestampFormat;
181
182 #[test]
183 fn deserialize_encoding_config() {
184 let string = r#"
185 {
186 "codec": "json",
187 "only_fields": ["a.b[0]"],
188 "except_fields": ["ignore_me"],
189 "timestamp_format": "unix"
190 }
191 "#;
192
193 let encoding = serde_json::from_str::<EncodingConfig>(string).unwrap();
194 let serializer = encoding.config();
195
196 assert!(matches!(serializer, SerializerConfig::Json(_)));
197
198 let transformer = encoding.transformer();
199
200 assert_eq!(
201 transformer.only_fields(),
202 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
203 );
204 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
205 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
206 }
207
208 #[test]
209 fn deserialize_encoding_config_with_framing() {
210 let string = r#"
211 {
212 "framing": {
213 "method": "newline_delimited"
214 },
215 "encoding": {
216 "codec": "json",
217 "only_fields": ["a.b[0]"],
218 "except_fields": ["ignore_me"],
219 "timestamp_format": "unix"
220 }
221 }
222 "#;
223
224 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
225 let (framing, serializer) = encoding.config();
226
227 assert!(matches!(framing, Some(FramingConfig::NewlineDelimited)));
228 assert!(matches!(serializer, SerializerConfig::Json(_)));
229
230 let transformer = encoding.transformer();
231
232 assert_eq!(
233 transformer.only_fields(),
234 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
235 );
236 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
237 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
238 }
239
240 #[test]
241 fn deserialize_encoding_config_without_framing() {
242 let string = r#"
243 {
244 "encoding": {
245 "codec": "json",
246 "only_fields": ["a.b[0]"],
247 "except_fields": ["ignore_me"],
248 "timestamp_format": "unix"
249 }
250 }
251 "#;
252
253 let encoding = serde_json::from_str::<EncodingConfigWithFraming>(string).unwrap();
254 let (framing, serializer) = encoding.config();
255
256 assert!(framing.is_none());
257 assert!(matches!(serializer, SerializerConfig::Json(_)));
258
259 let transformer = encoding.transformer();
260
261 assert_eq!(
262 transformer.only_fields(),
263 &Some(vec![ConfigValuePath(parse_value_path("a.b[0]").unwrap())])
264 );
265 assert_eq!(transformer.except_fields(), &Some(vec!["ignore_me".into()]));
266 assert_eq!(transformer.timestamp_format(), &Some(TimestampFormat::Unix));
267 }
268}