vector/transforms/lua/v1/
mod.rs1use std::{future::ready, pin::Pin};
2
3use futures::{Stream, StreamExt, stream};
4use mlua::{ExternalError, FromLua};
5use ordered_float::NotNan;
6use snafu::{ResultExt, Snafu};
7use vector_lib::configurable::configurable_component;
8use vrl::path::parse_target_path;
9
10use crate::{
11 config::{DataType, Input, OutputId, TransformOutput},
12 event::{Event, Value},
13 internal_events::{LuaGcTriggered, LuaScriptError},
14 schema,
15 schema::Definition,
16 transforms::{TaskTransform, Transform},
17};
18
19#[derive(Debug, Snafu)]
20enum BuildError {
21 #[snafu(display("Lua error: {}", source))]
22 InvalidLua { source: mlua::Error },
23}
24
25#[configurable_component]
27#[derive(Clone, Debug)]
28#[serde(deny_unknown_fields)]
29pub struct LuaConfig {
30 source: String,
32
33 #[serde(default)]
37 search_dirs: Vec<String>,
38}
39
40impl LuaConfig {
41 pub fn build(&self) -> crate::Result<Transform> {
42 warn!(
43 "DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
44 );
45 Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
46 }
47
48 pub fn input(&self) -> Input {
49 Input::log()
50 }
51
52 pub fn outputs(
53 &self,
54 input_definitions: &[(OutputId, schema::Definition)],
55 ) -> Vec<TransformOutput> {
56 let namespaces = input_definitions
58 .iter()
59 .flat_map(|(_output, definition)| definition.log_namespaces().clone())
60 .collect();
61
62 let definition = input_definitions
63 .iter()
64 .map(|(output, _definition)| {
65 (
66 output.clone(),
67 Definition::default_for_namespace(&namespaces),
68 )
69 })
70 .collect();
71
72 vec![TransformOutput::new(DataType::Log, definition)]
73 }
74}
75
76const GC_INTERVAL: usize = 16;
83
84#[derive(Derivative)]
85#[derivative(Debug)]
86pub struct Lua {
87 #[derivative(Debug = "ignore")]
88 source: String,
89 #[derivative(Debug = "ignore")]
90 search_dirs: Vec<String>,
91 #[derivative(Debug = "ignore")]
92 lua: mlua::Lua,
93 vector_func: mlua::RegistryKey,
94 invocations_after_gc: usize,
95}
96
97impl Clone for Lua {
98 fn clone(&self) -> Self {
99 Lua::new(self.source.clone(), self.search_dirs.clone())
100 .expect("Tried to clone existing valid lua transform. This is an invariant.")
101 }
102}
103
104#[derive(Clone, FromLua)]
107struct LuaEvent {
108 inner: Event,
109}
110
111impl Lua {
112 pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
113 let lua = unsafe {
116 mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
117 };
118
119 let additional_paths = search_dirs
120 .iter()
121 .map(|d| format!("{d}/?.lua"))
122 .collect::<Vec<_>>()
123 .join(";");
124
125 if !additional_paths.is_empty() {
126 let package = lua
127 .globals()
128 .get::<mlua::Table>("package")
129 .context(InvalidLuaSnafu)?;
130 let current_paths = package
131 .get::<String>("path")
132 .unwrap_or_else(|_| ";".to_string());
133 let paths = format!("{additional_paths};{current_paths}");
134 package.set("path", paths).context(InvalidLuaSnafu)?;
135 }
136
137 let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
138 let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
139
140 Ok(Self {
141 source,
142 search_dirs,
143 lua,
144 vector_func,
145 invocations_after_gc: 0,
146 })
147 }
148
149 fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
150 let source_id = event.source_id().cloned();
151 let lua = &self.lua;
152 let globals = lua.globals();
153
154 globals.raw_set("event", LuaEvent { inner: event })?;
155
156 let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
157 func.call::<()>(())?;
158
159 let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
160 option.map(|lua_event| {
161 let mut event = lua_event.inner;
162 if let Some(source_id) = source_id {
163 event.set_source_id(source_id);
164 }
165 event
166 })
167 });
168
169 self.invocations_after_gc += 1;
170 if self.invocations_after_gc.is_multiple_of(GC_INTERVAL) {
171 emit!(LuaGcTriggered {
172 used_memory: self.lua.used_memory()
173 });
174 self.lua.gc_collect()?;
175 self.invocations_after_gc = 0;
176 }
177
178 result
179 }
180
181 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
182 match self.process(event) {
183 Ok(event) => event,
184 Err(error) => {
185 emit!(LuaScriptError { error });
186 None
187 }
188 }
189 }
190}
191
192impl TaskTransform<Event> for Lua {
193 fn transform(
194 self: Box<Self>,
195 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
196 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
197 where
198 Self: 'static,
199 {
200 let mut inner = self;
201 Box::pin(
202 task.filter_map(move |event| {
203 let mut output = Vec::with_capacity(1);
204 ready(match inner.process(event) {
205 Ok(event) => {
206 output.extend(event);
207 Some(stream::iter(output))
208 }
209 Err(error) => {
210 emit!(LuaScriptError { error });
211 None
212 }
213 })
214 })
215 .flatten(),
216 )
217 }
218}
219
220impl mlua::UserData for LuaEvent {
221 fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
222 methods.add_meta_method_mut(
223 mlua::MetaMethod::NewIndex,
224 |_lua, this, (key, value): (String, Option<mlua::Value>)| {
225 let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
226 match value {
227 Some(mlua::Value::String(string)) => {
228 this.inner.as_mut_log().insert(
229 &key_path,
230 Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
231 );
232 }
233 Some(mlua::Value::Integer(integer)) => {
234 this.inner
235 .as_mut_log()
236 .insert(&key_path, Value::Integer(integer));
237 }
238 Some(mlua::Value::Number(number)) if !number.is_nan() => {
239 this.inner
240 .as_mut_log()
241 .insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
242 }
243 Some(mlua::Value::Boolean(boolean)) => {
244 this.inner
245 .as_mut_log()
246 .insert(&key_path, Value::Boolean(boolean));
247 }
248 Some(mlua::Value::Nil) | None => {
249 this.inner.as_mut_log().remove(&key_path);
250 }
251 _ => {
252 info!(
253 message =
254 "Could not set field to Lua value of invalid type, dropping field.",
255 field = key.as_str()
256 );
257 this.inner.as_mut_log().remove(&key_path);
258 }
259 }
260
261 Ok(())
262 },
263 );
264
265 methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
266 if let Some(value) = this
267 .inner
268 .as_log()
269 .parse_path_and_get_value(key.as_str())
270 .ok()
271 .flatten()
272 {
273 let string = lua.create_string(value.coerce_to_bytes())?;
274 Ok(Some(string))
275 } else {
276 Ok(None)
277 }
278 });
279
280 methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
281 let state = lua.create_table()?;
282 {
283 if let Some(keys) = event.inner.as_log().keys() {
284 let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
285 state.raw_set("keys", keys)?;
286 }
287 state.raw_set("event", event)?;
288 }
289 let function =
290 lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
291 let event: LuaEvent = state.raw_get("event")?;
292 let keys: mlua::Table = state.raw_get("keys")?;
293 let next: mlua::Function = lua.globals().raw_get("next")?;
294 let key: Option<String> = next.call((keys, prev))?;
295 let value = key.clone().and_then(|k| {
296 event
297 .inner
298 .as_log()
299 .parse_path_and_get_value(k.as_str())
300 .ok()
301 .flatten()
302 });
303 match value {
304 Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
305 None => Ok((None, None)),
306 }
307 })?;
308 Ok((function, state))
309 });
310 }
311}
312
313pub fn format_error(error: &mlua::Error) -> String {
314 match error {
315 mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
316 err => err.to_string(),
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use std::sync::Arc;
323
324 use super::*;
325 use crate::{
326 config::ComponentKey,
327 event::{Event, LogEvent, Value},
328 test_util,
329 };
330
331 #[test]
332 fn lua_add_field() {
333 let event = transform_one(
334 r#"
335 event["hello"] = "goodbye"
336 "#,
337 LogEvent::from("program me"),
338 )
339 .unwrap();
340
341 assert_eq!(event.as_log()["hello"], "goodbye".into());
342 }
343
344 #[test]
345 fn lua_read_field() {
346 let event = transform_one(
347 r#"
348 _, _, name = string.find(event["message"], "Hello, my name is (%a+).")
349 event["name"] = name
350 "#,
351 LogEvent::from("Hello, my name is Bob."),
352 )
353 .unwrap();
354
355 assert_eq!(event.as_log()["name"], "Bob".into());
356 }
357
358 #[test]
359 fn lua_remove_field() {
360 let mut log = LogEvent::default();
361 log.insert("name", "Bob");
362 let event = transform_one(
363 r#"
364 event["name"] = nil
365 "#,
366 log,
367 )
368 .unwrap();
369
370 assert!(event.as_log().get("name").is_none());
371 }
372
373 #[test]
374 fn lua_drop_event() {
375 let mut log = LogEvent::default();
376 log.insert("name", "Bob");
377 let event = transform_one(
378 r"
379 event = nil
380 ",
381 log,
382 );
383
384 assert!(event.is_none());
385 }
386
387 #[test]
388 fn lua_read_empty_field() {
389 let event = transform_one(
390 r#"
391 if event["non-existant"] == nil then
392 event["result"] = "empty"
393 else
394 event["result"] = "found"
395 end
396 "#,
397 LogEvent::default(),
398 )
399 .unwrap();
400
401 assert_eq!(event.as_log()["result"], "empty".into());
402 }
403
404 #[test]
405 fn lua_integer_value() {
406 let event = transform_one(
407 r#"
408 event["number"] = 3
409 "#,
410 LogEvent::default(),
411 )
412 .unwrap();
413 assert_eq!(event.as_log()["number"], Value::Integer(3));
414 }
415
416 #[test]
417 fn lua_numeric_value() {
418 let event = transform_one(
419 r#"
420 event["number"] = 3.14159
421 "#,
422 LogEvent::default(),
423 )
424 .unwrap();
425 assert_eq!(event.as_log()["number"], Value::from(3.14159));
426 }
427
428 #[test]
429 fn lua_boolean_value() {
430 let event = transform_one(
431 r#"
432 event["bool"] = true
433 "#,
434 LogEvent::default(),
435 )
436 .unwrap();
437 assert_eq!(event.as_log()["bool"], Value::Boolean(true));
438 }
439
440 #[test]
441 fn lua_non_coercible_value() {
442 let event = transform_one(
443 r#"
444 event["junk"] = {"asdf"}
445 "#,
446 LogEvent::default(),
447 )
448 .unwrap();
449 assert_eq!(event.as_log().get("junk"), None);
450 }
451
452 #[test]
453 fn lua_non_string_key_write() {
454 crate::test_util::trace_init();
455 let mut transform = Lua::new(
456 r#"
457 event[false] = "hello"
458 "#
459 .to_string(),
460 vec![],
461 )
462 .unwrap();
463
464 let err = transform.process(LogEvent::default().into()).unwrap_err();
465 let err = format_error(&err);
466 assert!(
467 err.contains("error converting Lua boolean to String"),
468 "{}",
469 err
470 );
471 }
472
473 #[test]
474 fn lua_non_string_key_read() {
475 crate::test_util::trace_init();
476 let mut transform = Lua::new(
477 r"
478 print(event[false])
479 "
480 .to_string(),
481 vec![],
482 )
483 .unwrap();
484
485 let err = transform.process(LogEvent::default().into()).unwrap_err();
486 let err = format_error(&err);
487 assert!(
488 err.contains("error converting Lua boolean to String"),
489 "{}",
490 err
491 );
492 }
493
494 #[test]
495 fn lua_script_error() {
496 crate::test_util::trace_init();
497 let mut transform = Lua::new(
498 r#"
499 error("this is an error")
500 "#
501 .to_string(),
502 vec![],
503 )
504 .unwrap();
505
506 let err = transform.process(LogEvent::default().into()).unwrap_err();
507 let err = format_error(&err);
508 assert!(err.contains("this is an error"), "{}", err);
509 }
510
511 #[test]
512 fn lua_syntax_error() {
513 crate::test_util::trace_init();
514 let err = Lua::new(
515 r"
516 1234 = sadf <>&*!#@
517 "
518 .to_string(),
519 vec![],
520 )
521 .map(|_| ())
522 .unwrap_err()
523 .to_string();
524
525 assert!(err.contains("syntax error:"), "{}", err);
526 }
527
528 #[test]
529 fn lua_load_file() {
530 use std::{fs::File, io::Write};
531 crate::test_util::trace_init();
532
533 let dir = tempfile::tempdir().unwrap();
534
535 let mut file = File::create(dir.path().join("script2.lua")).unwrap();
536 write!(
537 &mut file,
538 r#"
539 local M = {{}}
540
541 local function modify(event2)
542 event2["\"new field\""] = "new value"
543 end
544 M.modify = modify
545
546 return M
547 "#
548 )
549 .unwrap();
550
551 let source = r#"
552 local script2 = require("script2")
553 script2.modify(event)
554 "#
555 .to_string();
556
557 let mut transform =
558 Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
559 let event = transform.transform_one(LogEvent::default().into()).unwrap();
560 assert_eq!(event.as_log()["\"new field\""], "new value".into());
561 }
562
563 #[test]
564 fn lua_pairs() {
565 let mut event = LogEvent::default();
566 event.insert("name", "Bob");
567 event.insert("friend", "Alice");
568
569 let event = transform_one(
570 r"
571 for k,v in pairs(event) do
572 event[k] = k .. v
573 end
574 ",
575 event,
576 )
577 .unwrap();
578
579 assert_eq!(event.as_log()["name"], "nameBob".into());
580 assert_eq!(event.as_log()["friend"], "friendAlice".into());
581 }
582
583 fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
584 crate::test_util::trace_init();
585
586 let source = source_id();
587 let mut event = event.into();
588 event.set_source_id(Arc::clone(&source));
589
590 let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
591 let event = transform.transform_one(event);
592
593 if let Some(event) = &event {
594 assert_eq!(event.source_id(), Some(&source));
595 }
596
597 event
598 }
599
600 fn source_id() -> Arc<ComponentKey> {
601 Arc::new(ComponentKey::from(test_util::random_string(16)))
602 }
603}