codecs/encoding/format/
json.rs

1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::Encoder;
3use vector_config_macros::configurable_component;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::MetricTagValues;
7
8/// Config used to build a `JsonSerializer`.
9#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct JsonSerializerConfig {
12    /// Controls how metric tag values are encoded.
13    ///
14    /// When set to `single`, only the last non-bare value of tags are displayed with the
15    /// metric.  When set to `full`, all metric tags are exposed as separate assignments.
16    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
17    pub metric_tag_values: MetricTagValues,
18
19    /// Options for the JsonSerializer.
20    #[serde(default, rename = "json")]
21    pub options: JsonSerializerOptions,
22}
23
24/// Options for the JsonSerializer.
25#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct JsonSerializerOptions {
28    /// Whether to use pretty JSON formatting.
29    #[serde(default)]
30    pub pretty: bool,
31}
32
33impl JsonSerializerConfig {
34    /// Creates a new `JsonSerializerConfig`.
35    pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
36        Self {
37            metric_tag_values,
38            options,
39        }
40    }
41
42    /// Build the `JsonSerializer` from this configuration.
43    pub fn build(&self) -> JsonSerializer {
44        JsonSerializer::new(self.metric_tag_values, self.options.clone())
45    }
46
47    /// The data type of events that are accepted by `JsonSerializer`.
48    pub fn input_type(&self) -> DataType {
49        DataType::all_bits()
50    }
51
52    /// The schema required by the serializer.
53    pub fn schema_requirement(&self) -> schema::Requirement {
54        // While technically we support `Value` variants that can't be losslessly serialized to
55        // JSON, we don't want to enforce that limitation to users yet.
56        schema::Requirement::empty()
57    }
58}
59
60/// Serializer that converts an `Event` to bytes using the JSON format.
61#[derive(Debug, Clone)]
62pub struct JsonSerializer {
63    metric_tag_values: MetricTagValues,
64    options: JsonSerializerOptions,
65}
66
67impl JsonSerializer {
68    /// Creates a new `JsonSerializer`.
69    pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
70        Self {
71            metric_tag_values,
72            options,
73        }
74    }
75
76    /// Encode event and represent it as JSON value.
77    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
78        match event {
79            Event::Log(log) => serde_json::to_value(&log),
80            Event::Metric(metric) => serde_json::to_value(&metric),
81            Event::Trace(trace) => serde_json::to_value(&trace),
82        }
83        .map_err(|e| e.to_string().into())
84    }
85}
86
87impl Encoder<Event> for JsonSerializer {
88    type Error = vector_common::Error;
89
90    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
91        let writer = buffer.writer();
92        if self.options.pretty {
93            match event {
94                Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
95                Event::Metric(mut metric) => {
96                    if self.metric_tag_values == MetricTagValues::Single {
97                        metric.reduce_tags_to_single();
98                    }
99                    serde_json::to_writer_pretty(writer, &metric)
100                }
101                Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
102            }
103        } else {
104            match event {
105                Event::Log(log) => serde_json::to_writer(writer, &log),
106                Event::Metric(mut metric) => {
107                    if self.metric_tag_values == MetricTagValues::Single {
108                        metric.reduce_tags_to_single();
109                    }
110                    serde_json::to_writer(writer, &metric)
111                }
112                Event::Trace(trace) => serde_json::to_writer(writer, &trace),
113            }
114        }
115        .map_err(Into::into)
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use bytes::{Bytes, BytesMut};
122    use chrono::{TimeZone, Timelike, Utc};
123    use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
124    use vector_core::metric_tags;
125    use vrl::btreemap;
126
127    use super::*;
128
129    #[test]
130    fn serialize_json_log() {
131        let event = Event::Log(LogEvent::from(btreemap! {
132            "x" => Value::from("23"),
133            "z" => Value::from(25),
134            "a" => Value::from("0"),
135        }));
136        let bytes = serialize(JsonSerializerConfig::default(), event);
137
138        assert_eq!(bytes, r#"{"a":"0","x":"23","z":25}"#);
139    }
140
141    #[test]
142    fn serialize_json_metric_counter() {
143        let event = Event::Metric(
144            Metric::new(
145                "foos",
146                MetricKind::Incremental,
147                MetricValue::Counter { value: 100.0 },
148            )
149            .with_namespace(Some("vector"))
150            .with_tags(Some(metric_tags!(
151                "key2" => "value2",
152                "key1" => "value1",
153                "Key3" => "Value3",
154            )))
155            .with_timestamp(Some(
156                Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
157                    .single()
158                    .and_then(|t| t.with_nanosecond(11))
159                    .expect("invalid timestamp"),
160            )),
161        );
162
163        let bytes = serialize(JsonSerializerConfig::default(), event);
164
165        assert_eq!(
166            bytes,
167            r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
168        );
169    }
170
171    #[test]
172    fn serialize_json_metric_set() {
173        let event = Event::Metric(Metric::new(
174            "users",
175            MetricKind::Incremental,
176            MetricValue::Set {
177                values: vec!["bob".into()].into_iter().collect(),
178            },
179        ));
180
181        let bytes = serialize(JsonSerializerConfig::default(), event);
182
183        assert_eq!(
184            bytes,
185            r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
186        );
187    }
188
189    #[test]
190    fn serialize_json_metric_histogram_without_timestamp() {
191        let event = Event::Metric(Metric::new(
192            "glork",
193            MetricKind::Incremental,
194            MetricValue::Distribution {
195                samples: vector_core::samples![10.0 => 1],
196                statistic: StatisticKind::Histogram,
197            },
198        ));
199
200        let bytes = serialize(JsonSerializerConfig::default(), event);
201
202        assert_eq!(
203            bytes,
204            r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
205        );
206    }
207
208    #[test]
209    fn serialize_equals_to_json_value() {
210        let event = Event::Log(LogEvent::from(btreemap! {
211            "foo" => Value::from("bar")
212        }));
213        let mut serializer = JsonSerializerConfig::default().build();
214        let mut bytes = BytesMut::new();
215
216        serializer.encode(event.clone(), &mut bytes).unwrap();
217
218        let json = serializer.to_json_value(event).unwrap();
219
220        assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
221    }
222
223    #[test]
224    fn serialize_metric_tags_full() {
225        let bytes = serialize(
226            JsonSerializerConfig {
227                metric_tag_values: MetricTagValues::Full,
228                options: JsonSerializerOptions::default(),
229            },
230            metric2(),
231        );
232
233        assert_eq!(
234            bytes,
235            r#"{"name":"counter","tags":{"a":["first",null,"second"]},"kind":"incremental","counter":{"value":1.0}}"#
236        );
237    }
238
239    #[test]
240    fn serialize_metric_tags_single() {
241        let bytes = serialize(
242            JsonSerializerConfig {
243                metric_tag_values: MetricTagValues::Single,
244                options: JsonSerializerOptions::default(),
245            },
246            metric2(),
247        );
248
249        assert_eq!(
250            bytes,
251            r#"{"name":"counter","tags":{"a":"second"},"kind":"incremental","counter":{"value":1.0}}"#
252        );
253    }
254
255    fn metric2() -> Event {
256        Event::Metric(
257            Metric::new(
258                "counter",
259                MetricKind::Incremental,
260                MetricValue::Counter { value: 1.0 },
261            )
262            .with_tags(Some(metric_tags! (
263                "a" => "first",
264                "a" => None,
265                "a" => "second",
266            ))),
267        )
268    }
269
270    fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
271        let mut buffer = BytesMut::new();
272        config.build().encode(input, &mut buffer).unwrap();
273        buffer.freeze()
274    }
275
276    mod pretty_json {
277        use super::*;
278        use bytes::{Bytes, BytesMut};
279        use chrono::{TimeZone, Timelike, Utc};
280        use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
281        use vector_core::metric_tags;
282        use vrl::btreemap;
283
284        fn get_pretty_json_config() -> JsonSerializerConfig {
285            JsonSerializerConfig {
286                options: JsonSerializerOptions { pretty: true },
287                ..Default::default()
288            }
289        }
290
291        #[test]
292        fn serialize_json_log() {
293            let event = Event::Log(LogEvent::from(
294                btreemap! {"x" => Value::from("23"),"z" => Value::from(25),"a" => Value::from("0"),},
295            ));
296            let bytes = serialize(get_pretty_json_config(), event);
297            assert_eq!(
298                bytes,
299                r#"{
300  "a": "0",
301  "x": "23",
302  "z": 25
303}"#
304            );
305        }
306        #[test]
307        fn serialize_json_metric_counter() {
308            let event = Event::Metric(
309                Metric::new(
310                    "foos",
311                    MetricKind::Incremental,
312                    MetricValue::Counter { value: 100.0 },
313                )
314                .with_namespace(Some("vector"))
315                .with_tags(Some(
316                    metric_tags!("key2" => "value2","key1" => "value1","Key3" => "Value3",),
317                ))
318                .with_timestamp(Some(
319                    Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
320                        .single()
321                        .and_then(|t| t.with_nanosecond(11))
322                        .expect("invalid timestamp"),
323                )),
324            );
325            let bytes = serialize(get_pretty_json_config(), event);
326            assert_eq!(
327                bytes,
328                r#"{
329  "name": "foos",
330  "namespace": "vector",
331  "tags": {
332    "Key3": "Value3",
333    "key1": "value1",
334    "key2": "value2"
335  },
336  "timestamp": "2018-11-14T08:09:10.000000011Z",
337  "kind": "incremental",
338  "counter": {
339    "value": 100.0
340  }
341}"#
342            );
343        }
344        #[test]
345        fn serialize_json_metric_set() {
346            let event = Event::Metric(Metric::new(
347                "users",
348                MetricKind::Incremental,
349                MetricValue::Set {
350                    values: vec!["bob".into()].into_iter().collect(),
351                },
352            ));
353            let bytes = serialize(get_pretty_json_config(), event);
354            assert_eq!(
355                bytes,
356                r#"{
357  "name": "users",
358  "kind": "incremental",
359  "set": {
360    "values": [
361      "bob"
362    ]
363  }
364}"#
365            );
366        }
367        #[test]
368        fn serialize_json_metric_histogram_without_timestamp() {
369            let event = Event::Metric(Metric::new(
370                "glork",
371                MetricKind::Incremental,
372                MetricValue::Distribution {
373                    samples: vector_core::samples![10.0 => 1],
374                    statistic: StatisticKind::Histogram,
375                },
376            ));
377            let bytes = serialize(get_pretty_json_config(), event);
378            assert_eq!(
379                bytes,
380                r#"{
381  "name": "glork",
382  "kind": "incremental",
383  "distribution": {
384    "samples": [
385      {
386        "value": 10.0,
387        "rate": 1
388      }
389    ],
390    "statistic": "histogram"
391  }
392}"#
393            );
394        }
395        #[test]
396        fn serialize_equals_to_json_value() {
397            let event = Event::Log(LogEvent::from(btreemap! {"foo" => Value::from("bar")}));
398            let mut serializer = get_pretty_json_config().build();
399            let mut bytes = BytesMut::new();
400            serializer.encode(event.clone(), &mut bytes).unwrap();
401            let json = serializer.to_json_value(event).unwrap();
402            assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
403        }
404        #[test]
405        fn serialize_metric_tags_full() {
406            let bytes = serialize(
407                JsonSerializerConfig {
408                    metric_tag_values: MetricTagValues::Full,
409                    options: JsonSerializerOptions { pretty: true },
410                },
411                metric2(),
412            );
413            assert_eq!(
414                bytes,
415                r#"{
416  "name": "counter",
417  "tags": {
418    "a": [
419      "first",
420      null,
421      "second"
422    ]
423  },
424  "kind": "incremental",
425  "counter": {
426    "value": 1.0
427  }
428}"#
429            );
430        }
431        #[test]
432        fn serialize_metric_tags_single() {
433            let bytes = serialize(
434                JsonSerializerConfig {
435                    metric_tag_values: MetricTagValues::Single,
436                    options: JsonSerializerOptions { pretty: true },
437                },
438                metric2(),
439            );
440            assert_eq!(
441                bytes,
442                r#"{
443  "name": "counter",
444  "tags": {
445    "a": "second"
446  },
447  "kind": "incremental",
448  "counter": {
449    "value": 1.0
450  }
451}"#
452            );
453        }
454        fn metric2() -> Event {
455            Event::Metric(
456                Metric::new(
457                    "counter",
458                    MetricKind::Incremental,
459                    MetricValue::Counter { value: 1.0 },
460                )
461                .with_tags(Some(
462                    metric_tags! ("a" => "first","a" => None,"a" => "second",),
463                )),
464            )
465        }
466        fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
467            let mut buffer = BytesMut::new();
468            config.build().encode(input, &mut buffer).unwrap();
469            buffer.freeze()
470        }
471    }
472}