codecs/decoding/
mod.rs

1//! A collection of support structures that are used in the process of decoding
2//! bytes into events.
3
4mod config;
5mod decoder;
6mod error;
7pub mod format;
8pub mod framing;
9
10use std::fmt::Debug;
11
12use bytes::{Bytes, BytesMut};
13pub use config::DecodingConfig;
14pub use decoder::Decoder;
15pub use error::StreamDecodingError;
16pub use format::{
17    BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
18    GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
19    InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
20    NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
21    NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
22    ProtobufDeserializerConfig, ProtobufDeserializerOptions,
23};
24#[cfg(feature = "opentelemetry")]
25pub use format::{OtlpDeserializer, OtlpDeserializerConfig, OtlpSignalType};
26#[cfg(feature = "syslog")]
27pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
28pub use framing::{
29    BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
30    CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
31    ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
32    LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
33    NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
34    OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
35};
36use smallvec::SmallVec;
37use vector_config::configurable_component;
38use vector_core::{
39    config::{DataType, LogNamespace},
40    event::Event,
41    schema,
42};
43
44use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
45use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
46
47/// An error that occurred while decoding structured events from a byte stream /
48/// byte messages.
49#[derive(Debug)]
50pub enum Error {
51    /// The error occurred while producing byte frames from the byte stream /
52    /// byte messages.
53    FramingError(BoxedFramingError),
54    /// The error occurred while parsing structured events from a byte frame.
55    ParsingError(vector_common::Error),
56}
57
58impl std::fmt::Display for Error {
59    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            Self::FramingError(error) => write!(formatter, "FramingError({error})"),
62            Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
63        }
64    }
65}
66
67impl std::error::Error for Error {}
68
69impl From<std::io::Error> for Error {
70    fn from(error: std::io::Error) -> Self {
71        Self::FramingError(Box::new(error))
72    }
73}
74
75impl StreamDecodingError for Error {
76    fn can_continue(&self) -> bool {
77        match self {
78            Self::FramingError(error) => error.can_continue(),
79            Self::ParsingError(_) => true,
80        }
81    }
82}
83
84/// Framing configuration.
85///
86/// Framing handles how events are separated when encoded in a raw byte form, where each event is
87/// a frame that must be prefixed, or delimited, in a way that marks where an event begins and
88/// ends within the byte stream.
89#[configurable_component]
90#[derive(Clone, Debug)]
91#[serde(tag = "method", rename_all = "snake_case")]
92#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
93pub enum FramingConfig {
94    /// Byte frames are passed through as-is according to the underlying I/O boundaries (for example, split between messages or stream segments).
95    Bytes,
96
97    /// Byte frames which are delimited by a chosen character.
98    CharacterDelimited(CharacterDelimitedDecoderConfig),
99
100    /// Byte frames which are prefixed by an unsigned big-endian 32-bit integer indicating the length.
101    LengthDelimited(LengthDelimitedDecoderConfig),
102
103    /// Byte frames which are delimited by a newline character.
104    NewlineDelimited(NewlineDelimitedDecoderConfig),
105
106    /// Byte frames according to the [octet counting][octet_counting] format.
107    ///
108    /// [octet_counting]: https://tools.ietf.org/html/rfc6587#section-3.4.1
109    OctetCounting(OctetCountingDecoderConfig),
110
111    /// Byte frames which are chunked GELF messages.
112    ///
113    /// [chunked_gelf]: https://go2docs.graylog.org/current/getting_in_log_data/gelf.html
114    ChunkedGelf(ChunkedGelfDecoderConfig),
115
116    /// Byte frames which are prefixed by a varint indicating the length.
117    /// This is compatible with protobuf's length-delimited encoding.
118    VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
119}
120
121impl From<BytesDecoderConfig> for FramingConfig {
122    fn from(_: BytesDecoderConfig) -> Self {
123        Self::Bytes
124    }
125}
126
127impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
128    fn from(config: CharacterDelimitedDecoderConfig) -> Self {
129        Self::CharacterDelimited(config)
130    }
131}
132
133impl From<LengthDelimitedDecoderConfig> for FramingConfig {
134    fn from(config: LengthDelimitedDecoderConfig) -> Self {
135        Self::LengthDelimited(config)
136    }
137}
138
139impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
140    fn from(config: NewlineDelimitedDecoderConfig) -> Self {
141        Self::NewlineDelimited(config)
142    }
143}
144
145impl From<OctetCountingDecoderConfig> for FramingConfig {
146    fn from(config: OctetCountingDecoderConfig) -> Self {
147        Self::OctetCounting(config)
148    }
149}
150
151impl From<ChunkedGelfDecoderConfig> for FramingConfig {
152    fn from(config: ChunkedGelfDecoderConfig) -> Self {
153        Self::ChunkedGelf(config)
154    }
155}
156
157impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
158    fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
159        Self::VarintLengthDelimited(config)
160    }
161}
162
163impl FramingConfig {
164    /// Build the `Framer` from this configuration.
165    pub fn build(&self) -> Framer {
166        match self {
167            FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
168            FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
169            FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
170            FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
171            FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
172            FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
173            FramingConfig::VarintLengthDelimited(config) => {
174                Framer::VarintLengthDelimited(config.build())
175            }
176        }
177    }
178}
179
180/// Produce byte frames from a byte stream / byte message.
181#[derive(Debug, Clone)]
182pub enum Framer {
183    /// Uses a `BytesDecoder` for framing.
184    Bytes(BytesDecoder),
185    /// Uses a `CharacterDelimitedDecoder` for framing.
186    CharacterDelimited(CharacterDelimitedDecoder),
187    /// Uses a `LengthDelimitedDecoder` for framing.
188    LengthDelimited(LengthDelimitedDecoder),
189    /// Uses a `NewlineDelimitedDecoder` for framing.
190    NewlineDelimited(NewlineDelimitedDecoder),
191    /// Uses a `OctetCountingDecoder` for framing.
192    OctetCounting(OctetCountingDecoder),
193    /// Uses an opaque `Framer` implementation for framing.
194    Boxed(BoxedFramer),
195    /// Uses a `ChunkedGelfDecoder` for framing.
196    ChunkedGelf(ChunkedGelfDecoder),
197    /// Uses a `VarintLengthDelimitedDecoder` for framing.
198    VarintLengthDelimited(VarintLengthDelimitedDecoder),
199}
200
201impl tokio_util::codec::Decoder for Framer {
202    type Item = Bytes;
203    type Error = BoxedFramingError;
204
205    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
206        match self {
207            Framer::Bytes(framer) => framer.decode(src),
208            Framer::CharacterDelimited(framer) => framer.decode(src),
209            Framer::LengthDelimited(framer) => framer.decode(src),
210            Framer::NewlineDelimited(framer) => framer.decode(src),
211            Framer::OctetCounting(framer) => framer.decode(src),
212            Framer::Boxed(framer) => framer.decode(src),
213            Framer::ChunkedGelf(framer) => framer.decode(src),
214            Framer::VarintLengthDelimited(framer) => framer.decode(src),
215        }
216    }
217
218    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
219        match self {
220            Framer::Bytes(framer) => framer.decode_eof(src),
221            Framer::CharacterDelimited(framer) => framer.decode_eof(src),
222            Framer::LengthDelimited(framer) => framer.decode_eof(src),
223            Framer::NewlineDelimited(framer) => framer.decode_eof(src),
224            Framer::OctetCounting(framer) => framer.decode_eof(src),
225            Framer::Boxed(framer) => framer.decode_eof(src),
226            Framer::ChunkedGelf(framer) => framer.decode_eof(src),
227            Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
228        }
229    }
230}
231
232/// Configures how events are decoded from raw bytes. Note some decoders can also determine the event output
233/// type (log, metric, trace).
234#[configurable_component]
235#[derive(Clone, Debug)]
236#[serde(tag = "codec", rename_all = "snake_case")]
237#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
238pub enum DeserializerConfig {
239    /// Uses the raw bytes as-is.
240    Bytes,
241
242    /// Decodes the raw bytes as [JSON][json].
243    ///
244    /// [json]: https://www.json.org/
245    Json(JsonDeserializerConfig),
246
247    /// Decodes the raw bytes as [protobuf][protobuf].
248    ///
249    /// [protobuf]: https://protobuf.dev/
250    Protobuf(ProtobufDeserializerConfig),
251
252    #[cfg(feature = "opentelemetry")]
253    /// Decodes the raw bytes as [OTLP (OpenTelemetry Protocol)][otlp] protobuf format.
254    ///
255    /// This decoder handles the three OTLP signal types: logs, metrics, and traces.
256    /// It automatically detects which type of OTLP message is being decoded.
257    ///
258    /// [otlp]: https://opentelemetry.io/docs/specs/otlp/
259    Otlp(OtlpDeserializerConfig),
260
261    #[cfg(feature = "syslog")]
262    /// Decodes the raw bytes as a Syslog message.
263    ///
264    /// Decodes either as the [RFC 3164][rfc3164]-style format ("old" style) or the
265    /// [RFC 5424][rfc5424]-style format ("new" style, includes structured data).
266    ///
267    /// [rfc3164]: https://www.ietf.org/rfc/rfc3164.txt
268    /// [rfc5424]: https://www.ietf.org/rfc/rfc5424.txt
269    Syslog(SyslogDeserializerConfig),
270
271    /// Decodes the raw bytes as [native Protocol Buffers format][vector_native_protobuf].
272    ///
273    /// This decoder can output all types of events (logs, metrics, traces).
274    ///
275    /// This codec is **[experimental][experimental]**.
276    ///
277    /// [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto
278    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
279    Native,
280
281    /// Decodes the raw bytes as [native JSON format][vector_native_json].
282    ///
283    /// This decoder can output all types of events (logs, metrics, traces).
284    ///
285    /// This codec is **[experimental][experimental]**.
286    ///
287    /// [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue
288    /// [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs
289    NativeJson(NativeJsonDeserializerConfig),
290
291    /// Decodes the raw bytes as a [GELF][gelf] message.
292    ///
293    /// This codec is experimental for the following reason:
294    ///
295    /// The GELF specification is more strict than the actual Graylog receiver.
296    /// Vector's decoder adheres more strictly to the GELF spec, with
297    /// the exception that some characters such as `@`  are allowed in field names.
298    ///
299    /// Other GELF codecs such as Loki's, use a [Go SDK][implementation] that is maintained
300    /// by Graylog, and is much more relaxed than the GELF spec.
301    ///
302    /// Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means
303    /// the codec may continue to relax the enforcement of specification.
304    ///
305    /// [gelf]: https://docs.graylog.org/docs/gelf
306    /// [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go
307    Gelf(GelfDeserializerConfig),
308
309    /// Decodes the raw bytes as an [Influxdb Line Protocol][influxdb] message.
310    ///
311    /// [influxdb]: https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol
312    Influxdb(InfluxdbDeserializerConfig),
313
314    /// Decodes the raw bytes as as an [Apache Avro][apache_avro] message.
315    ///
316    /// [apache_avro]: https://avro.apache.org/
317    Avro {
318        /// Apache Avro-specific encoder options.
319        avro: AvroDeserializerOptions,
320    },
321
322    /// Decodes the raw bytes as a string and passes them as input to a [VRL][vrl] program.
323    ///
324    /// [vrl]: https://vector.dev/docs/reference/vrl
325    Vrl(VrlDeserializerConfig),
326}
327
328impl From<BytesDeserializerConfig> for DeserializerConfig {
329    fn from(_: BytesDeserializerConfig) -> Self {
330        Self::Bytes
331    }
332}
333
334impl From<JsonDeserializerConfig> for DeserializerConfig {
335    fn from(config: JsonDeserializerConfig) -> Self {
336        Self::Json(config)
337    }
338}
339
340#[cfg(feature = "syslog")]
341impl From<SyslogDeserializerConfig> for DeserializerConfig {
342    fn from(config: SyslogDeserializerConfig) -> Self {
343        Self::Syslog(config)
344    }
345}
346
347impl From<GelfDeserializerConfig> for DeserializerConfig {
348    fn from(config: GelfDeserializerConfig) -> Self {
349        Self::Gelf(config)
350    }
351}
352
353impl From<NativeDeserializerConfig> for DeserializerConfig {
354    fn from(_: NativeDeserializerConfig) -> Self {
355        Self::Native
356    }
357}
358
359impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
360    fn from(config: NativeJsonDeserializerConfig) -> Self {
361        Self::NativeJson(config)
362    }
363}
364
365impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
366    fn from(config: InfluxdbDeserializerConfig) -> Self {
367        Self::Influxdb(config)
368    }
369}
370
371impl DeserializerConfig {
372    /// Build the `Deserializer` from this configuration.
373    pub fn build(&self) -> vector_common::Result<Deserializer> {
374        match self {
375            DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
376                AvroDeserializerConfig {
377                    avro_options: avro.clone(),
378                }
379                .build()?,
380            )),
381            DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
382            DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
383            DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
384            #[cfg(feature = "opentelemetry")]
385            DeserializerConfig::Otlp(config) => Ok(Deserializer::Otlp(config.build())),
386            #[cfg(feature = "syslog")]
387            DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
388            DeserializerConfig::Native => {
389                Ok(Deserializer::Native(NativeDeserializerConfig.build()))
390            }
391            DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
392            DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
393            DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
394            DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
395        }
396    }
397
398    /// Return an appropriate default framer for the given deserializer
399    pub fn default_stream_framing(&self) -> FramingConfig {
400        match self {
401            DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
402            DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
403            DeserializerConfig::Bytes
404            | DeserializerConfig::Json(_)
405            | DeserializerConfig::Influxdb(_)
406            | DeserializerConfig::NativeJson(_) => {
407                FramingConfig::NewlineDelimited(Default::default())
408            }
409            DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
410            #[cfg(feature = "opentelemetry")]
411            DeserializerConfig::Otlp(_) => FramingConfig::Bytes,
412            #[cfg(feature = "syslog")]
413            DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
414            DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
415            DeserializerConfig::Gelf(_) => {
416                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
417            }
418        }
419    }
420
421    /// Returns an appropriate default framing config for the given deserializer with message based inputs.
422    pub fn default_message_based_framing(&self) -> FramingConfig {
423        match self {
424            DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
425            _ => FramingConfig::Bytes,
426        }
427    }
428
429    /// Return the type of event build by this deserializer.
430    pub fn output_type(&self) -> DataType {
431        match self {
432            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
433                avro_options: avro.clone(),
434            }
435            .output_type(),
436            DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
437            DeserializerConfig::Json(config) => config.output_type(),
438            DeserializerConfig::Protobuf(config) => config.output_type(),
439            #[cfg(feature = "opentelemetry")]
440            DeserializerConfig::Otlp(config) => config.output_type(),
441            #[cfg(feature = "syslog")]
442            DeserializerConfig::Syslog(config) => config.output_type(),
443            DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
444            DeserializerConfig::NativeJson(config) => config.output_type(),
445            DeserializerConfig::Gelf(config) => config.output_type(),
446            DeserializerConfig::Vrl(config) => config.output_type(),
447            DeserializerConfig::Influxdb(config) => config.output_type(),
448        }
449    }
450
451    /// The schema produced by the deserializer.
452    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
453        match self {
454            DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
455                avro_options: avro.clone(),
456            }
457            .schema_definition(log_namespace),
458            DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
459            DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
460            DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
461            #[cfg(feature = "opentelemetry")]
462            DeserializerConfig::Otlp(config) => config.schema_definition(log_namespace),
463            #[cfg(feature = "syslog")]
464            DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
465            DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
466            DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
467            DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
468            DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
469            DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
470        }
471    }
472
473    /// Get the HTTP content type.
474    pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
475        match (&self, framer) {
476            (
477                DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
478                FramingConfig::NewlineDelimited(_),
479            ) => "application/x-ndjson",
480            (
481                DeserializerConfig::Gelf(_)
482                | DeserializerConfig::Json(_)
483                | DeserializerConfig::NativeJson(_),
484                FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
485                    character_delimited:
486                        CharacterDelimitedDecoderOptions {
487                            delimiter: b',',
488                            max_length: Some(usize::MAX),
489                        },
490                }),
491            ) => "application/json",
492            (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
493                "application/octet-stream"
494            }
495            (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
496            #[cfg(feature = "opentelemetry")]
497            (DeserializerConfig::Otlp(_), _) => "application/x-protobuf",
498            (
499                DeserializerConfig::Json(_)
500                | DeserializerConfig::NativeJson(_)
501                | DeserializerConfig::Bytes
502                | DeserializerConfig::Gelf(_)
503                | DeserializerConfig::Influxdb(_)
504                | DeserializerConfig::Vrl(_),
505                _,
506            ) => "text/plain",
507            #[cfg(feature = "syslog")]
508            (DeserializerConfig::Syslog(_), _) => "text/plain",
509        }
510    }
511}
512
513/// Parse structured events from bytes.
514#[allow(clippy::large_enum_variant)]
515#[derive(Clone)]
516pub enum Deserializer {
517    /// Uses a `AvroDeserializer` for deserialization.
518    Avro(AvroDeserializer),
519    /// Uses a `BytesDeserializer` for deserialization.
520    Bytes(BytesDeserializer),
521    /// Uses a `JsonDeserializer` for deserialization.
522    Json(JsonDeserializer),
523    /// Uses a `ProtobufDeserializer` for deserialization.
524    Protobuf(ProtobufDeserializer),
525    #[cfg(feature = "opentelemetry")]
526    /// Uses an `OtlpDeserializer` for deserialization.
527    Otlp(OtlpDeserializer),
528    #[cfg(feature = "syslog")]
529    /// Uses a `SyslogDeserializer` for deserialization.
530    Syslog(SyslogDeserializer),
531    /// Uses a `NativeDeserializer` for deserialization.
532    Native(NativeDeserializer),
533    /// Uses a `NativeDeserializer` for deserialization.
534    NativeJson(NativeJsonDeserializer),
535    /// Uses an opaque `Deserializer` implementation for deserialization.
536    Boxed(BoxedDeserializer),
537    /// Uses a `GelfDeserializer` for deserialization.
538    Gelf(GelfDeserializer),
539    /// Uses a `InfluxdbDeserializer` for deserialization.
540    Influxdb(InfluxdbDeserializer),
541    /// Uses a `VrlDeserializer` for deserialization.
542    Vrl(VrlDeserializer),
543}
544
545impl format::Deserializer for Deserializer {
546    fn parse(
547        &self,
548        bytes: Bytes,
549        log_namespace: LogNamespace,
550    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
551        match self {
552            Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
553            Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
554            Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
555            Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
556            #[cfg(feature = "opentelemetry")]
557            Deserializer::Otlp(deserializer) => deserializer.parse(bytes, log_namespace),
558            #[cfg(feature = "syslog")]
559            Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
560            Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
561            Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
562            Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
563            Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
564            Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
565            Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
566        }
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use super::*;
573
574    #[test]
575    fn gelf_stream_default_framing_is_null_delimited() {
576        let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
577        let framing_config = deserializer_config.default_stream_framing();
578        assert!(matches!(
579            framing_config,
580            FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
581                character_delimited: CharacterDelimitedDecoderOptions {
582                    delimiter: 0,
583                    max_length: None,
584                }
585            })
586        ));
587    }
588}