use std::{path::PathBuf, sync::Arc, time::Duration};
use serde_with::serde_as;
use snafu::{ResultExt, Snafu};
use vector_lib::codecs::MetricTagValues;
use vector_lib::configurable::configurable_component;
pub use vector_lib::event::lua;
use vector_lib::transform::runtime_transform::{RuntimeTransform, Timer};
use crate::config::{ComponentKey, OutputId};
use crate::event::lua::event::LuaEvent;
use crate::schema::Definition;
use crate::{
config::{self, DataType, Input, TransformOutput, CONFIG_PATHS},
event::Event,
internal_events::{LuaBuildError, LuaGcTriggered},
schema,
transforms::Transform,
};
#[derive(Debug, Snafu)]
pub enum BuildError {
#[snafu(display("Invalid \"search_dirs\": {}", source))]
InvalidSearchDirs { source: mlua::Error },
#[snafu(display("Cannot evaluate Lua code in \"source\": {}", source))]
InvalidSource { source: mlua::Error },
#[snafu(display("Cannot evaluate Lua code defining \"hooks.init\": {}", source))]
InvalidHooksInit { source: mlua::Error },
#[snafu(display("Cannot evaluate Lua code defining \"hooks.process\": {}", source))]
InvalidHooksProcess { source: mlua::Error },
#[snafu(display("Cannot evaluate Lua code defining \"hooks.shutdown\": {}", source))]
InvalidHooksShutdown { source: mlua::Error },
#[snafu(display("Cannot evaluate Lua code defining timer handler: {}", source))]
InvalidTimerHandler { source: mlua::Error },
#[snafu(display("Runtime error in \"hooks.init\" function: {}", source))]
RuntimeErrorHooksInit { source: mlua::Error },
#[snafu(display("Runtime error in \"hooks.process\" function: {}", source))]
RuntimeErrorHooksProcess { source: mlua::Error },
#[snafu(display("Runtime error in \"hooks.shutdown\" function: {}", source))]
RuntimeErrorHooksShutdown { source: mlua::Error },
#[snafu(display("Runtime error in timer handler: {}", source))]
RuntimeErrorTimerHandler { source: mlua::Error },
#[snafu(display("Cannot call GC in Lua runtime: {}", source))]
RuntimeErrorGc { source: mlua::Error },
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct LuaConfig {
#[configurable(metadata(
docs::examples = "function init()\n\tcount = 0\nend\n\nfunction process()\n\tcount = count + 1\nend\n\nfunction timer_handler(emit)\n\temit(make_counter(counter))\n\tcounter = 0\nend\n\nfunction shutdown(emit)\n\temit(make_counter(counter))\nend\n\nfunction make_counter(value)\n\treturn metric = {\n\t\tname = \"event_counter\",\n\t\tkind = \"incremental\",\n\t\ttimestamp = os.date(\"!*t\"),\n\t\tcounter = {\n\t\t\tvalue = value\n\t\t}\n \t}\nend",
docs::examples = "-- external file with hooks and timers defined\nrequire('custom_module')",
))]
source: Option<String>,
#[serde(default = "default_config_paths")]
#[configurable(metadata(docs::examples = "/etc/vector/lua"))]
#[configurable(metadata(docs::human_name = "Search Directories"))]
search_dirs: Vec<PathBuf>,
#[configurable(derived)]
hooks: HooksConfig,
#[serde(default)]
timers: Vec<TimerConfig>,
#[serde(default)]
metric_tag_values: MetricTagValues,
}
fn default_config_paths() -> Vec<PathBuf> {
match CONFIG_PATHS.lock().ok() {
Some(config_paths) => config_paths
.clone()
.into_iter()
.map(|config_path| match config_path {
config::ConfigPath::File(mut path, _format) => {
path.pop();
path
}
config::ConfigPath::Dir(path) => path,
})
.collect(),
None => vec![],
}
}
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
struct HooksConfig {
#[configurable(metadata(
docs::examples = "function (emit)\n\t-- Custom Lua code here\nend",
docs::examples = "init",
))]
init: Option<String>,
#[configurable(metadata(
docs::examples = "function (event, emit)\n\tevent.log.field = \"value\" -- set value of a field\n\tevent.log.another_field = nil -- remove field\n\tevent.log.first, event.log.second = nil, event.log.first -- rename field\n\t-- Very important! Emit the processed event.\n\temit(event)\nend",
docs::examples = "process",
))]
process: String,
#[configurable(metadata(
docs::examples = "function (emit)\n\t-- Custom Lua code here\nend",
docs::examples = "shutdown",
))]
shutdown: Option<String>,
}
#[serde_as]
#[configurable_component]
#[derive(Clone, Debug)]
struct TimerConfig {
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[configurable(metadata(docs::human_name = "Interval"))]
interval_seconds: Duration,
#[configurable(metadata(docs::examples = "timer_handler"))]
handler: String,
}
impl LuaConfig {
pub fn build(&self, key: ComponentKey) -> crate::Result<Transform> {
Lua::new(self, key).map(Transform::event_task)
}
pub fn input(&self) -> Input {
Input::new(DataType::Metric | DataType::Log)
}
pub fn outputs(
&self,
input_definitions: &[(OutputId, schema::Definition)],
) -> Vec<TransformOutput> {
let namespaces = input_definitions
.iter()
.flat_map(|(_output, definition)| definition.log_namespaces().clone())
.collect();
let definition = input_definitions
.iter()
.map(|(output, _definition)| {
(
output.clone(),
Definition::default_for_namespace(&namespaces),
)
})
.collect();
vec![TransformOutput::new(
DataType::Metric | DataType::Log,
definition,
)]
}
}
const GC_INTERVAL: usize = 16;
pub struct Lua {
lua: mlua::Lua,
invocations_after_gc: usize,
hook_init: Option<mlua::RegistryKey>,
hook_process: mlua::RegistryKey,
hook_shutdown: Option<mlua::RegistryKey>,
timers: Vec<(Timer, mlua::RegistryKey)>,
multi_value_tags: bool,
source_id: Arc<ComponentKey>,
}
fn make_registry_value(lua: &mlua::Lua, source: &str) -> mlua::Result<mlua::RegistryKey> {
lua.load(source)
.eval::<mlua::Function>()
.and_then(|f| lua.create_registry_value(f))
}
impl Lua {
pub fn new(config: &LuaConfig, key: ComponentKey) -> crate::Result<Self> {
let lua = unsafe {
mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
};
let additional_paths = config
.search_dirs
.iter()
.map(|d| format!("{}/?.lua", d.to_string_lossy()))
.collect::<Vec<_>>()
.join(";");
let mut timers = Vec::new();
if !additional_paths.is_empty() {
let package = lua.globals().get::<mlua::Table>("package")?;
let current_paths = package
.get::<String>("path")
.unwrap_or_else(|_| ";".to_string());
let paths = format!("{};{}", additional_paths, current_paths);
package.set("path", paths)?;
}
if let Some(source) = &config.source {
lua.load(source).eval::<()>().context(InvalidSourceSnafu)?;
}
let hook_init_code = config.hooks.init.as_ref();
let hook_init = hook_init_code
.map(|code| make_registry_value(&lua, code))
.transpose()
.context(InvalidHooksInitSnafu)?;
let hook_process =
make_registry_value(&lua, &config.hooks.process).context(InvalidHooksProcessSnafu)?;
let hook_shutdown_code = config.hooks.shutdown.as_ref();
let hook_shutdown = hook_shutdown_code
.map(|code| make_registry_value(&lua, code))
.transpose()
.context(InvalidHooksShutdownSnafu)?;
for (id, timer) in config.timers.iter().enumerate() {
let handler_key = lua
.load(&timer.handler)
.eval::<mlua::Function>()
.and_then(|f| lua.create_registry_value(f))
.context(InvalidTimerHandlerSnafu)?;
let timer = Timer {
id: id as u32,
interval: timer.interval_seconds,
};
timers.push((timer, handler_key));
}
let multi_value_tags = config.metric_tag_values == MetricTagValues::Full;
Ok(Self {
lua,
invocations_after_gc: 0,
timers,
hook_init,
hook_process,
hook_shutdown,
multi_value_tags,
source_id: Arc::new(key),
})
}
#[cfg(test)]
fn process(&mut self, event: Event, output: &mut Vec<Event>) -> Result<(), mlua::Error> {
let source_id = event.source_id().cloned();
let lua = &self.lua;
let result = lua.scope(|scope| {
let emit = scope.create_function_mut(|_, mut event: Event| {
if let Some(source_id) = &source_id {
event.set_source_id(Arc::clone(source_id));
}
output.push(event);
Ok(())
})?;
lua.registry_value::<mlua::Function>(&self.hook_process)?
.call((
LuaEvent {
event,
metric_multi_value_tags: self.multi_value_tags,
},
emit,
))
});
self.attempt_gc();
result
}
#[cfg(test)]
fn process_single(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
let mut out = Vec::new();
self.process(event, &mut out)?;
assert!(out.len() <= 1);
Ok(out.into_iter().next())
}
fn attempt_gc(&mut self) {
self.invocations_after_gc += 1;
if self.invocations_after_gc % GC_INTERVAL == 0 {
emit!(LuaGcTriggered {
used_memory: self.lua.used_memory()
});
_ = self
.lua
.gc_collect()
.context(RuntimeErrorGcSnafu)
.map_err(|error| error!(%error, rate_limit = 30));
self.invocations_after_gc = 0;
}
}
}
fn wrap_emit_fn<'scope, 'env, F: 'scope + FnMut(Event)>(
scope: &'scope mlua::Scope<'scope, 'env>,
mut emit_fn: F,
source_id: Arc<ComponentKey>,
) -> mlua::Result<mlua::Function> {
scope.create_function_mut(move |_, mut event: Event| -> mlua::Result<()> {
event.set_source_id(Arc::clone(&source_id));
emit_fn(event);
Ok(())
})
}
impl RuntimeTransform for Lua {
fn hook_process<F>(&mut self, event: Event, emit_fn: F)
where
F: FnMut(Event),
{
let lua = &self.lua;
let source_id = Arc::clone(event.source_id().unwrap_or(&self.source_id));
_ = lua
.scope(|scope| -> mlua::Result<()> {
lua.registry_value::<mlua::Function>(&self.hook_process)?
.call((
LuaEvent {
event,
metric_multi_value_tags: self.multi_value_tags,
},
wrap_emit_fn(scope, emit_fn, source_id)?,
))
})
.context(RuntimeErrorHooksProcessSnafu)
.map_err(|e| emit!(LuaBuildError { error: e }));
self.attempt_gc();
}
fn hook_init<F>(&mut self, emit_fn: F)
where
F: FnMut(Event),
{
let lua = &self.lua;
_ = lua
.scope(|scope| -> mlua::Result<()> {
match &self.hook_init {
Some(key) => lua
.registry_value::<mlua::Function>(key)?
.call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?),
None => Ok(()),
}
})
.context(RuntimeErrorHooksInitSnafu)
.map_err(|error| error!(%error, rate_limit = 30));
self.attempt_gc();
}
fn hook_shutdown<F>(&mut self, emit_fn: F)
where
F: FnMut(Event),
{
let lua = &self.lua;
_ = lua
.scope(|scope| -> mlua::Result<()> {
match &self.hook_shutdown {
Some(key) => lua
.registry_value::<mlua::Function>(key)?
.call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?),
None => Ok(()),
}
})
.context(RuntimeErrorHooksShutdownSnafu)
.map_err(|error| error!(%error, rate_limit = 30));
self.attempt_gc();
}
fn timer_handler<F>(&mut self, timer: Timer, emit_fn: F)
where
F: FnMut(Event),
{
let lua = &self.lua;
_ = lua
.scope(|scope| -> mlua::Result<()> {
let handler_key = &self.timers[timer.id as usize].1;
lua.registry_value::<mlua::Function>(handler_key)?
.call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?)
})
.context(RuntimeErrorTimerHandlerSnafu)
.map_err(|error| error!(%error, rate_limit = 30));
self.attempt_gc();
}
fn timers(&self) -> Vec<Timer> {
self.timers.iter().map(|(timer, _)| *timer).collect()
}
}
#[cfg(test)]
mod tests {
use std::{future::Future, sync::Arc};
use similar_asserts::assert_eq;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Mutex;
use tokio_stream::wrappers::ReceiverStream;
use super::*;
use crate::test_util::{components::assert_transform_compliance, random_string};
use crate::transforms::test::create_topology;
use crate::{
event::{
metric::{Metric, MetricKind, MetricValue},
Event, LogEvent, Value,
},
test_util,
};
fn format_error(error: &mlua::Error) -> String {
match error {
mlua::Error::CallbackError { traceback, cause } => {
format_error(cause) + "\n" + traceback
}
err => err.to_string(),
}
}
fn from_config(config: &str) -> crate::Result<Box<Lua>> {
Lua::new(&toml::from_str(config).unwrap(), "transform".into()).map(Box::new)
}
async fn run_transform<T: Future>(
config: &str,
func: impl FnOnce(Sender<Event>, Arc<Mutex<Receiver<Event>>>) -> T,
) -> T::Output {
test_util::trace_init();
assert_transform_compliance(async move {
let config = super::super::LuaConfig::V2(toml::from_str(config).unwrap());
let (tx, rx) = mpsc::channel(1);
let (topology, out) = create_topology(ReceiverStream::new(rx), config).await;
let out = Arc::new(tokio::sync::Mutex::new(out));
let result = func(tx, Arc::clone(&out)).await;
topology.stop().await;
assert_eq!(out.lock().await.recv().await, None);
result
})
.await
}
async fn next_event(out: &Arc<Mutex<Receiver<Event>>>, source: &str) -> Event {
let event = out
.lock()
.await
.recv()
.await
.expect("Event was not received");
assert_eq!(
event.source_id(),
Some(&Arc::new(ComponentKey::from(source)))
);
event
}
#[tokio::test]
async fn lua_runs_init_hook() {
let line1 = random_string(9);
run_transform(
&format!(
r#"
version = "2"
hooks.init = """function (emit)
event = {{log={{message="{line1}"}}}}
emit(event)
end
"""
hooks.process = """function (event, emit)
emit(event)
end
"""
"#
),
|tx, out| async move {
let line2 = random_string(9);
tx.send(Event::Log(LogEvent::from(line2.as_str())))
.await
.unwrap();
drop(tx);
assert_eq!(
next_event(&out, "transform").await.as_log()["message"],
line1.into()
);
assert_eq!(
next_event(&out, "in").await.as_log()["message"],
line2.into(),
);
},
)
.await;
}
#[tokio::test]
async fn lua_add_field() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event["log"]["hello"] = "goodbye"
emit(event)
end
"""
"#,
|tx, out| async move {
let event = Event::Log(LogEvent::from("program me"));
tx.send(event).await.unwrap();
assert_eq!(
next_event(&out, "in").await.as_log()["hello"],
"goodbye".into()
);
},
)
.await;
}
#[tokio::test]
async fn lua_read_field() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
_, _, name = string.find(event.log.message, "Hello, my name is (%a+).")
event.log.name = name
emit(event)
end
"""
"#,
|tx, out| async move {
let event = Event::Log(LogEvent::from("Hello, my name is Bob."));
tx.send(event).await.unwrap();
assert_eq!(next_event(&out, "in").await.as_log()["name"], "Bob".into());
},
)
.await;
}
#[tokio::test]
async fn lua_remove_field() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event.log.name = nil
emit(event)
end
"""
"#,
|tx, out| async move {
let mut event = LogEvent::default();
event.insert("name", "Bob");
tx.send(event.into()).await.unwrap();
assert_eq!(next_event(&out, "in").await.as_log().get("name"), None);
},
)
.await;
}
#[tokio::test]
async fn lua_drop_event() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
-- emit nothing
end
"""
"#,
|tx, _out| async move {
let event = LogEvent::default().into();
tx.send(event).await.unwrap();
},
)
.await;
}
#[tokio::test]
async fn lua_duplicate_event() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
emit(event)
emit(event)
end
"""
"#,
|tx, out| async move {
let mut event = LogEvent::default();
event.insert("host", "127.0.0.1");
tx.send(event.into()).await.unwrap();
assert!(out.lock().await.recv().await.is_some());
assert!(out.lock().await.recv().await.is_some());
},
)
.await;
}
#[tokio::test]
async fn lua_read_empty_field() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
if event["log"]["non-existant"] == nil then
event["log"]["result"] = "empty"
else
event["log"]["result"] = "found"
end
emit(event)
end
"""
"#,
|tx, out| async move {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();
assert_eq!(
next_event(&out, "in").await.as_log()["result"],
"empty".into()
);
},
)
.await;
}
#[tokio::test]
async fn lua_integer_value() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event["log"]["number"] = 3
emit(event)
end
"""
"#,
|tx, out| async move {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();
assert_eq!(
next_event(&out, "in").await.as_log()["number"],
Value::Integer(3)
);
},
)
.await;
}
#[tokio::test]
async fn lua_numeric_value() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event["log"]["number"] = 3.14159
emit(event)
end
"""
"#,
|tx, out| async move {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();
assert_eq!(
next_event(&out, "in").await.as_log()["number"],
Value::from(3.14159)
);
},
)
.await;
}
#[tokio::test]
async fn lua_boolean_value() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event["log"]["bool"] = true
emit(event)
end
"""
"#,
|tx, out| async move {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();
assert_eq!(
next_event(&out, "in").await.as_log()["bool"],
Value::Boolean(true)
);
},
)
.await;
}
#[tokio::test]
async fn lua_non_coercible_value() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event["log"]["junk"] = nil
emit(event)
end
"""
"#,
|tx, out| async move {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();
assert_eq!(next_event(&out, "in").await.as_log().get("junk"), None);
},
)
.await;
}
#[tokio::test]
async fn lua_non_string_key_write() -> crate::Result<()> {
let mut transform = from_config(
r#"
hooks.process = """function (event, emit)
event["log"][false] = "hello"
emit(event)
end
"""
"#,
)
.unwrap();
let err = transform
.process_single(LogEvent::default().into())
.unwrap_err();
let err = format_error(&err);
assert!(
err.contains("error converting Lua boolean to String"),
"{}",
err
);
Ok(())
}
#[tokio::test]
async fn lua_non_string_key_read() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event.log.result = event.log[false]
emit(event)
end
"""
"#,
|tx, out| async move {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();
assert_eq!(next_event(&out, "in").await.as_log().get("result"), None);
},
)
.await;
}
#[tokio::test]
async fn lua_script_error() -> crate::Result<()> {
let mut transform = from_config(
r#"
hooks.process = """function (event, emit)
error("this is an error")
end
"""
"#,
)
.unwrap();
let err = transform
.process_single(LogEvent::default().into())
.unwrap_err();
let err = format_error(&err);
assert!(err.contains("this is an error"), "{}", err);
Ok(())
}
#[tokio::test]
async fn lua_syntax_error() -> crate::Result<()> {
let err = from_config(
r#"
hooks.process = """function (event, emit)
1234 = sadf <>&*!#@
end
"""
"#,
)
.map(|_| ())
.unwrap_err()
.to_string();
assert!(err.contains("syntax error:"), "{}", err);
Ok(())
}
#[tokio::test]
async fn lua_load_file() {
use std::{fs::File, io::Write};
let dir = tempfile::tempdir().unwrap();
let mut file = File::create(dir.path().join("script2.lua")).unwrap();
write!(
&mut file,
r#"
local M = {{}}
local function modify(event2)
event2["log"]["new field"] = "new value"
end
M.modify = modify
return M
"#
)
.unwrap();
run_transform(
&format!(
r#"
version = "2"
hooks.process = """function (event, emit)
local script2 = require("script2")
script2.modify(event)
emit(event)
end
"""
search_dirs = [{:?}]
"#,
dir.path().as_os_str() ),
|tx, out| async move {
let event = LogEvent::default();
tx.send(event.into()).await.unwrap();
assert_eq!(
next_event(&out, "in").await.as_log()["\"new field\""],
"new value".into()
);
},
)
.await;
}
#[tokio::test]
async fn lua_pairs() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
for k,v in pairs(event.log) do
event.log[k] = k .. v
end
emit(event)
end
"""
"#,
|tx, out| async move {
let mut event = LogEvent::default();
event.insert("name", "Bob");
event.insert("friend", "Alice");
tx.send(event.into()).await.unwrap();
let output = next_event(&out, "in").await;
assert_eq!(output.as_log()["name"], "nameBob".into());
assert_eq!(output.as_log()["friend"], "friendAlice".into());
},
)
.await;
}
#[tokio::test]
async fn lua_metric() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event.metric.counter.value = event.metric.counter.value + 1
emit(event)
end
"""
"#,
|tx, out| async move {
let metric = Metric::new(
"example counter",
MetricKind::Absolute,
MetricValue::Counter { value: 1.0 },
);
let mut expected = metric
.clone()
.with_value(MetricValue::Counter { value: 2.0 });
let metadata = expected.metadata_mut();
metadata.set_upstream_id(Arc::new(OutputId::from("transform")));
metadata.set_source_id(Arc::new(ComponentKey::from("in")));
tx.send(metric.into()).await.unwrap();
assert_eq!(next_event(&out, "in").await.as_metric(), &expected);
},
)
.await;
}
#[tokio::test]
async fn lua_multiple_events() {
run_transform(
r#"
version = "2"
hooks.process = """function (event, emit)
event["log"]["hello"] = "goodbye"
emit(event)
end
"""
"#,
|tx, out| async move {
let n: usize = 10;
let events =
(0..n).map(|i| Event::Log(LogEvent::from(format!("program me {}", i))));
for event in events {
tx.send(event).await.unwrap();
assert!(out.lock().await.recv().await.is_some());
}
},
)
.await;
}
}