mod error;
pub mod format;
pub mod framing;
use crate::decoding::format::{VrlDeserializer, VrlDeserializerConfig};
use bytes::{Bytes, BytesMut};
pub use error::StreamDecodingError;
pub use format::{
BoxedDeserializer, BytesDeserializer, BytesDeserializerConfig, GelfDeserializer,
GelfDeserializerConfig, GelfDeserializerOptions, InfluxdbDeserializer,
InfluxdbDeserializerConfig, JsonDeserializer, JsonDeserializerConfig, JsonDeserializerOptions,
NativeDeserializer, NativeDeserializerConfig, NativeJsonDeserializer,
NativeJsonDeserializerConfig, NativeJsonDeserializerOptions, ProtobufDeserializer,
ProtobufDeserializerConfig, ProtobufDeserializerOptions,
};
#[cfg(feature = "syslog")]
pub use format::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
pub use framing::{
BoxedFramer, BoxedFramingError, BytesDecoder, BytesDecoderConfig, CharacterDelimitedDecoder,
CharacterDelimitedDecoderConfig, CharacterDelimitedDecoderOptions, ChunkedGelfDecoder,
ChunkedGelfDecoderConfig, ChunkedGelfDecoderOptions, FramingError, LengthDelimitedDecoder,
LengthDelimitedDecoderConfig, NewlineDelimitedDecoder, NewlineDelimitedDecoderConfig,
NewlineDelimitedDecoderOptions, OctetCountingDecoder, OctetCountingDecoderConfig,
OctetCountingDecoderOptions,
};
use smallvec::SmallVec;
use std::fmt::Debug;
use vector_config::configurable_component;
use vector_core::{
config::{DataType, LogNamespace},
event::Event,
schema,
};
use self::format::{AvroDeserializer, AvroDeserializerConfig, AvroDeserializerOptions};
#[derive(Debug)]
pub enum Error {
FramingError(BoxedFramingError),
ParsingError(vector_common::Error),
}
impl std::fmt::Display for Error {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::FramingError(error) => write!(formatter, "FramingError({})", error),
Self::ParsingError(error) => write!(formatter, "ParsingError({})", error),
}
}
}
impl std::error::Error for Error {}
impl From<std::io::Error> for Error {
fn from(error: std::io::Error) -> Self {
Self::FramingError(Box::new(error))
}
}
impl StreamDecodingError for Error {
fn can_continue(&self) -> bool {
match self {
Self::FramingError(error) => error.can_continue(),
Self::ParsingError(_) => true,
}
}
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "method", rename_all = "snake_case")]
#[configurable(metadata(docs::enum_tag_description = "The framing method."))]
pub enum FramingConfig {
Bytes,
CharacterDelimited(CharacterDelimitedDecoderConfig),
LengthDelimited(LengthDelimitedDecoderConfig),
NewlineDelimited(NewlineDelimitedDecoderConfig),
OctetCounting(OctetCountingDecoderConfig),
ChunkedGelf(ChunkedGelfDecoderConfig),
}
impl From<BytesDecoderConfig> for FramingConfig {
fn from(_: BytesDecoderConfig) -> Self {
Self::Bytes
}
}
impl From<CharacterDelimitedDecoderConfig> for FramingConfig {
fn from(config: CharacterDelimitedDecoderConfig) -> Self {
Self::CharacterDelimited(config)
}
}
impl From<LengthDelimitedDecoderConfig> for FramingConfig {
fn from(config: LengthDelimitedDecoderConfig) -> Self {
Self::LengthDelimited(config)
}
}
impl From<NewlineDelimitedDecoderConfig> for FramingConfig {
fn from(config: NewlineDelimitedDecoderConfig) -> Self {
Self::NewlineDelimited(config)
}
}
impl From<OctetCountingDecoderConfig> for FramingConfig {
fn from(config: OctetCountingDecoderConfig) -> Self {
Self::OctetCounting(config)
}
}
impl From<ChunkedGelfDecoderConfig> for FramingConfig {
fn from(config: ChunkedGelfDecoderConfig) -> Self {
Self::ChunkedGelf(config)
}
}
impl FramingConfig {
pub fn build(&self) -> Framer {
match self {
FramingConfig::Bytes => Framer::Bytes(BytesDecoderConfig.build()),
FramingConfig::CharacterDelimited(config) => Framer::CharacterDelimited(config.build()),
FramingConfig::LengthDelimited(config) => Framer::LengthDelimited(config.build()),
FramingConfig::NewlineDelimited(config) => Framer::NewlineDelimited(config.build()),
FramingConfig::OctetCounting(config) => Framer::OctetCounting(config.build()),
FramingConfig::ChunkedGelf(config) => Framer::ChunkedGelf(config.build()),
}
}
}
#[derive(Debug, Clone)]
pub enum Framer {
Bytes(BytesDecoder),
CharacterDelimited(CharacterDelimitedDecoder),
LengthDelimited(LengthDelimitedDecoder),
NewlineDelimited(NewlineDelimitedDecoder),
OctetCounting(OctetCountingDecoder),
Boxed(BoxedFramer),
ChunkedGelf(ChunkedGelfDecoder),
}
impl tokio_util::codec::Decoder for Framer {
type Item = Bytes;
type Error = BoxedFramingError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
Framer::Bytes(framer) => framer.decode(src),
Framer::CharacterDelimited(framer) => framer.decode(src),
Framer::LengthDelimited(framer) => framer.decode(src),
Framer::NewlineDelimited(framer) => framer.decode(src),
Framer::OctetCounting(framer) => framer.decode(src),
Framer::Boxed(framer) => framer.decode(src),
Framer::ChunkedGelf(framer) => framer.decode(src),
}
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self {
Framer::Bytes(framer) => framer.decode_eof(src),
Framer::CharacterDelimited(framer) => framer.decode_eof(src),
Framer::LengthDelimited(framer) => framer.decode_eof(src),
Framer::NewlineDelimited(framer) => framer.decode_eof(src),
Framer::OctetCounting(framer) => framer.decode_eof(src),
Framer::Boxed(framer) => framer.decode_eof(src),
Framer::ChunkedGelf(framer) => framer.decode_eof(src),
}
}
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(tag = "codec", rename_all = "snake_case")]
#[configurable(description = "Configures how events are decoded from raw bytes.")]
#[configurable(metadata(docs::enum_tag_description = "The codec to use for decoding events."))]
pub enum DeserializerConfig {
Bytes,
Json(JsonDeserializerConfig),
Protobuf(ProtobufDeserializerConfig),
#[cfg(feature = "syslog")]
Syslog(SyslogDeserializerConfig),
Native,
NativeJson(NativeJsonDeserializerConfig),
Gelf(GelfDeserializerConfig),
Influxdb(InfluxdbDeserializerConfig),
Avro {
avro: AvroDeserializerOptions,
},
Vrl(VrlDeserializerConfig),
}
impl From<BytesDeserializerConfig> for DeserializerConfig {
fn from(_: BytesDeserializerConfig) -> Self {
Self::Bytes
}
}
impl From<JsonDeserializerConfig> for DeserializerConfig {
fn from(config: JsonDeserializerConfig) -> Self {
Self::Json(config)
}
}
#[cfg(feature = "syslog")]
impl From<SyslogDeserializerConfig> for DeserializerConfig {
fn from(config: SyslogDeserializerConfig) -> Self {
Self::Syslog(config)
}
}
impl From<GelfDeserializerConfig> for DeserializerConfig {
fn from(config: GelfDeserializerConfig) -> Self {
Self::Gelf(config)
}
}
impl From<NativeDeserializerConfig> for DeserializerConfig {
fn from(_: NativeDeserializerConfig) -> Self {
Self::Native
}
}
impl From<NativeJsonDeserializerConfig> for DeserializerConfig {
fn from(config: NativeJsonDeserializerConfig) -> Self {
Self::NativeJson(config)
}
}
impl From<InfluxdbDeserializerConfig> for DeserializerConfig {
fn from(config: InfluxdbDeserializerConfig) -> Self {
Self::Influxdb(config)
}
}
impl DeserializerConfig {
pub fn build(&self) -> vector_common::Result<Deserializer> {
match self {
DeserializerConfig::Avro { avro } => Ok(Deserializer::Avro(
AvroDeserializerConfig {
avro_options: avro.clone(),
}
.build(),
)),
DeserializerConfig::Bytes => Ok(Deserializer::Bytes(BytesDeserializerConfig.build())),
DeserializerConfig::Json(config) => Ok(Deserializer::Json(config.build())),
DeserializerConfig::Protobuf(config) => Ok(Deserializer::Protobuf(config.build()?)),
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog(config) => Ok(Deserializer::Syslog(config.build())),
DeserializerConfig::Native => {
Ok(Deserializer::Native(NativeDeserializerConfig.build()))
}
DeserializerConfig::NativeJson(config) => Ok(Deserializer::NativeJson(config.build())),
DeserializerConfig::Gelf(config) => Ok(Deserializer::Gelf(config.build())),
DeserializerConfig::Influxdb(config) => Ok(Deserializer::Influxdb(config.build())),
DeserializerConfig::Vrl(config) => Ok(Deserializer::Vrl(config.build()?)),
}
}
pub fn default_stream_framing(&self) -> FramingConfig {
match self {
DeserializerConfig::Avro { .. } => FramingConfig::Bytes,
DeserializerConfig::Native => FramingConfig::LengthDelimited(Default::default()),
DeserializerConfig::Bytes
| DeserializerConfig::Json(_)
| DeserializerConfig::Influxdb(_)
| DeserializerConfig::NativeJson(_) => {
FramingConfig::NewlineDelimited(Default::default())
}
DeserializerConfig::Protobuf(_) => FramingConfig::Bytes,
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog(_) => FramingConfig::NewlineDelimited(Default::default()),
DeserializerConfig::Vrl(_) => FramingConfig::Bytes,
DeserializerConfig::Gelf(_) => {
FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig::new(0))
}
}
}
pub fn default_message_based_framing(&self) -> FramingConfig {
match self {
DeserializerConfig::Gelf(_) => FramingConfig::ChunkedGelf(Default::default()),
_ => FramingConfig::Bytes,
}
}
pub fn output_type(&self) -> DataType {
match self {
DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
avro_options: avro.clone(),
}
.output_type(),
DeserializerConfig::Bytes => BytesDeserializerConfig.output_type(),
DeserializerConfig::Json(config) => config.output_type(),
DeserializerConfig::Protobuf(config) => config.output_type(),
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog(config) => config.output_type(),
DeserializerConfig::Native => NativeDeserializerConfig.output_type(),
DeserializerConfig::NativeJson(config) => config.output_type(),
DeserializerConfig::Gelf(config) => config.output_type(),
DeserializerConfig::Vrl(config) => config.output_type(),
DeserializerConfig::Influxdb(config) => config.output_type(),
}
}
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match self {
DeserializerConfig::Avro { avro } => AvroDeserializerConfig {
avro_options: avro.clone(),
}
.schema_definition(log_namespace),
DeserializerConfig::Bytes => BytesDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::Json(config) => config.schema_definition(log_namespace),
DeserializerConfig::Protobuf(config) => config.schema_definition(log_namespace),
#[cfg(feature = "syslog")]
DeserializerConfig::Syslog(config) => config.schema_definition(log_namespace),
DeserializerConfig::Native => NativeDeserializerConfig.schema_definition(log_namespace),
DeserializerConfig::NativeJson(config) => config.schema_definition(log_namespace),
DeserializerConfig::Gelf(config) => config.schema_definition(log_namespace),
DeserializerConfig::Influxdb(config) => config.schema_definition(log_namespace),
DeserializerConfig::Vrl(config) => config.schema_definition(log_namespace),
}
}
pub const fn content_type(&self, framer: &FramingConfig) -> &'static str {
match (&self, framer) {
(
DeserializerConfig::Json(_) | DeserializerConfig::NativeJson(_),
FramingConfig::NewlineDelimited(_),
) => "application/x-ndjson",
(
DeserializerConfig::Gelf(_)
| DeserializerConfig::Json(_)
| DeserializerConfig::NativeJson(_),
FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
character_delimited:
CharacterDelimitedDecoderOptions {
delimiter: b',',
max_length: Some(usize::MAX),
},
}),
) => "application/json",
(DeserializerConfig::Native, _) | (DeserializerConfig::Avro { .. }, _) => {
"application/octet-stream"
}
(DeserializerConfig::Protobuf(_), _) => "application/octet-stream",
(
DeserializerConfig::Json(_)
| DeserializerConfig::NativeJson(_)
| DeserializerConfig::Bytes
| DeserializerConfig::Gelf(_)
| DeserializerConfig::Influxdb(_)
| DeserializerConfig::Vrl(_),
_,
) => "text/plain",
#[cfg(feature = "syslog")]
(DeserializerConfig::Syslog(_), _) => "text/plain",
}
}
}
#[derive(Clone)]
pub enum Deserializer {
Avro(AvroDeserializer),
Bytes(BytesDeserializer),
Json(JsonDeserializer),
Protobuf(ProtobufDeserializer),
#[cfg(feature = "syslog")]
Syslog(SyslogDeserializer),
Native(NativeDeserializer),
NativeJson(NativeJsonDeserializer),
Boxed(BoxedDeserializer),
Gelf(GelfDeserializer),
Influxdb(InfluxdbDeserializer),
Vrl(VrlDeserializer),
}
impl format::Deserializer for Deserializer {
fn parse(
&self,
bytes: Bytes,
log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
match self {
Deserializer::Avro(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Bytes(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Json(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Protobuf(deserializer) => deserializer.parse(bytes, log_namespace),
#[cfg(feature = "syslog")]
Deserializer::Syslog(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Native(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::NativeJson(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Boxed(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Gelf(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Influxdb(deserializer) => deserializer.parse(bytes, log_namespace),
Deserializer::Vrl(deserializer) => deserializer.parse(bytes, log_namespace),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn gelf_stream_default_framing_is_null_delimited() {
let deserializer_config = DeserializerConfig::from(GelfDeserializerConfig::default());
let framing_config = deserializer_config.default_stream_framing();
assert!(matches!(
framing_config,
FramingConfig::CharacterDelimited(CharacterDelimitedDecoderConfig {
character_delimited: CharacterDelimitedDecoderOptions {
delimiter: 0,
max_length: None,
}
})
));
}
}