codecs/decoding/format/
json.rs

1use bytes::Bytes;
2use chrono::Utc;
3use derivative::Derivative;
4use smallvec::{smallvec, SmallVec};
5use vector_config::configurable_component;
6use vector_core::{
7    config::{log_schema, DataType, LogNamespace},
8    event::Event,
9    schema,
10};
11use vrl::value::Kind;
12
13use super::{default_lossy, Deserializer};
14
15/// Config used to build a `JsonDeserializer`.
16#[configurable_component]
17#[derive(Debug, Clone, Default)]
18pub struct JsonDeserializerConfig {
19    /// JSON-specific decoding options.
20    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
21    pub json: JsonDeserializerOptions,
22}
23
24impl JsonDeserializerConfig {
25    /// Creates a new `JsonDeserializerConfig`.
26    pub fn new(options: JsonDeserializerOptions) -> Self {
27        Self { json: options }
28    }
29
30    /// Build the `JsonDeserializer` from this configuration.
31    pub fn build(&self) -> JsonDeserializer {
32        Into::<JsonDeserializer>::into(self)
33    }
34
35    /// Return the type of event build by this deserializer.
36    pub fn output_type(&self) -> DataType {
37        DataType::Log
38    }
39
40    /// The schema produced by the deserializer.
41    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
42        match log_namespace {
43            LogNamespace::Legacy => {
44                let mut definition =
45                    schema::Definition::empty_legacy_namespace().unknown_fields(Kind::json());
46
47                if let Some(timestamp_key) = log_schema().timestamp_key() {
48                    definition = definition.try_with_field(
49                        timestamp_key,
50                        // The JSON decoder will try to insert a new `timestamp`-type value into the
51                        // "timestamp_key" field, but only if that field doesn't already exist.
52                        Kind::json().or_timestamp(),
53                        Some("timestamp"),
54                    );
55                }
56                definition
57            }
58            LogNamespace::Vector => {
59                schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
60            }
61        }
62    }
63}
64
65/// JSON-specific decoding options.
66#[configurable_component]
67#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
68#[derivative(Default)]
69pub struct JsonDeserializerOptions {
70    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
71    ///
72    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
73    ///
74    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
75    #[serde(
76        default = "default_lossy",
77        skip_serializing_if = "vector_core::serde::is_default"
78    )]
79    #[derivative(Default(value = "default_lossy()"))]
80    pub lossy: bool,
81}
82
83/// Deserializer that builds `Event`s from a byte frame containing JSON.
84#[derive(Debug, Clone, Derivative)]
85#[derivative(Default)]
86pub struct JsonDeserializer {
87    #[derivative(Default(value = "default_lossy()"))]
88    lossy: bool,
89}
90
91impl JsonDeserializer {
92    /// Creates a new `JsonDeserializer`.
93    pub fn new(lossy: bool) -> Self {
94        Self { lossy }
95    }
96}
97
98impl Deserializer for JsonDeserializer {
99    fn parse(
100        &self,
101        bytes: Bytes,
102        log_namespace: LogNamespace,
103    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
104        // It's common to receive empty frames when parsing NDJSON, since it
105        // allows multiple empty newlines. We proceed without a warning here.
106        if bytes.is_empty() {
107            return Ok(smallvec![]);
108        }
109
110        let json: serde_json::Value = match self.lossy {
111            true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
112            false => serde_json::from_slice(&bytes),
113        }
114        .map_err(|error| format!("Error parsing JSON: {error:?}"))?;
115
116        // If the root is an Array, split it into multiple events
117        let mut events = match json {
118            serde_json::Value::Array(values) => values
119                .into_iter()
120                .map(|json| Event::from_json_value(json, log_namespace))
121                .collect::<Result<SmallVec<[Event; 1]>, _>>()?,
122            _ => smallvec![Event::from_json_value(json, log_namespace)?],
123        };
124
125        let events = match log_namespace {
126            LogNamespace::Vector => events,
127            LogNamespace::Legacy => {
128                let timestamp = Utc::now();
129
130                if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
131                    for event in &mut events {
132                        let log = event.as_mut_log();
133                        if !log.contains(timestamp_key) {
134                            log.insert(timestamp_key, timestamp);
135                        }
136                    }
137                }
138
139                events
140            }
141        };
142
143        Ok(events)
144    }
145}
146
147impl From<&JsonDeserializerConfig> for JsonDeserializer {
148    fn from(config: &JsonDeserializerConfig) -> Self {
149        Self {
150            lossy: config.json.lossy,
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use vector_core::config::log_schema;
158    use vrl::core::Value;
159
160    use super::*;
161
162    #[test]
163    fn deserialize_json() {
164        let input = Bytes::from(r#"{ "foo": 123 }"#);
165        let deserializer = JsonDeserializer::default();
166
167        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
168            let events = deserializer.parse(input.clone(), namespace).unwrap();
169            let mut events = events.into_iter();
170
171            {
172                let event = events.next().unwrap();
173                let log = event.as_log();
174                assert_eq!(log["foo"], 123.into());
175                assert_eq!(
176                    log.get((
177                        lookup::PathPrefix::Event,
178                        log_schema().timestamp_key().unwrap()
179                    ))
180                    .is_some(),
181                    namespace == LogNamespace::Legacy
182                );
183            }
184
185            assert_eq!(events.next(), None);
186        }
187    }
188
189    #[test]
190    fn deserialize_non_object_vector_namespace() {
191        let input = Bytes::from(r#"null"#);
192        let deserializer = JsonDeserializer::default();
193
194        let namespace = LogNamespace::Vector;
195        let events = deserializer.parse(input.clone(), namespace).unwrap();
196        let mut events = events.into_iter();
197
198        let event = events.next().unwrap();
199        let log = event.as_log();
200        assert_eq!(log["."], Value::Null);
201
202        assert_eq!(events.next(), None);
203    }
204
205    #[test]
206    fn deserialize_json_array() {
207        let input = Bytes::from(r#"[{ "foo": 123 }, { "bar": 456 }]"#);
208        let deserializer = JsonDeserializer::default();
209        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
210            let events = deserializer.parse(input.clone(), namespace).unwrap();
211            let mut events = events.into_iter();
212
213            {
214                let event = events.next().unwrap();
215                let log = event.as_log();
216                assert_eq!(log["foo"], 123.into());
217                assert_eq!(
218                    log.get((
219                        lookup::PathPrefix::Event,
220                        log_schema().timestamp_key().unwrap()
221                    ))
222                    .is_some(),
223                    namespace == LogNamespace::Legacy
224                );
225            }
226
227            {
228                let event = events.next().unwrap();
229                let log = event.as_log();
230                assert_eq!(log["bar"], 456.into());
231                assert_eq!(
232                    log.get(log_schema().timestamp_key_target_path().unwrap())
233                        .is_some(),
234                    namespace == LogNamespace::Legacy
235                );
236            }
237
238            assert_eq!(events.next(), None);
239        }
240    }
241
242    #[test]
243    fn deserialize_skip_empty() {
244        let input = Bytes::from("");
245        let deserializer = JsonDeserializer::default();
246
247        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
248            let events = deserializer.parse(input.clone(), namespace).unwrap();
249            assert!(events.is_empty());
250        }
251    }
252
253    #[test]
254    fn deserialize_error_invalid_json() {
255        let input = Bytes::from("{ foo");
256        let deserializer = JsonDeserializer::default();
257
258        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
259            assert!(deserializer.parse(input.clone(), namespace).is_err());
260        }
261    }
262
263    #[test]
264    fn deserialize_lossy_replace_invalid_utf8() {
265        let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
266        let deserializer = JsonDeserializer::new(true);
267
268        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
269            let events = deserializer.parse(input.clone(), namespace).unwrap();
270            let mut events = events.into_iter();
271
272            {
273                let event = events.next().unwrap();
274                let log = event.as_log();
275                assert_eq!(log["foo"], b"Hello \xEF\xBF\xBDWorld".into());
276                assert_eq!(
277                    log.get((
278                        lookup::PathPrefix::Event,
279                        log_schema().timestamp_key().unwrap()
280                    ))
281                    .is_some(),
282                    namespace == LogNamespace::Legacy
283                );
284            }
285
286            assert_eq!(events.next(), None);
287        }
288    }
289
290    #[test]
291    fn deserialize_non_lossy_error_invalid_utf8() {
292        let input = Bytes::from(b"{ \"foo\": \"Hello \xF0\x90\x80World\" }".as_slice());
293        let deserializer = JsonDeserializer::new(false);
294
295        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
296            assert!(deserializer.parse(input.clone(), namespace).is_err());
297        }
298    }
299}