vector/transforms/lua/v1/
mod.rsuse std::{future::ready, pin::Pin};
use futures::{stream, Stream, StreamExt};
use mlua::ExternalError;
use mlua::FromLua;
use ordered_float::NotNan;
use snafu::{ResultExt, Snafu};
use vector_lib::configurable::configurable_component;
use vrl::path::parse_target_path;
use crate::config::OutputId;
use crate::schema::Definition;
use crate::{
config::{DataType, Input, TransformOutput},
event::{Event, Value},
internal_events::{LuaGcTriggered, LuaScriptError},
schema,
transforms::{TaskTransform, Transform},
};
#[derive(Debug, Snafu)]
enum BuildError {
#[snafu(display("Lua error: {}", source))]
InvalidLua { source: mlua::Error },
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct LuaConfig {
source: String,
#[serde(default)]
search_dirs: Vec<String>,
}
impl LuaConfig {
pub fn build(&self) -> crate::Result<Transform> {
warn!(
"DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
);
Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
}
pub fn input(&self) -> Input {
Input::log()
}
pub fn outputs(
&self,
input_definitions: &[(OutputId, schema::Definition)],
) -> Vec<TransformOutput> {
let namespaces = input_definitions
.iter()
.flat_map(|(_output, definition)| definition.log_namespaces().clone())
.collect();
let definition = input_definitions
.iter()
.map(|(output, _definition)| {
(
output.clone(),
Definition::default_for_namespace(&namespaces),
)
})
.collect();
vec![TransformOutput::new(DataType::Log, definition)]
}
}
const GC_INTERVAL: usize = 16;
#[derive(Derivative)]
#[derivative(Debug)]
pub struct Lua {
#[derivative(Debug = "ignore")]
source: String,
#[derivative(Debug = "ignore")]
search_dirs: Vec<String>,
#[derivative(Debug = "ignore")]
lua: mlua::Lua,
vector_func: mlua::RegistryKey,
invocations_after_gc: usize,
}
impl Clone for Lua {
fn clone(&self) -> Self {
Lua::new(self.source.clone(), self.search_dirs.clone())
.expect("Tried to clone existing valid lua transform. This is an invariant.")
}
}
#[derive(Clone, FromLua)]
struct LuaEvent {
inner: Event,
}
impl Lua {
pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
let lua = unsafe {
mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
};
let additional_paths = search_dirs
.iter()
.map(|d| format!("{}/?.lua", d))
.collect::<Vec<_>>()
.join(";");
if !additional_paths.is_empty() {
let package = lua
.globals()
.get::<mlua::Table>("package")
.context(InvalidLuaSnafu)?;
let current_paths = package
.get::<String>("path")
.unwrap_or_else(|_| ";".to_string());
let paths = format!("{};{}", additional_paths, current_paths);
package.set("path", paths).context(InvalidLuaSnafu)?;
}
let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
Ok(Self {
source,
search_dirs,
lua,
vector_func,
invocations_after_gc: 0,
})
}
fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
let source_id = event.source_id().cloned();
let lua = &self.lua;
let globals = lua.globals();
globals.raw_set("event", LuaEvent { inner: event })?;
let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
func.call::<()>(())?;
let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
option.map(|lua_event| {
let mut event = lua_event.inner;
if let Some(source_id) = source_id {
event.set_source_id(source_id);
}
event
})
});
self.invocations_after_gc += 1;
if self.invocations_after_gc % GC_INTERVAL == 0 {
emit!(LuaGcTriggered {
used_memory: self.lua.used_memory()
});
self.lua.gc_collect()?;
self.invocations_after_gc = 0;
}
result
}
pub fn transform_one(&mut self, event: Event) -> Option<Event> {
match self.process(event) {
Ok(event) => event,
Err(error) => {
emit!(LuaScriptError { error });
None
}
}
}
}
impl TaskTransform<Event> for Lua {
fn transform(
self: Box<Self>,
task: Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Pin<Box<dyn Stream<Item = Event> + Send>>
where
Self: 'static,
{
let mut inner = self;
Box::pin(
task.filter_map(move |event| {
let mut output = Vec::with_capacity(1);
ready(match inner.process(event) {
Ok(event) => {
output.extend(event);
Some(stream::iter(output))
}
Err(error) => {
emit!(LuaScriptError { error });
None
}
})
})
.flatten(),
)
}
}
impl mlua::UserData for LuaEvent {
fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method_mut(
mlua::MetaMethod::NewIndex,
|_lua, this, (key, value): (String, Option<mlua::Value>)| {
let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
match value {
Some(mlua::Value::String(string)) => {
this.inner.as_mut_log().insert(
&key_path,
Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
);
}
Some(mlua::Value::Integer(integer)) => {
this.inner
.as_mut_log()
.insert(&key_path, Value::Integer(integer));
}
Some(mlua::Value::Number(number)) if !number.is_nan() => {
this.inner
.as_mut_log()
.insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
}
Some(mlua::Value::Boolean(boolean)) => {
this.inner
.as_mut_log()
.insert(&key_path, Value::Boolean(boolean));
}
Some(mlua::Value::Nil) | None => {
this.inner.as_mut_log().remove(&key_path);
}
_ => {
info!(
message =
"Could not set field to Lua value of invalid type, dropping field.",
field = key.as_str(),
internal_log_rate_limit = true
);
this.inner.as_mut_log().remove(&key_path);
}
}
Ok(())
},
);
methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
if let Some(value) = this
.inner
.as_log()
.parse_path_and_get_value(key.as_str())
.ok()
.flatten()
{
let string = lua.create_string(value.coerce_to_bytes())?;
Ok(Some(string))
} else {
Ok(None)
}
});
methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
let state = lua.create_table()?;
{
if let Some(keys) = event.inner.as_log().keys() {
let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
state.raw_set("keys", keys)?;
}
state.raw_set("event", event)?;
}
let function =
lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
let event: LuaEvent = state.raw_get("event")?;
let keys: mlua::Table = state.raw_get("keys")?;
let next: mlua::Function = lua.globals().raw_get("next")?;
let key: Option<String> = next.call((keys, prev))?;
let value = key.clone().and_then(|k| {
event
.inner
.as_log()
.parse_path_and_get_value(k.as_str())
.ok()
.flatten()
});
match value {
Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
None => Ok((None, None)),
}
})?;
Ok((function, state))
});
}
}
pub fn format_error(error: &mlua::Error) -> String {
match error {
mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
err => err.to_string(),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::event::{Event, LogEvent, Value};
use crate::{config::ComponentKey, test_util};
#[test]
fn lua_add_field() {
let event = transform_one(
r#"
event["hello"] = "goodbye"
"#,
LogEvent::from("program me"),
)
.unwrap();
assert_eq!(event.as_log()["hello"], "goodbye".into());
}
#[test]
fn lua_read_field() {
let event = transform_one(
r#"
_, _, name = string.find(event["message"], "Hello, my name is (%a+).")
event["name"] = name
"#,
LogEvent::from("Hello, my name is Bob."),
)
.unwrap();
assert_eq!(event.as_log()["name"], "Bob".into());
}
#[test]
fn lua_remove_field() {
let mut log = LogEvent::default();
log.insert("name", "Bob");
let event = transform_one(
r#"
event["name"] = nil
"#,
log,
)
.unwrap();
assert!(event.as_log().get("name").is_none());
}
#[test]
fn lua_drop_event() {
let mut log = LogEvent::default();
log.insert("name", "Bob");
let event = transform_one(
r"
event = nil
",
log,
);
assert!(event.is_none());
}
#[test]
fn lua_read_empty_field() {
let event = transform_one(
r#"
if event["non-existant"] == nil then
event["result"] = "empty"
else
event["result"] = "found"
end
"#,
LogEvent::default(),
)
.unwrap();
assert_eq!(event.as_log()["result"], "empty".into());
}
#[test]
fn lua_integer_value() {
let event = transform_one(
r#"
event["number"] = 3
"#,
LogEvent::default(),
)
.unwrap();
assert_eq!(event.as_log()["number"], Value::Integer(3));
}
#[test]
fn lua_numeric_value() {
let event = transform_one(
r#"
event["number"] = 3.14159
"#,
LogEvent::default(),
)
.unwrap();
assert_eq!(event.as_log()["number"], Value::from(3.14159));
}
#[test]
fn lua_boolean_value() {
let event = transform_one(
r#"
event["bool"] = true
"#,
LogEvent::default(),
)
.unwrap();
assert_eq!(event.as_log()["bool"], Value::Boolean(true));
}
#[test]
fn lua_non_coercible_value() {
let event = transform_one(
r#"
event["junk"] = {"asdf"}
"#,
LogEvent::default(),
)
.unwrap();
assert_eq!(event.as_log().get("junk"), None);
}
#[test]
fn lua_non_string_key_write() {
crate::test_util::trace_init();
let mut transform = Lua::new(
r#"
event[false] = "hello"
"#
.to_string(),
vec![],
)
.unwrap();
let err = transform.process(LogEvent::default().into()).unwrap_err();
let err = format_error(&err);
assert!(
err.contains("error converting Lua boolean to String"),
"{}",
err
);
}
#[test]
fn lua_non_string_key_read() {
crate::test_util::trace_init();
let mut transform = Lua::new(
r"
print(event[false])
"
.to_string(),
vec![],
)
.unwrap();
let err = transform.process(LogEvent::default().into()).unwrap_err();
let err = format_error(&err);
assert!(
err.contains("error converting Lua boolean to String"),
"{}",
err
);
}
#[test]
fn lua_script_error() {
crate::test_util::trace_init();
let mut transform = Lua::new(
r#"
error("this is an error")
"#
.to_string(),
vec![],
)
.unwrap();
let err = transform.process(LogEvent::default().into()).unwrap_err();
let err = format_error(&err);
assert!(err.contains("this is an error"), "{}", err);
}
#[test]
fn lua_syntax_error() {
crate::test_util::trace_init();
let err = Lua::new(
r"
1234 = sadf <>&*!#@
"
.to_string(),
vec![],
)
.map(|_| ())
.unwrap_err()
.to_string();
assert!(err.contains("syntax error:"), "{}", err);
}
#[test]
fn lua_load_file() {
use std::{fs::File, io::Write};
crate::test_util::trace_init();
let dir = tempfile::tempdir().unwrap();
let mut file = File::create(dir.path().join("script2.lua")).unwrap();
write!(
&mut file,
r#"
local M = {{}}
local function modify(event2)
event2["\"new field\""] = "new value"
end
M.modify = modify
return M
"#
)
.unwrap();
let source = r#"
local script2 = require("script2")
script2.modify(event)
"#
.to_string();
let mut transform =
Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
let event = transform.transform_one(LogEvent::default().into()).unwrap();
assert_eq!(event.as_log()["\"new field\""], "new value".into());
}
#[test]
fn lua_pairs() {
let mut event = LogEvent::default();
event.insert("name", "Bob");
event.insert("friend", "Alice");
let event = transform_one(
r"
for k,v in pairs(event) do
event[k] = k .. v
end
",
event,
)
.unwrap();
assert_eq!(event.as_log()["name"], "nameBob".into());
assert_eq!(event.as_log()["friend"], "friendAlice".into());
}
fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
crate::test_util::trace_init();
let source = source_id();
let mut event = event.into();
event.set_source_id(Arc::clone(&source));
let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
let event = transform.transform_one(event);
if let Some(event) = &event {
assert_eq!(event.source_id(), Some(&source));
}
event
}
fn source_id() -> Arc<ComponentKey> {
Arc::new(ComponentKey::from(test_util::random_string(16)))
}
}