1mod error;
5pub mod format;
6pub mod framing;
7
8use std::fmt::Debug;
9
10use bytes::{Bytes, BytesMut};
11pub use error::StreamDecodingError;
12pub use format::{
13 BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
14 GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
15 InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
16 NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
17 NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
18 ProtobufDeserializerConfig, ProtobufDeserializerOptions,
19};
20#[cfg(feature = "syslog")]
21pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
22pub use framing::{
23 BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
24 CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
25 ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
26 LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
27 NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
28 OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
29};
30use smallvec::SmallVec;
31use vector_config::configurable_component;
32use vector_core::{
33 config::{DataType, LogNamespace},
34 event::Event,
35 schema,
36};
37
38use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
39use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
40
41#[derive(Debug)]
44pub enum Error {
45 FramingError(BoxedFramingError),
48 ParsingError(vector_common::Error),
50}
51
52impl std::fmt::Display for Error {
53 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 Self::FramingError(error) => write!(formatter, "FramingError({error})"),
56 Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
57 }
58 }
59}
60
61impl std::error::Error for Error {}
62
63impl From<std::io::Error> for Error {
64 fn from(error: std::io::Error) -> Self {
65 Self::FramingError(Box::new(error))
66 }
67}
68
69impl StreamDecodingError for Error {
70 fn can_continue(&self) -> bool {
71 match self {
72 Self::FramingError(error) => error.can_continue(),
73 Self::ParsingError(_) => true,
74 }
75 }
76}
77
78#[configurable_component]
84#[derive(Clone, Debug)]
85#[serde(tag = "method", rename_all = "snake_case")]
86#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
87pub enum FramingConfig {
88 Bytes,
90
91 CharacterDelimited(CharacterDelimitedDecoderConfig),
93
94 LengthDelimited(LengthDelimitedDecoderConfig),
96
97 NewlineDelimited(NewlineDelimitedDecoderConfig),
99
100 OctetCounting(OctetCountingDecoderConfig),
104
105 ChunkedGelf(ChunkedGelfDecoderConfig),
109
110 VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
113}
114
115impl From<BytesDecoderConfig> for FramingConfig {
116 fn from(_: BytesDecoderConfig) -> Self {
117 Self::Bytes
118 }
119}
120
121impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
122 fn from(config: CharacterDelimitedDecoderConfig) -> Self {
123 Self::CharacterDelimited(config)
124 }
125}
126
127impl From<LengthDelimitedDecoderConfig> for FramingConfig {
128 fn from(config: LengthDelimitedDecoderConfig) -> Self {
129 Self::LengthDelimited(config)
130 }
131}
132
133impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
134 fn from(config: NewlineDelimitedDecoderConfig) -> Self {
135 Self::NewlineDelimited(config)
136 }
137}
138
139impl From<OctetCountingDecoderConfig> for FramingConfig {
140 fn from(config: OctetCountingDecoderConfig) -> Self {
141 Self::OctetCounting(config)
142 }
143}
144
145impl From<ChunkedGelfDecoderConfig> for FramingConfig {
146 fn from(config: ChunkedGelfDecoderConfig) -> Self {
147 Self::ChunkedGelf(config)
148 }
149}
150
151impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
152 fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
153 Self::VarintLengthDelimited(config)
154 }
155}
156
157impl FramingConfig {
158 pub fn build(&self) -> Framer {
160 match self {
161 FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
162 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
163 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
164 FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
165 FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
166 FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
167 FramingConfig::VarintLengthDelimited(config) => {
168 Framer::VarintLengthDelimited(config.build())
169 }
170 }
171 }
172}
173
174#[derive(Debug, Clone)]
176pub enum Framer {
177 Bytes(BytesDecoder),
179 CharacterDelimited(CharacterDelimitedDecoder),
181 LengthDelimited(LengthDelimitedDecoder),
183 NewlineDelimited(NewlineDelimitedDecoder),
185 OctetCounting(OctetCountingDecoder),
187 Boxed(BoxedFramer),
189 ChunkedGelf(ChunkedGelfDecoder),
191 VarintLengthDelimited(VarintLengthDelimitedDecoder),
193}
194
195impl tokio_util::codec::Decoder for Framer {
196 type Item = Bytes;
197 type Error = BoxedFramingError;
198
199 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
200 match self {
201 Framer::Bytes(framer) => framer.decode(src),
202 Framer::CharacterDelimited(framer) => framer.decode(src),
203 Framer::LengthDelimited(framer) => framer.decode(src),
204 Framer::NewlineDelimited(framer) => framer.decode(src),
205 Framer::OctetCounting(framer) => framer.decode(src),
206 Framer::Boxed(framer) => framer.decode(src),
207 Framer::ChunkedGelf(framer) => framer.decode(src),
208 Framer::VarintLengthDelimited(framer) => framer.decode(src),
209 }
210 }
211
212 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
213 match self {
214 Framer::Bytes(framer) => framer.decode_eof(src),
215 Framer::CharacterDelimited(framer) => framer.decode_eof(src),
216 Framer::LengthDelimited(framer) => framer.decode_eof(src),
217 Framer::NewlineDelimited(framer) => framer.decode_eof(src),
218 Framer::OctetCounting(framer) => framer.decode_eof(src),
219 Framer::Boxed(framer) => framer.decode_eof(src),
220 Framer::ChunkedGelf(framer) => framer.decode_eof(src),
221 Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
222 }
223 }
224}
225
226#[configurable_component]
229#[derive(Clone, Debug)]
230#[serde(tag = "codec", rename_all = "snake_case")]
231#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
232pub enum DeserializerConfig {
233 Bytes,
235
236 Json(JsonDeserializerConfig),
240
241 Protobuf(ProtobufDeserializerConfig),
245
246 #[cfg(feature = "syslog")]
247 Syslog(SyslogDeserializerConfig),
255
256 Native,
265
266 NativeJson(NativeJsonDeserializerConfig),
275
276 Gelf(GelfDeserializerConfig),
293
294 Influxdb(InfluxdbDeserializerConfig),
298
299 Avro {
303 avro: AvroDeserializerOptions,
305 },
306
307 Vrl(VrlDeserializerConfig),
311}
312
313impl From<BytesDeserializerConfig> for DeserializerConfig {
314 fn from(_: BytesDeserializerConfig) -> Self {
315 Self::Bytes
316 }
317}
318
319impl From<JsonDeserializerConfig> for DeserializerConfig {
320 fn from(config: JsonDeserializerConfig) -> Self {
321 Self::Json(config)
322 }
323}
324
325#[cfg(feature = "syslog")]
326impl From<SyslogDeserializerConfig> for DeserializerConfig {
327 fn from(config: SyslogDeserializerConfig) -> Self {
328 Self::Syslog(config)
329 }
330}
331
332impl From<GelfDeserializerConfig> for DeserializerConfig {
333 fn from(config: GelfDeserializerConfig) -> Self {
334 Self::Gelf(config)
335 }
336}
337
338impl From<NativeDeserializerConfig> for DeserializerConfig {
339 fn from(_: NativeDeserializerConfig) -> Self {
340 Self::Native
341 }
342}
343
344impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
345 fn from(config: NativeJsonDeserializerConfig) -> Self {
346 Self::NativeJson(config)
347 }
348}
349
350impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
351 fn from(config: InfluxdbDeserializerConfig) -> Self {
352 Self::Influxdb(config)
353 }
354}
355
356impl DeserializerConfig {
357 pub fn build(&self) -> vector_common::Result<Deserializer> {
359 match self {
360 DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
361 AvroDeserializerConfig {
362 avro_options: avro.clone(),
363 }
364 .build(),
365 )),
366 DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
367 DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
368 DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
369 #[cfg(feature = "syslog")]
370 DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
371 DeserializerConfig::Native => {
372 Ok(Deserializer::Native(NativeDeserializerConfig.build()))
373 }
374 DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
375 DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
376 DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
377 DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
378 }
379 }
380
381 pub fn default_stream_framing(&self) -> FramingConfig {
383 match self {
384 DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
385 DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
386 DeserializerConfig::Bytes
387 | DeserializerConfig::Json(_)
388 | DeserializerConfig::Influxdb(_)
389 | DeserializerConfig::NativeJson(_) => {
390 FramingConfig::NewlineDelimited(Default::default())
391 }
392 DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
393 #[cfg(feature = "syslog")]
394 DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
395 DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
396 DeserializerConfig::Gelf(_) => {
397 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
398 }
399 }
400 }
401
402 pub fn default_message_based_framing(&self) -> FramingConfig {
404 match self {
405 DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
406 _ => FramingConfig::Bytes,
407 }
408 }
409
410 pub fn output_type(&self) -> DataType {
412 match self {
413 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
414 avro_options: avro.clone(),
415 }
416 .output_type(),
417 DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
418 DeserializerConfig::Json(config) => config.output_type(),
419 DeserializerConfig::Protobuf(config) => config.output_type(),
420 #[cfg(feature = "syslog")]
421 DeserializerConfig::Syslog(config) => config.output_type(),
422 DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
423 DeserializerConfig::NativeJson(config) => config.output_type(),
424 DeserializerConfig::Gelf(config) => config.output_type(),
425 DeserializerConfig::Vrl(config) => config.output_type(),
426 DeserializerConfig::Influxdb(config) => config.output_type(),
427 }
428 }
429
430 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
432 match self {
433 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
434 avro_options: avro.clone(),
435 }
436 .schema_definition(log_namespace),
437 DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
438 DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
439 DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
440 #[cfg(feature = "syslog")]
441 DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
442 DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
443 DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
444 DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
445 DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
446 DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
447 }
448 }
449
450 pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
452 match (&self, framer) {
453 (
454 DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
455 FramingConfig::NewlineDelimited(_),
456 ) => "application/x-ndjson",
457 (
458 DeserializerConfig::Gelf(_)
459 | DeserializerConfig::Json(_)
460 | DeserializerConfig::NativeJson(_),
461 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
462 character_delimited:
463 CharacterDelimitedDecoderOptions {
464 delimiter: b',',
465 max_length: Some(usize::MAX),
466 },
467 }),
468 ) => "application/json",
469 (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
470 "application/octet-stream"
471 }
472 (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
473 (
474 DeserializerConfig::Json(_)
475 | DeserializerConfig::NativeJson(_)
476 | DeserializerConfig::Bytes
477 | DeserializerConfig::Gelf(_)
478 | DeserializerConfig::Influxdb(_)
479 | DeserializerConfig::Vrl(_),
480 _,
481 ) => "text/plain",
482 #[cfg(feature = "syslog")]
483 (DeserializerConfig::Syslog(_), _) => "text/plain",
484 }
485 }
486}
487
488#[allow(clippy::large_enum_variant)]
490#[derive(Clone)]
491pub enum Deserializer {
492 Avro(AvroDeserializer),
494 Bytes(BytesDeserializer),
496 Json(JsonDeserializer),
498 Protobuf(ProtobufDeserializer),
500 #[cfg(feature = "syslog")]
501 Syslog(SyslogDeserializer),
503 Native(NativeDeserializer),
505 NativeJson(NativeJsonDeserializer),
507 Boxed(BoxedDeserializer),
509 Gelf(GelfDeserializer),
511 Influxdb(InfluxdbDeserializer),
513 Vrl(VrlDeserializer),
515}
516
517impl format::Deserializer for Deserializer {
518 fn parse(
519 &self,
520 bytes: Bytes,
521 log_namespace: LogNamespace,
522 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
523 match self {
524 Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
525 Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
526 Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
527 Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
528 #[cfg(feature = "syslog")]
529 Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
530 Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
531 Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
532 Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
533 Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
534 Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
535 Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
536 }
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543
544 #[test]
545 fn gelf_stream_default_framing_is_null_delimited() {
546 let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
547 let framing_config = deserializer_config.default_stream_framing();
548 assert!(matches!(
549 framing_config,
550 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
551 character_delimited: CharacterDelimitedDecoderOptions {
552 delimiter: 0,
553 max_length: None,
554 }
555 })
556 ));
557 }
558}