1mod config;
5mod decoder;
6mod error;
7pub mod format;
8pub mod framing;
9
10use std::fmt::Debug;
11
12use bytes::{Bytes, BytesMut};
13pub use config::DecodingConfig;
14pub use decoder::Decoder;
15pub use error::StreamDecodingError;
16pub use format::{
17 BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
18 GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
19 InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
20 NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
21 NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
22 ProtobufDeserializerConfig, ProtobufDeserializerOptions,
23};
24#[cfg(feature = "opentelemetry")]
25pub use format::{OtlpDeserializer, OtlpDeserializerConfig, OtlpSignalType};
26#[cfg(feature = "syslog")]
27pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
28pub use framing::{
29 BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
30 CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
31 ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
32 LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
33 NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
34 OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
35};
36use smallvec::SmallVec;
37use vector_config::configurable_component;
38use vector_core::{
39 config::{DataType, LogNamespace},
40 event::Event,
41 schema,
42};
43
44use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
45use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
46
47#[derive(Debug)]
50pub enum Error {
51 FramingError(BoxedFramingError),
54 ParsingError(vector_common::Error),
56}
57
58impl std::fmt::Display for Error {
59 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 match self {
61 Self::FramingError(error) => write!(formatter, "FramingError({error})"),
62 Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
63 }
64 }
65}
66
67impl std::error::Error for Error {}
68
69impl From<std::io::Error> for Error {
70 fn from(error: std::io::Error) -> Self {
71 Self::FramingError(Box::new(error))
72 }
73}
74
75impl StreamDecodingError for Error {
76 fn can_continue(&self) -> bool {
77 match self {
78 Self::FramingError(error) => error.can_continue(),
79 Self::ParsingError(_) => true,
80 }
81 }
82}
83
84#[configurable_component]
90#[derive(Clone, Debug)]
91#[serde(tag = "method", rename_all = "snake_case")]
92#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
93pub enum FramingConfig {
94 Bytes,
96
97 CharacterDelimited(CharacterDelimitedDecoderConfig),
99
100 LengthDelimited(LengthDelimitedDecoderConfig),
102
103 NewlineDelimited(NewlineDelimitedDecoderConfig),
105
106 OctetCounting(OctetCountingDecoderConfig),
110
111 ChunkedGelf(ChunkedGelfDecoderConfig),
115
116 VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
119}
120
121impl From<BytesDecoderConfig> for FramingConfig {
122 fn from(_: BytesDecoderConfig) -> Self {
123 Self::Bytes
124 }
125}
126
127impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
128 fn from(config: CharacterDelimitedDecoderConfig) -> Self {
129 Self::CharacterDelimited(config)
130 }
131}
132
133impl From<LengthDelimitedDecoderConfig> for FramingConfig {
134 fn from(config: LengthDelimitedDecoderConfig) -> Self {
135 Self::LengthDelimited(config)
136 }
137}
138
139impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
140 fn from(config: NewlineDelimitedDecoderConfig) -> Self {
141 Self::NewlineDelimited(config)
142 }
143}
144
145impl From<OctetCountingDecoderConfig> for FramingConfig {
146 fn from(config: OctetCountingDecoderConfig) -> Self {
147 Self::OctetCounting(config)
148 }
149}
150
151impl From<ChunkedGelfDecoderConfig> for FramingConfig {
152 fn from(config: ChunkedGelfDecoderConfig) -> Self {
153 Self::ChunkedGelf(config)
154 }
155}
156
157impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
158 fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
159 Self::VarintLengthDelimited(config)
160 }
161}
162
163impl FramingConfig {
164 pub fn build(&self) -> Framer {
166 match self {
167 FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
168 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
169 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
170 FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
171 FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
172 FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
173 FramingConfig::VarintLengthDelimited(config) => {
174 Framer::VarintLengthDelimited(config.build())
175 }
176 }
177 }
178}
179
180#[derive(Debug, Clone)]
182pub enum Framer {
183 Bytes(BytesDecoder),
185 CharacterDelimited(CharacterDelimitedDecoder),
187 LengthDelimited(LengthDelimitedDecoder),
189 NewlineDelimited(NewlineDelimitedDecoder),
191 OctetCounting(OctetCountingDecoder),
193 Boxed(BoxedFramer),
195 ChunkedGelf(ChunkedGelfDecoder),
197 VarintLengthDelimited(VarintLengthDelimitedDecoder),
199}
200
201impl tokio_util::codec::Decoder for Framer {
202 type Item = Bytes;
203 type Error = BoxedFramingError;
204
205 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
206 match self {
207 Framer::Bytes(framer) => framer.decode(src),
208 Framer::CharacterDelimited(framer) => framer.decode(src),
209 Framer::LengthDelimited(framer) => framer.decode(src),
210 Framer::NewlineDelimited(framer) => framer.decode(src),
211 Framer::OctetCounting(framer) => framer.decode(src),
212 Framer::Boxed(framer) => framer.decode(src),
213 Framer::ChunkedGelf(framer) => framer.decode(src),
214 Framer::VarintLengthDelimited(framer) => framer.decode(src),
215 }
216 }
217
218 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
219 match self {
220 Framer::Bytes(framer) => framer.decode_eof(src),
221 Framer::CharacterDelimited(framer) => framer.decode_eof(src),
222 Framer::LengthDelimited(framer) => framer.decode_eof(src),
223 Framer::NewlineDelimited(framer) => framer.decode_eof(src),
224 Framer::OctetCounting(framer) => framer.decode_eof(src),
225 Framer::Boxed(framer) => framer.decode_eof(src),
226 Framer::ChunkedGelf(framer) => framer.decode_eof(src),
227 Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
228 }
229 }
230}
231
232#[configurable_component]
235#[derive(Clone, Debug)]
236#[serde(tag = "codec", rename_all = "snake_case")]
237#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
238pub enum DeserializerConfig {
239 Bytes,
241
242 Json(JsonDeserializerConfig),
246
247 Protobuf(ProtobufDeserializerConfig),
251
252 #[cfg(feature = "opentelemetry")]
253 Otlp(OtlpDeserializerConfig),
260
261 #[cfg(feature = "syslog")]
262 Syslog(SyslogDeserializerConfig),
270
271 Native,
280
281 NativeJson(NativeJsonDeserializerConfig),
290
291 Gelf(GelfDeserializerConfig),
308
309 Influxdb(InfluxdbDeserializerConfig),
313
314 Avro {
318 avro: AvroDeserializerOptions,
320 },
321
322 Vrl(VrlDeserializerConfig),
326}
327
328impl From<BytesDeserializerConfig> for DeserializerConfig {
329 fn from(_: BytesDeserializerConfig) -> Self {
330 Self::Bytes
331 }
332}
333
334impl From<JsonDeserializerConfig> for DeserializerConfig {
335 fn from(config: JsonDeserializerConfig) -> Self {
336 Self::Json(config)
337 }
338}
339
340#[cfg(feature = "syslog")]
341impl From<SyslogDeserializerConfig> for DeserializerConfig {
342 fn from(config: SyslogDeserializerConfig) -> Self {
343 Self::Syslog(config)
344 }
345}
346
347impl From<GelfDeserializerConfig> for DeserializerConfig {
348 fn from(config: GelfDeserializerConfig) -> Self {
349 Self::Gelf(config)
350 }
351}
352
353impl From<NativeDeserializerConfig> for DeserializerConfig {
354 fn from(_: NativeDeserializerConfig) -> Self {
355 Self::Native
356 }
357}
358
359impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
360 fn from(config: NativeJsonDeserializerConfig) -> Self {
361 Self::NativeJson(config)
362 }
363}
364
365impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
366 fn from(config: InfluxdbDeserializerConfig) -> Self {
367 Self::Influxdb(config)
368 }
369}
370
371impl DeserializerConfig {
372 pub fn build(&self) -> vector_common::Result<Deserializer> {
374 match self {
375 DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
376 AvroDeserializerConfig {
377 avro_options: avro.clone(),
378 }
379 .build()?,
380 )),
381 DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
382 DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
383 DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
384 #[cfg(feature = "opentelemetry")]
385 DeserializerConfig::Otlp(config) => Ok(Deserializer::Otlp(config.build())),
386 #[cfg(feature = "syslog")]
387 DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
388 DeserializerConfig::Native => {
389 Ok(Deserializer::Native(NativeDeserializerConfig.build()))
390 }
391 DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
392 DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
393 DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
394 DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
395 }
396 }
397
398 pub fn default_stream_framing(&self) -> FramingConfig {
400 match self {
401 DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
402 DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
403 DeserializerConfig::Bytes
404 | DeserializerConfig::Json(_)
405 | DeserializerConfig::Influxdb(_)
406 | DeserializerConfig::NativeJson(_) => {
407 FramingConfig::NewlineDelimited(Default::default())
408 }
409 DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
410 #[cfg(feature = "opentelemetry")]
411 DeserializerConfig::Otlp(_) => FramingConfig::Bytes,
412 #[cfg(feature = "syslog")]
413 DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
414 DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
415 DeserializerConfig::Gelf(_) => {
416 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
417 }
418 }
419 }
420
421 pub fn default_message_based_framing(&self) -> FramingConfig {
423 match self {
424 DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
425 _ => FramingConfig::Bytes,
426 }
427 }
428
429 pub fn output_type(&self) -> DataType {
431 match self {
432 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
433 avro_options: avro.clone(),
434 }
435 .output_type(),
436 DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
437 DeserializerConfig::Json(config) => config.output_type(),
438 DeserializerConfig::Protobuf(config) => config.output_type(),
439 #[cfg(feature = "opentelemetry")]
440 DeserializerConfig::Otlp(config) => config.output_type(),
441 #[cfg(feature = "syslog")]
442 DeserializerConfig::Syslog(config) => config.output_type(),
443 DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
444 DeserializerConfig::NativeJson(config) => config.output_type(),
445 DeserializerConfig::Gelf(config) => config.output_type(),
446 DeserializerConfig::Vrl(config) => config.output_type(),
447 DeserializerConfig::Influxdb(config) => config.output_type(),
448 }
449 }
450
451 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
453 match self {
454 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
455 avro_options: avro.clone(),
456 }
457 .schema_definition(log_namespace),
458 DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
459 DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
460 DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
461 #[cfg(feature = "opentelemetry")]
462 DeserializerConfig::Otlp(config) => config.schema_definition(log_namespace),
463 #[cfg(feature = "syslog")]
464 DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
465 DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
466 DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
467 DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
468 DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
469 DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
470 }
471 }
472
473 pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
475 match (&self, framer) {
476 (
477 DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
478 FramingConfig::NewlineDelimited(_),
479 ) => "application/x-ndjson",
480 (
481 DeserializerConfig::Gelf(_)
482 | DeserializerConfig::Json(_)
483 | DeserializerConfig::NativeJson(_),
484 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
485 character_delimited:
486 CharacterDelimitedDecoderOptions {
487 delimiter: b',',
488 max_length: Some(usize::MAX),
489 },
490 }),
491 ) => "application/json",
492 (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
493 "application/octet-stream"
494 }
495 (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
496 #[cfg(feature = "opentelemetry")]
497 (DeserializerConfig::Otlp(_), _) => "application/x-protobuf",
498 (
499 DeserializerConfig::Json(_)
500 | DeserializerConfig::NativeJson(_)
501 | DeserializerConfig::Bytes
502 | DeserializerConfig::Gelf(_)
503 | DeserializerConfig::Influxdb(_)
504 | DeserializerConfig::Vrl(_),
505 _,
506 ) => "text/plain",
507 #[cfg(feature = "syslog")]
508 (DeserializerConfig::Syslog(_), _) => "text/plain",
509 }
510 }
511}
512
513#[allow(clippy::large_enum_variant)]
515#[derive(Clone)]
516pub enum Deserializer {
517 Avro(AvroDeserializer),
519 Bytes(BytesDeserializer),
521 Json(JsonDeserializer),
523 Protobuf(ProtobufDeserializer),
525 #[cfg(feature = "opentelemetry")]
526 Otlp(OtlpDeserializer),
528 #[cfg(feature = "syslog")]
529 Syslog(SyslogDeserializer),
531 Native(NativeDeserializer),
533 NativeJson(NativeJsonDeserializer),
535 Boxed(BoxedDeserializer),
537 Gelf(GelfDeserializer),
539 Influxdb(InfluxdbDeserializer),
541 Vrl(VrlDeserializer),
543}
544
545impl format::Deserializer for Deserializer {
546 fn parse(
547 &self,
548 bytes: Bytes,
549 log_namespace: LogNamespace,
550 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
551 match self {
552 Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
553 Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
554 Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
555 Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
556 #[cfg(feature = "opentelemetry")]
557 Deserializer::Otlp(deserializer) => deserializer.parse(bytes, log_namespace),
558 #[cfg(feature = "syslog")]
559 Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
560 Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
561 Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
562 Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
563 Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
564 Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
565 Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
566 }
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use super::*;
573
574 #[test]
575 fn gelf_stream_default_framing_is_null_delimited() {
576 let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
577 let framing_config = deserializer_config.default_stream_framing();
578 assert!(matches!(
579 framing_config,
580 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
581 character_delimited: CharacterDelimitedDecoderOptions {
582 delimiter: 0,
583 max_length: None,
584 }
585 })
586 ));
587 }
588}