codecs/decoding/format/
json.rs1use bytes::Bytes;
2use chrono::Utc;
3use derivative::Derivative;
4use smallvec::{smallvec, SmallVec};
5use vector_config::configurable_component;
6use vector_core::{
7 config::{log_schema, DataType, LogNamespace},
8 event::Event,
9 schema,
10};
11use vrl::value::Kind;
12
13use super::{default_lossy, Deserializer};
14
15#[configurable_component]
17#[derive(Debug, Clone, Default)]
18pub struct JsonDeserializerConfig {
19 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
21 pub json: JsonDeserializerOptions,
22}
23
24impl JsonDeserializerConfig {
25 pub fn new(options: JsonDeserializerOptions) -> Self {
27 Self { json: options }
28 }
29
30 pub fn build(&self) -> JsonDeserializer {
32 Into::<JsonDeserializer>::into(self)
33 }
34
35 pub fn output_type(&self) -> DataType {
37 DataType::Log
38 }
39
40 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
42 match log_namespace {
43 LogNamespace::Legacy => {
44 let mut definition =
45 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::json());
46
47 if let Some(timestamp_key) = log_schema().timestamp_key() {
48 definition = definition.try_with_field(
49 timestamp_key,
50 Kind::json().or_timestamp(),
53 Some("timestamp"),
54 );
55 }
56 definition
57 }
58 LogNamespace::Vector => {
59 schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
60 }
61 }
62 }
63}
64
65#[configurable_component]
67#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
68#[derivative(Default)]
69pub struct JsonDeserializerOptions {
70 #[serde(
76 default = "default_lossy",
77 skip_serializing_if = "vector_core::serde::is_default"
78 )]
79 #[derivative(Default(value = "default_lossy()"))]
80 pub lossy: bool,
81}
82
83#[derive(Debug, Clone, Derivative)]
85#[derivative(Default)]
86pub struct JsonDeserializer {
87 #[derivative(Default(value = "default_lossy()"))]
88 lossy: bool,
89}
90
91impl JsonDeserializer {
92 pub fn new(lossy: bool) -> Self {
94 Self { lossy }
95 }
96}
97
98impl Deserializer for JsonDeserializer {
99 fn parse(
100 &self,
101 bytes: Bytes,
102 log_namespace: LogNamespace,
103 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
104 if bytes.is_empty() {
107 return Ok(smallvec![]);
108 }
109
110 let json: serde_json::Value = match self.lossy {
111 true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
112 false => serde_json::from_slice(&bytes),
113 }
114 .map_err(|error| format!("Error parsing JSON: {error:?}"))?;
115
116 let mut events = match json {
118 serde_json::Value::Array(values) => values
119 .into_iter()
120 .map(|json| Event::from_json_value(json, log_namespace))
121 .collect::<Result<SmallVec<[Event; 1]>, _>>()?,
122 _ => smallvec![Event::from_json_value(json, log_namespace)?],
123 };
124
125 let events = match log_namespace {
126 LogNamespace::Vector => events,
127 LogNamespace::Legacy => {
128 let timestamp = Utc::now();
129
130 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
131 for event in &mut events {
132 let log = event.as_mut_log();
133 if !log.contains(timestamp_key) {
134 log.insert(timestamp_key, timestamp);
135 }
136 }
137 }
138
139 events
140 }
141 };
142
143 Ok(events)
144 }
145}
146
147impl From<&JsonDeserializerConfig> for JsonDeserializer {
148 fn from(config: &JsonDeserializerConfig) -> Self {
149 Self {
150 lossy: config.json.lossy,
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use vector_core::config::log_schema;
158 use vrl::core::Value;
159
160 use super::*;
161
162 #[test]
163 fn deserialize_json() {
164 let input = Bytes::from(r#"{ "foo": 123 }"#);
165 let deserializer = JsonDeserializer::default();
166
167 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
168 let events = deserializer.parse(input.clone(), namespace).unwrap();
169 let mut events = events.into_iter();
170
171 {
172 let event = events.next().unwrap();
173 let log = event.as_log();
174 assert_eq!(log["foo"], 123.into());
175 assert_eq!(
176 log.get((
177 lookup::PathPrefix::Event,
178 log_schema().timestamp_key().unwrap()
179 ))
180 .is_some(),
181 namespace == LogNamespace::Legacy
182 );
183 }
184
185 assert_eq!(events.next(), None);
186 }
187 }
188
189 #[test]
190 fn deserialize_non_object_vector_namespace() {
191 let input = Bytes::from(r#"null"#);
192 let deserializer = JsonDeserializer::default();
193
194 let namespace = LogNamespace::Vector;
195 let events = deserializer.parse(input.clone(), namespace).unwrap();
196 let mut events = events.into_iter();
197
198 let event = events.next().unwrap();
199 let log = event.as_log();
200 assert_eq!(log["."], Value::Null);
201
202 assert_eq!(events.next(), None);
203 }
204
205 #[test]
206 fn deserialize_json_array() {
207 let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
208 let deserializer = JsonDeserializer::default();
209 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
210 let events = deserializer.parse(input.clone(), namespace).unwrap();
211 let mut events = events.into_iter();
212
213 {
214 let event = events.next().unwrap();
215 let log = event.as_log();
216 assert_eq!(log["foo"], 123.into());
217 assert_eq!(
218 log.get((
219 lookup::PathPrefix::Event,
220 log_schema().timestamp_key().unwrap()
221 ))
222 .is_some(),
223 namespace == LogNamespace::Legacy
224 );
225 }
226
227 {
228 let event = events.next().unwrap();
229 let log = event.as_log();
230 assert_eq!(log["bar"], 456.into());
231 assert_eq!(
232 log.get(log_schema().timestamp_key_target_path().unwrap())
233 .is_some(),
234 namespace == LogNamespace::Legacy
235 );
236 }
237
238 assert_eq!(events.next(), None);
239 }
240 }
241
242 #[test]
243 fn deserialize_skip_empty() {
244 let input = Bytes::from("");
245 let deserializer = JsonDeserializer::default();
246
247 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
248 let events = deserializer.parse(input.clone(), namespace).unwrap();
249 assert!(events.is_empty());
250 }
251 }
252
253 #[test]
254 fn deserialize_error_invalid_json() {
255 let input = Bytes::from("{ foo");
256 let deserializer = JsonDeserializer::default();
257
258 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
259 assert!(deserializer.parse(input.clone(), namespace).is_err());
260 }
261 }
262
263 #[test]
264 fn deserialize_lossy_replace_invalid_utf8() {
265 let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
266 let deserializer = JsonDeserializer::new(true);
267
268 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
269 let events = deserializer.parse(input.clone(), namespace).unwrap();
270 let mut events = events.into_iter();
271
272 {
273 let event = events.next().unwrap();
274 let log = event.as_log();
275 assert_eq!(log["foo"], b"Hello \xEF\xBF\xBDWorld".into());
276 assert_eq!(
277 log.get((
278 lookup::PathPrefix::Event,
279 log_schema().timestamp_key().unwrap()
280 ))
281 .is_some(),
282 namespace == LogNamespace::Legacy
283 );
284 }
285
286 assert_eq!(events.next(), None);
287 }
288 }
289
290 #[test]
291 fn deserialize_non_lossy_error_invalid_utf8() {
292 let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
293 let deserializer = JsonDeserializer::new(false);
294
295 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
296 assert!(deserializer.parse(input.clone(), namespace).is_err());
297 }
298 }
299}