use crate::decoding::format::Deserializer;
use crate::BytesDeserializerConfig;
use bytes::Bytes;
use derivative::Derivative;
use smallvec::{smallvec, SmallVec};
use vector_config_macros::configurable_component;
use vector_core::config::{DataType, LogNamespace};
use vector_core::event::{Event, TargetEvents, VrlTarget};
use vector_core::{compile_vrl, schema};
use vrl::compiler::state::ExternalEnv;
use vrl::compiler::{runtime::Runtime, CompileConfig, Program, TimeZone, TypeState};
use vrl::diagnostic::Formatter;
use vrl::value::Kind;
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct VrlDeserializerConfig {
pub vrl: VrlDeserializerOptions,
}
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct VrlDeserializerOptions {
pub source: String,
#[serde(default)]
#[configurable(metadata(docs::advanced))]
pub timezone: Option<TimeZone>,
}
impl VrlDeserializerConfig {
pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
let state = TypeState {
local: Default::default(),
external: ExternalEnv::default(),
};
match compile_vrl(
&self.vrl.source,
&vrl::stdlib::all(),
&state,
CompileConfig::default(),
) {
Ok(result) => Ok(VrlDeserializer {
program: result.program,
timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
}),
Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
.to_string()
.into()),
}
}
pub fn output_type(&self) -> DataType {
DataType::Log
}
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
match log_namespace {
LogNamespace::Legacy => {
schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
}
LogNamespace::Vector => {
schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
}
}
}
}
#[derive(Debug, Clone)]
pub struct VrlDeserializer {
program: Program,
timezone: TimeZone,
}
fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
let bytes_deserializer = BytesDeserializerConfig::new().build();
let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
Event::from(log_event)
}
impl Deserializer for VrlDeserializer {
fn parse(
&self,
bytes: Bytes,
log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let event = parse_bytes(bytes, log_namespace);
match self.run_vrl(event, log_namespace) {
Ok(events) => Ok(events),
Err(e) => Err(e),
}
}
}
impl VrlDeserializer {
fn run_vrl(
&self,
event: Event,
log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let mut runtime = Runtime::default();
let mut target = VrlTarget::new(event, self.program.info(), true);
match runtime.resolve(&mut target, &self.program, &self.timezone) {
Ok(_) => match target.into_events(log_namespace) {
TargetEvents::One(event) => Ok(smallvec![event]),
TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
},
Err(e) => Err(e.to_string().into()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, Utc};
use indoc::indoc;
use vrl::btreemap;
use vrl::path::OwnedTargetPath;
use vrl::value::Value;
fn make_decoder(source: &str) -> VrlDeserializer {
VrlDeserializerConfig {
vrl: VrlDeserializerOptions {
source: source.to_string(),
timezone: None,
},
}
.build()
.expect("Failed to build VrlDeserializer")
}
#[test]
fn test_json_message() {
let source = indoc!(
r#"
%m1 = "metadata"
. = string!(.)
. = parse_json!(.)
"#
);
let decoder = make_decoder(source);
let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
assert_eq!(result.len(), 1);
let event = result.first().unwrap();
assert_eq!(
*event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
btreemap! { "message" => "Hello VRL" }.into()
);
assert_eq!(
*event
.as_log()
.get(&OwnedTargetPath::metadata_root())
.unwrap(),
btreemap! { "m1" => "metadata" }.into()
);
}
#[test]
fn test_ignored_returned_expression() {
let source = indoc!(
r#"
. = { "a" : 1 }
{ "b" : 9 }
"#
);
let decoder = make_decoder(source);
let log_bytes = Bytes::from("some bytes");
let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
assert_eq!(result.len(), 1);
let event = result.first().unwrap();
assert_eq!(
*event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
btreemap! { "a" => 1 }.into()
);
}
#[test]
fn test_multiple_events() {
let source = indoc!(". = [0,1,2]");
let decoder = make_decoder(source);
let log_bytes = Bytes::from("some bytes");
let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
assert_eq!(result.len(), 3);
for (i, event) in result.iter().enumerate() {
assert_eq!(
*event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
i.into()
);
}
}
#[test]
fn test_syslog_and_cef_input() {
let source = indoc!(
r#"
if exists(.message) {
. = string!(.message)
}
. = parse_syslog(.) ?? parse_cef(.) ?? null
"#
);
let decoder = make_decoder(source);
let syslog_bytes = Bytes::from(
"<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
);
let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
assert_eq!(result.len(), 1);
let syslog_event = result.first().unwrap();
assert_eq!(
*syslog_event
.as_log()
.get(&OwnedTargetPath::event_root())
.unwrap(),
btreemap! {
"appname" => "su",
"facility" => "auth",
"hostname" => "mymachine.example.com",
"message" => "'su root' failed for user on /dev/pts/8",
"msgid" => "ID47",
"severity" => "crit",
"timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
"version" => 1
}
.into()
);
let cef_bytes = Bytes::from("CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232");
let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
assert_eq!(result.len(), 1);
let cef_event = result.first().unwrap();
assert_eq!(
*cef_event
.as_log()
.get(&OwnedTargetPath::event_root())
.unwrap(),
btreemap! {
"cefVersion" =>"0",
"deviceEventClassId" =>"100",
"deviceProduct" =>"Threat Manager",
"deviceVendor" =>"Security",
"deviceVersion" =>"1.0",
"dst" =>"2.1.2.2",
"name" =>"worm successfully stopped",
"severity" =>"10",
"spt" =>"1232",
"src" =>"10.0.0.1"
}
.into()
);
let random_bytes = Bytes::from("a|- -| x");
let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
let random_event = result.first().unwrap();
assert_eq!(result.len(), 1);
assert_eq!(
*random_event
.as_log()
.get(&OwnedTargetPath::event_root())
.unwrap(),
Value::Null
);
}
#[test]
fn test_invalid_source() {
let error = VrlDeserializerConfig {
vrl: VrlDeserializerOptions {
source: ". ?".to_string(),
timezone: None,
},
}
.build()
.unwrap_err()
.to_string();
assert!(error.contains("error[E203]: syntax error"));
}
#[test]
fn test_abort() {
let decoder = make_decoder("abort");
let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
let error = decoder
.parse(log_bytes, LogNamespace::Vector)
.unwrap_err()
.to_string();
assert!(error.contains("aborted"));
}
}