1mod error;
5pub mod format;
6pub mod framing;
7
8use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
9use bytes::{Bytes, BytesMut};
10pub use error::StreamDecodingError;
11pub use format::{
12 BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
13 GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
14 InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
15 NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
16 NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
17 ProtobufDeserializerConfig, ProtobufDeserializerOptions,
18};
19#[cfg(feature = "syslog")]
20pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
21pub use framing::{
22 BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
23 CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
24 ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
25 LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
26 NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
27 OctetCountingDecoderOptions,
28};
29use smallvec::SmallVec;
30use std::fmt::Debug;
31use vector_config::configurable_component;
32use vector_core::{
33 config::{DataType, LogNamespace},
34 event::Event,
35 schema,
36};
37
38use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
39
40#[derive(Debug)]
43pub enum Error {
44 FramingError(BoxedFramingError),
47 ParsingError(vector_common::Error),
49}
50
51impl std::fmt::Display for Error {
52 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
54 Self::FramingError(error) => write!(formatter, "FramingError({error})"),
55 Self::ParsingError(error) => write!(formatter, "ParsingError({error})"),
56 }
57 }
58}
59
60impl std::error::Error for Error {}
61
62impl From<std::io::Error> for Error {
63 fn from(error: std::io::Error) -> Self {
64 Self::FramingError(Box::new(error))
65 }
66}
67
68impl StreamDecodingError for Error {
69 fn can_continue(&self) -> bool {
70 match self {
71 Self::FramingError(error) => error.can_continue(),
72 Self::ParsingError(_) => true,
73 }
74 }
75}
76
77#[configurable_component]
83#[derive(Clone, Debug)]
84#[serde(tag = "method", rename_all = "snake_case")]
85#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
86pub enum FramingConfig {
87 Bytes,
89
90 CharacterDelimited(CharacterDelimitedDecoderConfig),
92
93 LengthDelimited(LengthDelimitedDecoderConfig),
95
96 NewlineDelimited(NewlineDelimitedDecoderConfig),
98
99 OctetCounting(OctetCountingDecoderConfig),
103
104 ChunkedGelf(ChunkedGelfDecoderConfig),
108}
109
110impl From<BytesDecoderConfig> for FramingConfig {
111 fn from(_: BytesDecoderConfig) -> Self {
112 Self::Bytes
113 }
114}
115
116impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
117 fn from(config: CharacterDelimitedDecoderConfig) -> Self {
118 Self::CharacterDelimited(config)
119 }
120}
121
122impl From<LengthDelimitedDecoderConfig> for FramingConfig {
123 fn from(config: LengthDelimitedDecoderConfig) -> Self {
124 Self::LengthDelimited(config)
125 }
126}
127
128impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
129 fn from(config: NewlineDelimitedDecoderConfig) -> Self {
130 Self::NewlineDelimited(config)
131 }
132}
133
134impl From<OctetCountingDecoderConfig> for FramingConfig {
135 fn from(config: OctetCountingDecoderConfig) -> Self {
136 Self::OctetCounting(config)
137 }
138}
139
140impl From<ChunkedGelfDecoderConfig> for FramingConfig {
141 fn from(config: ChunkedGelfDecoderConfig) -> Self {
142 Self::ChunkedGelf(config)
143 }
144}
145
146impl FramingConfig {
147 pub fn build(&self) -> Framer {
149 match self {
150 FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
151 FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
152 FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
153 FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
154 FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
155 FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
156 }
157 }
158}
159
160#[derive(Debug, Clone)]
162pub enum Framer {
163 Bytes(BytesDecoder),
165 CharacterDelimited(CharacterDelimitedDecoder),
167 LengthDelimited(LengthDelimitedDecoder),
169 NewlineDelimited(NewlineDelimitedDecoder),
171 OctetCounting(OctetCountingDecoder),
173 Boxed(BoxedFramer),
175 ChunkedGelf(ChunkedGelfDecoder),
177}
178
179impl tokio_util::codec::Decoder for Framer {
180 type Item = Bytes;
181 type Error = BoxedFramingError;
182
183 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
184 match self {
185 Framer::Bytes(framer) => framer.decode(src),
186 Framer::CharacterDelimited(framer) => framer.decode(src),
187 Framer::LengthDelimited(framer) => framer.decode(src),
188 Framer::NewlineDelimited(framer) => framer.decode(src),
189 Framer::OctetCounting(framer) => framer.decode(src),
190 Framer::Boxed(framer) => framer.decode(src),
191 Framer::ChunkedGelf(framer) => framer.decode(src),
192 }
193 }
194
195 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
196 match self {
197 Framer::Bytes(framer) => framer.decode_eof(src),
198 Framer::CharacterDelimited(framer) => framer.decode_eof(src),
199 Framer::LengthDelimited(framer) => framer.decode_eof(src),
200 Framer::NewlineDelimited(framer) => framer.decode_eof(src),
201 Framer::OctetCounting(framer) => framer.decode_eof(src),
202 Framer::Boxed(framer) => framer.decode_eof(src),
203 Framer::ChunkedGelf(framer) => framer.decode_eof(src),
204 }
205 }
206}
207
208#[configurable_component]
211#[derive(Clone, Debug)]
212#[serde(tag = "codec", rename_all = "snake_case")]
213#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
214pub enum DeserializerConfig {
215 Bytes,
217
218 Json(JsonDeserializerConfig),
222
223 Protobuf(ProtobufDeserializerConfig),
227
228 #[cfg(feature = "syslog")]
229 Syslog(SyslogDeserializerConfig),
237
238 Native,
247
248 NativeJson(NativeJsonDeserializerConfig),
257
258 Gelf(GelfDeserializerConfig),
275
276 Influxdb(InfluxdbDeserializerConfig),
280
281 Avro {
285 avro: AvroDeserializerOptions,
287 },
288
289 Vrl(VrlDeserializerConfig),
293}
294
295impl From<BytesDeserializerConfig> for DeserializerConfig {
296 fn from(_: BytesDeserializerConfig) -> Self {
297 Self::Bytes
298 }
299}
300
301impl From<JsonDeserializerConfig> for DeserializerConfig {
302 fn from(config: JsonDeserializerConfig) -> Self {
303 Self::Json(config)
304 }
305}
306
307#[cfg(feature = "syslog")]
308impl From<SyslogDeserializerConfig> for DeserializerConfig {
309 fn from(config: SyslogDeserializerConfig) -> Self {
310 Self::Syslog(config)
311 }
312}
313
314impl From<GelfDeserializerConfig> for DeserializerConfig {
315 fn from(config: GelfDeserializerConfig) -> Self {
316 Self::Gelf(config)
317 }
318}
319
320impl From<NativeDeserializerConfig> for DeserializerConfig {
321 fn from(_: NativeDeserializerConfig) -> Self {
322 Self::Native
323 }
324}
325
326impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
327 fn from(config: NativeJsonDeserializerConfig) -> Self {
328 Self::NativeJson(config)
329 }
330}
331
332impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
333 fn from(config: InfluxdbDeserializerConfig) -> Self {
334 Self::Influxdb(config)
335 }
336}
337
338impl DeserializerConfig {
339 pub fn build(&self) -> vector_common::Result<Deserializer> {
341 match self {
342 DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
343 AvroDeserializerConfig {
344 avro_options: avro.clone(),
345 }
346 .build(),
347 )),
348 DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
349 DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
350 DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
351 #[cfg(feature = "syslog")]
352 DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
353 DeserializerConfig::Native => {
354 Ok(Deserializer::Native(NativeDeserializerConfig.build()))
355 }
356 DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
357 DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
358 DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
359 DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
360 }
361 }
362
363 pub fn default_stream_framing(&self) -> FramingConfig {
365 match self {
366 DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
367 DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
368 DeserializerConfig::Bytes
369 | DeserializerConfig::Json(_)
370 | DeserializerConfig::Influxdb(_)
371 | DeserializerConfig::NativeJson(_) => {
372 FramingConfig::NewlineDelimited(Default::default())
373 }
374 DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
375 #[cfg(feature = "syslog")]
376 DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
377 DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
378 DeserializerConfig::Gelf(_) => {
379 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
380 }
381 }
382 }
383
384 pub fn default_message_based_framing(&self) -> FramingConfig {
386 match self {
387 DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
388 _ => FramingConfig::Bytes,
389 }
390 }
391
392 pub fn output_type(&self) -> DataType {
394 match self {
395 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
396 avro_options: avro.clone(),
397 }
398 .output_type(),
399 DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
400 DeserializerConfig::Json(config) => config.output_type(),
401 DeserializerConfig::Protobuf(config) => config.output_type(),
402 #[cfg(feature = "syslog")]
403 DeserializerConfig::Syslog(config) => config.output_type(),
404 DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
405 DeserializerConfig::NativeJson(config) => config.output_type(),
406 DeserializerConfig::Gelf(config) => config.output_type(),
407 DeserializerConfig::Vrl(config) => config.output_type(),
408 DeserializerConfig::Influxdb(config) => config.output_type(),
409 }
410 }
411
412 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
414 match self {
415 DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
416 avro_options: avro.clone(),
417 }
418 .schema_definition(log_namespace),
419 DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
420 DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
421 DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
422 #[cfg(feature = "syslog")]
423 DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
424 DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
425 DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
426 DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
427 DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
428 DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
429 }
430 }
431
432 pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
434 match (&self, framer) {
435 (
436 DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
437 FramingConfig::NewlineDelimited(_),
438 ) => "application/x-ndjson",
439 (
440 DeserializerConfig::Gelf(_)
441 | DeserializerConfig::Json(_)
442 | DeserializerConfig::NativeJson(_),
443 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
444 character_delimited:
445 CharacterDelimitedDecoderOptions {
446 delimiter: b',',
447 max_length: Some(usize::MAX),
448 },
449 }),
450 ) => "application/json",
451 (DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
452 "application/octet-stream"
453 }
454 (DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
455 (
456 DeserializerConfig::Json(_)
457 | DeserializerConfig::NativeJson(_)
458 | DeserializerConfig::Bytes
459 | DeserializerConfig::Gelf(_)
460 | DeserializerConfig::Influxdb(_)
461 | DeserializerConfig::Vrl(_),
462 _,
463 ) => "text/plain",
464 #[cfg(feature = "syslog")]
465 (DeserializerConfig::Syslog(_), _) => "text/plain",
466 }
467 }
468}
469
470#[allow(clippy::large_enum_variant)]
472#[derive(Clone)]
473pub enum Deserializer {
474 Avro(AvroDeserializer),
476 Bytes(BytesDeserializer),
478 Json(JsonDeserializer),
480 Protobuf(ProtobufDeserializer),
482 #[cfg(feature = "syslog")]
483 Syslog(SyslogDeserializer),
485 Native(NativeDeserializer),
487 NativeJson(NativeJsonDeserializer),
489 Boxed(BoxedDeserializer),
491 Gelf(GelfDeserializer),
493 Influxdb(InfluxdbDeserializer),
495 Vrl(VrlDeserializer),
497}
498
499impl format::Deserializer for Deserializer {
500 fn parse(
501 &self,
502 bytes: Bytes,
503 log_namespace: LogNamespace,
504 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
505 match self {
506 Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
507 Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
508 Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
509 Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
510 #[cfg(feature = "syslog")]
511 Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
512 Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
513 Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
514 Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
515 Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
516 Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
517 Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
518 }
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 #[test]
527 fn gelf_stream_default_framing_is_null_delimited() {
528 let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
529 let framing_config = deserializer_config.default_stream_framing();
530 assert!(matches!(
531 framing_config,
532 FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
533 character_delimited: CharacterDelimitedDecoderOptions {
534 delimiter: 0,
535 max_length: None,
536 }
537 })
538 ));
539 }
540}