codecs/encoding/
mod.rs

1//! A collection of support structures that are used in the process of encoding
2//! events into bytes.
3
4pub mod format;
5pub mod framing;
6
7use std::fmt::Debug;
8
9use bytes::BytesMut;
10pub use format::{
11    AvroSerializer, AvroSerializerConfig, AvroSerializerOptions, CefSerializer,
12    CefSerializerConfig, CsvSerializer, CsvSerializerConfig, GelfSerializer, GelfSerializerConfig,
13    JsonSerializer, JsonSerializerConfig, JsonSerializerOptions, LogfmtSerializer,
14    LogfmtSerializerConfig, NativeJsonSerializer, NativeJsonSerializerConfig, NativeSerializer,
15    NativeSerializerConfig, ProtobufSerializer, ProtobufSerializerConfig,
16    ProtobufSerializerOptions, RawMessageSerializer, RawMessageSerializerConfig, TextSerializer,
17    TextSerializerConfig,
18};
19pub use framing::{
20    BoxedFramer, BoxedFramingError, BytesEncoder, BytesEncoderConfig, CharacterDelimitedEncoder,
21    CharacterDelimitedEncoderConfig, CharacterDelimitedEncoderOptions, LengthDelimitedEncoder,
22    LengthDelimitedEncoderConfig, NewlineDelimitedEncoder, NewlineDelimitedEncoderConfig,
23};
24use vector_config::configurable_component;
25use vector_core::{config::DataType, event::Event, schema};
26
27/// An error that occurred while building an encoder.
28pub type BuildError = Box<dyn std::error::Error + Send + Sync + 'static>;
29
30/// An error that occurred while encoding structured events into byte frames.
31#[derive(Debug)]
32pub enum Error {
33    /// The error occurred while encoding the byte frame boundaries.
34    FramingError(BoxedFramingError),
35    /// The error occurred while serializing a structured event into bytes.
36    SerializingError(vector_common::Error),
37}
38
39impl std::fmt::Display for Error {
40    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
43            Self::SerializingError(error) => write!(formatter, "SerializingError({error})"),
44        }
45    }
46}
47
48impl std::error::Error for Error {}
49
50impl From<std::io::Error> for Error {
51    fn from(error: std::io::Error) -> Self {
52        Self::FramingError(Box::new(error))
53    }
54}
55
56/// Framing configuration.
57#[configurable_component]
58#[derive(Clone, Debug, Eq, PartialEq)]
59#[serde(tag = "method", rename_all = "snake_case")]
60#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
61pub enum FramingConfig {
62    /// Event data is not delimited at all.
63    Bytes,
64
65    /// Event data is delimited by a single ASCII (7-bit) character.
66    CharacterDelimited(CharacterDelimitedEncoderConfig),
67
68    /// Event data is prefixed with its length in bytes.
69    ///
70    /// The prefix is a 32-bit unsigned integer, little endian.
71    LengthDelimited(LengthDelimitedEncoderConfig),
72
73    /// Event data is delimited by a newline (LF) character.
74    NewlineDelimited,
75}
76
77impl From<BytesEncoderConfig> for FramingConfig {
78    fn from(_: BytesEncoderConfig) -> Self {
79        Self::Bytes
80    }
81}
82
83impl From<CharacterDelimitedEncoderConfig> for FramingConfig {
84    fn from(config: CharacterDelimitedEncoderConfig) -> Self {
85        Self::CharacterDelimited(config)
86    }
87}
88
89impl From<LengthDelimitedEncoderConfig> for FramingConfig {
90    fn from(config: LengthDelimitedEncoderConfig) -> Self {
91        Self::LengthDelimited(config)
92    }
93}
94
95impl From<NewlineDelimitedEncoderConfig> for FramingConfig {
96    fn from(_: NewlineDelimitedEncoderConfig) -> Self {
97        Self::NewlineDelimited
98    }
99}
100
101impl FramingConfig {
102    /// Build the `Framer` from this configuration.
103    pub fn build(&self) -> Framer {
104        match self {
105            FramingConfig::Bytes => Framer::Bytes(BytesEncoderConfig.build()),
106            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
107            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
108            FramingConfig::NewlineDelimited => {
109                Framer::NewlineDelimited(NewlineDelimitedEncoderConfig.build())
110            }
111        }
112    }
113}
114
115/// Produce a byte stream from byte frames.
116#[derive(Debug, Clone)]
117pub enum Framer {
118    /// Uses a `BytesEncoder` for framing.
119    Bytes(BytesEncoder),
120    /// Uses a `CharacterDelimitedEncoder` for framing.
121    CharacterDelimited(CharacterDelimitedEncoder),
122    /// Uses a `LengthDelimitedEncoder` for framing.
123    LengthDelimited(LengthDelimitedEncoder),
124    /// Uses a `NewlineDelimitedEncoder` for framing.
125    NewlineDelimited(NewlineDelimitedEncoder),
126    /// Uses an opaque `Encoder` implementation for framing.
127    Boxed(BoxedFramer),
128}
129
130impl From<BytesEncoder> for Framer {
131    fn from(encoder: BytesEncoder) -> Self {
132        Self::Bytes(encoder)
133    }
134}
135
136impl From<CharacterDelimitedEncoder> for Framer {
137    fn from(encoder: CharacterDelimitedEncoder) -> Self {
138        Self::CharacterDelimited(encoder)
139    }
140}
141
142impl From<LengthDelimitedEncoder> for Framer {
143    fn from(encoder: LengthDelimitedEncoder) -> Self {
144        Self::LengthDelimited(encoder)
145    }
146}
147
148impl From<NewlineDelimitedEncoder> for Framer {
149    fn from(encoder: NewlineDelimitedEncoder) -> Self {
150        Self::NewlineDelimited(encoder)
151    }
152}
153
154impl From<BoxedFramer> for Framer {
155    fn from(encoder: BoxedFramer) -> Self {
156        Self::Boxed(encoder)
157    }
158}
159
160impl tokio_util::codec::Encoder<()> for Framer {
161    type Error = BoxedFramingError;
162
163    fn encode(&mut self, _: (), buffer: &mut BytesMut) -> Result<(), Self::Error> {
164        match self {
165            Framer::Bytes(framer) => framer.encode((), buffer),
166            Framer::CharacterDelimited(framer) => framer.encode((), buffer),
167            Framer::LengthDelimited(framer) => framer.encode((), buffer),
168            Framer::NewlineDelimited(framer) => framer.encode((), buffer),
169            Framer::Boxed(framer) => framer.encode((), buffer),
170        }
171    }
172}
173
174/// Serializer configuration.
175#[configurable_component]
176#[derive(Clone, Debug)]
177#[serde(tag = "codec", rename_all = "snake_case")]
178#[configurable(metadata(docs::enum_tag_description = "The codec to use for encoding events."))]
179pub enum SerializerConfig {
180    /// Encodes an event as an [Apache Avro][apache_avro] message.
181    ///
182    /// [apache_avro]: https://avro.apache.org/
183    Avro {
184        /// Apache Avro-specific encoder options.
185        avro: AvroSerializerOptions,
186    },
187
188    /// Encodes an event as a CEF (Common Event Format) formatted message.
189    ///
190    Cef(
191        /// Options for the CEF encoder.
192        CefSerializerConfig,
193    ),
194
195    /// Encodes an event as a CSV message.
196    ///
197    /// This codec must be configured with fields to encode.
198    ///
199    Csv(CsvSerializerConfig),
200
201    /// Encodes an event as a [GELF][gelf] message.
202    ///
203    /// This codec is experimental for the following reason:
204    ///
205    /// The GELF specification is more strict than the actual Graylog receiver.
206    /// Vector's encoder currently adheres more strictly to the GELF spec, with
207    /// the exception that some characters such as `@`  are allowed in field names.
208    ///
209    /// Other GELF codecs, such as Loki's, use a [Go SDK][implementation] that is maintained
210    /// by Graylog and is much more relaxed than the GELF spec.
211    ///
212    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
213    /// the codec might continue to relax the enforcement of the specification.
214    ///
215    /// [gelf]: https://docs.graylog.org/docs/gelf
216    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
217    Gelf,
218
219    /// Encodes an event as [JSON][json].
220    ///
221    /// [json]: https://www.json.org/
222    Json(JsonSerializerConfig),
223
224    /// Encodes an event as a [logfmt][logfmt] message.
225    ///
226    /// [logfmt]: https://brandur.org/logfmt
227    Logfmt,
228
229    /// Encodes an event in the [native Protocol Buffers format][vector_native_protobuf].
230    ///
231    /// This codec is **[experimental][experimental]**.
232    ///
233    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
234    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
235    Native,
236
237    /// Encodes an event in the [native JSON format][vector_native_json].
238    ///
239    /// This codec is **[experimental][experimental]**.
240    ///
241    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
242    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
243    NativeJson,
244
245    /// Encodes an event as a [Protobuf][protobuf] message.
246    ///
247    /// [protobuf]: https://protobuf.dev/
248    Protobuf(ProtobufSerializerConfig),
249
250    /// No encoding.
251    ///
252    /// This encoding uses the `message` field of a log event.
253    ///
254    /// Be careful if you are modifying your log events (for example, by using a `remap`
255    /// transform) and removing the message field while doing additional parsing on it, as this
256    /// could lead to the encoding emitting empty strings for the given event.
257    RawMessage,
258
259    /// Plain text encoding.
260    ///
261    /// This encoding uses the `message` field of a log event. For metrics, it uses an
262    /// encoding that resembles the Prometheus export format.
263    ///
264    /// Be careful if you are modifying your log events (for example, by using a `remap`
265    /// transform) and removing the message field while doing additional parsing on it, as this
266    /// could lead to the encoding emitting empty strings for the given event.
267    Text(TextSerializerConfig),
268}
269
270impl From<AvroSerializerConfig> for SerializerConfig {
271    fn from(config: AvroSerializerConfig) -> Self {
272        Self::Avro { avro: config.avro }
273    }
274}
275
276impl From<CefSerializerConfig> for SerializerConfig {
277    fn from(config: CefSerializerConfig) -> Self {
278        Self::Cef(config)
279    }
280}
281
282impl From<CsvSerializerConfig> for SerializerConfig {
283    fn from(config: CsvSerializerConfig) -> Self {
284        Self::Csv(config)
285    }
286}
287
288impl From<GelfSerializerConfig> for SerializerConfig {
289    fn from(_: GelfSerializerConfig) -> Self {
290        Self::Gelf
291    }
292}
293
294impl From<JsonSerializerConfig> for SerializerConfig {
295    fn from(config: JsonSerializerConfig) -> Self {
296        Self::Json(config)
297    }
298}
299
300impl From<LogfmtSerializerConfig> for SerializerConfig {
301    fn from(_: LogfmtSerializerConfig) -> Self {
302        Self::Logfmt
303    }
304}
305
306impl From<NativeSerializerConfig> for SerializerConfig {
307    fn from(_: NativeSerializerConfig) -> Self {
308        Self::Native
309    }
310}
311
312impl From<NativeJsonSerializerConfig> for SerializerConfig {
313    fn from(_: NativeJsonSerializerConfig) -> Self {
314        Self::NativeJson
315    }
316}
317
318impl From<ProtobufSerializerConfig> for SerializerConfig {
319    fn from(config: ProtobufSerializerConfig) -> Self {
320        Self::Protobuf(config)
321    }
322}
323
324impl From<RawMessageSerializerConfig> for SerializerConfig {
325    fn from(_: RawMessageSerializerConfig) -> Self {
326        Self::RawMessage
327    }
328}
329
330impl From<TextSerializerConfig> for SerializerConfig {
331    fn from(config: TextSerializerConfig) -> Self {
332        Self::Text(config)
333    }
334}
335
336impl SerializerConfig {
337    /// Build the `Serializer` from this configuration.
338    pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
339        match self {
340            SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
341                AvroSerializerConfig::new(avro.schema.clone()).build()?,
342            )),
343            SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
344            SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
345            SerializerConfig::Gelf => Ok(Serializer::Gelf(GelfSerializerConfig::new().build())),
346            SerializerConfig::Json(config) => Ok(Serializer::Json(config.build())),
347            SerializerConfig::Logfmt => Ok(Serializer::Logfmt(LogfmtSerializerConfig.build())),
348            SerializerConfig::Native => Ok(Serializer::Native(NativeSerializerConfig.build())),
349            SerializerConfig::NativeJson => {
350                Ok(Serializer::NativeJson(NativeJsonSerializerConfig.build()))
351            }
352            SerializerConfig::Protobuf(config) => Ok(Serializer::Protobuf(config.build()?)),
353            SerializerConfig::RawMessage => {
354                Ok(Serializer::RawMessage(RawMessageSerializerConfig.build()))
355            }
356            SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())),
357        }
358    }
359
360    /// Return an appropriate default framer for the given serializer.
361    pub fn default_stream_framing(&self) -> FramingConfig {
362        match self {
363            // TODO: Technically, Avro messages are supposed to be framed[1] as a vector of
364            // length-delimited buffers -- `len` as big-endian 32-bit unsigned integer, followed by
365            // `len` bytes -- with a "zero-length buffer" to terminate the overall message... which
366            // our length delimited framer obviously will not do.
367            //
368            // This is OK for now, because the Avro serializer is more ceremonial than anything
369            // else, existing to curry serializer config options to Pulsar's native client, not to
370            // actually serialize the bytes themselves... but we're still exposing this method and
371            // we should do so accurately, even if practically it doesn't need to be.
372            //
373            // [1]: https://avro.apache.org/docs/1.11.1/specification/_print/#message-framing
374            SerializerConfig::Avro { .. }
375            | SerializerConfig::Native
376            | SerializerConfig::Protobuf(_) => {
377                FramingConfig::LengthDelimited(LengthDelimitedEncoderConfig::default())
378            }
379            SerializerConfig::Cef(_)
380            | SerializerConfig::Csv(_)
381            | SerializerConfig::Json(_)
382            | SerializerConfig::Logfmt
383            | SerializerConfig::NativeJson
384            | SerializerConfig::RawMessage
385            | SerializerConfig::Text(_) => FramingConfig::NewlineDelimited,
386            SerializerConfig::Gelf => {
387                FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0))
388            }
389        }
390    }
391
392    /// The data type of events that are accepted by this `Serializer`.
393    pub fn input_type(&self) -> DataType {
394        match self {
395            SerializerConfig::Avro { avro } => {
396                AvroSerializerConfig::new(avro.schema.clone()).input_type()
397            }
398            SerializerConfig::Cef(config) => config.input_type(),
399            SerializerConfig::Csv(config) => config.input_type(),
400            SerializerConfig::Gelf => GelfSerializerConfig::input_type(),
401            SerializerConfig::Json(config) => config.input_type(),
402            SerializerConfig::Logfmt => LogfmtSerializerConfig.input_type(),
403            SerializerConfig::Native => NativeSerializerConfig.input_type(),
404            SerializerConfig::NativeJson => NativeJsonSerializerConfig.input_type(),
405            SerializerConfig::Protobuf(config) => config.input_type(),
406            SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(),
407            SerializerConfig::Text(config) => config.input_type(),
408        }
409    }
410
411    /// The schema required by the serializer.
412    pub fn schema_requirement(&self) -> schema::Requirement {
413        match self {
414            SerializerConfig::Avro { avro } => {
415                AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
416            }
417            SerializerConfig::Cef(config) => config.schema_requirement(),
418            SerializerConfig::Csv(config) => config.schema_requirement(),
419            SerializerConfig::Gelf => GelfSerializerConfig::schema_requirement(),
420            SerializerConfig::Json(config) => config.schema_requirement(),
421            SerializerConfig::Logfmt => LogfmtSerializerConfig.schema_requirement(),
422            SerializerConfig::Native => NativeSerializerConfig.schema_requirement(),
423            SerializerConfig::NativeJson => NativeJsonSerializerConfig.schema_requirement(),
424            SerializerConfig::Protobuf(config) => config.schema_requirement(),
425            SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(),
426            SerializerConfig::Text(config) => config.schema_requirement(),
427        }
428    }
429}
430
431/// Serialize structured events as bytes.
432#[derive(Debug, Clone)]
433pub enum Serializer {
434    /// Uses an `AvroSerializer` for serialization.
435    Avro(AvroSerializer),
436    /// Uses a `CefSerializer` for serialization.
437    Cef(CefSerializer),
438    /// Uses a `CsvSerializer` for serialization.
439    Csv(CsvSerializer),
440    /// Uses a `GelfSerializer` for serialization.
441    Gelf(GelfSerializer),
442    /// Uses a `JsonSerializer` for serialization.
443    Json(JsonSerializer),
444    /// Uses a `LogfmtSerializer` for serialization.
445    Logfmt(LogfmtSerializer),
446    /// Uses a `NativeSerializer` for serialization.
447    Native(NativeSerializer),
448    /// Uses a `NativeJsonSerializer` for serialization.
449    NativeJson(NativeJsonSerializer),
450    /// Uses a `ProtobufSerializer` for serialization.
451    Protobuf(ProtobufSerializer),
452    /// Uses a `RawMessageSerializer` for serialization.
453    RawMessage(RawMessageSerializer),
454    /// Uses a `TextSerializer` for serialization.
455    Text(TextSerializer),
456}
457
458impl Serializer {
459    /// Check if the serializer supports encoding an event to JSON via `Serializer::to_json_value`.
460    pub fn supports_json(&self) -> bool {
461        match self {
462            Serializer::Json(_) | Serializer::NativeJson(_) | Serializer::Gelf(_) => true,
463            Serializer::Avro(_)
464            | Serializer::Cef(_)
465            | Serializer::Csv(_)
466            | Serializer::Logfmt(_)
467            | Serializer::Text(_)
468            | Serializer::Native(_)
469            | Serializer::Protobuf(_)
470            | Serializer::RawMessage(_) => false,
471        }
472    }
473
474    /// Encode event and represent it as JSON value.
475    ///
476    /// # Panics
477    ///
478    /// Panics if the serializer does not support encoding to JSON. Call `Serializer::supports_json`
479    /// if you need to determine the capability to encode to JSON at runtime.
480    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
481        match self {
482            Serializer::Gelf(serializer) => serializer.to_json_value(event),
483            Serializer::Json(serializer) => serializer.to_json_value(event),
484            Serializer::NativeJson(serializer) => serializer.to_json_value(event),
485            Serializer::Avro(_)
486            | Serializer::Cef(_)
487            | Serializer::Csv(_)
488            | Serializer::Logfmt(_)
489            | Serializer::Text(_)
490            | Serializer::Native(_)
491            | Serializer::Protobuf(_)
492            | Serializer::RawMessage(_) => {
493                panic!("Serializer does not support JSON")
494            }
495        }
496    }
497}
498
499impl From<AvroSerializer> for Serializer {
500    fn from(serializer: AvroSerializer) -> Self {
501        Self::Avro(serializer)
502    }
503}
504
505impl From<CefSerializer> for Serializer {
506    fn from(serializer: CefSerializer) -> Self {
507        Self::Cef(serializer)
508    }
509}
510
511impl From<CsvSerializer> for Serializer {
512    fn from(serializer: CsvSerializer) -> Self {
513        Self::Csv(serializer)
514    }
515}
516
517impl From<GelfSerializer> for Serializer {
518    fn from(serializer: GelfSerializer) -> Self {
519        Self::Gelf(serializer)
520    }
521}
522
523impl From<JsonSerializer> for Serializer {
524    fn from(serializer: JsonSerializer) -> Self {
525        Self::Json(serializer)
526    }
527}
528
529impl From<LogfmtSerializer> for Serializer {
530    fn from(serializer: LogfmtSerializer) -> Self {
531        Self::Logfmt(serializer)
532    }
533}
534
535impl From<NativeSerializer> for Serializer {
536    fn from(serializer: NativeSerializer) -> Self {
537        Self::Native(serializer)
538    }
539}
540
541impl From<NativeJsonSerializer> for Serializer {
542    fn from(serializer: NativeJsonSerializer) -> Self {
543        Self::NativeJson(serializer)
544    }
545}
546
547impl From<ProtobufSerializer> for Serializer {
548    fn from(serializer: ProtobufSerializer) -> Self {
549        Self::Protobuf(serializer)
550    }
551}
552
553impl From<RawMessageSerializer> for Serializer {
554    fn from(serializer: RawMessageSerializer) -> Self {
555        Self::RawMessage(serializer)
556    }
557}
558
559impl From<TextSerializer> for Serializer {
560    fn from(serializer: TextSerializer) -> Self {
561        Self::Text(serializer)
562    }
563}
564
565impl tokio_util::codec::Encoder<Event> for Serializer {
566    type Error = vector_common::Error;
567
568    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
569        match self {
570            Serializer::Avro(serializer) => serializer.encode(event, buffer),
571            Serializer::Cef(serializer) => serializer.encode(event, buffer),
572            Serializer::Csv(serializer) => serializer.encode(event, buffer),
573            Serializer::Gelf(serializer) => serializer.encode(event, buffer),
574            Serializer::Json(serializer) => serializer.encode(event, buffer),
575            Serializer::Logfmt(serializer) => serializer.encode(event, buffer),
576            Serializer::Native(serializer) => serializer.encode(event, buffer),
577            Serializer::NativeJson(serializer) => serializer.encode(event, buffer),
578            Serializer::Protobuf(serializer) => serializer.encode(event, buffer),
579            Serializer::RawMessage(serializer) => serializer.encode(event, buffer),
580            Serializer::Text(serializer) => serializer.encode(event, buffer),
581        }
582    }
583}