use crate::gelf::GELF_TARGET_PATHS;
use crate::{gelf_fields::*, VALID_FIELD_REGEX};
use bytes::{BufMut, BytesMut};
use lookup::event_path;
use ordered_float::NotNan;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use tokio_util::codec::Encoder;
use vector_core::{
config::{log_schema, DataType},
event::{Event, KeyString, LogEvent, Value},
schema,
};
#[derive(Debug, Snafu)]
pub enum GelfSerializerError {
#[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))]
MissingField { field: KeyString },
#[snafu(display(
r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#,
field,
actual_type,
expected_type
))]
InvalidValueType {
field: String,
actual_type: String,
expected_type: String,
},
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct GelfSerializerConfig;
impl GelfSerializerConfig {
pub const fn new() -> Self {
Self
}
pub fn build(&self) -> GelfSerializer {
GelfSerializer::new()
}
pub fn input_type() -> DataType {
DataType::Log
}
pub fn schema_requirement() -> schema::Requirement {
schema::Requirement::empty()
}
}
#[derive(Debug, Clone)]
pub struct GelfSerializer;
impl GelfSerializer {
pub fn new() -> Self {
GelfSerializer
}
pub fn to_json_value(&self, event: Event) -> Result<serde_json::Value, vector_common::Error> {
let log = to_gelf_event(event.into_log())?;
serde_json::to_value(&log).map_err(|e| e.to_string().into())
}
}
impl Default for GelfSerializer {
fn default() -> Self {
Self::new()
}
}
impl Encoder<Event> for GelfSerializer {
type Error = vector_common::Error;
fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let log = to_gelf_event(event.into_log())?;
let writer = buffer.writer();
serde_json::to_writer(writer, &log)?;
Ok(())
}
}
fn err_invalid_type(
field: &str,
expected_type: &str,
actual_type: &str,
) -> vector_common::Result<()> {
InvalidValueTypeSnafu {
field,
actual_type,
expected_type,
}
.fail()
.map_err(|e| e.to_string().into())
}
fn coerce_required_fields(mut log: LogEvent) -> vector_common::Result<LogEvent> {
fn err_missing_field(field: &str) -> vector_common::Result<()> {
MissingFieldSnafu { field }
.fail()
.map_err(|e| e.to_string().into())
}
if !log.contains(&GELF_TARGET_PATHS.version) {
log.insert(&GELF_TARGET_PATHS.version, GELF_VERSION);
}
if !log.contains(&GELF_TARGET_PATHS.host) {
err_missing_field(HOST)?;
}
if !log.contains(&GELF_TARGET_PATHS.short_message) {
if let Some(message_key) = log_schema().message_key_target_path() {
if log.contains(message_key) {
log.rename_key(message_key, &GELF_TARGET_PATHS.short_message);
} else {
err_missing_field(SHORT_MESSAGE)?;
}
}
}
Ok(log)
}
fn coerce_field_names_and_values(
mut log: LogEvent,
) -> vector_common::Result<(LogEvent, Vec<String>)> {
let mut missing_prefix = vec![];
if let Some(event_data) = log.as_map_mut() {
for (field, value) in event_data.iter_mut() {
match field.as_str() {
VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => {
if !value.is_bytes() {
err_invalid_type(field, "UTF-8 string", value.kind_str())?;
}
}
TIMESTAMP => {
if !(value.is_timestamp() || value.is_integer()) {
err_invalid_type(field, "timestamp or integer", value.kind_str())?;
}
if let Value::Timestamp(ts) = value {
let ts_millis = ts.timestamp_millis();
if ts_millis % 1000 != 0 {
*value = Value::Float(NotNan::new(ts_millis as f64 / 1000.0).unwrap());
} else {
*value = Value::Integer(ts.timestamp())
}
}
}
LEVEL => {
if !value.is_integer() {
err_invalid_type(field, "integer", value.kind_str())?;
}
}
LINE => {
if !(value.is_float() || value.is_integer()) {
err_invalid_type(field, "number", value.kind_str())?;
}
}
_ => {
if !VALID_FIELD_REGEX.is_match(field) {
return MissingFieldSnafu {
field: field.clone(),
}
.fail()
.map_err(|e| e.to_string().into());
}
if !(value.is_integer() || value.is_float() || value.is_bytes()) {
err_invalid_type(field, "string or number", value.kind_str())?;
}
if !field.is_empty() && !field.starts_with('_') {
missing_prefix.push(field.to_string());
}
}
}
}
}
Ok((log, missing_prefix))
}
fn to_gelf_event(log: LogEvent) -> vector_common::Result<LogEvent> {
let log = coerce_required_fields(log).and_then(|log| {
coerce_field_names_and_values(log).map(|(mut log, missing_prefix)| {
for field in missing_prefix {
log.rename_key(
event_path!(field.as_str()),
event_path!(format!("_{}", &field).as_str()),
);
}
log
})
})?;
Ok(log)
}
#[cfg(test)]
mod tests {
use crate::encoding::SerializerConfig;
use super::*;
use chrono::NaiveDateTime;
use vector_core::event::{Event, EventMetadata};
use vrl::btreemap;
use vrl::value::{ObjectMap, Value};
fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option<serde_json::Value> {
let config = GelfSerializerConfig::new();
let mut serializer = config.build();
let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
let mut buffer = BytesMut::new();
if expect_success {
assert!(serializer.encode(event, &mut buffer).is_ok());
let buffer_str = std::str::from_utf8(&buffer).unwrap();
let result = serde_json::from_str(buffer_str);
assert!(result.is_ok());
Some(result.unwrap())
} else {
assert!(serializer.encode(event, &mut buffer).is_err());
None
}
}
#[test]
fn gelf_serde_json_to_value_supported_success() {
let serializer = SerializerConfig::Gelf.build().unwrap();
let event_fields = btreemap! {
VERSION => "1.1",
HOST => "example.org",
SHORT_MESSAGE => "Some message",
};
let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
assert!(serializer.supports_json());
assert!(serializer.to_json_value(log_event).is_ok());
}
#[test]
fn gelf_serde_json_to_value_supported_failure_to_encode() {
let serializer = SerializerConfig::Gelf.build().unwrap();
let event_fields = btreemap! {};
let log_event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into();
assert!(serializer.supports_json());
assert!(serializer.to_json_value(log_event).is_err());
}
#[test]
fn gelf_serializing_valid() {
let event_fields = btreemap! {
VERSION => "1.1",
HOST => "example.org",
SHORT_MESSAGE => "Some message",
FULL_MESSAGE => "Even more message",
FACILITY => "",
FILE => "/tmp/foobar",
LINE => Value::Float(ordered_float::NotNan::new(1.5).unwrap()),
LEVEL => 5,
};
let jsn = do_serialize(true, event_fields).unwrap();
assert_eq!(jsn.get(VERSION).unwrap(), "1.1");
assert_eq!(jsn.get(HOST).unwrap(), "example.org");
assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
}
#[test]
fn gelf_serializing_coerced() {
{
let event_fields = btreemap! {
VERSION => "1.1",
HOST => "example.org",
SHORT_MESSAGE => "Some message",
"noUnderScore" => 0,
};
let jsn = do_serialize(true, event_fields).unwrap();
assert_eq!(jsn.get("_noUnderScore").unwrap(), 0);
}
{
let event_fields = btreemap! {
VERSION => "1.1",
HOST => "example.org",
log_schema().message_key().unwrap().to_string() => "Some message",
};
let jsn = do_serialize(true, event_fields).unwrap();
assert_eq!(jsn.get(SHORT_MESSAGE).unwrap(), "Some message");
}
}
#[test]
fn gelf_serializing_timestamp() {
{
let naive_dt =
NaiveDateTime::parse_from_str("1970-01-01 00:00:00.1", "%Y-%m-%d %H:%M:%S%.f");
let dt = naive_dt.unwrap().and_utc();
let event_fields = btreemap! {
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
HOST => "example.org",
TIMESTAMP => dt,
};
let jsn = do_serialize(true, event_fields).unwrap();
assert!(jsn.get(TIMESTAMP).unwrap().is_f64());
assert_eq!(jsn.get(TIMESTAMP).unwrap().as_f64().unwrap(), 0.1,);
}
{
let naive_dt =
NaiveDateTime::parse_from_str("1970-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S%.f");
let dt = naive_dt.unwrap().and_utc();
let event_fields = btreemap! {
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
HOST => "example.org",
TIMESTAMP => dt,
};
let jsn = do_serialize(true, event_fields).unwrap();
assert!(jsn.get(TIMESTAMP).unwrap().is_i64());
assert_eq!(jsn.get(TIMESTAMP).unwrap().as_i64().unwrap(), 0);
}
}
#[test]
fn gelf_serializing_invalid_error() {
{
let event_fields = btreemap! {
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
};
do_serialize(false, event_fields);
}
{
let event_fields = btreemap! {
HOST => "example.org",
VERSION => "1.1",
};
do_serialize(false, event_fields);
}
{
let event_fields = btreemap! {
HOST => "example.org",
VERSION => "1.1",
SHORT_MESSAGE => 0,
};
do_serialize(false, event_fields);
}
{
let event_fields = btreemap! {
HOST => "example.org",
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
LEVEL => "1",
};
do_serialize(false, event_fields);
}
{
let event_fields = btreemap! {
HOST => "example.org",
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
LINE => "1.2",
};
do_serialize(false, event_fields);
}
{
let event_fields = btreemap! {
HOST => "example.org",
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
"invalid%field" => "foo",
};
do_serialize(false, event_fields);
}
{
let event_fields = btreemap! {
HOST => "example.org",
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
"_foobar" => false,
};
do_serialize(false, event_fields);
}
{
let event_fields = btreemap! {
HOST => "example.org",
VERSION => "1.1",
SHORT_MESSAGE => "Some message",
"_foobar" => serde_json::Value::Null,
};
do_serialize(false, event_fields);
}
}
}