codecs/decoding/format/
native_json.rs1use bytes::Bytes;
2use derivative::Derivative;
3use smallvec::{smallvec, SmallVec};
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6use vrl::value::kind::Collection;
7use vrl::value::Kind;
8
9use super::{default_lossy, Deserializer};
10use vector_core::config::LogNamespace;
11
12#[configurable_component]
14#[derive(Debug, Clone, Default)]
15pub struct NativeJsonDeserializerConfig {
16 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
18 pub native_json: NativeJsonDeserializerOptions,
19}
20
21impl NativeJsonDeserializerConfig {
22 pub fn new(options: NativeJsonDeserializerOptions) -> Self {
24 Self {
25 native_json: options,
26 }
27 }
28
29 pub fn build(&self) -> NativeJsonDeserializer {
31 NativeJsonDeserializer {
32 lossy: self.native_json.lossy,
33 }
34 }
35
36 pub fn output_type(&self) -> DataType {
38 DataType::all_bits()
39 }
40
41 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
43 match log_namespace {
44 LogNamespace::Vector => {
45 schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
46 }
47 LogNamespace::Legacy => schema::Definition::new_with_default_metadata(
48 Kind::object(Collection::json()),
49 [log_namespace],
50 ),
51 }
52 }
53}
54
55#[configurable_component]
57#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
58#[derivative(Default)]
59pub struct NativeJsonDeserializerOptions {
60 #[serde(
66 default = "default_lossy",
67 skip_serializing_if = "vector_core::serde::is_default"
68 )]
69 #[derivative(Default(value = "default_lossy()"))]
70 pub lossy: bool,
71}
72
73#[derive(Debug, Clone, Derivative)]
76#[derivative(Default)]
77pub struct NativeJsonDeserializer {
78 #[derivative(Default(value = "default_lossy()"))]
79 lossy: bool,
80}
81
82impl Deserializer for NativeJsonDeserializer {
83 fn parse(
84 &self,
85 bytes: Bytes,
86 _log_namespace: LogNamespace,
89 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
90 if bytes.is_empty() {
93 return Ok(smallvec![]);
94 }
95
96 let json: serde_json::Value = match self.lossy {
97 true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
98 false => serde_json::from_slice(&bytes),
99 }
100 .map_err(|error| format!("Error parsing JSON: {error:?}"))?;
101
102 let events = match json {
103 serde_json::Value::Array(values) => values
104 .into_iter()
105 .map(serde_json::from_value)
106 .collect::<Result<SmallVec<[Event; 1]>, _>>()?,
107 _ => smallvec![serde_json::from_value(json)?],
108 };
109
110 Ok(events)
111 }
112}
113
114#[cfg(test)]
115mod test {
116 use serde_json::json;
117
118 use super::*;
119
120 #[test]
121 fn parses_top_level_arrays() {
122 let config = NativeJsonDeserializerConfig::default();
123 let deserializer = config.build();
124
125 let json1 = json!({"a": "b", "c": "d"});
126 let json2 = json!({"foo": "bar", "baz": "quux"});
127 let json_array = json!([{ "log": json1 }, { "log": json2 }]);
128 let input = Bytes::from(serde_json::to_vec(&json_array).unwrap());
129
130 let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
131
132 let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap();
133 let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap();
134 let expected: SmallVec<[Event; 1]> = smallvec![event1, event2];
135 assert_eq!(events, expected);
136 }
137}