codecs/decoding/format/
native_json.rs

1use bytes::Bytes;
2use derivative::Derivative;
3use smallvec::{SmallVec, smallvec};
4use vector_config::configurable_component;
5use vector_core::{
6    config::{DataType, LogNamespace},
7    event::Event,
8    schema,
9};
10use vrl::value::{Kind, kind::Collection};
11
12use super::{Deserializer, default_lossy};
13
14/// Config used to build a `NativeJsonDeserializer`.
15#[configurable_component]
16#[derive(Debug, Clone, Default)]
17pub struct NativeJsonDeserializerConfig {
18    /// Vector's native JSON-specific decoding options.
19    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
20    pub native_json: NativeJsonDeserializerOptions,
21}
22
23impl NativeJsonDeserializerConfig {
24    /// Creates a new `NativeJsonDeserializerConfig`.
25    pub fn new(options: NativeJsonDeserializerOptions) -> Self {
26        Self {
27            native_json: options,
28        }
29    }
30
31    /// Build the `NativeJsonDeserializer` from this configuration.
32    pub fn build(&self) -> NativeJsonDeserializer {
33        NativeJsonDeserializer {
34            lossy: self.native_json.lossy,
35        }
36    }
37
38    /// Return the type of event build by this deserializer.
39    pub fn output_type(&self) -> DataType {
40        DataType::all_bits()
41    }
42
43    /// The schema produced by the deserializer.
44    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
45        match log_namespace {
46            LogNamespace::Vector => {
47                schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
48            }
49            LogNamespace::Legacy => schema::Definition::new_with_default_metadata(
50                Kind::object(Collection::json()),
51                [log_namespace],
52            ),
53        }
54    }
55}
56
57/// Vector's native JSON-specific decoding options.
58#[configurable_component]
59#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
60#[derivative(Default)]
61pub struct NativeJsonDeserializerOptions {
62    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
63    ///
64    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
65    ///
66    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
67    #[serde(
68        default = "default_lossy",
69        skip_serializing_if = "vector_core::serde::is_default"
70    )]
71    #[derivative(Default(value = "default_lossy()"))]
72    pub lossy: bool,
73}
74
75/// Deserializer that builds `Event`s from a byte frame containing Vector's native JSON
76/// representation.
77#[derive(Debug, Clone, Derivative)]
78#[derivative(Default)]
79pub struct NativeJsonDeserializer {
80    #[derivative(Default(value = "default_lossy()"))]
81    lossy: bool,
82}
83
84impl Deserializer for NativeJsonDeserializer {
85    fn parse(
86        &self,
87        bytes: Bytes,
88        // LogNamespace is ignored because Vector owns the data format being consumed and as such there
89        // is no need to change the fields of the event.
90        _log_namespace: LogNamespace,
91    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
92        // It's common to receive empty frames when parsing NDJSON, since it
93        // allows multiple empty newlines. We proceed without a warning here.
94        if bytes.is_empty() {
95            return Ok(smallvec![]);
96        }
97
98        let json: serde_json::Value = match self.lossy {
99            true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
100            false => serde_json::from_slice(&bytes),
101        }
102        .map_err(|error| format!("Error parsing JSON: {error:?}"))?;
103
104        let events = match json {
105            serde_json::Value::Array(values) => values
106                .into_iter()
107                .map(serde_json::from_value)
108                .collect::<Result<SmallVec<[Event; 1]>, _>>()?,
109            _ => smallvec![serde_json::from_value(json)?],
110        };
111
112        Ok(events)
113    }
114}
115
116#[cfg(test)]
117mod test {
118    use serde_json::json;
119
120    use super::*;
121
122    #[test]
123    fn parses_top_level_arrays() {
124        let config = NativeJsonDeserializerConfig::default();
125        let deserializer = config.build();
126
127        let json1 = json!({"a": "b", "c": "d"});
128        let json2 = json!({"foo": "bar", "baz": "quux"});
129        let json_array = json!([{ "log": json1 }, { "log": json2 }]);
130        let input = Bytes::from(serde_json::to_vec(&json_array).unwrap());
131
132        let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
133
134        let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap();
135        let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap();
136        let expected: SmallVec<[Event; 1]> = smallvec![event1, event2];
137        assert_eq!(events, expected);
138    }
139}