use bytes::{BufMut, BytesMut};
use tokio_util::codec::Encoder;
use vector_config_macros::configurable_component;
use vector_core::{config::DataType, event::Event, schema};
use crate::MetricTagValues;
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct JsonSerializerConfig {
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub metric_tag_values: MetricTagValues,
#[serde(default, rename = "json")]
pub options: JsonSerializerOptions,
}
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct JsonSerializerOptions {
#[serde(default)]
pub pretty: bool,
}
impl JsonSerializerConfig {
pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
Self {
metric_tag_values,
options,
}
}
pub fn build(&self) -> JsonSerializer {
JsonSerializer::new(self.metric_tag_values, self.options.clone())
}
pub fn input_type(&self) -> DataType {
DataType::all_bits()
}
pub fn schema_requirement(&self) -> schema::Requirement {
schema::Requirement::empty()
}
}
#[derive(Debug, Clone)]
pub struct JsonSerializer {
metric_tag_values: MetricTagValues,
options: JsonSerializerOptions,
}
impl JsonSerializer {
pub const fn new(metric_tag_values: MetricTagValues, options: JsonSerializerOptions) -> Self {
Self {
metric_tag_values,
options,
}
}
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
match event {
Event::Log(log) => serde_json::to_value(&log),
Event::Metric(metric) => serde_json::to_value(&metric),
Event::Trace(trace) => serde_json::to_value(&trace),
}
.map_err(|e| e.to_string().into())
}
}
impl Encoder<Event> for JsonSerializer {
type Error = vector_common::Error;
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let writer = buffer.writer();
if self.options.pretty {
match event {
Event::Log(log) => serde_json::to_writer_pretty(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_writer_pretty(writer, &metric)
}
Event::Trace(trace) => serde_json::to_writer_pretty(writer, &trace),
}
} else {
match event {
Event::Log(log) => serde_json::to_writer(writer, &log),
Event::Metric(mut metric) => {
if self.metric_tag_values == MetricTagValues::Single {
metric.reduce_tags_to_single();
}
serde_json::to_writer(writer, &metric)
}
Event::Trace(trace) => serde_json::to_writer(writer, &trace),
}
}
.map_err(Into::into)
}
}
#[cfg(test)]
mod tests {
use bytes::{Bytes, BytesMut};
use chrono::{TimeZone, Timelike, Utc};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
use vector_core::metric_tags;
use vrl::btreemap;
use super::*;
#[test]
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(btreemap! {
"x" => Value::from("23"),
"z" => Value::from(25),
"a" => Value::from("0"),
}));
let bytes = serialize(JsonSerializerConfig::default(), event);
assert_eq!(bytes, r#"{"a":"0","x":"23","z":25}"#);
}
#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(metric_tags!(
"key2" => "value2",
"key1" => "value1",
"Key3" => "Value3",
)))
.with_timestamp(Some(
Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
.single()
.and_then(|t| t.with_nanosecond(11))
.expect("invalid timestamp"),
)),
);
let bytes = serialize(JsonSerializerConfig::default(), event);
assert_eq!(
bytes,
r#"{"name":"foos","namespace":"vector","tags":{"Key3":"Value3","key1":"value1","key2":"value2"},"timestamp":"2018-11-14T08:09:10.000000011Z","kind":"incremental","counter":{"value":100.0}}"#
);
}
#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));
let bytes = serialize(JsonSerializerConfig::default(), event);
assert_eq!(
bytes,
r#"{"name":"users","kind":"incremental","set":{"values":["bob"]}}"#
);
}
#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));
let bytes = serialize(JsonSerializerConfig::default(), event);
assert_eq!(
bytes,
r#"{"name":"glork","kind":"incremental","distribution":{"samples":[{"value":10.0,"rate":1}],"statistic":"histogram"}}"#
);
}
#[test]
fn serialize_equals_to_json_value() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
}));
let mut serializer = JsonSerializerConfig::default().build();
let mut bytes = BytesMut::new();
serializer.encode(event.clone(), &mut bytes).unwrap();
let json = serializer.to_json_value(event).unwrap();
assert_eq!(bytes.freeze(), serde_json::to_string(&json).unwrap());
}
#[test]
fn serialize_metric_tags_full() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Full,
options: JsonSerializerOptions::default(),
},
metric2(),
);
assert_eq!(
bytes,
r#"{"name":"counter","tags":{"a":["first",null,"second"]},"kind":"incremental","counter":{"value":1.0}}"#
);
}
#[test]
fn serialize_metric_tags_single() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
options: JsonSerializerOptions::default(),
},
metric2(),
);
assert_eq!(
bytes,
r#"{"name":"counter","tags":{"a":"second"},"kind":"incremental","counter":{"value":1.0}}"#
);
}
fn metric2() -> Event {
Event::Metric(
Metric::new(
"counter",
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
)
.with_tags(Some(metric_tags! (
"a" => "first",
"a" => None,
"a" => "second",
))),
)
}
fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
let mut buffer = BytesMut::new();
config.build().encode(input, &mut buffer).unwrap();
buffer.freeze()
}
mod pretty_json {
use super::*;
use bytes::{Bytes, BytesMut};
use chrono::{TimeZone, Timelike, Utc};
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, StatisticKind, Value};
use vector_core::metric_tags;
use vrl::btreemap;
fn get_pretty_json_config() -> JsonSerializerConfig {
JsonSerializerConfig {
options: JsonSerializerOptions { pretty: true },
..Default::default()
}
}
#[test]
fn serialize_json_log() {
let event = Event::Log(LogEvent::from(
btreemap! {"x" => Value::from("23"),"z" => Value::from(25),"a" => Value::from("0"),},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"a": "0",
"x": "23",
"z": 25
}"#
);
}
#[test]
fn serialize_json_metric_counter() {
let event = Event::Metric(
Metric::new(
"foos",
MetricKind::Incremental,
MetricValue::Counter { value: 100.0 },
)
.with_namespace(Some("vector"))
.with_tags(Some(
metric_tags!("key2" => "value2","key1" => "value1","Key3" => "Value3",),
))
.with_timestamp(Some(
Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
.single()
.and_then(|t| t.with_nanosecond(11))
.expect("invalid timestamp"),
)),
);
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "foos",
"namespace": "vector",
"tags": {
"Key3": "Value3",
"key1": "value1",
"key2": "value2"
},
"timestamp": "2018-11-14T08:09:10.000000011Z",
"kind": "incremental",
"counter": {
"value": 100.0
}
}"#
);
}
#[test]
fn serialize_json_metric_set() {
let event = Event::Metric(Metric::new(
"users",
MetricKind::Incremental,
MetricValue::Set {
values: vec!["bob".into()].into_iter().collect(),
},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "users",
"kind": "incremental",
"set": {
"values": [
"bob"
]
}
}"#
);
}
#[test]
fn serialize_json_metric_histogram_without_timestamp() {
let event = Event::Metric(Metric::new(
"glork",
MetricKind::Incremental,
MetricValue::Distribution {
samples: vector_core::samples![10.0 => 1],
statistic: StatisticKind::Histogram,
},
));
let bytes = serialize(get_pretty_json_config(), event);
assert_eq!(
bytes,
r#"{
"name": "glork",
"kind": "incremental",
"distribution": {
"samples": [
{
"value": 10.0,
"rate": 1
}
],
"statistic": "histogram"
}
}"#
);
}
#[test]
fn serialize_equals_to_json_value() {
let event = Event::Log(LogEvent::from(btreemap! {"foo" => Value::from("bar")}));
let mut serializer = get_pretty_json_config().build();
let mut bytes = BytesMut::new();
serializer.encode(event.clone(), &mut bytes).unwrap();
let json = serializer.to_json_value(event).unwrap();
assert_eq!(bytes.freeze(), serde_json::to_string_pretty(&json).unwrap());
}
#[test]
fn serialize_metric_tags_full() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Full,
options: JsonSerializerOptions { pretty: true },
},
metric2(),
);
assert_eq!(
bytes,
r#"{
"name": "counter",
"tags": {
"a": [
"first",
null,
"second"
]
},
"kind": "incremental",
"counter": {
"value": 1.0
}
}"#
);
}
#[test]
fn serialize_metric_tags_single() {
let bytes = serialize(
JsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
options: JsonSerializerOptions { pretty: true },
},
metric2(),
);
assert_eq!(
bytes,
r#"{
"name": "counter",
"tags": {
"a": "second"
},
"kind": "incremental",
"counter": {
"value": 1.0
}
}"#
);
}
fn metric2() -> Event {
Event::Metric(
Metric::new(
"counter",
MetricKind::Incremental,
MetricValue::Counter { value: 1.0 },
)
.with_tags(Some(
metric_tags! ("a" => "first","a" => None,"a" => "second",),
)),
)
}
fn serialize(config: JsonSerializerConfig, input: Event) -> Bytes {
let mut buffer = BytesMut::new();
config.build().encode(input, &mut buffer).unwrap();
buffer.freeze()
}
}
}