1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::Encoder;
3use vector_config_macros::configurable_component;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::MetricTagValues;
7
8#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct JsonSerializerConfig {
12 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
17 pub metric_tag_values: MetricTagValues,
18
19 #[serde(default, rename = "json")]
21 pub options: JsonSerializerOptions,
22}
23
24#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct JsonSerializerOptions {
28 #[serde(default)]
30 pub pretty: bool,
31}
32
33impl JsonSerializerConfig {
34 pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
36 Self {
37 metric_tag_values,
38 options,
39 }
40 }
41
42 pub fn build(&self) -> JsonSerializer {
44 JsonSerializer::new(self.metric_tag_values, self.options.clone())
45 }
46
47 pub fn input_type(&self) -> DataType {
49 DataType::all_bits()
50 }
51
52 pub fn schema_requirement(&self) -> schema::Requirement {
54 schema::Requirement::empty()
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct JsonSerializer {
63 metric_tag_values: MetricTagValues,
64 options: JsonSerializerOptions,
65}
66
67impl JsonSerializer {
68 pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
70 Self {
71 metric_tag_values,
72 options,
73 }
74 }
75
76 pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
78 match event {
79 Event::Log(log) => serde_json::to_value(&log),
80 Event::Metric(metric) => serde_json::to_value(&metric),
81 Event::Trace(trace) => serde_json::to_value(&trace),
82 }
83 .map_err(|e| e.to_string().into())
84 }
85}
86
87impl Encoder<Event> for JsonSerializer {
88 type Error = vector_common::Error;
89
90 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
91 let writer = buffer.writer();
92 if self.options.pretty {
93 match event {
94 Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
95 Event::Metric(mut metric) => {
96 if self.metric_tag_values == MetricTagValues::Single {
97 metric.reduce_tags_to_single();
98 }
99 serde_json::to_writer_pretty(writer, &metric)
100 }
101 Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
102 }
103 } else {
104 match event {
105 Event::Log(log) => serde_json::to_writer(writer, &log),
106 Event::Metric(mut metric) => {
107 if self.metric_tag_values == MetricTagValues::Single {
108 metric.reduce_tags_to_single();
109 }
110 serde_json::to_writer(writer, &metric)
111 }
112 Event::Trace(trace) => serde_json::to_writer(writer, &trace),
113 }
114 }
115 .map_err(Into::into)
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use bytes::{Bytes, BytesMut};
122 use chrono::{TimeZone, Timelike, Utc};
123 use vector_core::{
124 event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value},
125 metric_tags,
126 };
127 use vrl::btreemap;
128
129 use super::*;
130
131 #[test]
132 fn serialize_json_log() {
133 let event = Event::Log(LogEvent::from(btreemap! {
134 "x" => Value::from("23"),
135 "z" => Value::from(25),
136 "a" => Value::from("0"),
137 }));
138 let bytes = serialize(JsonSerializerConfig::default(), event);
139
140 assert_eq!(bytes, r#"{"a":"0","x":"23","z":25}"#);
141 }
142
143 #[test]
144 fn serialize_json_metric_counter() {
145 let event = Event::Metric(
146 Metric::new(
147 "foos",
148 MetricKind::Incremental,
149 MetricValue::Counter { value: 100.0 },
150 )
151 .with_namespace(Some("vector"))
152 .with_tags(Some(metric_tags!(
153 "key2" => "value2",
154 "key1" => "value1",
155 "Key3" => "Value3",
156 )))
157 .with_timestamp(Some(
158 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
159 .single()
160 .and_then(|t| t.with_nanosecond(11))
161 .expect("invalid timestamp"),
162 )),
163 );
164
165 let bytes = serialize(JsonSerializerConfig::default(), event);
166
167 assert_eq!(
168 bytes,
169 r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
170 );
171 }
172
173 #[test]
174 fn serialize_json_metric_set() {
175 let event = Event::Metric(Metric::new(
176 "users",
177 MetricKind::Incremental,
178 MetricValue::Set {
179 values: vec!["bob".into()].into_iter().collect(),
180 },
181 ));
182
183 let bytes = serialize(JsonSerializerConfig::default(), event);
184
185 assert_eq!(
186 bytes,
187 r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
188 );
189 }
190
191 #[test]
192 fn serialize_json_metric_histogram_without_timestamp() {
193 let event = Event::Metric(Metric::new(
194 "glork",
195 MetricKind::Incremental,
196 MetricValue::Distribution {
197 samples: vector_core::samples![10.0 => 1],
198 statistic: StatisticKind::Histogram,
199 },
200 ));
201
202 let bytes = serialize(JsonSerializerConfig::default(), event);
203
204 assert_eq!(
205 bytes,
206 r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
207 );
208 }
209
210 #[test]
211 fn serialize_equals_to_json_value() {
212 let event = Event::Log(LogEvent::from(btreemap! {
213 "foo" => Value::from("bar")
214 }));
215 let mut serializer = JsonSerializerConfig::default().build();
216 let mut bytes = BytesMut::new();
217
218 serializer.encode(event.clone(), &mut bytes).unwrap();
219
220 let json = serializer.to_json_value(event).unwrap();
221
222 assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
223 }
224
225 #[test]
226 fn serialize_metric_tags_full() {
227 let bytes = serialize(
228 JsonSerializerConfig {
229 metric_tag_values: MetricTagValues::Full,
230 options: JsonSerializerOptions::default(),
231 },
232 metric2(),
233 );
234
235 assert_eq!(
236 bytes,
237 r#"{"name":"counter","tags":{"a":["first",null,"second"]},"kind":"incremental","counter":{"value":1.0}}"#
238 );
239 }
240
241 #[test]
242 fn serialize_metric_tags_single() {
243 let bytes = serialize(
244 JsonSerializerConfig {
245 metric_tag_values: MetricTagValues::Single,
246 options: JsonSerializerOptions::default(),
247 },
248 metric2(),
249 );
250
251 assert_eq!(
252 bytes,
253 r#"{"name":"counter","tags":{"a":"second"},"kind":"incremental","counter":{"value":1.0}}"#
254 );
255 }
256
257 fn metric2() -> Event {
258 Event::Metric(
259 Metric::new(
260 "counter",
261 MetricKind::Incremental,
262 MetricValue::Counter { value: 1.0 },
263 )
264 .with_tags(Some(metric_tags! (
265 "a" => "first",
266 "a" => None,
267 "a" => "second",
268 ))),
269 )
270 }
271
272 fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
273 let mut buffer = BytesMut::new();
274 config.build().encode(input, &mut buffer).unwrap();
275 buffer.freeze()
276 }
277
278 mod pretty_json {
279 use bytes::{Bytes, BytesMut};
280 use chrono::{TimeZone, Timelike, Utc};
281 use vector_core::{
282 event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value},
283 metric_tags,
284 };
285 use vrl::btreemap;
286
287 use super::*;
288
289 fn get_pretty_json_config() -> JsonSerializerConfig {
290 JsonSerializerConfig {
291 options: JsonSerializerOptions { pretty: true },
292 ..Default::default()
293 }
294 }
295
296 #[test]
297 fn serialize_json_log() {
298 let event = Event::Log(LogEvent::from(
299 btreemap! {"x" => Value::from("23"),"z" => Value::from(25),"a" => Value::from("0"),},
300 ));
301 let bytes = serialize(get_pretty_json_config(), event);
302 assert_eq!(
303 bytes,
304 r#"{
305 "a": "0",
306 "x": "23",
307 "z": 25
308}"#
309 );
310 }
311 #[test]
312 fn serialize_json_metric_counter() {
313 let event = Event::Metric(
314 Metric::new(
315 "foos",
316 MetricKind::Incremental,
317 MetricValue::Counter { value: 100.0 },
318 )
319 .with_namespace(Some("vector"))
320 .with_tags(Some(
321 metric_tags!("key2" => "value2","key1" => "value1","Key3" => "Value3",),
322 ))
323 .with_timestamp(Some(
324 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
325 .single()
326 .and_then(|t| t.with_nanosecond(11))
327 .expect("invalid timestamp"),
328 )),
329 );
330 let bytes = serialize(get_pretty_json_config(), event);
331 assert_eq!(
332 bytes,
333 r#"{
334 "name": "foos",
335 "namespace": "vector",
336 "tags": {
337 "Key3": "Value3",
338 "key1": "value1",
339 "key2": "value2"
340 },
341 "timestamp": "2018-11-14T08:09:10.000000011Z",
342 "kind": "incremental",
343 "counter": {
344 "value": 100.0
345 }
346}"#
347 );
348 }
349 #[test]
350 fn serialize_json_metric_set() {
351 let event = Event::Metric(Metric::new(
352 "users",
353 MetricKind::Incremental,
354 MetricValue::Set {
355 values: vec!["bob".into()].into_iter().collect(),
356 },
357 ));
358 let bytes = serialize(get_pretty_json_config(), event);
359 assert_eq!(
360 bytes,
361 r#"{
362 "name": "users",
363 "kind": "incremental",
364 "set": {
365 "values": [
366 "bob"
367 ]
368 }
369}"#
370 );
371 }
372 #[test]
373 fn serialize_json_metric_histogram_without_timestamp() {
374 let event = Event::Metric(Metric::new(
375 "glork",
376 MetricKind::Incremental,
377 MetricValue::Distribution {
378 samples: vector_core::samples![10.0 => 1],
379 statistic: StatisticKind::Histogram,
380 },
381 ));
382 let bytes = serialize(get_pretty_json_config(), event);
383 assert_eq!(
384 bytes,
385 r#"{
386 "name": "glork",
387 "kind": "incremental",
388 "distribution": {
389 "samples": [
390 {
391 "value": 10.0,
392 "rate": 1
393 }
394 ],
395 "statistic": "histogram"
396 }
397}"#
398 );
399 }
400 #[test]
401 fn serialize_equals_to_json_value() {
402 let event = Event::Log(LogEvent::from(btreemap! {"foo" => Value::from("bar")}));
403 let mut serializer = get_pretty_json_config().build();
404 let mut bytes = BytesMut::new();
405 serializer.encode(event.clone(), &mut bytes).unwrap();
406 let json = serializer.to_json_value(event).unwrap();
407 assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
408 }
409 #[test]
410 fn serialize_metric_tags_full() {
411 let bytes = serialize(
412 JsonSerializerConfig {
413 metric_tag_values: MetricTagValues::Full,
414 options: JsonSerializerOptions { pretty: true },
415 },
416 metric2(),
417 );
418 assert_eq!(
419 bytes,
420 r#"{
421 "name": "counter",
422 "tags": {
423 "a": [
424 "first",
425 null,
426 "second"
427 ]
428 },
429 "kind": "incremental",
430 "counter": {
431 "value": 1.0
432 }
433}"#
434 );
435 }
436 #[test]
437 fn serialize_metric_tags_single() {
438 let bytes = serialize(
439 JsonSerializerConfig {
440 metric_tag_values: MetricTagValues::Single,
441 options: JsonSerializerOptions { pretty: true },
442 },
443 metric2(),
444 );
445 assert_eq!(
446 bytes,
447 r#"{
448 "name": "counter",
449 "tags": {
450 "a": "second"
451 },
452 "kind": "incremental",
453 "counter": {
454 "value": 1.0
455 }
456}"#
457 );
458 }
459 fn metric2() -> Event {
460 Event::Metric(
461 Metric::new(
462 "counter",
463 MetricKind::Incremental,
464 MetricValue::Counter { value: 1.0 },
465 )
466 .with_tags(Some(
467 metric_tags! ("a" => "first","a" => None,"a" => "second",),
468 )),
469 )
470 }
471 fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
472 let mut buffer = BytesMut::new();
473 config.build().encode(input, &mut buffer).unwrap();
474 buffer.freeze()
475 }
476 }
477}