codecs/encoding/format/
native_json.rs

1use bytes::{BufMut, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Encoder;
4use vector_core::{config::DataType, event::Event, schema};
5
6/// Config used to build a `NativeJsonSerializer`.
7#[derive(Debug, Clone, Default, Deserialize, Serialize)]
8pub struct NativeJsonSerializerConfig;
9
10impl NativeJsonSerializerConfig {
11    /// Build the `NativeJsonSerializer` from this configuration.
12    pub const fn build(&self) -> NativeJsonSerializer {
13        NativeJsonSerializer
14    }
15
16    /// The data type of events that are accepted by `NativeJsonSerializer`.
17    pub fn input_type(&self) -> DataType {
18        DataType::all_bits()
19    }
20
21    /// The schema required by the serializer.
22    pub fn schema_requirement(&self) -> schema::Requirement {
23        schema::Requirement::empty()
24    }
25}
26
27/// Serializer that converts an `Event` to bytes using the JSON format.
28#[derive(Debug, Clone)]
29pub struct NativeJsonSerializer;
30
31impl NativeJsonSerializer {
32    /// Encode event and represent it as native JSON value.
33    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
34        serde_json::to_value(&event).map_err(|e| e.to_string().into())
35    }
36}
37
38impl Encoder<Event> for NativeJsonSerializer {
39    type Error = vector_common::Error;
40
41    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
42        let writer = buffer.writer();
43        serde_json::to_writer(writer, &event).map_err(Into::into)
44    }
45}
46
47#[cfg(test)]
48mod tests {
49    use bytes::BytesMut;
50    use vector_core::buckets;
51    use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, Value};
52    use vrl::btreemap;
53
54    use super::*;
55
56    #[test]
57    fn serialize_json() {
58        let event = Event::Log(LogEvent::from(btreemap! {
59            "foo" => Value::from("bar")
60        }));
61        let mut serializer = NativeJsonSerializer;
62        let mut bytes = BytesMut::new();
63
64        serializer.encode(event, &mut bytes).unwrap();
65
66        assert_eq!(bytes.freeze(), r#"{"log":{"foo":"bar"}}"#);
67    }
68
69    #[test]
70    fn serialize_equals_to_json_value() {
71        let event = Event::Log(LogEvent::from(btreemap! {
72            "foo" => Value::from("bar")
73        }));
74        let mut serializer = NativeJsonSerializer;
75        let mut bytes = BytesMut::new();
76
77        serializer.encode(event.clone(), &mut bytes).unwrap();
78
79        let json = serializer.to_json_value(event).unwrap();
80
81        assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
82    }
83
84    #[test]
85    fn serialize_aggregated_histogram() {
86        let histogram_event = Event::from(Metric::new(
87            "histogram",
88            MetricKind::Absolute,
89            MetricValue::AggregatedHistogram {
90                count: 1,
91                sum: 1.0,
92                buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0),
93            },
94        ));
95
96        let mut serializer = NativeJsonSerializer;
97        let mut bytes = BytesMut::new();
98        serializer
99            .encode(histogram_event.clone(), &mut bytes)
100            .unwrap();
101        assert_eq!(
102            bytes.freeze(),
103            serde_json::to_string(&histogram_event).unwrap()
104        );
105    }
106}