codecs/encoding/format/
native_json.rs1use bytes::{BufMut, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Encoder;
4use vector_core::{config::DataType, event::Event, schema};
5
6#[derive(Debug, Clone, Default, Deserialize, Serialize)]
8pub struct NativeJsonSerializerConfig;
9
10impl NativeJsonSerializerConfig {
11 pub const fn build(&self) -> NativeJsonSerializer {
13 NativeJsonSerializer
14 }
15
16 pub fn input_type(&self) -> DataType {
18 DataType::all_bits()
19 }
20
21 pub fn schema_requirement(&self) -> schema::Requirement {
23 schema::Requirement::empty()
24 }
25}
26
27#[derive(Debug, Clone)]
29pub struct NativeJsonSerializer;
30
31impl NativeJsonSerializer {
32 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
34 serde_json::to_value(&event).map_err(|e| e.to_string().into())
35 }
36}
37
38impl Encoder<Event> for NativeJsonSerializer {
39 type Error = vector_common::Error;
40
41 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
42 let writer = buffer.writer();
43 serde_json::to_writer(writer, &event).map_err(Into::into)
44 }
45}
46
47#[cfg(test)]
48mod tests {
49 use bytes::BytesMut;
50 use vector_core::buckets;
51 use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, Value};
52 use vrl::btreemap;
53
54 use super::*;
55
56 #[test]
57 fn serialize_json() {
58 let event = Event::Log(LogEvent::from(btreemap! {
59 "foo" => Value::from("bar")
60 }));
61 let mut serializer = NativeJsonSerializer;
62 let mut bytes = BytesMut::new();
63
64 serializer.encode(event, &mut bytes).unwrap();
65
66 assert_eq!(bytes.freeze(), r#"{"log":{"foo":"bar"}}"#);
67 }
68
69 #[test]
70 fn serialize_equals_to_json_value() {
71 let event = Event::Log(LogEvent::from(btreemap! {
72 "foo" => Value::from("bar")
73 }));
74 let mut serializer = NativeJsonSerializer;
75 let mut bytes = BytesMut::new();
76
77 serializer.encode(event.clone(), &mut bytes).unwrap();
78
79 let json = serializer.to_json_value(event).unwrap();
80
81 assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
82 }
83
84 #[test]
85 fn serialize_aggregated_histogram() {
86 let histogram_event = Event::from(Metric::new(
87 "histogram",
88 MetricKind::Absolute,
89 MetricValue::AggregatedHistogram {
90 count: 1,
91 sum: 1.0,
92 buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0),
93 },
94 ));
95
96 let mut serializer = NativeJsonSerializer;
97 let mut bytes = BytesMut::new();
98 serializer
99 .encode(histogram_event.clone(), &mut bytes)
100 .unwrap();
101 assert_eq!(
102 bytes.freeze(),
103 serde_json::to_string(&histogram_event).unwrap()
104 );
105 }
106}