1use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7use super::chunking::Chunker;
8use super::format::{
9 AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
10 CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
11 JsonSerializer, JsonSerializerConfig, LogfmtSerializer, LogfmtSerializerConfig,
12 NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer, NativeSerializerConfig,
13 ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer, RawMessageSerializerConfig,
14 TextSerializer, TextSerializerConfig,
15};
16#[cfg(feature = "opentelemetry")]
17use super::format::{OtlpSerializer, OtlpSerializerConfig};
18use super::framing::{
19 CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
20 VarintLengthDelimitedEncoderConfig,
21};
22
23#[configurable_component]
25#[derive(Clone, Debug)]
26#[serde(tag = "codec", rename_all = "snake_case")]
27#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
28pub enum SerializerConfig {
29 Avro {
33 avro: AvroSerializerOptions,
35 },
36
37 Cef(
40 CefSerializerConfig,
42 ),
43
44 Csv(CsvSerializerConfig),
49
50 Gelf(GelfSerializerConfig),
67
68 Json(JsonSerializerConfig),
72
73 Logfmt,
77
78 Native,
85
86 NativeJson,
93
94 #[cfg(feature = "opentelemetry")]
102 Otlp,
103
104 Protobuf(ProtobufSerializerConfig),
108
109 RawMessage,
117
118 Text(TextSerializerConfig),
127}
128
129impl Default for SerializerConfig {
130 fn default() -> Self {
131 Self::Json(JsonSerializerConfig::default())
132 }
133}
134
135impl From<AvroSerializerConfig> for SerializerConfig {
136 fn from(config: AvroSerializerConfig) -> Self {
137 Self::Avro { avro: config.avro }
138 }
139}
140
141impl From<CefSerializerConfig> for SerializerConfig {
142 fn from(config: CefSerializerConfig) -> Self {
143 Self::Cef(config)
144 }
145}
146
147impl From<CsvSerializerConfig> for SerializerConfig {
148 fn from(config: CsvSerializerConfig) -> Self {
149 Self::Csv(config)
150 }
151}
152
153impl From<GelfSerializerConfig> for SerializerConfig {
154 fn from(config: GelfSerializerConfig) -> Self {
155 Self::Gelf(config)
156 }
157}
158
159impl From<JsonSerializerConfig> for SerializerConfig {
160 fn from(config: JsonSerializerConfig) -> Self {
161 Self::Json(config)
162 }
163}
164
165impl From<LogfmtSerializerConfig> for SerializerConfig {
166 fn from(_: LogfmtSerializerConfig) -> Self {
167 Self::Logfmt
168 }
169}
170
171impl From<NativeSerializerConfig> for SerializerConfig {
172 fn from(_: NativeSerializerConfig) -> Self {
173 Self::Native
174 }
175}
176
177impl From<NativeJsonSerializerConfig> for SerializerConfig {
178 fn from(_: NativeJsonSerializerConfig) -> Self {
179 Self::NativeJson
180 }
181}
182
183#[cfg(feature = "opentelemetry")]
184impl From<OtlpSerializerConfig> for SerializerConfig {
185 fn from(_: OtlpSerializerConfig) -> Self {
186 Self::Otlp
187 }
188}
189
190impl From<ProtobufSerializerConfig> for SerializerConfig {
191 fn from(config: ProtobufSerializerConfig) -> Self {
192 Self::Protobuf(config)
193 }
194}
195
196impl From<RawMessageSerializerConfig> for SerializerConfig {
197 fn from(_: RawMessageSerializerConfig) -> Self {
198 Self::RawMessage
199 }
200}
201
202impl From<TextSerializerConfig> for SerializerConfig {
203 fn from(config: TextSerializerConfig) -> Self {
204 Self::Text(config)
205 }
206}
207
208impl SerializerConfig {
209 pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
211 match self {
212 SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
213 AvroSerializerConfig::new(avro.schema.clone()).build()?,
214 )),
215 SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
216 SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
217 SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
218 SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
219 SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
220 SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
221 SerializerConfig::NativeJson => {
222 Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
223 }
224 #[cfg(feature = "opentelemetry")]
225 SerializerConfig::Otlp => {
226 Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
227 }
228 SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
229 SerializerConfig::RawMessage => {
230 Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
231 }
232 SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
233 }
234 }
235
236 pub fn default_stream_framing(&self) -> FramingConfig {
238 match self {
239 SerializerConfig::Avro { .. } | SerializerConfig::Native => {
251 FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
252 }
253 #[cfg(feature = "opentelemetry")]
254 SerializerConfig::Otlp => FramingConfig::Bytes,
255 SerializerConfig::Protobuf(_) => {
256 FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
257 }
258 SerializerConfig::Cef(_)
259 | SerializerConfig::Csv(_)
260 | SerializerConfig::Json(_)
261 | SerializerConfig::Logfmt
262 | SerializerConfig::NativeJson
263 | SerializerConfig::RawMessage
264 | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
265 SerializerConfig::Gelf(_) => {
266 FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
267 }
268 }
269 }
270
271 pub fn input_type(&self) -> DataType {
273 match self {
274 SerializerConfig::Avro { avro } => {
275 AvroSerializerConfig::new(avro.schema.clone()).input_type()
276 }
277 SerializerConfig::Cef(config) => config.input_type(),
278 SerializerConfig::Csv(config) => config.input_type(),
279 SerializerConfig::Gelf(config) => config.input_type(),
280 SerializerConfig::Json(config) => config.input_type(),
281 SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
282 SerializerConfig::Native => NativeSerializerConfig.input_type(),
283 SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
284 #[cfg(feature = "opentelemetry")]
285 SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
286 SerializerConfig::Protobuf(config) => config.input_type(),
287 SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
288 SerializerConfig::Text(config) => config.input_type(),
289 }
290 }
291
292 pub fn schema_requirement(&self) -> schema::Requirement {
294 match self {
295 SerializerConfig::Avro { avro } => {
296 AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
297 }
298 SerializerConfig::Cef(config) => config.schema_requirement(),
299 SerializerConfig::Csv(config) => config.schema_requirement(),
300 SerializerConfig::Gelf(config) => config.schema_requirement(),
301 SerializerConfig::Json(config) => config.schema_requirement(),
302 SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
303 SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
304 SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
305 #[cfg(feature = "opentelemetry")]
306 SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
307 SerializerConfig::Protobuf(config) => config.schema_requirement(),
308 SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
309 SerializerConfig::Text(config) => config.schema_requirement(),
310 }
311 }
312}
313
314#[derive(Debug, Clone)]
316pub enum Serializer {
317 Avro(AvroSerializer),
319 Cef(CefSerializer),
321 Csv(CsvSerializer),
323 Gelf(GelfSerializer),
325 Json(JsonSerializer),
327 Logfmt(LogfmtSerializer),
329 Native(NativeSerializer),
331 NativeJson(NativeJsonSerializer),
333 #[cfg(feature = "opentelemetry")]
335 Otlp(OtlpSerializer),
336 Protobuf(ProtobufSerializer),
338 RawMessage(RawMessageSerializer),
340 Text(TextSerializer),
342}
343
344impl Serializer {
345 pub fn supports_json(&self) -> bool {
347 match self {
348 Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
349 Serializer::Avro(_)
350 | Serializer::Cef(_)
351 | Serializer::Csv(_)
352 | Serializer::Logfmt(_)
353 | Serializer::Text(_)
354 | Serializer::Native(_)
355 | Serializer::Protobuf(_)
356 | Serializer::RawMessage(_) => false,
357 #[cfg(feature = "opentelemetry")]
358 Serializer::Otlp(_) => false,
359 }
360 }
361
362 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
369 match self {
370 Serializer::Gelf(serializer) => serializer.to_json_value(event),
371 Serializer::Json(serializer) => serializer.to_json_value(event),
372 Serializer::NativeJson(serializer) => serializer.to_json_value(event),
373 Serializer::Avro(_)
374 | Serializer::Cef(_)
375 | Serializer::Csv(_)
376 | Serializer::Logfmt(_)
377 | Serializer::Text(_)
378 | Serializer::Native(_)
379 | Serializer::Protobuf(_)
380 | Serializer::RawMessage(_) => {
381 panic!("Serializer does not support JSON")
382 }
383 #[cfg(feature = "opentelemetry")]
384 Serializer::Otlp(_) => {
385 panic!("Serializer does not support JSON")
386 }
387 }
388 }
389
390 pub fn chunker(&self) -> Option<Chunker> {
392 match self {
393 Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
394 _ => None,
395 }
396 }
397
398 pub const fn is_binary(&self) -> bool {
403 match self {
404 Serializer::RawMessage(_)
405 | Serializer::Avro(_)
406 | Serializer::Native(_)
407 | Serializer::Protobuf(_) => true,
408 #[cfg(feature = "opentelemetry")]
409 Serializer::Otlp(_) => true,
410 Serializer::Cef(_)
411 | Serializer::Csv(_)
412 | Serializer::Logfmt(_)
413 | Serializer::Gelf(_)
414 | Serializer::Json(_)
415 | Serializer::Text(_)
416 | Serializer::NativeJson(_) => false,
417 }
418 }
419}
420
421impl From<AvroSerializer> for Serializer {
422 fn from(serializer: AvroSerializer) -> Self {
423 Self::Avro(serializer)
424 }
425}
426
427impl From<CefSerializer> for Serializer {
428 fn from(serializer: CefSerializer) -> Self {
429 Self::Cef(serializer)
430 }
431}
432
433impl From<CsvSerializer> for Serializer {
434 fn from(serializer: CsvSerializer) -> Self {
435 Self::Csv(serializer)
436 }
437}
438
439impl From<GelfSerializer> for Serializer {
440 fn from(serializer: GelfSerializer) -> Self {
441 Self::Gelf(serializer)
442 }
443}
444
445impl From<JsonSerializer> for Serializer {
446 fn from(serializer: JsonSerializer) -> Self {
447 Self::Json(serializer)
448 }
449}
450
451impl From<LogfmtSerializer> for Serializer {
452 fn from(serializer: LogfmtSerializer) -> Self {
453 Self::Logfmt(serializer)
454 }
455}
456
457impl From<NativeSerializer> for Serializer {
458 fn from(serializer: NativeSerializer) -> Self {
459 Self::Native(serializer)
460 }
461}
462
463impl From<NativeJsonSerializer> for Serializer {
464 fn from(serializer: NativeJsonSerializer) -> Self {
465 Self::NativeJson(serializer)
466 }
467}
468
469#[cfg(feature = "opentelemetry")]
470impl From<OtlpSerializer> for Serializer {
471 fn from(serializer: OtlpSerializer) -> Self {
472 Self::Otlp(serializer)
473 }
474}
475
476impl From<ProtobufSerializer> for Serializer {
477 fn from(serializer: ProtobufSerializer) -> Self {
478 Self::Protobuf(serializer)
479 }
480}
481
482impl From<RawMessageSerializer> for Serializer {
483 fn from(serializer: RawMessageSerializer) -> Self {
484 Self::RawMessage(serializer)
485 }
486}
487
488impl From<TextSerializer> for Serializer {
489 fn from(serializer: TextSerializer) -> Self {
490 Self::Text(serializer)
491 }
492}
493
494impl tokio_util::codec::Encoder<Event> for Serializer {
495 type Error = vector_common::Error;
496
497 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
498 match self {
499 Serializer::Avro(serializer) => serializer.encode(event, buffer),
500 Serializer::Cef(serializer) => serializer.encode(event, buffer),
501 Serializer::Csv(serializer) => serializer.encode(event, buffer),
502 Serializer::Gelf(serializer) => serializer.encode(event, buffer),
503 Serializer::Json(serializer) => serializer.encode(event, buffer),
504 Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
505 Serializer::Native(serializer) => serializer.encode(event, buffer),
506 Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
507 #[cfg(feature = "opentelemetry")]
508 Serializer::Otlp(serializer) => serializer.encode(event, buffer),
509 Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
510 Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
511 Serializer::Text(serializer) => serializer.encode(event, buffer),
512 }
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519
520 #[test]
521 fn test_serializer_config_default() {
522 let config = SerializerConfig::default();
524 assert!(matches!(config, SerializerConfig::Json(_)));
525 }
526
527 #[test]
528 fn test_serializer_is_binary() {
529 let json_config = JsonSerializerConfig::default();
531 let json_serializer = Serializer::Json(json_config.build());
532 assert!(!json_serializer.is_binary());
533
534 let native_serializer = Serializer::Native(NativeSerializerConfig.build());
535 assert!(native_serializer.is_binary());
536
537 let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
538 assert!(raw_message_serializer.is_binary());
539 }
540
541 #[test]
542 fn test_serializer_supports_json() {
543 let json_config = JsonSerializerConfig::default();
545 let json_serializer = Serializer::Json(json_config.build());
546 assert!(json_serializer.supports_json());
547
548 let text_config = TextSerializerConfig::default();
549 let text_serializer = Serializer::Text(text_config.build());
550 assert!(!text_serializer.supports_json());
551 }
552
553 #[test]
554 fn test_serializer_config_build() {
555 let config = SerializerConfig::Json(JsonSerializerConfig::default());
557 let serializer = config.build();
558 assert!(serializer.is_ok());
559 assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
560 }
561
562 #[test]
563 fn test_serializer_config_default_framing() {
564 let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
566 assert!(matches!(
567 json_config.default_stream_framing(),
568 FramingConfig::NewlineDelimited
569 ));
570
571 let native_config = SerializerConfig::Native;
572 assert!(matches!(
573 native_config.default_stream_framing(),
574 FramingConfig::LengthDelimited(_)
575 ));
576 }
577}