use bytes::{BufMut, BytesMut};
use serde::{Deserialize, Serialize};
use tokio_util::codec::Encoder;
use vector_core::{config::DataType, event::Event, schema};
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct NativeJsonSerializerConfig;
impl NativeJsonSerializerConfig {
pub const fn build(&self) -> NativeJsonSerializer {
NativeJsonSerializer
}
pub fn input_type(&self) -> DataType {
DataType::all_bits()
}
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty()
}
}
#[derive(Debug, Clone)]
pub struct NativeJsonSerializer;
impl NativeJsonSerializer {
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
serde_json::to_value(&event).map_err(|e| e.to_string().into())
}
}
impl Encoder<Event> for NativeJsonSerializer {
type Error = vector_common::Error;
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let writer = buffer.writer();
serde_json::to_writer(writer, &event).map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use bytes::BytesMut;
use vector_core::buckets;
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, Value};
use vrl::btreemap;
use super::*;
#[test]
fn serialize_json() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
}));
let mut serializer = NativeJsonSerializer;
let mut bytes = BytesMut::new();
serializer.encode(event, &mut bytes).unwrap();
assert_eq!(bytes.freeze(), r#"{"log":{"foo":"bar"}}"#);
}
#[test]
fn serialize_equals_to_json_value() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
}));
let mut serializer = NativeJsonSerializer;
let mut bytes = BytesMut::new();
serializer.encode(event.clone(), &mut bytes).unwrap();
let json = serializer.to_json_value(event).unwrap();
assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
}
#[test]
fn serialize_aggregated_histogram() {
let histogram_event = Event::from(Metric::new(
"histogram",
MetricKind::Absolute,
MetricValue::AggregatedHistogram {
count: 1,
sum: 1.0,
buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0),
},
));
let mut serializer = NativeJsonSerializer;
let mut bytes = BytesMut::new();
serializer
.encode(histogram_event.clone(), &mut bytes)
.unwrap();
assert_eq!(
bytes.freeze(),
serde_json::to_string(&histogram_event).unwrap()
);
}
}