codecs/decoding/format/
native_json.rs

1use bytes::Bytes;
2use derivative::Derivative;
3use smallvec::{smallvec, SmallVec};
4use vector_config::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6use vrl::value::kind::Collection;
7use vrl::value::Kind;
8
9use super::{default_lossy, Deserializer};
10use vector_core::config::LogNamespace;
11
12/// Config used to build a `NativeJsonDeserializer`.
13#[configurable_component]
14#[derive(Debug, Clone, Default)]
15pub struct NativeJsonDeserializerConfig {
16    /// Vector's native JSON-specific decoding options.
17    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
18    pub native_json: NativeJsonDeserializerOptions,
19}
20
21impl NativeJsonDeserializerConfig {
22    /// Creates a new `NativeJsonDeserializerConfig`.
23    pub fn new(options: NativeJsonDeserializerOptions) -> Self {
24        Self {
25            native_json: options,
26        }
27    }
28
29    /// Build the `NativeJsonDeserializer` from this configuration.
30    pub fn build(&self) -> NativeJsonDeserializer {
31        NativeJsonDeserializer {
32            lossy: self.native_json.lossy,
33        }
34    }
35
36    /// Return the type of event build by this deserializer.
37    pub fn output_type(&self) -> DataType {
38        DataType::all_bits()
39    }
40
41    /// The schema produced by the deserializer.
42    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
43        match log_namespace {
44            LogNamespace::Vector => {
45                schema::Definition::new_with_default_metadata(Kind::json(), [log_namespace])
46            }
47            LogNamespace::Legacy => schema::Definition::new_with_default_metadata(
48                Kind::object(Collection::json()),
49                [log_namespace],
50            ),
51        }
52    }
53}
54
55/// Vector's native JSON-specific decoding options.
56#[configurable_component]
57#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
58#[derivative(Default)]
59pub struct NativeJsonDeserializerOptions {
60    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
61    ///
62    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
63    ///
64    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
65    #[serde(
66        default = "default_lossy",
67        skip_serializing_if = "vector_core::serde::is_default"
68    )]
69    #[derivative(Default(value = "default_lossy()"))]
70    pub lossy: bool,
71}
72
73/// Deserializer that builds `Event`s from a byte frame containing Vector's native JSON
74/// representation.
75#[derive(Debug, Clone, Derivative)]
76#[derivative(Default)]
77pub struct NativeJsonDeserializer {
78    #[derivative(Default(value = "default_lossy()"))]
79    lossy: bool,
80}
81
82impl Deserializer for NativeJsonDeserializer {
83    fn parse(
84        &self,
85        bytes: Bytes,
86        // LogNamespace is ignored because Vector owns the data format being consumed and as such there
87        // is no need to change the fields of the event.
88        _log_namespace: LogNamespace,
89    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
90        // It's common to receive empty frames when parsing NDJSON, since it
91        // allows multiple empty newlines. We proceed without a warning here.
92        if bytes.is_empty() {
93            return Ok(smallvec![]);
94        }
95
96        let json: serde_json::Value = match self.lossy {
97            true => serde_json::from_str(&String::from_utf8_lossy(&bytes)),
98            false => serde_json::from_slice(&bytes),
99        }
100        .map_err(|error| format!("Error parsing JSON: {error:?}"))?;
101
102        let events = match json {
103            serde_json::Value::Array(values) => values
104                .into_iter()
105                .map(serde_json::from_value)
106                .collect::<Result<SmallVec<[Event; 1]>, _>>()?,
107            _ => smallvec![serde_json::from_value(json)?],
108        };
109
110        Ok(events)
111    }
112}
113
114#[cfg(test)]
115mod test {
116    use serde_json::json;
117
118    use super::*;
119
120    #[test]
121    fn parses_top_level_arrays() {
122        let config = NativeJsonDeserializerConfig::default();
123        let deserializer = config.build();
124
125        let json1 = json!({"a": "b", "c": "d"});
126        let json2 = json!({"foo": "bar", "baz": "quux"});
127        let json_array = json!([{ "log": json1 }, { "log": json2 }]);
128        let input = Bytes::from(serde_json::to_vec(&json_array).unwrap());
129
130        let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
131
132        let event1 = Event::from_json_value(json1, LogNamespace::Legacy).unwrap();
133        let event2 = Event::from_json_value(json2, LogNamespace::Legacy).unwrap();
134        let expected: SmallVec<[Event; 1]> = smallvec![event1, event2];
135        assert_eq!(events, expected);
136    }
137}