use bytes::Bytes;
use prost::Message;
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use vector_core::config::LogNamespace;
use vector_core::{
config::DataType,
event::{proto, Event, EventArray, EventContainer},
schema,
};
use vrl::value::Kind;
use super::Deserializer;
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct NativeDeserializerConfig;
impl NativeDeserializerConfig {
pub fn build(&self) -> NativeDeserializer {
NativeDeserializer
}
pub fn output_type(&self) -> DataType {
DataType::all_bits()
}
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => schema::Definition::empty_legacy_namespace(),
LogNamespace::Vector => {
schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
}
}
}
}
#[derive(Debug, Clone, Default)]
pub struct NativeDeserializer;
impl Deserializer for NativeDeserializer {
fn parse(
&self,
bytes: Bytes,
_log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
if bytes.is_empty() {
Ok(smallvec![])
} else {
let event_array = EventArray::from(proto::EventArray::decode(bytes)?);
Ok(event_array.into_events().collect())
}
}
}