codecs/decoding/
mod.rs

1//! A collection of support structures that are used in the process of decoding
2//! bytes into events.
3
4mod error;
5pub mod format;
6pub mod framing;
7
8use std::fmt::Debug;
9
10use bytes::{Bytes, BytesMut};
11pub use error::StreamDecodingError;
12pub use format::{
13    BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
14    GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
15    InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
16    NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
17    NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
18    ProtobufDeserializerConfig, ProtobufDeserializerOptions,
19};
20#[cfg(feature = "opentelemetry")]
21pub use format::{OtlpDeserializer, OtlpDeserializerConfig, OtlpSignalType};
22#[cfg(feature = "syslog")]
23pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
24pub use framing::{
25    BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
26    CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
27    ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
28    LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
29    NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
30    OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
31};
32use smallvec::SmallVec;
33use vector_config::configurable_component;
34use vector_core::{
35    config::{DataType, LogNamespace},
36    event::Event,
37    schema,
38};
39
40use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
41use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
42
43/// An error that occurred while decoding structured events from a byte stream /
44/// byte messages.
45#[derive(Debug)]
46pub enum Error {
47    /// The error occurred while producing byte frames from the byte stream /
48    /// byte messages.
49    FramingError(BoxedFramingError),
50    /// The error occurred while parsing structured events from a byte frame.
51    ParsingError(vector_common::Error),
52}
53
54impl std::fmt::Display for Error {
55    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        match self {
57            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
58            Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
59        }
60    }
61}
62
63impl std::error::Error for Error {}
64
65impl From<std::io::Error> for Error {
66    fn from(error: std::io::Error) -> Self {
67        Self::FramingError(Box::new(error))
68    }
69}
70
71impl StreamDecodingError for Error {
72    fn can_continue(&self) -> bool {
73        match self {
74            Self::FramingError(error) => error.can_continue(),
75            Self::ParsingError(_) => true,
76        }
77    }
78}
79
80/// Framing configuration.
81///
82/// Framing handles how events are separated when encoded in a raw byte form, where each event is
83/// a frame that must be prefixed, or delimited, in a way that marks where an event begins and
84/// ends within the byte stream.
85#[configurable_component]
86#[derive(Clone, Debug)]
87#[serde(tag = "method", rename_all = "snake_case")]
88#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
89pub enum FramingConfig {
90    /// Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
91    Bytes,
92
93    /// Byte frames which are delimited by a chosen character.
94    CharacterDelimited(CharacterDelimitedDecoderConfig),
95
96    /// Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
97    LengthDelimited(LengthDelimitedDecoderConfig),
98
99    /// Byte frames which are delimited by a newline character.
100    NewlineDelimited(NewlineDelimitedDecoderConfig),
101
102    /// Byte frames according to the [octet counting][octet_counting] format.
103    ///
104    /// [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1
105    OctetCounting(OctetCountingDecoderConfig),
106
107    /// Byte frames which are chunked GELF messages.
108    ///
109    /// [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html
110    ChunkedGelf(ChunkedGelfDecoderConfig),
111
112    /// Byte frames which are prefixed by a varint indicating the length.
113    /// This is compatible with protobuf's length-delimited encoding.
114    VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
115}
116
117impl From<BytesDecoderConfig> for FramingConfig {
118    fn from(_: BytesDecoderConfig) -> Self {
119        Self::Bytes
120    }
121}
122
123impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
124    fn from(config: CharacterDelimitedDecoderConfig) -> Self {
125        Self::CharacterDelimited(config)
126    }
127}
128
129impl From<LengthDelimitedDecoderConfig> for FramingConfig {
130    fn from(config: LengthDelimitedDecoderConfig) -> Self {
131        Self::LengthDelimited(config)
132    }
133}
134
135impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
136    fn from(config: NewlineDelimitedDecoderConfig) -> Self {
137        Self::NewlineDelimited(config)
138    }
139}
140
141impl From<OctetCountingDecoderConfig> for FramingConfig {
142    fn from(config: OctetCountingDecoderConfig) -> Self {
143        Self::OctetCounting(config)
144    }
145}
146
147impl From<ChunkedGelfDecoderConfig> for FramingConfig {
148    fn from(config: ChunkedGelfDecoderConfig) -> Self {
149        Self::ChunkedGelf(config)
150    }
151}
152
153impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
154    fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
155        Self::VarintLengthDelimited(config)
156    }
157}
158
159impl FramingConfig {
160    /// Build the `Framer` from this configuration.
161    pub fn build(&self) -> Framer {
162        match self {
163            FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
164            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
165            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
166            FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
167            FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
168            FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
169            FramingConfig::VarintLengthDelimited(config) => {
170                Framer::VarintLengthDelimited(config.build())
171            }
172        }
173    }
174}
175
176/// Produce byte frames from a byte stream / byte message.
177#[derive(Debug, Clone)]
178pub enum Framer {
179    /// Uses a `BytesDecoder` for framing.
180    Bytes(BytesDecoder),
181    /// Uses a `CharacterDelimitedDecoder` for framing.
182    CharacterDelimited(CharacterDelimitedDecoder),
183    /// Uses a `LengthDelimitedDecoder` for framing.
184    LengthDelimited(LengthDelimitedDecoder),
185    /// Uses a `NewlineDelimitedDecoder` for framing.
186    NewlineDelimited(NewlineDelimitedDecoder),
187    /// Uses a `OctetCountingDecoder` for framing.
188    OctetCounting(OctetCountingDecoder),
189    /// Uses an opaque `Framer` implementation for framing.
190    Boxed(BoxedFramer),
191    /// Uses a `ChunkedGelfDecoder` for framing.
192    ChunkedGelf(ChunkedGelfDecoder),
193    /// Uses a `VarintLengthDelimitedDecoder` for framing.
194    VarintLengthDelimited(VarintLengthDelimitedDecoder),
195}
196
197impl tokio_util::codec::Decoder for Framer {
198    type Item = Bytes;
199    type Error = BoxedFramingError;
200
201    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
202        match self {
203            Framer::Bytes(framer) => framer.decode(src),
204            Framer::CharacterDelimited(framer) => framer.decode(src),
205            Framer::LengthDelimited(framer) => framer.decode(src),
206            Framer::NewlineDelimited(framer) => framer.decode(src),
207            Framer::OctetCounting(framer) => framer.decode(src),
208            Framer::Boxed(framer) => framer.decode(src),
209            Framer::ChunkedGelf(framer) => framer.decode(src),
210            Framer::VarintLengthDelimited(framer) => framer.decode(src),
211        }
212    }
213
214    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
215        match self {
216            Framer::Bytes(framer) => framer.decode_eof(src),
217            Framer::CharacterDelimited(framer) => framer.decode_eof(src),
218            Framer::LengthDelimited(framer) => framer.decode_eof(src),
219            Framer::NewlineDelimited(framer) => framer.decode_eof(src),
220            Framer::OctetCounting(framer) => framer.decode_eof(src),
221            Framer::Boxed(framer) => framer.decode_eof(src),
222            Framer::ChunkedGelf(framer) => framer.decode_eof(src),
223            Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
224        }
225    }
226}
227
228/// Configures how events are decoded from raw bytes. Note some decoders can also determine the event output
229/// type (log, metric, trace).
230#[configurable_component]
231#[derive(Clone, Debug)]
232#[serde(tag = "codec", rename_all = "snake_case")]
233#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
234pub enum DeserializerConfig {
235    /// Uses the raw bytes as-is.
236    Bytes,
237
238    /// Decodes the raw bytes as [JSON][json].
239    ///
240    /// [json]: https://www.json.org/
241    Json(JsonDeserializerConfig),
242
243    /// Decodes the raw bytes as [protobuf][protobuf].
244    ///
245    /// [protobuf]: https://protobuf.dev/
246    Protobuf(ProtobufDeserializerConfig),
247
248    #[cfg(feature = "opentelemetry")]
249    /// Decodes the raw bytes as [OTLP (OpenTelemetry Protocol)][otlp] protobuf format.
250    ///
251    /// This decoder handles the three OTLP signal types: logs, metrics, and traces.
252    /// It automatically detects which type of OTLP message is being decoded.
253    ///
254    /// [otlp]: https://opentelemetry.io/docs/specs/otlp/
255    Otlp(OtlpDeserializerConfig),
256
257    #[cfg(feature = "syslog")]
258    /// Decodes the raw bytes as a Syslog message.
259    ///
260    /// Decodes either as the [RFC 3164][rfc3164]-style format ("old" style) or the
261    /// [RFC 5424][rfc5424]-style format ("new" style, includes structured data).
262    ///
263    /// [rfc3164]: https://www.ietf.org/rfc/rfc3164.txt
264    /// [rfc5424]: https://www.ietf.org/rfc/rfc5424.txt
265    Syslog(SyslogDeserializerConfig),
266
267    /// Decodes the raw bytes as [native Protocol Buffers format][vector_native_protobuf].
268    ///
269    /// This decoder can output all types of events (logs, metrics, traces).
270    ///
271    /// This codec is **[experimental][experimental]**.
272    ///
273    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
274    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
275    Native,
276
277    /// Decodes the raw bytes as [native JSON format][vector_native_json].
278    ///
279    /// This decoder can output all types of events (logs, metrics, traces).
280    ///
281    /// This codec is **[experimental][experimental]**.
282    ///
283    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
284    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
285    NativeJson(NativeJsonDeserializerConfig),
286
287    /// Decodes the raw bytes as a [GELF][gelf] message.
288    ///
289    /// This codec is experimental for the following reason:
290    ///
291    /// The GELF specification is more strict than the actual Graylog receiver.
292    /// Vector's decoder adheres more strictly to the GELF spec, with
293    /// the exception that some characters such as `@`  are allowed in field names.
294    ///
295    /// Other GELF codecs such as Loki's, use a [Go SDK][implementation] that is maintained
296    /// by Graylog, and is much more relaxed than the GELF spec.
297    ///
298    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
299    /// the codec may continue to relax the enforcement of specification.
300    ///
301    /// [gelf]: https://docs.graylog.org/docs/gelf
302    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
303    Gelf(GelfDeserializerConfig),
304
305    /// Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message.
306    ///
307    /// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
308    Influxdb(InfluxdbDeserializerConfig),
309
310    /// Decodes the raw bytes as as an [Apache Avro][apache_avro] message.
311    ///
312    /// [apache_avro]: https://avro.apache.org/
313    Avro {
314        /// Apache Avro-specific encoder options.
315        avro: AvroDeserializerOptions,
316    },
317
318    /// Decodes the raw bytes as a string and passes them as input to a [VRL][vrl] program.
319    ///
320    /// [vrl]: https://vector.dev/docs/reference/vrl
321    Vrl(VrlDeserializerConfig),
322}
323
324impl From<BytesDeserializerConfig> for DeserializerConfig {
325    fn from(_: BytesDeserializerConfig) -> Self {
326        Self::Bytes
327    }
328}
329
330impl From<JsonDeserializerConfig> for DeserializerConfig {
331    fn from(config: JsonDeserializerConfig) -> Self {
332        Self::Json(config)
333    }
334}
335
336#[cfg(feature = "syslog")]
337impl From<SyslogDeserializerConfig> for DeserializerConfig {
338    fn from(config: SyslogDeserializerConfig) -> Self {
339        Self::Syslog(config)
340    }
341}
342
343impl From<GelfDeserializerConfig> for DeserializerConfig {
344    fn from(config: GelfDeserializerConfig) -> Self {
345        Self::Gelf(config)
346    }
347}
348
349impl From<NativeDeserializerConfig> for DeserializerConfig {
350    fn from(_: NativeDeserializerConfig) -> Self {
351        Self::Native
352    }
353}
354
355impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
356    fn from(config: NativeJsonDeserializerConfig) -> Self {
357        Self::NativeJson(config)
358    }
359}
360
361impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
362    fn from(config: InfluxdbDeserializerConfig) -> Self {
363        Self::Influxdb(config)
364    }
365}
366
367impl DeserializerConfig {
368    /// Build the `Deserializer` from this configuration.
369    pub fn build(&self) -> vector_common::Result<Deserializer> {
370        match self {
371            DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
372                AvroDeserializerConfig {
373                    avro_options: avro.clone(),
374                }
375                .build(),
376            )),
377            DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
378            DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
379            DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
380            #[cfg(feature = "opentelemetry")]
381            DeserializerConfig::Otlp(config) => Ok(Deserializer::Otlp(config.build())),
382            #[cfg(feature = "syslog")]
383            DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
384            DeserializerConfig::Native => {
385                Ok(Deserializer::Native(NativeDeserializerConfig.build()))
386            }
387            DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
388            DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
389            DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
390            DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
391        }
392    }
393
394    /// Return an appropriate default framer for the given deserializer
395    pub fn default_stream_framing(&self) -> FramingConfig {
396        match self {
397            DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
398            DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
399            DeserializerConfig::Bytes
400            | DeserializerConfig::Json(_)
401            | DeserializerConfig::Influxdb(_)
402            | DeserializerConfig::NativeJson(_) => {
403                FramingConfig::NewlineDelimited(Default::default())
404            }
405            DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
406            #[cfg(feature = "opentelemetry")]
407            DeserializerConfig::Otlp(_) => FramingConfig::Bytes,
408            #[cfg(feature = "syslog")]
409            DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
410            DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
411            DeserializerConfig::Gelf(_) => {
412                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
413            }
414        }
415    }
416
417    /// Returns an appropriate default framing config for the given deserializer with message based inputs.
418    pub fn default_message_based_framing(&self) -> FramingConfig {
419        match self {
420            DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
421            _ => FramingConfig::Bytes,
422        }
423    }
424
425    /// Return the type of event build by this deserializer.
426    pub fn output_type(&self) -> DataType {
427        match self {
428            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
429                avro_options: avro.clone(),
430            }
431            .output_type(),
432            DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
433            DeserializerConfig::Json(config) => config.output_type(),
434            DeserializerConfig::Protobuf(config) => config.output_type(),
435            #[cfg(feature = "opentelemetry")]
436            DeserializerConfig::Otlp(config) => config.output_type(),
437            #[cfg(feature = "syslog")]
438            DeserializerConfig::Syslog(config) => config.output_type(),
439            DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
440            DeserializerConfig::NativeJson(config) => config.output_type(),
441            DeserializerConfig::Gelf(config) => config.output_type(),
442            DeserializerConfig::Vrl(config) => config.output_type(),
443            DeserializerConfig::Influxdb(config) => config.output_type(),
444        }
445    }
446
447    /// The schema produced by the deserializer.
448    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
449        match self {
450            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
451                avro_options: avro.clone(),
452            }
453            .schema_definition(log_namespace),
454            DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
455            DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
456            DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
457            #[cfg(feature = "opentelemetry")]
458            DeserializerConfig::Otlp(config) => config.schema_definition(log_namespace),
459            #[cfg(feature = "syslog")]
460            DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
461            DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
462            DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
463            DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
464            DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
465            DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
466        }
467    }
468
469    /// Get the HTTP content type.
470    pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
471        match (&self, framer) {
472            (
473                DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
474                FramingConfig::NewlineDelimited(_),
475            ) => "application/x-ndjson",
476            (
477                DeserializerConfig::Gelf(_)
478                | DeserializerConfig::Json(_)
479                | DeserializerConfig::NativeJson(_),
480                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
481                    character_delimited:
482                        CharacterDelimitedDecoderOptions {
483                            delimiter: b',',
484                            max_length: Some(usize::MAX),
485                        },
486                }),
487            ) => "application/json",
488            (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
489                "application/octet-stream"
490            }
491            (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
492            #[cfg(feature = "opentelemetry")]
493            (DeserializerConfig::Otlp(_), _) => "application/x-protobuf",
494            (
495                DeserializerConfig::Json(_)
496                | DeserializerConfig::NativeJson(_)
497                | DeserializerConfig::Bytes
498                | DeserializerConfig::Gelf(_)
499                | DeserializerConfig::Influxdb(_)
500                | DeserializerConfig::Vrl(_),
501                _,
502            ) => "text/plain",
503            #[cfg(feature = "syslog")]
504            (DeserializerConfig::Syslog(_), _) => "text/plain",
505        }
506    }
507}
508
509/// Parse structured events from bytes.
510#[allow(clippy::large_enum_variant)]
511#[derive(Clone)]
512pub enum Deserializer {
513    /// Uses a `AvroDeserializer` for deserialization.
514    Avro(AvroDeserializer),
515    /// Uses a `BytesDeserializer` for deserialization.
516    Bytes(BytesDeserializer),
517    /// Uses a `JsonDeserializer` for deserialization.
518    Json(JsonDeserializer),
519    /// Uses a `ProtobufDeserializer` for deserialization.
520    Protobuf(ProtobufDeserializer),
521    #[cfg(feature = "opentelemetry")]
522    /// Uses an `OtlpDeserializer` for deserialization.
523    Otlp(OtlpDeserializer),
524    #[cfg(feature = "syslog")]
525    /// Uses a `SyslogDeserializer` for deserialization.
526    Syslog(SyslogDeserializer),
527    /// Uses a `NativeDeserializer` for deserialization.
528    Native(NativeDeserializer),
529    /// Uses a `NativeDeserializer` for deserialization.
530    NativeJson(NativeJsonDeserializer),
531    /// Uses an opaque `Deserializer` implementation for deserialization.
532    Boxed(BoxedDeserializer),
533    /// Uses a `GelfDeserializer` for deserialization.
534    Gelf(GelfDeserializer),
535    /// Uses a `InfluxdbDeserializer` for deserialization.
536    Influxdb(InfluxdbDeserializer),
537    /// Uses a `VrlDeserializer` for deserialization.
538    Vrl(VrlDeserializer),
539}
540
541impl format::Deserializer for Deserializer {
542    fn parse(
543        &self,
544        bytes: Bytes,
545        log_namespace: LogNamespace,
546    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
547        match self {
548            Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
549            Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
550            Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
551            Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
552            #[cfg(feature = "opentelemetry")]
553            Deserializer::Otlp(deserializer) => deserializer.parse(bytes, log_namespace),
554            #[cfg(feature = "syslog")]
555            Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
556            Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
557            Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
558            Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
559            Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
560            Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
561            Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
562        }
563    }
564}
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569
570    #[test]
571    fn gelf_stream_default_framing_is_null_delimited() {
572        let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
573        let framing_config = deserializer_config.default_stream_framing();
574        assert!(matches!(
575            framing_config,
576            FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
577                character_delimited: CharacterDelimitedDecoderOptions {
578                    delimiter: 0,
579                    max_length: None,
580                }
581            })
582        ));
583    }
584}