codecs/encoding/format/
json.rs

1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::Encoder;
3use vector_config_macros::configurable_component;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::MetricTagValues;
7
8/// Config used to build a `JsonSerializer`.
9#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct JsonSerializerConfig {
12    /// Controls how metric tag values are encoded.
13    ///
14    /// When set to `single`, only the last non-bare value of tags are displayed with the
15    /// metric.  When set to `full`, all metric tags are exposed as separate assignments.
16    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
17    pub metric_tag_values: MetricTagValues,
18
19    /// Options for the JsonSerializer.
20    #[serde(default, rename = "json")]
21    pub options: JsonSerializerOptions,
22}
23
24/// Options for the JsonSerializer.
25#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct JsonSerializerOptions {
28    /// Whether to use pretty JSON formatting.
29    #[serde(default)]
30    pub pretty: bool,
31}
32
33impl JsonSerializerConfig {
34    /// Creates a new `JsonSerializerConfig`.
35    pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
36        Self {
37            metric_tag_values,
38            options,
39        }
40    }
41
42    /// Build the `JsonSerializer` from this configuration.
43    pub fn build(&self) -> JsonSerializer {
44        JsonSerializer::new(self.metric_tag_values, self.options.clone())
45    }
46
47    /// The data type of events that are accepted by `JsonSerializer`.
48    pub fn input_type(&self) -> DataType {
49        DataType::all_bits()
50    }
51
52    /// The schema required by the serializer.
53    pub fn schema_requirement(&self) -> schema::Requirement {
54        // While technically we support `Value` variants that can't be losslessly serialized to
55        // JSON, we don't want to enforce that limitation to users yet.
56        schema::Requirement::empty()
57    }
58}
59
60/// Serializer that converts an `Event` to bytes using the JSON format.
61#[derive(Debug, Clone)]
62pub struct JsonSerializer {
63    metric_tag_values: MetricTagValues,
64    options: JsonSerializerOptions,
65}
66
67impl JsonSerializer {
68    /// Creates a new `JsonSerializer`.
69    pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
70        Self {
71            metric_tag_values,
72            options,
73        }
74    }
75
76    /// Encode event and represent it as JSON value.
77    pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
78        match event {
79            Event::Log(log) => serde_json::to_value(&log),
80            Event::Metric(metric) => serde_json::to_value(&metric),
81            Event::Trace(trace) => serde_json::to_value(&trace),
82        }
83        .map_err(|e| e.to_string().into())
84    }
85}
86
87impl Encoder<Event> for JsonSerializer {
88    type Error = vector_common::Error;
89
90    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
91        let writer = buffer.writer();
92        if self.options.pretty {
93            match event {
94                Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
95                Event::Metric(mut metric) => {
96                    if self.metric_tag_values == MetricTagValues::Single {
97                        metric.reduce_tags_to_single();
98                    }
99                    serde_json::to_writer_pretty(writer, &metric)
100                }
101                Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
102            }
103        } else {
104            match event {
105                Event::Log(log) => serde_json::to_writer(writer, &log),
106                Event::Metric(mut metric) => {
107                    if self.metric_tag_values == MetricTagValues::Single {
108                        metric.reduce_tags_to_single();
109                    }
110                    serde_json::to_writer(writer, &metric)
111                }
112                Event::Trace(trace) => serde_json::to_writer(writer, &trace),
113            }
114        }
115        .map_err(Into::into)
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use bytes::{Bytes, BytesMut};
122    use chrono::{TimeZone, Timelike, Utc};
123    use vector_core::{
124        event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value},
125        metric_tags,
126    };
127    use vrl::btreemap;
128
129    use super::*;
130
131    #[test]
132    fn serialize_json_log() {
133        let event = Event::Log(LogEvent::from(btreemap! {
134            "x" => Value::from("23"),
135            "z" => Value::from(25),
136            "a" => Value::from("0"),
137        }));
138        let bytes = serialize(JsonSerializerConfig::default(), event);
139
140        assert_eq!(bytes, r#"{"a":"0","x":"23","z":25}"#);
141    }
142
143    #[test]
144    fn serialize_json_metric_counter() {
145        let event = Event::Metric(
146            Metric::new(
147                "foos",
148                MetricKind::Incremental,
149                MetricValue::Counter { value: 100.0 },
150            )
151            .with_namespace(Some("vector"))
152            .with_tags(Some(metric_tags!(
153                "key2" => "value2",
154                "key1" => "value1",
155                "Key3" => "Value3",
156            )))
157            .with_timestamp(Some(
158                Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
159                    .single()
160                    .and_then(|t| t.with_nanosecond(11))
161                    .expect("invalid timestamp"),
162            )),
163        );
164
165        let bytes = serialize(JsonSerializerConfig::default(), event);
166
167        assert_eq!(
168            bytes,
169            r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
170        );
171    }
172
173    #[test]
174    fn serialize_json_metric_set() {
175        let event = Event::Metric(Metric::new(
176            "users",
177            MetricKind::Incremental,
178            MetricValue::Set {
179                values: vec!["bob".into()].into_iter().collect(),
180            },
181        ));
182
183        let bytes = serialize(JsonSerializerConfig::default(), event);
184
185        assert_eq!(
186            bytes,
187            r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
188        );
189    }
190
191    #[test]
192    fn serialize_json_metric_histogram_without_timestamp() {
193        let event = Event::Metric(Metric::new(
194            "glork",
195            MetricKind::Incremental,
196            MetricValue::Distribution {
197                samples: vector_core::samples![10.0 => 1],
198                statistic: StatisticKind::Histogram,
199            },
200        ));
201
202        let bytes = serialize(JsonSerializerConfig::default(), event);
203
204        assert_eq!(
205            bytes,
206            r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
207        );
208    }
209
210    #[test]
211    fn serialize_equals_to_json_value() {
212        let event = Event::Log(LogEvent::from(btreemap! {
213            "foo" => Value::from("bar")
214        }));
215        let mut serializer = JsonSerializerConfig::default().build();
216        let mut bytes = BytesMut::new();
217
218        serializer.encode(event.clone(), &mut bytes).unwrap();
219
220        let json = serializer.to_json_value(event).unwrap();
221
222        assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
223    }
224
225    #[test]
226    fn serialize_metric_tags_full() {
227        let bytes = serialize(
228            JsonSerializerConfig {
229                metric_tag_values: MetricTagValues::Full,
230                options: JsonSerializerOptions::default(),
231            },
232            metric2(),
233        );
234
235        assert_eq!(
236            bytes,
237            r#"{"name":"counter","tags":{"a":["first",null,"second"]},"kind":"incremental","counter":{"value":1.0}}"#
238        );
239    }
240
241    #[test]
242    fn serialize_metric_tags_single() {
243        let bytes = serialize(
244            JsonSerializerConfig {
245                metric_tag_values: MetricTagValues::Single,
246                options: JsonSerializerOptions::default(),
247            },
248            metric2(),
249        );
250
251        assert_eq!(
252            bytes,
253            r#"{"name":"counter","tags":{"a":"second"},"kind":"incremental","counter":{"value":1.0}}"#
254        );
255    }
256
257    fn metric2() -> Event {
258        Event::Metric(
259            Metric::new(
260                "counter",
261                MetricKind::Incremental,
262                MetricValue::Counter { value: 1.0 },
263            )
264            .with_tags(Some(metric_tags! (
265                "a" => "first",
266                "a" => None,
267                "a" => "second",
268            ))),
269        )
270    }
271
272    fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
273        let mut buffer = BytesMut::new();
274        config.build().encode(input, &mut buffer).unwrap();
275        buffer.freeze()
276    }
277
278    mod pretty_json {
279        use bytes::{Bytes, BytesMut};
280        use chrono::{TimeZone, Timelike, Utc};
281        use vector_core::{
282            event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value},
283            metric_tags,
284        };
285        use vrl::btreemap;
286
287        use super::*;
288
289        fn get_pretty_json_config() -> JsonSerializerConfig {
290            JsonSerializerConfig {
291                options: JsonSerializerOptions { pretty: true },
292                ..Default::default()
293            }
294        }
295
296        #[test]
297        fn serialize_json_log() {
298            let event = Event::Log(LogEvent::from(
299                btreemap! {"x" => Value::from("23"),"z" => Value::from(25),"a" => Value::from("0"),},
300            ));
301            let bytes = serialize(get_pretty_json_config(), event);
302            assert_eq!(
303                bytes,
304                r#"{
305  "a": "0",
306  "x": "23",
307  "z": 25
308}"#
309            );
310        }
311        #[test]
312        fn serialize_json_metric_counter() {
313            let event = Event::Metric(
314                Metric::new(
315                    "foos",
316                    MetricKind::Incremental,
317                    MetricValue::Counter { value: 100.0 },
318                )
319                .with_namespace(Some("vector"))
320                .with_tags(Some(
321                    metric_tags!("key2" => "value2","key1" => "value1","Key3" => "Value3",),
322                ))
323                .with_timestamp(Some(
324                    Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
325                        .single()
326                        .and_then(|t| t.with_nanosecond(11))
327                        .expect("invalid timestamp"),
328                )),
329            );
330            let bytes = serialize(get_pretty_json_config(), event);
331            assert_eq!(
332                bytes,
333                r#"{
334  "name": "foos",
335  "namespace": "vector",
336  "tags": {
337    "Key3": "Value3",
338    "key1": "value1",
339    "key2": "value2"
340  },
341  "timestamp": "2018-11-14T08:09:10.000000011Z",
342  "kind": "incremental",
343  "counter": {
344    "value": 100.0
345  }
346}"#
347            );
348        }
349        #[test]
350        fn serialize_json_metric_set() {
351            let event = Event::Metric(Metric::new(
352                "users",
353                MetricKind::Incremental,
354                MetricValue::Set {
355                    values: vec!["bob".into()].into_iter().collect(),
356                },
357            ));
358            let bytes = serialize(get_pretty_json_config(), event);
359            assert_eq!(
360                bytes,
361                r#"{
362  "name": "users",
363  "kind": "incremental",
364  "set": {
365    "values": [
366      "bob"
367    ]
368  }
369}"#
370            );
371        }
372        #[test]
373        fn serialize_json_metric_histogram_without_timestamp() {
374            let event = Event::Metric(Metric::new(
375                "glork",
376                MetricKind::Incremental,
377                MetricValue::Distribution {
378                    samples: vector_core::samples![10.0 => 1],
379                    statistic: StatisticKind::Histogram,
380                },
381            ));
382            let bytes = serialize(get_pretty_json_config(), event);
383            assert_eq!(
384                bytes,
385                r#"{
386  "name": "glork",
387  "kind": "incremental",
388  "distribution": {
389    "samples": [
390      {
391        "value": 10.0,
392        "rate": 1
393      }
394    ],
395    "statistic": "histogram"
396  }
397}"#
398            );
399        }
400        #[test]
401        fn serialize_equals_to_json_value() {
402            let event = Event::Log(LogEvent::from(btreemap! {"foo" => Value::from("bar")}));
403            let mut serializer = get_pretty_json_config().build();
404            let mut bytes = BytesMut::new();
405            serializer.encode(event.clone(), &mut bytes).unwrap();
406            let json = serializer.to_json_value(event).unwrap();
407            assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
408        }
409        #[test]
410        fn serialize_metric_tags_full() {
411            let bytes = serialize(
412                JsonSerializerConfig {
413                    metric_tag_values: MetricTagValues::Full,
414                    options: JsonSerializerOptions { pretty: true },
415                },
416                metric2(),
417            );
418            assert_eq!(
419                bytes,
420                r#"{
421  "name": "counter",
422  "tags": {
423    "a": [
424      "first",
425      null,
426      "second"
427    ]
428  },
429  "kind": "incremental",
430  "counter": {
431    "value": 1.0
432  }
433}"#
434            );
435        }
436        #[test]
437        fn serialize_metric_tags_single() {
438            let bytes = serialize(
439                JsonSerializerConfig {
440                    metric_tag_values: MetricTagValues::Single,
441                    options: JsonSerializerOptions { pretty: true },
442                },
443                metric2(),
444            );
445            assert_eq!(
446                bytes,
447                r#"{
448  "name": "counter",
449  "tags": {
450    "a": "second"
451  },
452  "kind": "incremental",
453  "counter": {
454    "value": 1.0
455  }
456}"#
457            );
458        }
459        fn metric2() -> Event {
460            Event::Metric(
461                Metric::new(
462                    "counter",
463                    MetricKind::Incremental,
464                    MetricValue::Counter { value: 1.0 },
465                )
466                .with_tags(Some(
467                    metric_tags! ("a" => "first","a" => None,"a" => "second",),
468                )),
469            )
470        }
471        fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
472            let mut buffer = BytesMut::new();
473            config.build().encode(input, &mut buffer).unwrap();
474            buffer.freeze()
475        }
476    }
477}