1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::Encoder;
3use vector_config_macros::configurable_component;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::MetricTagValues;
7
8#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct JsonSerializerConfig {
12 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
17 pub metric_tag_values: MetricTagValues,
18
19 #[serde(default, rename = "json")]
21 pub options: JsonSerializerOptions,
22}
23
24#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct JsonSerializerOptions {
28 #[serde(default)]
30 pub pretty: bool,
31}
32
33impl JsonSerializerConfig {
34 pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
36 Self {
37 metric_tag_values,
38 options,
39 }
40 }
41
42 pub fn build(&self) -> JsonSerializer {
44 JsonSerializer::new(self.metric_tag_values, self.options.clone())
45 }
46
47 pub fn input_type(&self) -> DataType {
49 DataType::all_bits()
50 }
51
52 pub fn schema_requirement(&self) -> schema::Requirement {
54 schema::Requirement::empty()
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct JsonSerializer {
63 metric_tag_values: MetricTagValues,
64 options: JsonSerializerOptions,
65}
66
67impl JsonSerializer {
68 pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
70 Self {
71 metric_tag_values,
72 options,
73 }
74 }
75
76 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
78 match event {
79 Event::Log(log) => serde_json::to_value(&log),
80 Event::Metric(metric) => serde_json::to_value(&metric),
81 Event::Trace(trace) => serde_json::to_value(&trace),
82 }
83 .map_err(|e| e.to_string().into())
84 }
85}
86
87impl Encoder<Event> for JsonSerializer {
88 type Error = vector_common::Error;
89
90 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
91 let writer = buffer.writer();
92 if self.options.pretty {
93 match event {
94 Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
95 Event::Metric(mut metric) => {
96 if self.metric_tag_values == MetricTagValues::Single {
97 metric.reduce_tags_to_single();
98 }
99 serde_json::to_writer_pretty(writer, &metric)
100 }
101 Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
102 }
103 } else {
104 match event {
105 Event::Log(log) => serde_json::to_writer(writer, &log),
106 Event::Metric(mut metric) => {
107 if self.metric_tag_values == MetricTagValues::Single {
108 metric.reduce_tags_to_single();
109 }
110 serde_json::to_writer(writer, &metric)
111 }
112 Event::Trace(trace) => serde_json::to_writer(writer, &trace),
113 }
114 }
115 .map_err(Into::into)
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use bytes::{Bytes, BytesMut};
122 use chrono::{TimeZone, Timelike, Utc};
123 use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
124 use vector_core::metric_tags;
125 use vrl::btreemap;
126
127 use super::*;
128
129 #[test]
130 fn serialize_json_log() {
131 let event = Event::Log(LogEvent::from(btreemap! {
132 "x" => Value::from("23"),
133 "z" => Value::from(25),
134 "a" => Value::from("0"),
135 }));
136 let bytes = serialize(JsonSerializerConfig::default(), event);
137
138 assert_eq!(bytes, r#"{"a":"0","x":"23","z":25}"#);
139 }
140
141 #[test]
142 fn serialize_json_metric_counter() {
143 let event = Event::Metric(
144 Metric::new(
145 "foos",
146 MetricKind::Incremental,
147 MetricValue::Counter { value: 100.0 },
148 )
149 .with_namespace(Some("vector"))
150 .with_tags(Some(metric_tags!(
151 "key2" => "value2",
152 "key1" => "value1",
153 "Key3" => "Value3",
154 )))
155 .with_timestamp(Some(
156 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
157 .single()
158 .and_then(|t| t.with_nanosecond(11))
159 .expect("invalid timestamp"),
160 )),
161 );
162
163 let bytes = serialize(JsonSerializerConfig::default(), event);
164
165 assert_eq!(
166 bytes,
167 r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
168 );
169 }
170
171 #[test]
172 fn serialize_json_metric_set() {
173 let event = Event::Metric(Metric::new(
174 "users",
175 MetricKind::Incremental,
176 MetricValue::Set {
177 values: vec!["bob".into()].into_iter().collect(),
178 },
179 ));
180
181 let bytes = serialize(JsonSerializerConfig::default(), event);
182
183 assert_eq!(
184 bytes,
185 r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
186 );
187 }
188
189 #[test]
190 fn serialize_json_metric_histogram_without_timestamp() {
191 let event = Event::Metric(Metric::new(
192 "glork",
193 MetricKind::Incremental,
194 MetricValue::Distribution {
195 samples: vector_core::samples![10.0 => 1],
196 statistic: StatisticKind::Histogram,
197 },
198 ));
199
200 let bytes = serialize(JsonSerializerConfig::default(), event);
201
202 assert_eq!(
203 bytes,
204 r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
205 );
206 }
207
208 #[test]
209 fn serialize_equals_to_json_value() {
210 let event = Event::Log(LogEvent::from(btreemap! {
211 "foo" => Value::from("bar")
212 }));
213 let mut serializer = JsonSerializerConfig::default().build();
214 let mut bytes = BytesMut::new();
215
216 serializer.encode(event.clone(), &mut bytes).unwrap();
217
218 let json = serializer.to_json_value(event).unwrap();
219
220 assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
221 }
222
223 #[test]
224 fn serialize_metric_tags_full() {
225 let bytes = serialize(
226 JsonSerializerConfig {
227 metric_tag_values: MetricTagValues::Full,
228 options: JsonSerializerOptions::default(),
229 },
230 metric2(),
231 );
232
233 assert_eq!(
234 bytes,
235 r#"{"name":"counter","tags":{"a":["first",null,"second"]},"kind":"incremental","counter":{"value":1.0}}"#
236 );
237 }
238
239 #[test]
240 fn serialize_metric_tags_single() {
241 let bytes = serialize(
242 JsonSerializerConfig {
243 metric_tag_values: MetricTagValues::Single,
244 options: JsonSerializerOptions::default(),
245 },
246 metric2(),
247 );
248
249 assert_eq!(
250 bytes,
251 r#"{"name":"counter","tags":{"a":"second"},"kind":"incremental","counter":{"value":1.0}}"#
252 );
253 }
254
255 fn metric2() -> Event {
256 Event::Metric(
257 Metric::new(
258 "counter",
259 MetricKind::Incremental,
260 MetricValue::Counter { value: 1.0 },
261 )
262 .with_tags(Some(metric_tags! (
263 "a" => "first",
264 "a" => None,
265 "a" => "second",
266 ))),
267 )
268 }
269
270 fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
271 let mut buffer = BytesMut::new();
272 config.build().encode(input, &mut buffer).unwrap();
273 buffer.freeze()
274 }
275
276 mod pretty_json {
277 use super::*;
278 use bytes::{Bytes, BytesMut};
279 use chrono::{TimeZone, Timelike, Utc};
280 use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
281 use vector_core::metric_tags;
282 use vrl::btreemap;
283
284 fn get_pretty_json_config() -> JsonSerializerConfig {
285 JsonSerializerConfig {
286 options: JsonSerializerOptions { pretty: true },
287 ..Default::default()
288 }
289 }
290
291 #[test]
292 fn serialize_json_log() {
293 let event = Event::Log(LogEvent::from(
294 btreemap! {"x" => Value::from("23"),"z" => Value::from(25),"a" => Value::from("0"),},
295 ));
296 let bytes = serialize(get_pretty_json_config(), event);
297 assert_eq!(
298 bytes,
299 r#"{
300 "a": "0",
301 "x": "23",
302 "z": 25
303}"#
304 );
305 }
306 #[test]
307 fn serialize_json_metric_counter() {
308 let event = Event::Metric(
309 Metric::new(
310 "foos",
311 MetricKind::Incremental,
312 MetricValue::Counter { value: 100.0 },
313 )
314 .with_namespace(Some("vector"))
315 .with_tags(Some(
316 metric_tags!("key2" => "value2","key1" => "value1","Key3" => "Value3",),
317 ))
318 .with_timestamp(Some(
319 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
320 .single()
321 .and_then(|t| t.with_nanosecond(11))
322 .expect("invalid timestamp"),
323 )),
324 );
325 let bytes = serialize(get_pretty_json_config(), event);
326 assert_eq!(
327 bytes,
328 r#"{
329 "name": "foos",
330 "namespace": "vector",
331 "tags": {
332 "Key3": "Value3",
333 "key1": "value1",
334 "key2": "value2"
335 },
336 "timestamp": "2018-11-14T08:09:10.000000011Z",
337 "kind": "incremental",
338 "counter": {
339 "value": 100.0
340 }
341}"#
342 );
343 }
344 #[test]
345 fn serialize_json_metric_set() {
346 let event = Event::Metric(Metric::new(
347 "users",
348 MetricKind::Incremental,
349 MetricValue::Set {
350 values: vec!["bob".into()].into_iter().collect(),
351 },
352 ));
353 let bytes = serialize(get_pretty_json_config(), event);
354 assert_eq!(
355 bytes,
356 r#"{
357 "name": "users",
358 "kind": "incremental",
359 "set": {
360 "values": [
361 "bob"
362 ]
363 }
364}"#
365 );
366 }
367 #[test]
368 fn serialize_json_metric_histogram_without_timestamp() {
369 let event = Event::Metric(Metric::new(
370 "glork",
371 MetricKind::Incremental,
372 MetricValue::Distribution {
373 samples: vector_core::samples![10.0 => 1],
374 statistic: StatisticKind::Histogram,
375 },
376 ));
377 let bytes = serialize(get_pretty_json_config(), event);
378 assert_eq!(
379 bytes,
380 r#"{
381 "name": "glork",
382 "kind": "incremental",
383 "distribution": {
384 "samples": [
385 {
386 "value": 10.0,
387 "rate": 1
388 }
389 ],
390 "statistic": "histogram"
391 }
392}"#
393 );
394 }
395 #[test]
396 fn serialize_equals_to_json_value() {
397 let event = Event::Log(LogEvent::from(btreemap! {"foo" => Value::from("bar")}));
398 let mut serializer = get_pretty_json_config().build();
399 let mut bytes = BytesMut::new();
400 serializer.encode(event.clone(), &mut bytes).unwrap();
401 let json = serializer.to_json_value(event).unwrap();
402 assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
403 }
404 #[test]
405 fn serialize_metric_tags_full() {
406 let bytes = serialize(
407 JsonSerializerConfig {
408 metric_tag_values: MetricTagValues::Full,
409 options: JsonSerializerOptions { pretty: true },
410 },
411 metric2(),
412 );
413 assert_eq!(
414 bytes,
415 r#"{
416 "name": "counter",
417 "tags": {
418 "a": [
419 "first",
420 null,
421 "second"
422 ]
423 },
424 "kind": "incremental",
425 "counter": {
426 "value": 1.0
427 }
428}"#
429 );
430 }
431 #[test]
432 fn serialize_metric_tags_single() {
433 let bytes = serialize(
434 JsonSerializerConfig {
435 metric_tag_values: MetricTagValues::Single,
436 options: JsonSerializerOptions { pretty: true },
437 },
438 metric2(),
439 );
440 assert_eq!(
441 bytes,
442 r#"{
443 "name": "counter",
444 "tags": {
445 "a": "second"
446 },
447 "kind": "incremental",
448 "counter": {
449 "value": 1.0
450 }
451}"#
452 );
453 }
454 fn metric2() -> Event {
455 Event::Metric(
456 Metric::new(
457 "counter",
458 MetricKind::Incremental,
459 MetricValue::Counter { value: 1.0 },
460 )
461 .with_tags(Some(
462 metric_tags! ("a" => "first","a" => None,"a" => "second",),
463 )),
464 )
465 }
466 fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
467 let mut buffer = BytesMut::new();
468 config.build().encode(input, &mut buffer).unwrap();
469 buffer.freeze()
470 }
471 }
472}