1pub mod chunking;
5pub mod format;
6pub mod framing;
7
8use std::fmt::Debug;
9
10use bytes::BytesMut;
11pub use chunking::{Chunker, Chunking, GelfChunker};
12pub use format::{
13    AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
14    CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
15    JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer,
16    LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
17    NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig,
18    ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer,
19    TextSerializerConfig,
20};
21pub use framing::{
22    BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
23    CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
24    LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
25    VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig,
26};
27use vector_config::configurable_component;
28use vector_core::{config::DataType, event::Event, schema};
29
30pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;
32
33#[derive(Debug)]
35pub enum Error {
36    FramingError(BoxedFramingError),
38    SerializingError(vector_common::Error),
40}
41
42impl std::fmt::Display for Error {
43    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
46            Self::SerializingError(error) => write!(formatter, "SerializingError({error})"),
47        }
48    }
49}
50
51impl std::error::Error for Error {}
52
53impl From<std::io::Error> for Error {
54    fn from(error: std::io::Error) -> Self {
55        Self::FramingError(Box::new(error))
56    }
57}
58
59#[configurable_component]
61#[derive(Clone, Debug, Eq, PartialEq)]
62#[serde(tag = "method", rename_all = "snake_case")]
63#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
64pub enum FramingConfig {
65    Bytes,
67
68    CharacterDelimited(CharacterDelimitedEncoderConfig),
70
71    LengthDelimited(LengthDelimitedEncoderConfig),
75
76    NewlineDelimited,
78
79    VarintLengthDelimited(VarintLengthDelimitedEncoderConfig),
83}
84
85impl From<BytesEncoderConfig> for FramingConfig {
86    fn from(_: BytesEncoderConfig) -> Self {
87        Self::Bytes
88    }
89}
90
91impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
92    fn from(config: CharacterDelimitedEncoderConfig) -> Self {
93        Self::CharacterDelimited(config)
94    }
95}
96
97impl From<LengthDelimitedEncoderConfig> for FramingConfig {
98    fn from(config: LengthDelimitedEncoderConfig) -> Self {
99        Self::LengthDelimited(config)
100    }
101}
102
103impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
104    fn from(_: NewlineDelimitedEncoderConfig) -> Self {
105        Self::NewlineDelimited
106    }
107}
108
109impl From<VarintLengthDelimitedEncoderConfig> for FramingConfig {
110    fn from(config: VarintLengthDelimitedEncoderConfig) -> Self {
111        Self::VarintLengthDelimited(config)
112    }
113}
114
115impl FramingConfig {
116    pub fn build(&self) -> Framer {
118        match self {
119            FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
120            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
121            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
122            FramingConfig::NewlineDelimited => {
123                Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
124            }
125            FramingConfig::VarintLengthDelimited(config) => {
126                Framer::VarintLengthDelimited(config.build())
127            }
128        }
129    }
130}
131
132#[derive(Debug, Clone)]
134pub enum Framer {
135    Bytes(BytesEncoder),
137    CharacterDelimited(CharacterDelimitedEncoder),
139    LengthDelimited(LengthDelimitedEncoder),
141    NewlineDelimited(NewlineDelimitedEncoder),
143    VarintLengthDelimited(VarintLengthDelimitedEncoder),
145    Boxed(BoxedFramer),
147}
148
149impl From<BytesEncoder> for Framer {
150    fn from(encoder: BytesEncoder) -> Self {
151        Self::Bytes(encoder)
152    }
153}
154
155impl From<CharacterDelimitedEncoder> for Framer {
156    fn from(encoder: CharacterDelimitedEncoder) -> Self {
157        Self::CharacterDelimited(encoder)
158    }
159}
160
161impl From<LengthDelimitedEncoder> for Framer {
162    fn from(encoder: LengthDelimitedEncoder) -> Self {
163        Self::LengthDelimited(encoder)
164    }
165}
166
167impl From<NewlineDelimitedEncoder> for Framer {
168    fn from(encoder: NewlineDelimitedEncoder) -> Self {
169        Self::NewlineDelimited(encoder)
170    }
171}
172
173impl From<VarintLengthDelimitedEncoder> for Framer {
174    fn from(encoder: VarintLengthDelimitedEncoder) -> Self {
175        Self::VarintLengthDelimited(encoder)
176    }
177}
178
179impl From<BoxedFramer> for Framer {
180    fn from(encoder: BoxedFramer) -> Self {
181        Self::Boxed(encoder)
182    }
183}
184
185impl tokio_util::codec::Encoder<()> for Framer {
186    type Error = BoxedFramingError;
187
188    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
189        match self {
190            Framer::Bytes(framer) => framer.encode((), buffer),
191            Framer::CharacterDelimited(framer) => framer.encode((), buffer),
192            Framer::LengthDelimited(framer) => framer.encode((), buffer),
193            Framer::NewlineDelimited(framer) => framer.encode((), buffer),
194            Framer::VarintLengthDelimited(framer) => framer.encode((), buffer),
195            Framer::Boxed(framer) => framer.encode((), buffer),
196        }
197    }
198}
199
200#[configurable_component]
202#[derive(Clone, Debug)]
203#[serde(tag = "codec", rename_all = "snake_case")]
204#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
205pub enum SerializerConfig {
206    Avro {
210        avro: AvroSerializerOptions,
212    },
213
214    Cef(
217        CefSerializerConfig,
219    ),
220
221    Csv(CsvSerializerConfig),
226
227    Gelf(GelfSerializerConfig),
244
245    Json(JsonSerializerConfig),
249
250    Logfmt,
254
255    Native,
262
263    NativeJson,
270
271    Protobuf(ProtobufSerializerConfig),
275
276    RawMessage,
284
285    Text(TextSerializerConfig),
294}
295
296impl From<AvroSerializerConfig> for SerializerConfig {
297    fn from(config: AvroSerializerConfig) -> Self {
298        Self::Avro { avro: config.avro }
299    }
300}
301
302impl From<CefSerializerConfig> for SerializerConfig {
303    fn from(config: CefSerializerConfig) -> Self {
304        Self::Cef(config)
305    }
306}
307
308impl From<CsvSerializerConfig> for SerializerConfig {
309    fn from(config: CsvSerializerConfig) -> Self {
310        Self::Csv(config)
311    }
312}
313
314impl From<GelfSerializerConfig> for SerializerConfig {
315    fn from(config: GelfSerializerConfig) -> Self {
316        Self::Gelf(config)
317    }
318}
319
320impl From<JsonSerializerConfig> for SerializerConfig {
321    fn from(config: JsonSerializerConfig) -> Self {
322        Self::Json(config)
323    }
324}
325
326impl From<LogfmtSerializerConfig> for SerializerConfig {
327    fn from(_: LogfmtSerializerConfig) -> Self {
328        Self::Logfmt
329    }
330}
331
332impl From<NativeSerializerConfig> for SerializerConfig {
333    fn from(_: NativeSerializerConfig) -> Self {
334        Self::Native
335    }
336}
337
338impl From<NativeJsonSerializerConfig> for SerializerConfig {
339    fn from(_: NativeJsonSerializerConfig) -> Self {
340        Self::NativeJson
341    }
342}
343
344impl From<ProtobufSerializerConfig> for SerializerConfig {
345    fn from(config: ProtobufSerializerConfig) -> Self {
346        Self::Protobuf(config)
347    }
348}
349
350impl From<RawMessageSerializerConfig> for SerializerConfig {
351    fn from(_: RawMessageSerializerConfig) -> Self {
352        Self::RawMessage
353    }
354}
355
356impl From<TextSerializerConfig> for SerializerConfig {
357    fn from(config: TextSerializerConfig) -> Self {
358        Self::Text(config)
359    }
360}
361
362impl SerializerConfig {
363    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
365        match self {
366            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
367                AvroSerializerConfig::new(avro.schema.clone()).build()?,
368            )),
369            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
370            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
371            SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
372            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
373            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
374            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
375            SerializerConfig::NativeJson => {
376                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
377            }
378            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
379            SerializerConfig::RawMessage => {
380                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
381            }
382            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
383        }
384    }
385
386    pub fn default_stream_framing(&self) -> FramingConfig {
388        match self {
389            SerializerConfig::Avro { .. } | SerializerConfig::Native => {
401                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
402            }
403            SerializerConfig::Protobuf(_) => {
404                FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
405            }
406            SerializerConfig::Cef(_)
407            | SerializerConfig::Csv(_)
408            | SerializerConfig::Json(_)
409            | SerializerConfig::Logfmt
410            | SerializerConfig::NativeJson
411            | SerializerConfig::RawMessage
412            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
413            SerializerConfig::Gelf(_) => {
414                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
415            }
416        }
417    }
418
419    pub fn input_type(&self) -> DataType {
421        match self {
422            SerializerConfig::Avro { avro } => {
423                AvroSerializerConfig::new(avro.schema.clone()).input_type()
424            }
425            SerializerConfig::Cef(config) => config.input_type(),
426            SerializerConfig::Csv(config) => config.input_type(),
427            SerializerConfig::Gelf(config) => config.input_type(),
428            SerializerConfig::Json(config) => config.input_type(),
429            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
430            SerializerConfig::Native => NativeSerializerConfig.input_type(),
431            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
432            SerializerConfig::Protobuf(config) => config.input_type(),
433            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
434            SerializerConfig::Text(config) => config.input_type(),
435        }
436    }
437
438    pub fn schema_requirement(&self) -> schema::Requirement {
440        match self {
441            SerializerConfig::Avro { avro } => {
442                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
443            }
444            SerializerConfig::Cef(config) => config.schema_requirement(),
445            SerializerConfig::Csv(config) => config.schema_requirement(),
446            SerializerConfig::Gelf(config) => config.schema_requirement(),
447            SerializerConfig::Json(config) => config.schema_requirement(),
448            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
449            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
450            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
451            SerializerConfig::Protobuf(config) => config.schema_requirement(),
452            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
453            SerializerConfig::Text(config) => config.schema_requirement(),
454        }
455    }
456}
457
458#[derive(Debug, Clone)]
460pub enum Serializer {
461    Avro(AvroSerializer),
463    Cef(CefSerializer),
465    Csv(CsvSerializer),
467    Gelf(GelfSerializer),
469    Json(JsonSerializer),
471    Logfmt(LogfmtSerializer),
473    Native(NativeSerializer),
475    NativeJson(NativeJsonSerializer),
477    Protobuf(ProtobufSerializer),
479    RawMessage(RawMessageSerializer),
481    Text(TextSerializer),
483}
484
485impl Serializer {
486    pub fn supports_json(&self) -> bool {
488        match self {
489            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
490            Serializer::Avro(_)
491            | Serializer::Cef(_)
492            | Serializer::Csv(_)
493            | Serializer::Logfmt(_)
494            | Serializer::Text(_)
495            | Serializer::Native(_)
496            | Serializer::Protobuf(_)
497            | Serializer::RawMessage(_) => false,
498        }
499    }
500
501    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
508        match self {
509            Serializer::Gelf(serializer) => serializer.to_json_value(event),
510            Serializer::Json(serializer) => serializer.to_json_value(event),
511            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
512            Serializer::Avro(_)
513            | Serializer::Cef(_)
514            | Serializer::Csv(_)
515            | Serializer::Logfmt(_)
516            | Serializer::Text(_)
517            | Serializer::Native(_)
518            | Serializer::Protobuf(_)
519            | Serializer::RawMessage(_) => {
520                panic!("Serializer does not support JSON")
521            }
522        }
523    }
524
525    pub fn chunker(&self) -> Option<Chunker> {
527        match self {
528            Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
529            _ => None,
530        }
531    }
532}
533
534impl From<AvroSerializer> for Serializer {
535    fn from(serializer: AvroSerializer) -> Self {
536        Self::Avro(serializer)
537    }
538}
539
540impl From<CefSerializer> for Serializer {
541    fn from(serializer: CefSerializer) -> Self {
542        Self::Cef(serializer)
543    }
544}
545
546impl From<CsvSerializer> for Serializer {
547    fn from(serializer: CsvSerializer) -> Self {
548        Self::Csv(serializer)
549    }
550}
551
552impl From<GelfSerializer> for Serializer {
553    fn from(serializer: GelfSerializer) -> Self {
554        Self::Gelf(serializer)
555    }
556}
557
558impl From<JsonSerializer> for Serializer {
559    fn from(serializer: JsonSerializer) -> Self {
560        Self::Json(serializer)
561    }
562}
563
564impl From<LogfmtSerializer> for Serializer {
565    fn from(serializer: LogfmtSerializer) -> Self {
566        Self::Logfmt(serializer)
567    }
568}
569
570impl From<NativeSerializer> for Serializer {
571    fn from(serializer: NativeSerializer) -> Self {
572        Self::Native(serializer)
573    }
574}
575
576impl From<NativeJsonSerializer> for Serializer {
577    fn from(serializer: NativeJsonSerializer) -> Self {
578        Self::NativeJson(serializer)
579    }
580}
581
582impl From<ProtobufSerializer> for Serializer {
583    fn from(serializer: ProtobufSerializer) -> Self {
584        Self::Protobuf(serializer)
585    }
586}
587
588impl From<RawMessageSerializer> for Serializer {
589    fn from(serializer: RawMessageSerializer) -> Self {
590        Self::RawMessage(serializer)
591    }
592}
593
594impl From<TextSerializer> for Serializer {
595    fn from(serializer: TextSerializer) -> Self {
596        Self::Text(serializer)
597    }
598}
599
600impl tokio_util::codec::Encoder<Event> for Serializer {
601    type Error = vector_common::Error;
602
603    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
604        match self {
605            Serializer::Avro(serializer) => serializer.encode(event, buffer),
606            Serializer::Cef(serializer) => serializer.encode(event, buffer),
607            Serializer::Csv(serializer) => serializer.encode(event, buffer),
608            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
609            Serializer::Json(serializer) => serializer.encode(event, buffer),
610            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
611            Serializer::Native(serializer) => serializer.encode(event, buffer),
612            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
613            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
614            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
615            Serializer::Text(serializer) => serializer.encode(event, buffer),
616        }
617    }
618}