1mod error;
5pub mod format;
6pub mod framing;
7
8use std::fmt::Debug;
9
10use bytes::{Bytes, BytesMut};
11pub use error::StreamDecodingError;
12pub use format::{
13 BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
14 GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
15 InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
16 NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
17 NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
18 ProtobufDeserializerConfig, ProtobufDeserializerOptions,
19};
20#[cfg(feature = "opentelemetry")]
21pub use format::{OtlpDeserializer, OtlpDeserializerConfig, OtlpSignalType};
22#[cfg(feature = "syslog")]
23pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
24pub use framing::{
25 BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
26 CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
27 ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
28 LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
29 NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
30 OctetCountingDecoderOptions, VarintLengthDelimitedDecoder, VarintLengthDelimitedDecoderConfig,
31};
32use smallvec::SmallVec;
33use vector_config::configurable_component;
34use vector_core::{
35 config::{DataType, LogNamespace},
36 event::Event,
37 schema,
38};
39
40use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
41use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
42
43#[derive(Debug)]
46pub enum Error {
47 FramingError(BoxedFramingError),
50 ParsingError(vector_common::Error),
52}
53
54impl std::fmt::Display for Error {
55 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 match self {
57 Self::FramingError(error) => write!(formatter, "FramingError({error})"),
58 Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
59 }
60 }
61}
62
63impl std::error::Error for Error {}
64
65impl From<std::io::Error> for Error {
66 fn from(error: std::io::Error) -> Self {
67 Self::FramingError(Box::new(error))
68 }
69}
70
71impl StreamDecodingError for Error {
72 fn can_continue(&self) -> bool {
73 match self {
74 Self::FramingError(error) => error.can_continue(),
75 Self::ParsingError(_) => true,
76 }
77 }
78}
79
80#[configurable_component]
86#[derive(Clone, Debug)]
87#[serde(tag = "method", rename_all = "snake_case")]
88#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
89pub enum FramingConfig {
90 Bytes,
92
93 CharacterDelimited(CharacterDelimitedDecoderConfig),
95
96 LengthDelimited(LengthDelimitedDecoderConfig),
98
99 NewlineDelimited(NewlineDelimitedDecoderConfig),
101
102 OctetCounting(OctetCountingDecoderConfig),
106
107 ChunkedGelf(ChunkedGelfDecoderConfig),
111
112 VarintLengthDelimited(VarintLengthDelimitedDecoderConfig),
115}
116
117impl From<BytesDecoderConfig> for FramingConfig {
118 fn from(_: BytesDecoderConfig) -> Self {
119 Self::Bytes
120 }
121}
122
123impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
124 fn from(config: CharacterDelimitedDecoderConfig) -> Self {
125 Self::CharacterDelimited(config)
126 }
127}
128
129impl From<LengthDelimitedDecoderConfig> for FramingConfig {
130 fn from(config: LengthDelimitedDecoderConfig) -> Self {
131 Self::LengthDelimited(config)
132 }
133}
134
135impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
136 fn from(config: NewlineDelimitedDecoderConfig) -> Self {
137 Self::NewlineDelimited(config)
138 }
139}
140
141impl From<OctetCountingDecoderConfig> for FramingConfig {
142 fn from(config: OctetCountingDecoderConfig) -> Self {
143 Self::OctetCounting(config)
144 }
145}
146
147impl From<ChunkedGelfDecoderConfig> for FramingConfig {
148 fn from(config: ChunkedGelfDecoderConfig) -> Self {
149 Self::ChunkedGelf(config)
150 }
151}
152
153impl From<VarintLengthDelimitedDecoderConfig> for FramingConfig {
154 fn from(config: VarintLengthDelimitedDecoderConfig) -> Self {
155 Self::VarintLengthDelimited(config)
156 }
157}
158
159impl FramingConfig {
160 pub fn build(&self) -> Framer {
162 match self {
163 FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
164 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
165 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
166 FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
167 FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
168 FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
169 FramingConfig::VarintLengthDelimited(config) => {
170 Framer::VarintLengthDelimited(config.build())
171 }
172 }
173 }
174}
175
176#[derive(Debug, Clone)]
178pub enum Framer {
179 Bytes(BytesDecoder),
181 CharacterDelimited(CharacterDelimitedDecoder),
183 LengthDelimited(LengthDelimitedDecoder),
185 NewlineDelimited(NewlineDelimitedDecoder),
187 OctetCounting(OctetCountingDecoder),
189 Boxed(BoxedFramer),
191 ChunkedGelf(ChunkedGelfDecoder),
193 VarintLengthDelimited(VarintLengthDelimitedDecoder),
195}
196
197impl tokio_util::codec::Decoder for Framer {
198 type Item = Bytes;
199 type Error = BoxedFramingError;
200
201 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
202 match self {
203 Framer::Bytes(framer) => framer.decode(src),
204 Framer::CharacterDelimited(framer) => framer.decode(src),
205 Framer::LengthDelimited(framer) => framer.decode(src),
206 Framer::NewlineDelimited(framer) => framer.decode(src),
207 Framer::OctetCounting(framer) => framer.decode(src),
208 Framer::Boxed(framer) => framer.decode(src),
209 Framer::ChunkedGelf(framer) => framer.decode(src),
210 Framer::VarintLengthDelimited(framer) => framer.decode(src),
211 }
212 }
213
214 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
215 match self {
216 Framer::Bytes(framer) => framer.decode_eof(src),
217 Framer::CharacterDelimited(framer) => framer.decode_eof(src),
218 Framer::LengthDelimited(framer) => framer.decode_eof(src),
219 Framer::NewlineDelimited(framer) => framer.decode_eof(src),
220 Framer::OctetCounting(framer) => framer.decode_eof(src),
221 Framer::Boxed(framer) => framer.decode_eof(src),
222 Framer::ChunkedGelf(framer) => framer.decode_eof(src),
223 Framer::VarintLengthDelimited(framer) => framer.decode_eof(src),
224 }
225 }
226}
227
228#[configurable_component]
231#[derive(Clone, Debug)]
232#[serde(tag = "codec", rename_all = "snake_case")]
233#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
234pub enum DeserializerConfig {
235 Bytes,
237
238 Json(JsonDeserializerConfig),
242
243 Protobuf(ProtobufDeserializerConfig),
247
248 #[cfg(feature = "opentelemetry")]
249 Otlp(OtlpDeserializerConfig),
256
257 #[cfg(feature = "syslog")]
258 Syslog(SyslogDeserializerConfig),
266
267 Native,
276
277 NativeJson(NativeJsonDeserializerConfig),
286
287 Gelf(GelfDeserializerConfig),
304
305 Influxdb(InfluxdbDeserializerConfig),
309
310 Avro {
314 avro: AvroDeserializerOptions,
316 },
317
318 Vrl(VrlDeserializerConfig),
322}
323
324impl From<BytesDeserializerConfig> for DeserializerConfig {
325 fn from(_: BytesDeserializerConfig) -> Self {
326 Self::Bytes
327 }
328}
329
330impl From<JsonDeserializerConfig> for DeserializerConfig {
331 fn from(config: JsonDeserializerConfig) -> Self {
332 Self::Json(config)
333 }
334}
335
336#[cfg(feature = "syslog")]
337impl From<SyslogDeserializerConfig> for DeserializerConfig {
338 fn from(config: SyslogDeserializerConfig) -> Self {
339 Self::Syslog(config)
340 }
341}
342
343impl From<GelfDeserializerConfig> for DeserializerConfig {
344 fn from(config: GelfDeserializerConfig) -> Self {
345 Self::Gelf(config)
346 }
347}
348
349impl From<NativeDeserializerConfig> for DeserializerConfig {
350 fn from(_: NativeDeserializerConfig) -> Self {
351 Self::Native
352 }
353}
354
355impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
356 fn from(config: NativeJsonDeserializerConfig) -> Self {
357 Self::NativeJson(config)
358 }
359}
360
361impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
362 fn from(config: InfluxdbDeserializerConfig) -> Self {
363 Self::Influxdb(config)
364 }
365}
366
367impl DeserializerConfig {
368 pub fn build(&self) -> vector_common::Result<Deserializer> {
370 match self {
371 DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
372 AvroDeserializerConfig {
373 avro_options: avro.clone(),
374 }
375 .build(),
376 )),
377 DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
378 DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
379 DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
380 #[cfg(feature = "opentelemetry")]
381 DeserializerConfig::Otlp(config) => Ok(Deserializer::Otlp(config.build())),
382 #[cfg(feature = "syslog")]
383 DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
384 DeserializerConfig::Native => {
385 Ok(Deserializer::Native(NativeDeserializerConfig.build()))
386 }
387 DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
388 DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
389 DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
390 DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
391 }
392 }
393
394 pub fn default_stream_framing(&self) -> FramingConfig {
396 match self {
397 DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
398 DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
399 DeserializerConfig::Bytes
400 | DeserializerConfig::Json(_)
401 | DeserializerConfig::Influxdb(_)
402 | DeserializerConfig::NativeJson(_) => {
403 FramingConfig::NewlineDelimited(Default::default())
404 }
405 DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
406 #[cfg(feature = "opentelemetry")]
407 DeserializerConfig::Otlp(_) => FramingConfig::Bytes,
408 #[cfg(feature = "syslog")]
409 DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
410 DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
411 DeserializerConfig::Gelf(_) => {
412 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
413 }
414 }
415 }
416
417 pub fn default_message_based_framing(&self) -> FramingConfig {
419 match self {
420 DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
421 _ => FramingConfig::Bytes,
422 }
423 }
424
425 pub fn output_type(&self) -> DataType {
427 match self {
428 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
429 avro_options: avro.clone(),
430 }
431 .output_type(),
432 DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
433 DeserializerConfig::Json(config) => config.output_type(),
434 DeserializerConfig::Protobuf(config) => config.output_type(),
435 #[cfg(feature = "opentelemetry")]
436 DeserializerConfig::Otlp(config) => config.output_type(),
437 #[cfg(feature = "syslog")]
438 DeserializerConfig::Syslog(config) => config.output_type(),
439 DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
440 DeserializerConfig::NativeJson(config) => config.output_type(),
441 DeserializerConfig::Gelf(config) => config.output_type(),
442 DeserializerConfig::Vrl(config) => config.output_type(),
443 DeserializerConfig::Influxdb(config) => config.output_type(),
444 }
445 }
446
447 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
449 match self {
450 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
451 avro_options: avro.clone(),
452 }
453 .schema_definition(log_namespace),
454 DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
455 DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
456 DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
457 #[cfg(feature = "opentelemetry")]
458 DeserializerConfig::Otlp(config) => config.schema_definition(log_namespace),
459 #[cfg(feature = "syslog")]
460 DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
461 DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
462 DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
463 DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
464 DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
465 DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
466 }
467 }
468
469 pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
471 match (&self, framer) {
472 (
473 DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
474 FramingConfig::NewlineDelimited(_),
475 ) => "application/x-ndjson",
476 (
477 DeserializerConfig::Gelf(_)
478 | DeserializerConfig::Json(_)
479 | DeserializerConfig::NativeJson(_),
480 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
481 character_delimited:
482 CharacterDelimitedDecoderOptions {
483 delimiter: b',',
484 max_length: Some(usize::MAX),
485 },
486 }),
487 ) => "application/json",
488 (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
489 "application/octet-stream"
490 }
491 (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
492 #[cfg(feature = "opentelemetry")]
493 (DeserializerConfig::Otlp(_), _) => "application/x-protobuf",
494 (
495 DeserializerConfig::Json(_)
496 | DeserializerConfig::NativeJson(_)
497 | DeserializerConfig::Bytes
498 | DeserializerConfig::Gelf(_)
499 | DeserializerConfig::Influxdb(_)
500 | DeserializerConfig::Vrl(_),
501 _,
502 ) => "text/plain",
503 #[cfg(feature = "syslog")]
504 (DeserializerConfig::Syslog(_), _) => "text/plain",
505 }
506 }
507}
508
509#[allow(clippy::large_enum_variant)]
511#[derive(Clone)]
512pub enum Deserializer {
513 Avro(AvroDeserializer),
515 Bytes(BytesDeserializer),
517 Json(JsonDeserializer),
519 Protobuf(ProtobufDeserializer),
521 #[cfg(feature = "opentelemetry")]
522 Otlp(OtlpDeserializer),
524 #[cfg(feature = "syslog")]
525 Syslog(SyslogDeserializer),
527 Native(NativeDeserializer),
529 NativeJson(NativeJsonDeserializer),
531 Boxed(BoxedDeserializer),
533 Gelf(GelfDeserializer),
535 Influxdb(InfluxdbDeserializer),
537 Vrl(VrlDeserializer),
539}
540
541impl format::Deserializer for Deserializer {
542 fn parse(
543 &self,
544 bytes: Bytes,
545 log_namespace: LogNamespace,
546 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
547 match self {
548 Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
549 Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
550 Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
551 Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
552 #[cfg(feature = "opentelemetry")]
553 Deserializer::Otlp(deserializer) => deserializer.parse(bytes, log_namespace),
554 #[cfg(feature = "syslog")]
555 Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
556 Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
557 Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
558 Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
559 Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
560 Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
561 Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
562 }
563 }
564}
565
566#[cfg(test)]
567mod tests {
568 use super::*;
569
570 #[test]
571 fn gelf_stream_default_framing_is_null_delimited() {
572 let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
573 let framing_config = deserializer_config.default_stream_framing();
574 assert!(matches!(
575 framing_config,
576 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
577 character_delimited: CharacterDelimitedDecoderOptions {
578 delimiter: 0,
579 max_length: None,
580 }
581 })
582 ));
583 }
584}