use bytes::Bytes;
use chrono::Utc;
use derivative::Derivative;
use smallvec::{smallvec, SmallVec};
use vector_config::configurable_component;
use vector_core::{
config::{log_schema, DataType, LogNamespace},
event::Event,
schema,
};
use vrl::value::Kind;
use super::{default_lossy, Deserializer};
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct JsonDeserializerConfig {
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub json: JsonDeserializerOptions,
}
impl JsonDeserializerConfig {
pub fn new(options: JsonDeserializerOptions) -> Self {
Self { json: options }
}
pub fn build(&self) -> JsonDeserializer {
Into::<JsonDeserializer>::into(self)
}
pub fn output_type(&self) -> DataType {
DataType::Log
}
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => {
let mut definition =
schema::Definition::empty_legacy_namespace().unknown_fields(Kind::json());
if let Some(timestamp_key) = log_schema().timestamp_key() {
definition = definition.try_with_field(
timestamp_key,
Kind::json().or_timestamp(),
Some("timestamp"),
);
}
definition
}
LogNamespace::Vector => {
schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
}
}
}
}
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializerOptions {
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::is_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct JsonDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}
impl JsonDeserializer {
pub fn new(lossy: bool) -> Self {
Self { lossy }
}
}
impl Deserializer for JsonDeserializer {
fn parse(
&self,
bytes: Bytes,
log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
if bytes.is_empty() {
return Ok(smallvec![]);
}
let json: serde_json::Value = match self.lossy {
true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
false => serde_json::from_slice(&bytes),
}
.map_err(|error| format!("Error parsing JSON: {:?}", error))?;
let mut events = match json {
serde_json::Value::Array(values) => values
.into_iter()
.map(|json| Event::from_json_value(json, log_namespace))
.collect::<Result<SmallVec<[Event; 1]>, _>>()?,
_ => smallvec![Event::from_json_value(json, log_namespace)?],
};
let events = match log_namespace {
LogNamespace::Vector => events,
LogNamespace::Legacy => {
let timestamp = Utc::now();
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
for event in &mut events {
let log = event.as_mut_log();
if !log.contains(timestamp_key) {
log.insert(timestamp_key, timestamp);
}
}
}
events
}
};
Ok(events)
}
}
impl From<&JsonDeserializerConfig> for JsonDeserializer {
fn from(config: &JsonDeserializerConfig) -> Self {
Self {
lossy: config.json.lossy,
}
}
}
#[cfg(test)]
mod tests {
use vector_core::config::log_schema;
use vrl::core::Value;
use super::*;
#[test]
fn deserialize_json() {
let input = Bytes::from(r#"{ "foo": 123 }"#);
let deserializer = JsonDeserializer::default();
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();
{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["foo"], 123.into());
assert_eq!(
log.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.is_some(),
namespace == LogNamespace::Legacy
);
}
assert_eq!(events.next(), None);
}
}
#[test]
fn deserialize_non_object_vector_namespace() {
let input = Bytes::from(r#"null"#);
let deserializer = JsonDeserializer::default();
let namespace = LogNamespace::Vector;
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["."], Value::Null);
assert_eq!(events.next(), None);
}
#[test]
fn deserialize_json_array() {
let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
let deserializer = JsonDeserializer::default();
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();
{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["foo"], 123.into());
assert_eq!(
log.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.is_some(),
namespace == LogNamespace::Legacy
);
}
{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["bar"], 456.into());
assert_eq!(
log.get(log_schema().timestamp_key_target_path().unwrap())
.is_some(),
namespace == LogNamespace::Legacy
);
}
assert_eq!(events.next(), None);
}
}
#[test]
fn deserialize_skip_empty() {
let input = Bytes::from("");
let deserializer = JsonDeserializer::default();
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
assert!(events.is_empty());
}
}
#[test]
fn deserialize_error_invalid_json() {
let input = Bytes::from("{ foo");
let deserializer = JsonDeserializer::default();
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
assert!(deserializer.parse(input.clone(), namespace).is_err());
}
}
#[test]
fn deserialize_lossy_replace_invalid_utf8() {
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
let deserializer = JsonDeserializer::new(true);
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
let events = deserializer.parse(input.clone(), namespace).unwrap();
let mut events = events.into_iter();
{
let event = events.next().unwrap();
let log = event.as_log();
assert_eq!(log["foo"], b"Hello \xEF\xBF\xBDWorld".into());
assert_eq!(
log.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap()
))
.is_some(),
namespace == LogNamespace::Legacy
);
}
assert_eq!(events.next(), None);
}
}
#[test]
fn deserialize_non_lossy_error_invalid_utf8() {
let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
let deserializer = JsonDeserializer::new(false);
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
assert!(deserializer.parse(input.clone(), namespace).is_err());
}
}
}