codecs/decoding/format/
native_json.rs1use bytes::Bytes;
2use derivative::Derivative;
3use smallvec::{SmallVec, smallvec};
4use vector_config::configurable_component;
5use vector_core::{
6 config::{DataType, LogNamespace},
7 event::Event,
8 schema,
9};
10use vrl::value::{Kind, kind::Collection};
11
12use super::{Deserializer, default_lossy};
13
14#[configurable_component]
16#[derive(Debug, Clone, Default)]
17pub struct NativeJsonDeserializerConfig {
18 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
20 pub native_json: NativeJsonDeserializerOptions,
21}
22
23impl NativeJsonDeserializerConfig {
24 pub fn new(options: NativeJsonDeserializerOptions) -> Self {
26 Self {
27 native_json: options,
28 }
29 }
30
31 pub fn build(&self) -> NativeJsonDeserializer {
33 NativeJsonDeserializer {
34 lossy: self.native_json.lossy,
35 }
36 }
37
38 pub fn output_type(&self) -> DataType {
40 DataType::all_bits()
41 }
42
43 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
45 match log_namespace {
46 LogNamespace::Vector => {
47 schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
48 }
49 LogNamespace::Legacy => schema::Definition::new_with_default_metadata(
50 Kind::object(Collection::json()),
51 [log_namespace],
52 ),
53 }
54 }
55}
56
57#[configurable_component]
59#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
60#[derivative(Default)]
61pub struct NativeJsonDeserializerOptions {
62 #[serde(
68 default = "default_lossy",
69 skip_serializing_if = "vector_core::serde::is_default"
70 )]
71 #[derivative(Default(value = "default_lossy()"))]
72 pub lossy: bool,
73}
74
75#[derive(Debug, Clone, Derivative)]
78#[derivative(Default)]
79pub struct NativeJsonDeserializer {
80 #[derivative(Default(value = "default_lossy()"))]
81 lossy: bool,
82}
83
84impl Deserializer for NativeJsonDeserializer {
85 fn parse(
86 &self,
87 bytes: Bytes,
88 _log_namespace: LogNamespace,
91 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
92 if bytes.is_empty() {
95 return Ok(smallvec![]);
96 }
97
98 let json: serde_json::Value = match self.lossy {
99 true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
100 false => serde_json::from_slice(&bytes),
101 }
102 .map_err(|error| format!("Error parsing JSON: {error:?}"))?;
103
104 let events = match json {
105 serde_json::Value::Array(values) => values
106 .into_iter()
107 .map(serde_json::from_value)
108 .collect::<Result<SmallVec<[Event; 1]>, _>>()?,
109 _ => smallvec![serde_json::from_value(json)?],
110 };
111
112 Ok(events)
113 }
114}
115
116#[cfg(test)]
117mod test {
118 use serde_json::json;
119
120 use super::*;
121
122 #[test]
123 fn parses_top_level_arrays() {
124 let config = NativeJsonDeserializerConfig::default();
125 let deserializer = config.build();
126
127 let json1 = json!({"a": "b", "c": "d"});
128 let json2 = json!({"foo": "bar", "baz": "quux"});
129 let json_array = json!([{ "log": json1 }, { "log": json2 }]);
130 let input = Bytes::from(serde_json::to_vec(&json_array).unwrap());
131
132 let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
133
134 let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap();
135 let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap();
136 let expected: SmallVec<[Event; 1]> = smallvec![event1, event2];
137 assert_eq!(events, expected);
138 }
139}