codecs/decoding/
mod.rs

1//! A collection of support structures that are used in the process of decoding
2//! bytes into events.
3
4mod error;
5pub mod format;
6pub mod framing;
7
8use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
9use bytes::{Bytes, BytesMut};
10pub use error::StreamDecodingError;
11pub use format::{
12    BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
13    GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
14    InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
15    NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
16    NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
17    ProtobufDeserializerConfig, ProtobufDeserializerOptions,
18};
19#[cfg(feature = "syslog")]
20pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
21pub use framing::{
22    BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
23    CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
24    ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
25    LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
26    NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
27    OctetCountingDecoderOptions,
28};
29use smallvec::SmallVec;
30use std::fmt::Debug;
31use vector_config::configurable_component;
32use vector_core::{
33    config::{DataType, LogNamespace},
34    event::Event,
35    schema,
36};
37
38use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
39
40/// An error that occurred while decoding structured events from a byte stream /
41/// byte messages.
42#[derive(Debug)]
43pub enum Error {
44    /// The error occurred while producing byte frames from the byte stream /
45    /// byte messages.
46    FramingError(BoxedFramingError),
47    /// The error occurred while parsing structured events from a byte frame.
48    ParsingError(vector_common::Error),
49}
50
51impl std::fmt::Display for Error {
52    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        match self {
54            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
55            Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
56        }
57    }
58}
59
60impl std::error::Error for Error {}
61
62impl From<std::io::Error> for Error {
63    fn from(error: std::io::Error) -> Self {
64        Self::FramingError(Box::new(error))
65    }
66}
67
68impl StreamDecodingError for Error {
69    fn can_continue(&self) -> bool {
70        match self {
71            Self::FramingError(error) => error.can_continue(),
72            Self::ParsingError(_) => true,
73        }
74    }
75}
76
77/// Framing configuration.
78///
79/// Framing handles how events are separated when encoded in a raw byte form, where each event is
80/// a frame that must be prefixed, or delimited, in a way that marks where an event begins and
81/// ends within the byte stream.
82#[configurable_component]
83#[derive(Clone, Debug)]
84#[serde(tag = "method", rename_all = "snake_case")]
85#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
86pub enum FramingConfig {
87    /// Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
88    Bytes,
89
90    /// Byte frames which are delimited by a chosen character.
91    CharacterDelimited(CharacterDelimitedDecoderConfig),
92
93    /// Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
94    LengthDelimited(LengthDelimitedDecoderConfig),
95
96    /// Byte frames which are delimited by a newline character.
97    NewlineDelimited(NewlineDelimitedDecoderConfig),
98
99    /// Byte frames according to the [octet counting][octet_counting] format.
100    ///
101    /// [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1
102    OctetCounting(OctetCountingDecoderConfig),
103
104    /// Byte frames which are chunked GELF messages.
105    ///
106    /// [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html
107    ChunkedGelf(ChunkedGelfDecoderConfig),
108}
109
110impl From<BytesDecoderConfig> for FramingConfig {
111    fn from(_: BytesDecoderConfig) -> Self {
112        Self::Bytes
113    }
114}
115
116impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
117    fn from(config: CharacterDelimitedDecoderConfig) -> Self {
118        Self::CharacterDelimited(config)
119    }
120}
121
122impl From<LengthDelimitedDecoderConfig> for FramingConfig {
123    fn from(config: LengthDelimitedDecoderConfig) -> Self {
124        Self::LengthDelimited(config)
125    }
126}
127
128impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
129    fn from(config: NewlineDelimitedDecoderConfig) -> Self {
130        Self::NewlineDelimited(config)
131    }
132}
133
134impl From<OctetCountingDecoderConfig> for FramingConfig {
135    fn from(config: OctetCountingDecoderConfig) -> Self {
136        Self::OctetCounting(config)
137    }
138}
139
140impl From<ChunkedGelfDecoderConfig> for FramingConfig {
141    fn from(config: ChunkedGelfDecoderConfig) -> Self {
142        Self::ChunkedGelf(config)
143    }
144}
145
146impl FramingConfig {
147    /// Build the `Framer` from this configuration.
148    pub fn build(&self) -> Framer {
149        match self {
150            FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
151            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
152            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
153            FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
154            FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
155            FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
156        }
157    }
158}
159
160/// Produce byte frames from a byte stream / byte message.
161#[derive(Debug, Clone)]
162pub enum Framer {
163    /// Uses a `BytesDecoder` for framing.
164    Bytes(BytesDecoder),
165    /// Uses a `CharacterDelimitedDecoder` for framing.
166    CharacterDelimited(CharacterDelimitedDecoder),
167    /// Uses a `LengthDelimitedDecoder` for framing.
168    LengthDelimited(LengthDelimitedDecoder),
169    /// Uses a `NewlineDelimitedDecoder` for framing.
170    NewlineDelimited(NewlineDelimitedDecoder),
171    /// Uses a `OctetCountingDecoder` for framing.
172    OctetCounting(OctetCountingDecoder),
173    /// Uses an opaque `Framer` implementation for framing.
174    Boxed(BoxedFramer),
175    /// Uses a `ChunkedGelfDecoder` for framing.
176    ChunkedGelf(ChunkedGelfDecoder),
177}
178
179impl tokio_util::codec::Decoder for Framer {
180    type Item = Bytes;
181    type Error = BoxedFramingError;
182
183    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
184        match self {
185            Framer::Bytes(framer) => framer.decode(src),
186            Framer::CharacterDelimited(framer) => framer.decode(src),
187            Framer::LengthDelimited(framer) => framer.decode(src),
188            Framer::NewlineDelimited(framer) => framer.decode(src),
189            Framer::OctetCounting(framer) => framer.decode(src),
190            Framer::Boxed(framer) => framer.decode(src),
191            Framer::ChunkedGelf(framer) => framer.decode(src),
192        }
193    }
194
195    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
196        match self {
197            Framer::Bytes(framer) => framer.decode_eof(src),
198            Framer::CharacterDelimited(framer) => framer.decode_eof(src),
199            Framer::LengthDelimited(framer) => framer.decode_eof(src),
200            Framer::NewlineDelimited(framer) => framer.decode_eof(src),
201            Framer::OctetCounting(framer) => framer.decode_eof(src),
202            Framer::Boxed(framer) => framer.decode_eof(src),
203            Framer::ChunkedGelf(framer) => framer.decode_eof(src),
204        }
205    }
206}
207
208/// Configures how events are decoded from raw bytes. Note some decoders can also determine the event output
209/// type (log, metric, trace).
210#[configurable_component]
211#[derive(Clone, Debug)]
212#[serde(tag = "codec", rename_all = "snake_case")]
213#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
214pub enum DeserializerConfig {
215    /// Uses the raw bytes as-is.
216    Bytes,
217
218    /// Decodes the raw bytes as [JSON][json].
219    ///
220    /// [json]: https://www.json.org/
221    Json(JsonDeserializerConfig),
222
223    /// Decodes the raw bytes as [protobuf][protobuf].
224    ///
225    /// [protobuf]: https://protobuf.dev/
226    Protobuf(ProtobufDeserializerConfig),
227
228    #[cfg(feature = "syslog")]
229    /// Decodes the raw bytes as a Syslog message.
230    ///
231    /// Decodes either as the [RFC 3164][rfc3164]-style format ("old" style) or the
232    /// [RFC 5424][rfc5424]-style format ("new" style, includes structured data).
233    ///
234    /// [rfc3164]: https://www.ietf.org/rfc/rfc3164.txt
235    /// [rfc5424]: https://www.ietf.org/rfc/rfc5424.txt
236    Syslog(SyslogDeserializerConfig),
237
238    /// Decodes the raw bytes as [native Protocol Buffers format][vector_native_protobuf].
239    ///
240    /// This decoder can output all types of events (logs, metrics, traces).
241    ///
242    /// This codec is **[experimental][experimental]**.
243    ///
244    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
245    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
246    Native,
247
248    /// Decodes the raw bytes as [native JSON format][vector_native_json].
249    ///
250    /// This decoder can output all types of events (logs, metrics, traces).
251    ///
252    /// This codec is **[experimental][experimental]**.
253    ///
254    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
255    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
256    NativeJson(NativeJsonDeserializerConfig),
257
258    /// Decodes the raw bytes as a [GELF][gelf] message.
259    ///
260    /// This codec is experimental for the following reason:
261    ///
262    /// The GELF specification is more strict than the actual Graylog receiver.
263    /// Vector's decoder adheres more strictly to the GELF spec, with
264    /// the exception that some characters such as `@`  are allowed in field names.
265    ///
266    /// Other GELF codecs such as Loki's, use a [Go SDK][implementation] that is maintained
267    /// by Graylog, and is much more relaxed than the GELF spec.
268    ///
269    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
270    /// the codec may continue to relax the enforcement of specification.
271    ///
272    /// [gelf]: https://docs.graylog.org/docs/gelf
273    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
274    Gelf(GelfDeserializerConfig),
275
276    /// Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message.
277    ///
278    /// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
279    Influxdb(InfluxdbDeserializerConfig),
280
281    /// Decodes the raw bytes as as an [Apache Avro][apache_avro] message.
282    ///
283    /// [apache_avro]: https://avro.apache.org/
284    Avro {
285        /// Apache Avro-specific encoder options.
286        avro: AvroDeserializerOptions,
287    },
288
289    /// Decodes the raw bytes as a string and passes them as input to a [VRL][vrl] program.
290    ///
291    /// [vrl]: https://vector.dev/docs/reference/vrl
292    Vrl(VrlDeserializerConfig),
293}
294
295impl From<BytesDeserializerConfig> for DeserializerConfig {
296    fn from(_: BytesDeserializerConfig) -> Self {
297        Self::Bytes
298    }
299}
300
301impl From<JsonDeserializerConfig> for DeserializerConfig {
302    fn from(config: JsonDeserializerConfig) -> Self {
303        Self::Json(config)
304    }
305}
306
307#[cfg(feature = "syslog")]
308impl From<SyslogDeserializerConfig> for DeserializerConfig {
309    fn from(config: SyslogDeserializerConfig) -> Self {
310        Self::Syslog(config)
311    }
312}
313
314impl From<GelfDeserializerConfig> for DeserializerConfig {
315    fn from(config: GelfDeserializerConfig) -> Self {
316        Self::Gelf(config)
317    }
318}
319
320impl From<NativeDeserializerConfig> for DeserializerConfig {
321    fn from(_: NativeDeserializerConfig) -> Self {
322        Self::Native
323    }
324}
325
326impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
327    fn from(config: NativeJsonDeserializerConfig) -> Self {
328        Self::NativeJson(config)
329    }
330}
331
332impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
333    fn from(config: InfluxdbDeserializerConfig) -> Self {
334        Self::Influxdb(config)
335    }
336}
337
338impl DeserializerConfig {
339    /// Build the `Deserializer` from this configuration.
340    pub fn build(&self) -> vector_common::Result<Deserializer> {
341        match self {
342            DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
343                AvroDeserializerConfig {
344                    avro_options: avro.clone(),
345                }
346                .build(),
347            )),
348            DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
349            DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
350            DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
351            #[cfg(feature = "syslog")]
352            DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
353            DeserializerConfig::Native => {
354                Ok(Deserializer::Native(NativeDeserializerConfig.build()))
355            }
356            DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
357            DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
358            DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
359            DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
360        }
361    }
362
363    /// Return an appropriate default framer for the given deserializer
364    pub fn default_stream_framing(&self) -> FramingConfig {
365        match self {
366            DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
367            DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
368            DeserializerConfig::Bytes
369            | DeserializerConfig::Json(_)
370            | DeserializerConfig::Influxdb(_)
371            | DeserializerConfig::NativeJson(_) => {
372                FramingConfig::NewlineDelimited(Default::default())
373            }
374            DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
375            #[cfg(feature = "syslog")]
376            DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
377            DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
378            DeserializerConfig::Gelf(_) => {
379                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
380            }
381        }
382    }
383
384    /// Returns an appropriate default framing config for the given deserializer with message based inputs.
385    pub fn default_message_based_framing(&self) -> FramingConfig {
386        match self {
387            DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
388            _ => FramingConfig::Bytes,
389        }
390    }
391
392    /// Return the type of event build by this deserializer.
393    pub fn output_type(&self) -> DataType {
394        match self {
395            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
396                avro_options: avro.clone(),
397            }
398            .output_type(),
399            DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
400            DeserializerConfig::Json(config) => config.output_type(),
401            DeserializerConfig::Protobuf(config) => config.output_type(),
402            #[cfg(feature = "syslog")]
403            DeserializerConfig::Syslog(config) => config.output_type(),
404            DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
405            DeserializerConfig::NativeJson(config) => config.output_type(),
406            DeserializerConfig::Gelf(config) => config.output_type(),
407            DeserializerConfig::Vrl(config) => config.output_type(),
408            DeserializerConfig::Influxdb(config) => config.output_type(),
409        }
410    }
411
412    /// The schema produced by the deserializer.
413    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
414        match self {
415            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
416                avro_options: avro.clone(),
417            }
418            .schema_definition(log_namespace),
419            DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
420            DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
421            DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
422            #[cfg(feature = "syslog")]
423            DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
424            DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
425            DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
426            DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
427            DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
428            DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
429        }
430    }
431
432    /// Get the HTTP content type.
433    pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
434        match (&self, framer) {
435            (
436                DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
437                FramingConfig::NewlineDelimited(_),
438            ) => "application/x-ndjson",
439            (
440                DeserializerConfig::Gelf(_)
441                | DeserializerConfig::Json(_)
442                | DeserializerConfig::NativeJson(_),
443                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
444                    character_delimited:
445                        CharacterDelimitedDecoderOptions {
446                            delimiter: b',',
447                            max_length: Some(usize::MAX),
448                        },
449                }),
450            ) => "application/json",
451            (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
452                "application/octet-stream"
453            }
454            (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
455            (
456                DeserializerConfig::Json(_)
457                | DeserializerConfig::NativeJson(_)
458                | DeserializerConfig::Bytes
459                | DeserializerConfig::Gelf(_)
460                | DeserializerConfig::Influxdb(_)
461                | DeserializerConfig::Vrl(_),
462                _,
463            ) => "text/plain",
464            #[cfg(feature = "syslog")]
465            (DeserializerConfig::Syslog(_), _) => "text/plain",
466        }
467    }
468}
469
470/// Parse structured events from bytes.
471#[allow(clippy::large_enum_variant)]
472#[derive(Clone)]
473pub enum Deserializer {
474    /// Uses a `AvroDeserializer` for deserialization.
475    Avro(AvroDeserializer),
476    /// Uses a `BytesDeserializer` for deserialization.
477    Bytes(BytesDeserializer),
478    /// Uses a `JsonDeserializer` for deserialization.
479    Json(JsonDeserializer),
480    /// Uses a `ProtobufDeserializer` for deserialization.
481    Protobuf(ProtobufDeserializer),
482    #[cfg(feature = "syslog")]
483    /// Uses a `SyslogDeserializer` for deserialization.
484    Syslog(SyslogDeserializer),
485    /// Uses a `NativeDeserializer` for deserialization.
486    Native(NativeDeserializer),
487    /// Uses a `NativeDeserializer` for deserialization.
488    NativeJson(NativeJsonDeserializer),
489    /// Uses an opaque `Deserializer` implementation for deserialization.
490    Boxed(BoxedDeserializer),
491    /// Uses a `GelfDeserializer` for deserialization.
492    Gelf(GelfDeserializer),
493    /// Uses a `InfluxdbDeserializer` for deserialization.
494    Influxdb(InfluxdbDeserializer),
495    /// Uses a `VrlDeserializer` for deserialization.
496    Vrl(VrlDeserializer),
497}
498
499impl format::Deserializer for Deserializer {
500    fn parse(
501        &self,
502        bytes: Bytes,
503        log_namespace: LogNamespace,
504    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
505        match self {
506            Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
507            Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
508            Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
509            Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
510            #[cfg(feature = "syslog")]
511            Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
512            Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
513            Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
514            Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
515            Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
516            Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
517            Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
518        }
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525
526    #[test]
527    fn gelf_stream_default_framing_is_null_delimited() {
528        let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
529        let framing_config = deserializer_config.default_stream_framing();
530        assert!(matches!(
531            framing_config,
532            FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
533                character_delimited: CharacterDelimitedDecoderOptions {
534                    delimiter: 0,
535                    max_length: None,
536                }
537            })
538        ));
539    }
540}