codecs/decoding/format/
native_json.rsuse bytes::Bytes;
use derivative::Derivative;
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
use vrl::value::kind::Collection;
use vrl::value::Kind;
use super::{default_lossy, Deserializer};
use vector_core::config::LogNamespace;
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct NativeJsonDeserializerConfig {
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub native_json: NativeJsonDeserializerOptions,
}
impl NativeJsonDeserializerConfig {
pub fn new(options: NativeJsonDeserializerOptions) -> Self {
Self {
native_json: options,
}
}
pub fn build(&self) -> NativeJsonDeserializer {
NativeJsonDeserializer {
lossy: self.native_json.lossy,
}
}
pub fn output_type(&self) -> DataType {
DataType::all_bits()
}
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Vector => {
schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
}
LogNamespace::Legacy => schema::Definition::new_with_default_metadata(
Kind::object(Collection::json()),
[log_namespace],
),
}
}
}
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct NativeJsonDeserializerOptions {
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::is_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct NativeJsonDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}
impl Deserializer for NativeJsonDeserializer {
fn parse(
&self,
bytes: Bytes,
_log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
if bytes.is_empty() {
return Ok(smallvec![]);
}
let json: serde_json::Value = match self.lossy {
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
false => serde_json::from_slice(&bytes),
}
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
let events = match json {
serde_json::Value::Array(values) => values
.into_iter()
.map(serde_json::from_value)
.collect::<Result<SmallVec<[Event; 1]>, _>>()?,
_ => smallvec![serde_json::from_value(json)?],
};
Ok(events)
}
}
#[cfg(test)]
mod test {
use serde_json::json;
use super::*;
#[test]
fn parses_top_level_arrays() {
let config = NativeJsonDeserializerConfig::default();
let deserializer = config.build();
let json1 = json!({"a": "b", "c": "d"});
let json2 = json!({"foo": "bar", "baz": "quux"});
let json_array = json!([{ "log": json1 }, { "log": json2 }]);
let input = Bytes::from(serde_json::to_vec(&json_array).unwrap());
let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap();
let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap();
let expected: SmallVec<[Event; 1]> = smallvec![event1, event2];
assert_eq!(events, expected);
}
}