codecs/encoding/format/
native_json.rs1use bytes::{BufMut, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Encoder;
4use vector_core::{config::DataType, event::Event, schema};
5
6#[derive(Debug, Clone, Default, Deserialize, Serialize)]
8pub struct NativeJsonSerializerConfig;
9
10impl NativeJsonSerializerConfig {
11 pub const fn build(&self) -> NativeJsonSerializer {
13 NativeJsonSerializer
14 }
15
16 pub fn input_type(&self) -> DataType {
18 DataType::all_bits()
19 }
20
21 pub fn schema_requirement(&self) -> schema::Requirement {
23 schema::Requirement::empty()
24 }
25}
26
27#[derive(Debug, Clone)]
29pub struct NativeJsonSerializer;
30
31impl NativeJsonSerializer {
32 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
34 serde_json::to_value(&event).map_err(|e| e.to_string().into())
35 }
36}
37
38impl Encoder<Event> for NativeJsonSerializer {
39 type Error = vector_common::Error;
40
41 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
42 let writer = buffer.writer();
43 serde_json::to_writer(writer, &event).map_err(Into::into)
44 }
45}
46
47#[cfg(test)]
48mod tests {
49 use bytes::BytesMut;
50 use vector_core::{
51 buckets,
52 event::{LogEvent, Metric, MetricKind, MetricValue, Value},
53 };
54 use vrl::btreemap;
55
56 use super::*;
57
58 #[test]
59 fn serialize_json() {
60 let event = Event::Log(LogEvent::from(btreemap! {
61 "foo" => Value::from("bar")
62 }));
63 let mut serializer = NativeJsonSerializer;
64 let mut bytes = BytesMut::new();
65
66 serializer.encode(event, &mut bytes).unwrap();
67
68 assert_eq!(bytes.freeze(), r#"{"log":{"foo":"bar"}}"#);
69 }
70
71 #[test]
72 fn serialize_equals_to_json_value() {
73 let event = Event::Log(LogEvent::from(btreemap! {
74 "foo" => Value::from("bar")
75 }));
76 let mut serializer = NativeJsonSerializer;
77 let mut bytes = BytesMut::new();
78
79 serializer.encode(event.clone(), &mut bytes).unwrap();
80
81 let json = serializer.to_json_value(event).unwrap();
82
83 assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
84 }
85
86 #[test]
87 fn serialize_aggregated_histogram() {
88 let histogram_event = Event::from(Metric::new(
89 "histogram",
90 MetricKind::Absolute,
91 MetricValue::AggregatedHistogram {
92 count: 1,
93 sum: 1.0,
94 buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0),
95 },
96 ));
97
98 let mut serializer = NativeJsonSerializer;
99 let mut bytes = BytesMut::new();
100 serializer
101 .encode(histogram_event.clone(), &mut bytes)
102 .unwrap();
103 assert_eq!(
104 bytes.freeze(),
105 serde_json::to_string(&histogram_event).unwrap()
106 );
107 }
108}