1use bytes::BytesMut;
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[cfg(feature = "arrow")]
8use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig};
9#[cfg(feature = "opentelemetry")]
10use super::format::{OtlpSerializer, OtlpSerializerConfig};
11#[cfg(feature = "syslog")]
12use super::format::{SyslogSerializer, SyslogSerializerConfig};
13use super::{
14 chunking::Chunker,
15 format::{
16 AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
17 CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer,
18 GelfSerializerConfig, JsonSerializer, JsonSerializerConfig, LogfmtSerializer,
19 LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
20 NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig, RawMessageSerializer,
21 RawMessageSerializerConfig, TextSerializer, TextSerializerConfig,
22 },
23 framing::{
24 CharacterDelimitedEncoderConfig, FramingConfig, LengthDelimitedEncoderConfig,
25 VarintLengthDelimitedEncoderConfig,
26 },
27};
28
29#[configurable_component]
31#[derive(Clone, Debug)]
32#[serde(tag = "codec", rename_all = "snake_case")]
33#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
34pub enum SerializerConfig {
35 Avro {
39 avro: AvroSerializerOptions,
41 },
42
43 Cef(
46 CefSerializerConfig,
48 ),
49
50 Csv(CsvSerializerConfig),
55
56 Gelf(GelfSerializerConfig),
73
74 Json(JsonSerializerConfig),
78
79 Logfmt,
83
84 Native,
91
92 NativeJson,
99
100 #[cfg(feature = "opentelemetry")]
108 Otlp,
109
110 Protobuf(ProtobufSerializerConfig),
114
115 RawMessage,
123
124 Text(TextSerializerConfig),
133
134 #[cfg(feature = "syslog")]
137 Syslog(SyslogSerializerConfig),
138}
139
140impl Default for SerializerConfig {
141 fn default() -> Self {
142 Self::Json(JsonSerializerConfig::default())
143 }
144}
145
146#[configurable_component]
148#[derive(Clone, Debug)]
149#[serde(tag = "codec", rename_all = "snake_case")]
150#[configurable(metadata(
151 docs::enum_tag_description = "The codec to use for batch encoding events."
152))]
153pub enum BatchSerializerConfig {
154 #[cfg(feature = "arrow")]
161 #[serde(rename = "arrow_stream")]
162 ArrowStream(ArrowStreamSerializerConfig),
163}
164
165#[cfg(feature = "arrow")]
166impl BatchSerializerConfig {
167 pub fn build(
169 &self,
170 ) -> Result<ArrowStreamSerializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
171 match self {
172 BatchSerializerConfig::ArrowStream(arrow_config) => {
173 ArrowStreamSerializer::new(arrow_config.clone())
174 }
175 }
176 }
177
178 pub fn input_type(&self) -> DataType {
180 match self {
181 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(),
182 }
183 }
184
185 pub fn schema_requirement(&self) -> schema::Requirement {
187 match self {
188 BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(),
189 }
190 }
191}
192
193impl From<AvroSerializerConfig> for SerializerConfig {
194 fn from(config: AvroSerializerConfig) -> Self {
195 Self::Avro { avro: config.avro }
196 }
197}
198
199impl From<CefSerializerConfig> for SerializerConfig {
200 fn from(config: CefSerializerConfig) -> Self {
201 Self::Cef(config)
202 }
203}
204
205impl From<CsvSerializerConfig> for SerializerConfig {
206 fn from(config: CsvSerializerConfig) -> Self {
207 Self::Csv(config)
208 }
209}
210
211impl From<GelfSerializerConfig> for SerializerConfig {
212 fn from(config: GelfSerializerConfig) -> Self {
213 Self::Gelf(config)
214 }
215}
216
217impl From<JsonSerializerConfig> for SerializerConfig {
218 fn from(config: JsonSerializerConfig) -> Self {
219 Self::Json(config)
220 }
221}
222
223impl From<LogfmtSerializerConfig> for SerializerConfig {
224 fn from(_: LogfmtSerializerConfig) -> Self {
225 Self::Logfmt
226 }
227}
228
229impl From<NativeSerializerConfig> for SerializerConfig {
230 fn from(_: NativeSerializerConfig) -> Self {
231 Self::Native
232 }
233}
234
235impl From<NativeJsonSerializerConfig> for SerializerConfig {
236 fn from(_: NativeJsonSerializerConfig) -> Self {
237 Self::NativeJson
238 }
239}
240
241#[cfg(feature = "opentelemetry")]
242impl From<OtlpSerializerConfig> for SerializerConfig {
243 fn from(_: OtlpSerializerConfig) -> Self {
244 Self::Otlp
245 }
246}
247
248impl From<ProtobufSerializerConfig> for SerializerConfig {
249 fn from(config: ProtobufSerializerConfig) -> Self {
250 Self::Protobuf(config)
251 }
252}
253
254impl From<RawMessageSerializerConfig> for SerializerConfig {
255 fn from(_: RawMessageSerializerConfig) -> Self {
256 Self::RawMessage
257 }
258}
259
260impl From<TextSerializerConfig> for SerializerConfig {
261 fn from(config: TextSerializerConfig) -> Self {
262 Self::Text(config)
263 }
264}
265
266impl SerializerConfig {
267 pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
269 match self {
270 SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
271 AvroSerializerConfig::new(avro.schema.clone()).build()?,
272 )),
273 SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
274 SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
275 SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
276 SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
277 SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
278 SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
279 SerializerConfig::NativeJson => {
280 Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
281 }
282 #[cfg(feature = "opentelemetry")]
283 SerializerConfig::Otlp => {
284 Ok(Serializer::Otlp(OtlpSerializerConfig::default().build()?))
285 }
286 SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
287 SerializerConfig::RawMessage => {
288 Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
289 }
290 SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
291 #[cfg(feature = "syslog")]
292 SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())),
293 }
294 }
295
296 pub fn default_stream_framing(&self) -> FramingConfig {
298 match self {
299 SerializerConfig::Avro { .. } | SerializerConfig::Native => {
311 FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
312 }
313 #[cfg(feature = "opentelemetry")]
314 SerializerConfig::Otlp => FramingConfig::Bytes,
315 SerializerConfig::Protobuf(_) => {
316 FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
317 }
318 SerializerConfig::Cef(_)
319 | SerializerConfig::Csv(_)
320 | SerializerConfig::Json(_)
321 | SerializerConfig::Logfmt
322 | SerializerConfig::NativeJson
323 | SerializerConfig::RawMessage
324 | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
325 #[cfg(feature = "syslog")]
326 SerializerConfig::Syslog(_) => FramingConfig::NewlineDelimited,
327 SerializerConfig::Gelf(_) => {
328 FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
329 }
330 }
331 }
332
333 pub fn input_type(&self) -> DataType {
335 match self {
336 SerializerConfig::Avro { avro } => {
337 AvroSerializerConfig::new(avro.schema.clone()).input_type()
338 }
339 SerializerConfig::Cef(config) => config.input_type(),
340 SerializerConfig::Csv(config) => config.input_type(),
341 SerializerConfig::Gelf(config) => config.input_type(),
342 SerializerConfig::Json(config) => config.input_type(),
343 SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
344 SerializerConfig::Native => NativeSerializerConfig.input_type(),
345 SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
346 #[cfg(feature = "opentelemetry")]
347 SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(),
348 SerializerConfig::Protobuf(config) => config.input_type(),
349 SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
350 SerializerConfig::Text(config) => config.input_type(),
351 #[cfg(feature = "syslog")]
352 SerializerConfig::Syslog(config) => config.input_type(),
353 }
354 }
355
356 pub fn schema_requirement(&self) -> schema::Requirement {
358 match self {
359 SerializerConfig::Avro { avro } => {
360 AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
361 }
362 SerializerConfig::Cef(config) => config.schema_requirement(),
363 SerializerConfig::Csv(config) => config.schema_requirement(),
364 SerializerConfig::Gelf(config) => config.schema_requirement(),
365 SerializerConfig::Json(config) => config.schema_requirement(),
366 SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
367 SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
368 SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
369 #[cfg(feature = "opentelemetry")]
370 SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(),
371 SerializerConfig::Protobuf(config) => config.schema_requirement(),
372 SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
373 SerializerConfig::Text(config) => config.schema_requirement(),
374 #[cfg(feature = "syslog")]
375 SerializerConfig::Syslog(config) => config.schema_requirement(),
376 }
377 }
378}
379
380#[derive(Debug, Clone)]
382pub enum Serializer {
383 Avro(AvroSerializer),
385 Cef(CefSerializer),
387 Csv(CsvSerializer),
389 Gelf(GelfSerializer),
391 Json(JsonSerializer),
393 Logfmt(LogfmtSerializer),
395 Native(NativeSerializer),
397 NativeJson(NativeJsonSerializer),
399 #[cfg(feature = "opentelemetry")]
401 Otlp(OtlpSerializer),
402 Protobuf(ProtobufSerializer),
404 RawMessage(RawMessageSerializer),
406 Text(TextSerializer),
408 #[cfg(feature = "syslog")]
410 Syslog(SyslogSerializer),
411}
412
413impl Serializer {
414 pub fn supports_json(&self) -> bool {
416 match self {
417 Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
418 Serializer::Avro(_)
419 | Serializer::Cef(_)
420 | Serializer::Csv(_)
421 | Serializer::Logfmt(_)
422 | Serializer::Text(_)
423 | Serializer::Native(_)
424 | Serializer::Protobuf(_)
425 | Serializer::RawMessage(_) => false,
426 #[cfg(feature = "syslog")]
427 Serializer::Syslog(_) => false,
428 #[cfg(feature = "opentelemetry")]
429 Serializer::Otlp(_) => false,
430 }
431 }
432
433 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
440 match self {
441 Serializer::Gelf(serializer) => serializer.to_json_value(event),
442 Serializer::Json(serializer) => serializer.to_json_value(event),
443 Serializer::NativeJson(serializer) => serializer.to_json_value(event),
444 Serializer::Avro(_)
445 | Serializer::Cef(_)
446 | Serializer::Csv(_)
447 | Serializer::Logfmt(_)
448 | Serializer::Text(_)
449 | Serializer::Native(_)
450 | Serializer::Protobuf(_)
451 | Serializer::RawMessage(_) => {
452 panic!("Serializer does not support JSON")
453 }
454 #[cfg(feature = "syslog")]
455 Serializer::Syslog(_) => {
456 panic!("Serializer does not support JSON")
457 }
458 #[cfg(feature = "opentelemetry")]
459 Serializer::Otlp(_) => {
460 panic!("Serializer does not support JSON")
461 }
462 }
463 }
464
465 pub fn chunker(&self) -> Option<Chunker> {
467 match self {
468 Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
469 _ => None,
470 }
471 }
472
473 pub const fn is_binary(&self) -> bool {
478 match self {
479 Serializer::RawMessage(_)
480 | Serializer::Avro(_)
481 | Serializer::Native(_)
482 | Serializer::Protobuf(_) => true,
483 #[cfg(feature = "opentelemetry")]
484 Serializer::Otlp(_) => true,
485 #[cfg(feature = "syslog")]
486 Serializer::Syslog(_) => false,
487 Serializer::Cef(_)
488 | Serializer::Csv(_)
489 | Serializer::Logfmt(_)
490 | Serializer::Gelf(_)
491 | Serializer::Json(_)
492 | Serializer::Text(_)
493 | Serializer::NativeJson(_) => false,
494 }
495 }
496}
497
498impl From<AvroSerializer> for Serializer {
499 fn from(serializer: AvroSerializer) -> Self {
500 Self::Avro(serializer)
501 }
502}
503
504impl From<CefSerializer> for Serializer {
505 fn from(serializer: CefSerializer) -> Self {
506 Self::Cef(serializer)
507 }
508}
509
510impl From<CsvSerializer> for Serializer {
511 fn from(serializer: CsvSerializer) -> Self {
512 Self::Csv(serializer)
513 }
514}
515
516impl From<GelfSerializer> for Serializer {
517 fn from(serializer: GelfSerializer) -> Self {
518 Self::Gelf(serializer)
519 }
520}
521
522impl From<JsonSerializer> for Serializer {
523 fn from(serializer: JsonSerializer) -> Self {
524 Self::Json(serializer)
525 }
526}
527
528impl From<LogfmtSerializer> for Serializer {
529 fn from(serializer: LogfmtSerializer) -> Self {
530 Self::Logfmt(serializer)
531 }
532}
533
534impl From<NativeSerializer> for Serializer {
535 fn from(serializer: NativeSerializer) -> Self {
536 Self::Native(serializer)
537 }
538}
539
540impl From<NativeJsonSerializer> for Serializer {
541 fn from(serializer: NativeJsonSerializer) -> Self {
542 Self::NativeJson(serializer)
543 }
544}
545
546#[cfg(feature = "opentelemetry")]
547impl From<OtlpSerializer> for Serializer {
548 fn from(serializer: OtlpSerializer) -> Self {
549 Self::Otlp(serializer)
550 }
551}
552
553impl From<ProtobufSerializer> for Serializer {
554 fn from(serializer: ProtobufSerializer) -> Self {
555 Self::Protobuf(serializer)
556 }
557}
558
559impl From<RawMessageSerializer> for Serializer {
560 fn from(serializer: RawMessageSerializer) -> Self {
561 Self::RawMessage(serializer)
562 }
563}
564
565impl From<TextSerializer> for Serializer {
566 fn from(serializer: TextSerializer) -> Self {
567 Self::Text(serializer)
568 }
569}
570#[cfg(feature = "syslog")]
571impl From<SyslogSerializer> for Serializer {
572 fn from(serializer: SyslogSerializer) -> Self {
573 Self::Syslog(serializer)
574 }
575}
576
577impl tokio_util::codec::Encoder<Event> for Serializer {
578 type Error = vector_common::Error;
579
580 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
581 match self {
582 Serializer::Avro(serializer) => serializer.encode(event, buffer),
583 Serializer::Cef(serializer) => serializer.encode(event, buffer),
584 Serializer::Csv(serializer) => serializer.encode(event, buffer),
585 Serializer::Gelf(serializer) => serializer.encode(event, buffer),
586 Serializer::Json(serializer) => serializer.encode(event, buffer),
587 Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
588 Serializer::Native(serializer) => serializer.encode(event, buffer),
589 Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
590 #[cfg(feature = "opentelemetry")]
591 Serializer::Otlp(serializer) => serializer.encode(event, buffer),
592 Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
593 Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
594 Serializer::Text(serializer) => serializer.encode(event, buffer),
595 #[cfg(feature = "syslog")]
596 Serializer::Syslog(serializer) => serializer.encode(event, buffer),
597 }
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604
605 #[test]
606 fn test_serializer_config_default() {
607 let config = SerializerConfig::default();
609 assert!(matches!(config, SerializerConfig::Json(_)));
610 }
611
612 #[test]
613 fn test_serializer_is_binary() {
614 let json_config = JsonSerializerConfig::default();
616 let json_serializer = Serializer::Json(json_config.build());
617 assert!(!json_serializer.is_binary());
618
619 let native_serializer = Serializer::Native(NativeSerializerConfig.build());
620 assert!(native_serializer.is_binary());
621
622 let raw_message_serializer = Serializer::RawMessage(RawMessageSerializerConfig.build());
623 assert!(raw_message_serializer.is_binary());
624 }
625
626 #[test]
627 fn test_serializer_supports_json() {
628 let json_config = JsonSerializerConfig::default();
630 let json_serializer = Serializer::Json(json_config.build());
631 assert!(json_serializer.supports_json());
632
633 let text_config = TextSerializerConfig::default();
634 let text_serializer = Serializer::Text(text_config.build());
635 assert!(!text_serializer.supports_json());
636 }
637
638 #[test]
639 fn test_serializer_config_build() {
640 let config = SerializerConfig::Json(JsonSerializerConfig::default());
642 let serializer = config.build();
643 assert!(serializer.is_ok());
644 assert!(matches!(serializer.unwrap(), Serializer::Json(_)));
645 }
646
647 #[test]
648 fn test_serializer_config_default_framing() {
649 let json_config = SerializerConfig::Json(JsonSerializerConfig::default());
651 assert!(matches!(
652 json_config.default_stream_framing(),
653 FramingConfig::NewlineDelimited
654 ));
655
656 let native_config = SerializerConfig::Native;
657 assert!(matches!(
658 native_config.default_stream_framing(),
659 FramingConfig::LengthDelimited(_)
660 ));
661 }
662}