codecs/decoding/
mod.rs

1//! A collection of support structures that are used in the process of decoding
2//! bytes into events.
3
4mod error;
5pub mod format;
6pub mod framing;
7
8use std::fmt::Debug;
9
10use bytes::{Bytes, BytesMut};
11pub use error::StreamDecodingError;
12pub use format::{
13    BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
14    GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
15    InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
16    NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
17    NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
18    ProtobufDeserializerConfig, ProtobufDeserializerOptions,
19};
20#[cfg(feature = "syslog")]
21pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
22pub use framing::{
23    BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
24    CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
25    ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
26    LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
27    NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
28    OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
29};
30use smallvec::SmallVec;
31use vector_config::configurable_component;
32use vector_core::{
33    config::{DataType, LogNamespace},
34    event::Event,
35    schema,
36};
37
38use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
39use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
40
41/// An error that occurred while decoding structured events from a byte stream /
42/// byte messages.
43#[derive(Debug)]
44pub enum Error {
45    /// The error occurred while producing byte frames from the byte stream /
46    /// byte messages.
47    FramingError(BoxedFramingError),
48    /// The error occurred while parsing structured events from a byte frame.
49    ParsingError(vector_common::Error),
50}
51
52impl std::fmt::Display for Error {
53    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
56            Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
57        }
58    }
59}
60
61impl std::error::Error for Error {}
62
63impl From<std::io::Error> for Error {
64    fn from(error: std::io::Error) -> Self {
65        Self::FramingError(Box::new(error))
66    }
67}
68
69impl StreamDecodingError for Error {
70    fn can_continue(&self) -> bool {
71        match self {
72            Self::FramingError(error) => error.can_continue(),
73            Self::ParsingError(_) => true,
74        }
75    }
76}
77
78/// Framing configuration.
79///
80/// Framing handles how events are separated when encoded in a raw byte form, where each event is
81/// a frame that must be prefixed, or delimited, in a way that marks where an event begins and
82/// ends within the byte stream.
83#[configurable_component]
84#[derive(Clone, Debug)]
85#[serde(tag = "method", rename_all = "snake_case")]
86#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
87pub enum FramingConfig {
88    /// Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
89    Bytes,
90
91    /// Byte frames which are delimited by a chosen character.
92    CharacterDelimited(CharacterDelimitedDecoderConfig),
93
94    /// Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
95    LengthDelimited(LengthDelimitedDecoderConfig),
96
97    /// Byte frames which are delimited by a newline character.
98    NewlineDelimited(NewlineDelimitedDecoderConfig),
99
100    /// Byte frames according to the [octet counting][octet_counting] format.
101    ///
102    /// [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1
103    OctetCounting(OctetCountingDecoderConfig),
104
105    /// Byte frames which are chunked GELF messages.
106    ///
107    /// [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html
108    ChunkedGelf(ChunkedGelfDecoderConfig),
109
110    /// Byte frames which are prefixed by a varint indicating the length.
111    /// This is compatible with protobuf's length-delimited encoding.
112    VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
113}
114
115impl From<BytesDecoderConfig> for FramingConfig {
116    fn from(_: BytesDecoderConfig) -> Self {
117        Self::Bytes
118    }
119}
120
121impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
122    fn from(config: CharacterDelimitedDecoderConfig) -> Self {
123        Self::CharacterDelimited(config)
124    }
125}
126
127impl From<LengthDelimitedDecoderConfig> for FramingConfig {
128    fn from(config: LengthDelimitedDecoderConfig) -> Self {
129        Self::LengthDelimited(config)
130    }
131}
132
133impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
134    fn from(config: NewlineDelimitedDecoderConfig) -> Self {
135        Self::NewlineDelimited(config)
136    }
137}
138
139impl From<OctetCountingDecoderConfig> for FramingConfig {
140    fn from(config: OctetCountingDecoderConfig) -> Self {
141        Self::OctetCounting(config)
142    }
143}
144
145impl From<ChunkedGelfDecoderConfig> for FramingConfig {
146    fn from(config: ChunkedGelfDecoderConfig) -> Self {
147        Self::ChunkedGelf(config)
148    }
149}
150
151impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
152    fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
153        Self::VarintLengthDelimited(config)
154    }
155}
156
157impl FramingConfig {
158    /// Build the `Framer` from this configuration.
159    pub fn build(&self) -> Framer {
160        match self {
161            FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
162            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
163            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
164            FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
165            FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
166            FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
167            FramingConfig::VarintLengthDelimited(config) => {
168                Framer::VarintLengthDelimited(config.build())
169            }
170        }
171    }
172}
173
174/// Produce byte frames from a byte stream / byte message.
175#[derive(Debug, Clone)]
176pub enum Framer {
177    /// Uses a `BytesDecoder` for framing.
178    Bytes(BytesDecoder),
179    /// Uses a `CharacterDelimitedDecoder` for framing.
180    CharacterDelimited(CharacterDelimitedDecoder),
181    /// Uses a `LengthDelimitedDecoder` for framing.
182    LengthDelimited(LengthDelimitedDecoder),
183    /// Uses a `NewlineDelimitedDecoder` for framing.
184    NewlineDelimited(NewlineDelimitedDecoder),
185    /// Uses a `OctetCountingDecoder` for framing.
186    OctetCounting(OctetCountingDecoder),
187    /// Uses an opaque `Framer` implementation for framing.
188    Boxed(BoxedFramer),
189    /// Uses a `ChunkedGelfDecoder` for framing.
190    ChunkedGelf(ChunkedGelfDecoder),
191    /// Uses a `VarintLengthDelimitedDecoder` for framing.
192    VarintLengthDelimited(VarintLengthDelimitedDecoder),
193}
194
195impl tokio_util::codec::Decoder for Framer {
196    type Item = Bytes;
197    type Error = BoxedFramingError;
198
199    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
200        match self {
201            Framer::Bytes(framer) => framer.decode(src),
202            Framer::CharacterDelimited(framer) => framer.decode(src),
203            Framer::LengthDelimited(framer) => framer.decode(src),
204            Framer::NewlineDelimited(framer) => framer.decode(src),
205            Framer::OctetCounting(framer) => framer.decode(src),
206            Framer::Boxed(framer) => framer.decode(src),
207            Framer::ChunkedGelf(framer) => framer.decode(src),
208            Framer::VarintLengthDelimited(framer) => framer.decode(src),
209        }
210    }
211
212    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
213        match self {
214            Framer::Bytes(framer) => framer.decode_eof(src),
215            Framer::CharacterDelimited(framer) => framer.decode_eof(src),
216            Framer::LengthDelimited(framer) => framer.decode_eof(src),
217            Framer::NewlineDelimited(framer) => framer.decode_eof(src),
218            Framer::OctetCounting(framer) => framer.decode_eof(src),
219            Framer::Boxed(framer) => framer.decode_eof(src),
220            Framer::ChunkedGelf(framer) => framer.decode_eof(src),
221            Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
222        }
223    }
224}
225
226/// Configures how events are decoded from raw bytes. Note some decoders can also determine the event output
227/// type (log, metric, trace).
228#[configurable_component]
229#[derive(Clone, Debug)]
230#[serde(tag = "codec", rename_all = "snake_case")]
231#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
232pub enum DeserializerConfig {
233    /// Uses the raw bytes as-is.
234    Bytes,
235
236    /// Decodes the raw bytes as [JSON][json].
237    ///
238    /// [json]: https://www.json.org/
239    Json(JsonDeserializerConfig),
240
241    /// Decodes the raw bytes as [protobuf][protobuf].
242    ///
243    /// [protobuf]: https://protobuf.dev/
244    Protobuf(ProtobufDeserializerConfig),
245
246    #[cfg(feature = "syslog")]
247    /// Decodes the raw bytes as a Syslog message.
248    ///
249    /// Decodes either as the [RFC 3164][rfc3164]-style format ("old" style) or the
250    /// [RFC 5424][rfc5424]-style format ("new" style, includes structured data).
251    ///
252    /// [rfc3164]: https://www.ietf.org/rfc/rfc3164.txt
253    /// [rfc5424]: https://www.ietf.org/rfc/rfc5424.txt
254    Syslog(SyslogDeserializerConfig),
255
256    /// Decodes the raw bytes as [native Protocol Buffers format][vector_native_protobuf].
257    ///
258    /// This decoder can output all types of events (logs, metrics, traces).
259    ///
260    /// This codec is **[experimental][experimental]**.
261    ///
262    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
263    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
264    Native,
265
266    /// Decodes the raw bytes as [native JSON format][vector_native_json].
267    ///
268    /// This decoder can output all types of events (logs, metrics, traces).
269    ///
270    /// This codec is **[experimental][experimental]**.
271    ///
272    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
273    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
274    NativeJson(NativeJsonDeserializerConfig),
275
276    /// Decodes the raw bytes as a [GELF][gelf] message.
277    ///
278    /// This codec is experimental for the following reason:
279    ///
280    /// The GELF specification is more strict than the actual Graylog receiver.
281    /// Vector's decoder adheres more strictly to the GELF spec, with
282    /// the exception that some characters such as `@`  are allowed in field names.
283    ///
284    /// Other GELF codecs such as Loki's, use a [Go SDK][implementation] that is maintained
285    /// by Graylog, and is much more relaxed than the GELF spec.
286    ///
287    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
288    /// the codec may continue to relax the enforcement of specification.
289    ///
290    /// [gelf]: https://docs.graylog.org/docs/gelf
291    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
292    Gelf(GelfDeserializerConfig),
293
294    /// Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message.
295    ///
296    /// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
297    Influxdb(InfluxdbDeserializerConfig),
298
299    /// Decodes the raw bytes as as an [Apache Avro][apache_avro] message.
300    ///
301    /// [apache_avro]: https://avro.apache.org/
302    Avro {
303        /// Apache Avro-specific encoder options.
304        avro: AvroDeserializerOptions,
305    },
306
307    /// Decodes the raw bytes as a string and passes them as input to a [VRL][vrl] program.
308    ///
309    /// [vrl]: https://vector.dev/docs/reference/vrl
310    Vrl(VrlDeserializerConfig),
311}
312
313impl From<BytesDeserializerConfig> for DeserializerConfig {
314    fn from(_: BytesDeserializerConfig) -> Self {
315        Self::Bytes
316    }
317}
318
319impl From<JsonDeserializerConfig> for DeserializerConfig {
320    fn from(config: JsonDeserializerConfig) -> Self {
321        Self::Json(config)
322    }
323}
324
325#[cfg(feature = "syslog")]
326impl From<SyslogDeserializerConfig> for DeserializerConfig {
327    fn from(config: SyslogDeserializerConfig) -> Self {
328        Self::Syslog(config)
329    }
330}
331
332impl From<GelfDeserializerConfig> for DeserializerConfig {
333    fn from(config: GelfDeserializerConfig) -> Self {
334        Self::Gelf(config)
335    }
336}
337
338impl From<NativeDeserializerConfig> for DeserializerConfig {
339    fn from(_: NativeDeserializerConfig) -> Self {
340        Self::Native
341    }
342}
343
344impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
345    fn from(config: NativeJsonDeserializerConfig) -> Self {
346        Self::NativeJson(config)
347    }
348}
349
350impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
351    fn from(config: InfluxdbDeserializerConfig) -> Self {
352        Self::Influxdb(config)
353    }
354}
355
356impl DeserializerConfig {
357    /// Build the `Deserializer` from this configuration.
358    pub fn build(&self) -> vector_common::Result<Deserializer> {
359        match self {
360            DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
361                AvroDeserializerConfig {
362                    avro_options: avro.clone(),
363                }
364                .build(),
365            )),
366            DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
367            DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
368            DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
369            #[cfg(feature = "syslog")]
370            DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
371            DeserializerConfig::Native => {
372                Ok(Deserializer::Native(NativeDeserializerConfig.build()))
373            }
374            DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
375            DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
376            DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
377            DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
378        }
379    }
380
381    /// Return an appropriate default framer for the given deserializer
382    pub fn default_stream_framing(&self) -> FramingConfig {
383        match self {
384            DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
385            DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
386            DeserializerConfig::Bytes
387            | DeserializerConfig::Json(_)
388            | DeserializerConfig::Influxdb(_)
389            | DeserializerConfig::NativeJson(_) => {
390                FramingConfig::NewlineDelimited(Default::default())
391            }
392            DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
393            #[cfg(feature = "syslog")]
394            DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
395            DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
396            DeserializerConfig::Gelf(_) => {
397                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
398            }
399        }
400    }
401
402    /// Returns an appropriate default framing config for the given deserializer with message based inputs.
403    pub fn default_message_based_framing(&self) -> FramingConfig {
404        match self {
405            DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
406            _ => FramingConfig::Bytes,
407        }
408    }
409
410    /// Return the type of event build by this deserializer.
411    pub fn output_type(&self) -> DataType {
412        match self {
413            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
414                avro_options: avro.clone(),
415            }
416            .output_type(),
417            DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
418            DeserializerConfig::Json(config) => config.output_type(),
419            DeserializerConfig::Protobuf(config) => config.output_type(),
420            #[cfg(feature = "syslog")]
421            DeserializerConfig::Syslog(config) => config.output_type(),
422            DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
423            DeserializerConfig::NativeJson(config) => config.output_type(),
424            DeserializerConfig::Gelf(config) => config.output_type(),
425            DeserializerConfig::Vrl(config) => config.output_type(),
426            DeserializerConfig::Influxdb(config) => config.output_type(),
427        }
428    }
429
430    /// The schema produced by the deserializer.
431    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
432        match self {
433            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
434                avro_options: avro.clone(),
435            }
436            .schema_definition(log_namespace),
437            DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
438            DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
439            DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
440            #[cfg(feature = "syslog")]
441            DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
442            DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
443            DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
444            DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
445            DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
446            DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
447        }
448    }
449
450    /// Get the HTTP content type.
451    pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
452        match (&self, framer) {
453            (
454                DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
455                FramingConfig::NewlineDelimited(_),
456            ) => "application/x-ndjson",
457            (
458                DeserializerConfig::Gelf(_)
459                | DeserializerConfig::Json(_)
460                | DeserializerConfig::NativeJson(_),
461                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
462                    character_delimited:
463                        CharacterDelimitedDecoderOptions {
464                            delimiter: b',',
465                            max_length: Some(usize::MAX),
466                        },
467                }),
468            ) => "application/json",
469            (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
470                "application/octet-stream"
471            }
472            (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
473            (
474                DeserializerConfig::Json(_)
475                | DeserializerConfig::NativeJson(_)
476                | DeserializerConfig::Bytes
477                | DeserializerConfig::Gelf(_)
478                | DeserializerConfig::Influxdb(_)
479                | DeserializerConfig::Vrl(_),
480                _,
481            ) => "text/plain",
482            #[cfg(feature = "syslog")]
483            (DeserializerConfig::Syslog(_), _) => "text/plain",
484        }
485    }
486}
487
488/// Parse structured events from bytes.
489#[allow(clippy::large_enum_variant)]
490#[derive(Clone)]
491pub enum Deserializer {
492    /// Uses a `AvroDeserializer` for deserialization.
493    Avro(AvroDeserializer),
494    /// Uses a `BytesDeserializer` for deserialization.
495    Bytes(BytesDeserializer),
496    /// Uses a `JsonDeserializer` for deserialization.
497    Json(JsonDeserializer),
498    /// Uses a `ProtobufDeserializer` for deserialization.
499    Protobuf(ProtobufDeserializer),
500    #[cfg(feature = "syslog")]
501    /// Uses a `SyslogDeserializer` for deserialization.
502    Syslog(SyslogDeserializer),
503    /// Uses a `NativeDeserializer` for deserialization.
504    Native(NativeDeserializer),
505    /// Uses a `NativeDeserializer` for deserialization.
506    NativeJson(NativeJsonDeserializer),
507    /// Uses an opaque `Deserializer` implementation for deserialization.
508    Boxed(BoxedDeserializer),
509    /// Uses a `GelfDeserializer` for deserialization.
510    Gelf(GelfDeserializer),
511    /// Uses a `InfluxdbDeserializer` for deserialization.
512    Influxdb(InfluxdbDeserializer),
513    /// Uses a `VrlDeserializer` for deserialization.
514    Vrl(VrlDeserializer),
515}
516
517impl format::Deserializer for Deserializer {
518    fn parse(
519        &self,
520        bytes: Bytes,
521        log_namespace: LogNamespace,
522    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
523        match self {
524            Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
525            Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
526            Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
527            Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
528            #[cfg(feature = "syslog")]
529            Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
530            Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
531            Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
532            Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
533            Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
534            Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
535            Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
536        }
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543
544    #[test]
545    fn gelf_stream_default_framing_is_null_delimited() {
546        let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
547        let framing_config = deserializer_config.default_stream_framing();
548        assert!(matches!(
549            framing_config,
550            FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
551                character_delimited: CharacterDelimitedDecoderOptions {
552                    delimiter: 0,
553                    max_length: None,
554                }
555            })
556        ));
557    }
558}