codecs/encoding/format/
native_json.rs

1use bytes::{BufMut, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Encoder;
4use vector_core::{config::DataType, event::Event, schema};
5
6/// Config used to build a `NativeJsonSerializer`.
7#[derive(Debug, Clone, Default, Deserialize, Serialize)]
8pub struct NativeJsonSerializerConfig;
9
10impl NativeJsonSerializerConfig {
11    /// Build the `NativeJsonSerializer` from this configuration.
12    pub const fn build(&self) -> NativeJsonSerializer {
13        NativeJsonSerializer
14    }
15
16    /// The data type of events that are accepted by `NativeJsonSerializer`.
17    pub fn input_type(&self) -> DataType {
18        DataType::all_bits()
19    }
20
21    /// The schema required by the serializer.
22    pub fn schema_requirement(&self) -> schema::Requirement {
23        schema::Requirement::empty()
24    }
25}
26
27/// Serializer that converts an `Event` to bytes using the JSON format.
28#[derive(Debug, Clone)]
29pub struct NativeJsonSerializer;
30
31impl NativeJsonSerializer {
32    /// Encode event and represent it as native JSON value.
33    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
34        serde_json::to_value(&event).map_err(|e| e.to_string().into())
35    }
36}
37
38impl Encoder<Event> for NativeJsonSerializer {
39    type Error = vector_common::Error;
40
41    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
42        let writer = buffer.writer();
43        serde_json::to_writer(writer, &event).map_err(Into::into)
44    }
45}
46
47#[cfg(test)]
48mod tests {
49    use bytes::BytesMut;
50    use vector_core::{
51        buckets,
52        event::{LogEvent, Metric, MetricKind, MetricValue, Value},
53    };
54    use vrl::btreemap;
55
56    use super::*;
57
58    #[test]
59    fn serialize_json() {
60        let event = Event::Log(LogEvent::from(btreemap! {
61            "foo" => Value::from("bar")
62        }));
63        let mut serializer = NativeJsonSerializer;
64        let mut bytes = BytesMut::new();
65
66        serializer.encode(event, &mut bytes).unwrap();
67
68        assert_eq!(bytes.freeze(), r#"{"log":{"foo":"bar"}}"#);
69    }
70
71    #[test]
72    fn serialize_equals_to_json_value() {
73        let event = Event::Log(LogEvent::from(btreemap! {
74            "foo" => Value::from("bar")
75        }));
76        let mut serializer = NativeJsonSerializer;
77        let mut bytes = BytesMut::new();
78
79        serializer.encode(event.clone(), &mut bytes).unwrap();
80
81        let json = serializer.to_json_value(event).unwrap();
82
83        assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
84    }
85
86    #[test]
87    fn serialize_aggregated_histogram() {
88        let histogram_event = Event::from(Metric::new(
89            "histogram",
90            MetricKind::Absolute,
91            MetricValue::AggregatedHistogram {
92                count: 1,
93                sum: 1.0,
94                buckets: buckets!(f64::NEG_INFINITY => 0 ,2.0 => 1, f64::INFINITY => 0),
95            },
96        ));
97
98        let mut serializer = NativeJsonSerializer;
99        let mut bytes = BytesMut::new();
100        serializer
101            .encode(histogram_event.clone(), &mut bytes)
102            .unwrap();
103        assert_eq!(
104            bytes.freeze(),
105            serde_json::to_string(&histogram_event).unwrap()
106        );
107    }
108}