codecs/encoding/
mod.rs

1//! A collection of support structures that are used in the process of encoding
2//! events into bytes.
3
4pub mod chunking;
5pub mod format;
6pub mod framing;
7
8use std::fmt::Debug;
9
10use bytes::BytesMut;
11pub use chunking::{Chunker, Chunking, GelfChunker};
12pub use format::{
13    AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
14    CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
15    JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer,
16    LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
17    NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig,
18    ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer,
19    TextSerializerConfig,
20};
21pub use framing::{
22    BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
23    CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
24    LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
25    VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig,
26};
27use vector_config::configurable_component;
28use vector_core::{config::DataType, event::Event, schema};
29
30/// An error that occurred while building an encoder.
31pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;
32
33/// An error that occurred while encoding structured events into byte frames.
34#[derive(Debug)]
35pub enum Error {
36    /// The error occurred while encoding the byte frame boundaries.
37    FramingError(BoxedFramingError),
38    /// The error occurred while serializing a structured event into bytes.
39    SerializingError(vector_common::Error),
40}
41
42impl std::fmt::Display for Error {
43    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
46            Self::SerializingError(error) => write!(formatter, "SerializingError({error})"),
47        }
48    }
49}
50
51impl std::error::Error for Error {}
52
53impl From<std::io::Error> for Error {
54    fn from(error: std::io::Error) -> Self {
55        Self::FramingError(Box::new(error))
56    }
57}
58
59/// Framing configuration.
60#[configurable_component]
61#[derive(Clone, Debug, Eq, PartialEq)]
62#[serde(tag = "method", rename_all = "snake_case")]
63#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
64pub enum FramingConfig {
65    /// Event data is not delimited at all.
66    Bytes,
67
68    /// Event data is delimited by a single ASCII (7-bit) character.
69    CharacterDelimited(CharacterDelimitedEncoderConfig),
70
71    /// Event data is prefixed with its length in bytes.
72    ///
73    /// The prefix is a 32-bit unsigned integer, little endian.
74    LengthDelimited(LengthDelimitedEncoderConfig),
75
76    /// Event data is delimited by a newline (LF) character.
77    NewlineDelimited,
78
79    /// Event data is prefixed with its length in bytes as a varint.
80    ///
81    /// This is compatible with protobuf's length-delimited encoding.
82    VarintLengthDelimited(VarintLengthDelimitedEncoderConfig),
83}
84
85impl From<BytesEncoderConfig> for FramingConfig {
86    fn from(_: BytesEncoderConfig) -> Self {
87        Self::Bytes
88    }
89}
90
91impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
92    fn from(config: CharacterDelimitedEncoderConfig) -> Self {
93        Self::CharacterDelimited(config)
94    }
95}
96
97impl From<LengthDelimitedEncoderConfig> for FramingConfig {
98    fn from(config: LengthDelimitedEncoderConfig) -> Self {
99        Self::LengthDelimited(config)
100    }
101}
102
103impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
104    fn from(_: NewlineDelimitedEncoderConfig) -> Self {
105        Self::NewlineDelimited
106    }
107}
108
109impl From<VarintLengthDelimitedEncoderConfig> for FramingConfig {
110    fn from(config: VarintLengthDelimitedEncoderConfig) -> Self {
111        Self::VarintLengthDelimited(config)
112    }
113}
114
115impl FramingConfig {
116    /// Build the `Framer` from this configuration.
117    pub fn build(&self) -> Framer {
118        match self {
119            FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
120            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
121            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
122            FramingConfig::NewlineDelimited => {
123                Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
124            }
125            FramingConfig::VarintLengthDelimited(config) => {
126                Framer::VarintLengthDelimited(config.build())
127            }
128        }
129    }
130}
131
132/// Produce a byte stream from byte frames.
133#[derive(Debug, Clone)]
134pub enum Framer {
135    /// Uses a `BytesEncoder` for framing.
136    Bytes(BytesEncoder),
137    /// Uses a `CharacterDelimitedEncoder` for framing.
138    CharacterDelimited(CharacterDelimitedEncoder),
139    /// Uses a `LengthDelimitedEncoder` for framing.
140    LengthDelimited(LengthDelimitedEncoder),
141    /// Uses a `NewlineDelimitedEncoder` for framing.
142    NewlineDelimited(NewlineDelimitedEncoder),
143    /// Uses a `VarintLengthDelimitedEncoder` for framing.
144    VarintLengthDelimited(VarintLengthDelimitedEncoder),
145    /// Uses an opaque `Encoder` implementation for framing.
146    Boxed(BoxedFramer),
147}
148
149impl From<BytesEncoder> for Framer {
150    fn from(encoder: BytesEncoder) -> Self {
151        Self::Bytes(encoder)
152    }
153}
154
155impl From<CharacterDelimitedEncoder> for Framer {
156    fn from(encoder: CharacterDelimitedEncoder) -> Self {
157        Self::CharacterDelimited(encoder)
158    }
159}
160
161impl From<LengthDelimitedEncoder> for Framer {
162    fn from(encoder: LengthDelimitedEncoder) -> Self {
163        Self::LengthDelimited(encoder)
164    }
165}
166
167impl From<NewlineDelimitedEncoder> for Framer {
168    fn from(encoder: NewlineDelimitedEncoder) -> Self {
169        Self::NewlineDelimited(encoder)
170    }
171}
172
173impl From<VarintLengthDelimitedEncoder> for Framer {
174    fn from(encoder: VarintLengthDelimitedEncoder) -> Self {
175        Self::VarintLengthDelimited(encoder)
176    }
177}
178
179impl From<BoxedFramer> for Framer {
180    fn from(encoder: BoxedFramer) -> Self {
181        Self::Boxed(encoder)
182    }
183}
184
185impl tokio_util::codec::Encoder<()> for Framer {
186    type Error = BoxedFramingError;
187
188    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
189        match self {
190            Framer::Bytes(framer) => framer.encode((), buffer),
191            Framer::CharacterDelimited(framer) => framer.encode((), buffer),
192            Framer::LengthDelimited(framer) => framer.encode((), buffer),
193            Framer::NewlineDelimited(framer) => framer.encode((), buffer),
194            Framer::VarintLengthDelimited(framer) => framer.encode((), buffer),
195            Framer::Boxed(framer) => framer.encode((), buffer),
196        }
197    }
198}
199
200/// Serializer configuration.
201#[configurable_component]
202#[derive(Clone, Debug)]
203#[serde(tag = "codec", rename_all = "snake_case")]
204#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
205pub enum SerializerConfig {
206    /// Encodes an event as an [Apache Avro][apache_avro] message.
207    ///
208    /// [apache_avro]: https://avro.apache.org/
209    Avro {
210        /// Apache Avro-specific encoder options.
211        avro: AvroSerializerOptions,
212    },
213
214    /// Encodes an event as a CEF (Common Event Format) formatted message.
215    ///
216    Cef(
217        /// Options for the CEF encoder.
218        CefSerializerConfig,
219    ),
220
221    /// Encodes an event as a CSV message.
222    ///
223    /// This codec must be configured with fields to encode.
224    ///
225    Csv(CsvSerializerConfig),
226
227    /// Encodes an event as a [GELF][gelf] message.
228    ///
229    /// This codec is experimental for the following reason:
230    ///
231    /// The GELF specification is more strict than the actual Graylog receiver.
232    /// Vector's encoder currently adheres more strictly to the GELF spec, with
233    /// the exception that some characters such as `@`  are allowed in field names.
234    ///
235    /// Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained
236    /// by Graylog and is much more relaxed than the GELF spec.
237    ///
238    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
239    /// the codec might continue to relax the enforcement of the specification.
240    ///
241    /// [gelf]: https://docs.graylog.org/docs/gelf
242    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
243    Gelf(GelfSerializerConfig),
244
245    /// Encodes an event as [JSON][json].
246    ///
247    /// [json]: https://www.json.org/
248    Json(JsonSerializerConfig),
249
250    /// Encodes an event as a [logfmt][logfmt] message.
251    ///
252    /// [logfmt]: https://brandur.org/logfmt
253    Logfmt,
254
255    /// Encodes an event in the [native Protocol Buffers format][vector_native_protobuf].
256    ///
257    /// This codec is **[experimental][experimental]**.
258    ///
259    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
260    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
261    Native,
262
263    /// Encodes an event in the [native JSON format][vector_native_json].
264    ///
265    /// This codec is **[experimental][experimental]**.
266    ///
267    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
268    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
269    NativeJson,
270
271    /// Encodes an event as a [Protobuf][protobuf] message.
272    ///
273    /// [protobuf]: https://protobuf.dev/
274    Protobuf(ProtobufSerializerConfig),
275
276    /// No encoding.
277    ///
278    /// This encoding uses the `message` field of a log event.
279    ///
280    /// Be careful if you are modifying your log events (for example, by using a `remap`
281    /// transform) and removing the message field while doing additional parsing on it, as this
282    /// could lead to the encoding emitting empty strings for the given event.
283    RawMessage,
284
285    /// Plain text encoding.
286    ///
287    /// This encoding uses the `message` field of a log event. For metrics, it uses an
288    /// encoding that resembles the Prometheus export format.
289    ///
290    /// Be careful if you are modifying your log events (for example, by using a `remap`
291    /// transform) and removing the message field while doing additional parsing on it, as this
292    /// could lead to the encoding emitting empty strings for the given event.
293    Text(TextSerializerConfig),
294}
295
296impl From<AvroSerializerConfig> for SerializerConfig {
297    fn from(config: AvroSerializerConfig) -> Self {
298        Self::Avro { avro: config.avro }
299    }
300}
301
302impl From<CefSerializerConfig> for SerializerConfig {
303    fn from(config: CefSerializerConfig) -> Self {
304        Self::Cef(config)
305    }
306}
307
308impl From<CsvSerializerConfig> for SerializerConfig {
309    fn from(config: CsvSerializerConfig) -> Self {
310        Self::Csv(config)
311    }
312}
313
314impl From<GelfSerializerConfig> for SerializerConfig {
315    fn from(config: GelfSerializerConfig) -> Self {
316        Self::Gelf(config)
317    }
318}
319
320impl From<JsonSerializerConfig> for SerializerConfig {
321    fn from(config: JsonSerializerConfig) -> Self {
322        Self::Json(config)
323    }
324}
325
326impl From<LogfmtSerializerConfig> for SerializerConfig {
327    fn from(_: LogfmtSerializerConfig) -> Self {
328        Self::Logfmt
329    }
330}
331
332impl From<NativeSerializerConfig> for SerializerConfig {
333    fn from(_: NativeSerializerConfig) -> Self {
334        Self::Native
335    }
336}
337
338impl From<NativeJsonSerializerConfig> for SerializerConfig {
339    fn from(_: NativeJsonSerializerConfig) -> Self {
340        Self::NativeJson
341    }
342}
343
344impl From<ProtobufSerializerConfig> for SerializerConfig {
345    fn from(config: ProtobufSerializerConfig) -> Self {
346        Self::Protobuf(config)
347    }
348}
349
350impl From<RawMessageSerializerConfig> for SerializerConfig {
351    fn from(_: RawMessageSerializerConfig) -> Self {
352        Self::RawMessage
353    }
354}
355
356impl From<TextSerializerConfig> for SerializerConfig {
357    fn from(config: TextSerializerConfig) -> Self {
358        Self::Text(config)
359    }
360}
361
362impl SerializerConfig {
363    /// Build the `Serializer` from this configuration.
364    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
365        match self {
366            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
367                AvroSerializerConfig::new(avro.schema.clone()).build()?,
368            )),
369            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
370            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
371            SerializerConfig::Gelf(config) => Ok(Serializer::Gelf(config.build())),
372            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
373            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
374            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
375            SerializerConfig::NativeJson => {
376                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
377            }
378            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
379            SerializerConfig::RawMessage => {
380                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
381            }
382            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
383        }
384    }
385
386    /// Return an appropriate default framer for the given serializer.
387    pub fn default_stream_framing(&self) -> FramingConfig {
388        match self {
389            // TODO: Technically, Avro messages are supposed to be framed[1] as a vector of
390            // length-delimited buffers -- `len` as big-endian 32-bit unsigned integer, followed by
391            // `len` bytes -- with a "zero-length buffer" to terminate the overall message... which
392            // our length delimited framer obviously will not do.
393            //
394            // This is OK for now, because the Avro serializer is more ceremonial than anything
395            // else, existing to curry serializer config options to Pulsar's native client, not to
396            // actually serialize the bytes themselves... but we're still exposing this method and
397            // we should do so accurately, even if practically it doesn't need to be.
398            //
399            // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing
400            SerializerConfig::Avro { .. } | SerializerConfig::Native => {
401                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
402            }
403            SerializerConfig::Protobuf(_) => {
404                FramingConfig::VarintLengthDelimited(VarintLengthDelimitedEncoderConfig::default())
405            }
406            SerializerConfig::Cef(_)
407            | SerializerConfig::Csv(_)
408            | SerializerConfig::Json(_)
409            | SerializerConfig::Logfmt
410            | SerializerConfig::NativeJson
411            | SerializerConfig::RawMessage
412            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
413            SerializerConfig::Gelf(_) => {
414                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
415            }
416        }
417    }
418
419    /// The data type of events that are accepted by this `Serializer`.
420    pub fn input_type(&self) -> DataType {
421        match self {
422            SerializerConfig::Avro { avro } => {
423                AvroSerializerConfig::new(avro.schema.clone()).input_type()
424            }
425            SerializerConfig::Cef(config) => config.input_type(),
426            SerializerConfig::Csv(config) => config.input_type(),
427            SerializerConfig::Gelf(config) => config.input_type(),
428            SerializerConfig::Json(config) => config.input_type(),
429            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
430            SerializerConfig::Native => NativeSerializerConfig.input_type(),
431            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
432            SerializerConfig::Protobuf(config) => config.input_type(),
433            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
434            SerializerConfig::Text(config) => config.input_type(),
435        }
436    }
437
438    /// The schema required by the serializer.
439    pub fn schema_requirement(&self) -> schema::Requirement {
440        match self {
441            SerializerConfig::Avro { avro } => {
442                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
443            }
444            SerializerConfig::Cef(config) => config.schema_requirement(),
445            SerializerConfig::Csv(config) => config.schema_requirement(),
446            SerializerConfig::Gelf(config) => config.schema_requirement(),
447            SerializerConfig::Json(config) => config.schema_requirement(),
448            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
449            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
450            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
451            SerializerConfig::Protobuf(config) => config.schema_requirement(),
452            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
453            SerializerConfig::Text(config) => config.schema_requirement(),
454        }
455    }
456}
457
458/// Serialize structured events as bytes.
459#[derive(Debug, Clone)]
460pub enum Serializer {
461    /// Uses an `AvroSerializer` for serialization.
462    Avro(AvroSerializer),
463    /// Uses a `CefSerializer` for serialization.
464    Cef(CefSerializer),
465    /// Uses a `CsvSerializer` for serialization.
466    Csv(CsvSerializer),
467    /// Uses a `GelfSerializer` for serialization.
468    Gelf(GelfSerializer),
469    /// Uses a `JsonSerializer` for serialization.
470    Json(JsonSerializer),
471    /// Uses a `LogfmtSerializer` for serialization.
472    Logfmt(LogfmtSerializer),
473    /// Uses a `NativeSerializer` for serialization.
474    Native(NativeSerializer),
475    /// Uses a `NativeJsonSerializer` for serialization.
476    NativeJson(NativeJsonSerializer),
477    /// Uses a `ProtobufSerializer` for serialization.
478    Protobuf(ProtobufSerializer),
479    /// Uses a `RawMessageSerializer` for serialization.
480    RawMessage(RawMessageSerializer),
481    /// Uses a `TextSerializer` for serialization.
482    Text(TextSerializer),
483}
484
485impl Serializer {
486    /// Check if the serializer supports encoding an event to JSON via `Serializer::to_json_value`.
487    pub fn supports_json(&self) -> bool {
488        match self {
489            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
490            Serializer::Avro(_)
491            | Serializer::Cef(_)
492            | Serializer::Csv(_)
493            | Serializer::Logfmt(_)
494            | Serializer::Text(_)
495            | Serializer::Native(_)
496            | Serializer::Protobuf(_)
497            | Serializer::RawMessage(_) => false,
498        }
499    }
500
501    /// Encode event and represent it as JSON value.
502    ///
503    /// # Panics
504    ///
505    /// Panics if the serializer does not support encoding to JSON. Call `Serializer::supports_json`
506    /// if you need to determine the capability to encode to JSON at runtime.
507    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
508        match self {
509            Serializer::Gelf(serializer) => serializer.to_json_value(event),
510            Serializer::Json(serializer) => serializer.to_json_value(event),
511            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
512            Serializer::Avro(_)
513            | Serializer::Cef(_)
514            | Serializer::Csv(_)
515            | Serializer::Logfmt(_)
516            | Serializer::Text(_)
517            | Serializer::Native(_)
518            | Serializer::Protobuf(_)
519            | Serializer::RawMessage(_) => {
520                panic!("Serializer does not support JSON")
521            }
522        }
523    }
524
525    /// Returns the chunking implementation for the serializer, if any is supported.
526    pub fn chunker(&self) -> Option<Chunker> {
527        match self {
528            Serializer::Gelf(gelf) => Some(Chunker::Gelf(gelf.chunker())),
529            _ => None,
530        }
531    }
532}
533
534impl From<AvroSerializer> for Serializer {
535    fn from(serializer: AvroSerializer) -> Self {
536        Self::Avro(serializer)
537    }
538}
539
540impl From<CefSerializer> for Serializer {
541    fn from(serializer: CefSerializer) -> Self {
542        Self::Cef(serializer)
543    }
544}
545
546impl From<CsvSerializer> for Serializer {
547    fn from(serializer: CsvSerializer) -> Self {
548        Self::Csv(serializer)
549    }
550}
551
552impl From<GelfSerializer> for Serializer {
553    fn from(serializer: GelfSerializer) -> Self {
554        Self::Gelf(serializer)
555    }
556}
557
558impl From<JsonSerializer> for Serializer {
559    fn from(serializer: JsonSerializer) -> Self {
560        Self::Json(serializer)
561    }
562}
563
564impl From<LogfmtSerializer> for Serializer {
565    fn from(serializer: LogfmtSerializer) -> Self {
566        Self::Logfmt(serializer)
567    }
568}
569
570impl From<NativeSerializer> for Serializer {
571    fn from(serializer: NativeSerializer) -> Self {
572        Self::Native(serializer)
573    }
574}
575
576impl From<NativeJsonSerializer> for Serializer {
577    fn from(serializer: NativeJsonSerializer) -> Self {
578        Self::NativeJson(serializer)
579    }
580}
581
582impl From<ProtobufSerializer> for Serializer {
583    fn from(serializer: ProtobufSerializer) -> Self {
584        Self::Protobuf(serializer)
585    }
586}
587
588impl From<RawMessageSerializer> for Serializer {
589    fn from(serializer: RawMessageSerializer) -> Self {
590        Self::RawMessage(serializer)
591    }
592}
593
594impl From<TextSerializer> for Serializer {
595    fn from(serializer: TextSerializer) -> Self {
596        Self::Text(serializer)
597    }
598}
599
600impl tokio_util::codec::Encoder<Event> for Serializer {
601    type Error = vector_common::Error;
602
603    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
604        match self {
605            Serializer::Avro(serializer) => serializer.encode(event, buffer),
606            Serializer::Cef(serializer) => serializer.encode(event, buffer),
607            Serializer::Csv(serializer) => serializer.encode(event, buffer),
608            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
609            Serializer::Json(serializer) => serializer.encode(event, buffer),
610            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
611            Serializer::Native(serializer) => serializer.encode(event, buffer),
612            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
613            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
614            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
615            Serializer::Text(serializer) => serializer.encode(event, buffer),
616        }
617    }
618}