vector/transforms/lua/v1/
mod.rs1use std::{future::ready, pin::Pin};
2
3use futures::{stream, Stream, StreamExt};
4use mlua::ExternalError;
5use mlua::FromLua;
6use ordered_float::NotNan;
7use snafu::{ResultExt, Snafu};
8use vector_lib::configurable::configurable_component;
9use vrl::path::parse_target_path;
10
11use crate::config::OutputId;
12use crate::schema::Definition;
13use crate::{
14 config::{DataType, Input, TransformOutput},
15 event::{Event, Value},
16 internal_events::{LuaGcTriggered, LuaScriptError},
17 schema,
18 transforms::{TaskTransform, Transform},
19};
20
21#[derive(Debug, Snafu)]
22enum BuildError {
23 #[snafu(display("Lua error: {}", source))]
24 InvalidLua { source: mlua::Error },
25}
26
27#[configurable_component]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields)]
31pub struct LuaConfig {
32 source: String,
34
35 #[serde(default)]
39 search_dirs: Vec<String>,
40}
41
42impl LuaConfig {
43 pub fn build(&self) -> crate::Result<Transform> {
44 warn!(
45 "DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
46 );
47 Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
48 }
49
50 pub fn input(&self) -> Input {
51 Input::log()
52 }
53
54 pub fn outputs(
55 &self,
56 input_definitions: &[(OutputId, schema::Definition)],
57 ) -> Vec<TransformOutput> {
58 let namespaces = input_definitions
60 .iter()
61 .flat_map(|(_output, definition)| definition.log_namespaces().clone())
62 .collect();
63
64 let definition = input_definitions
65 .iter()
66 .map(|(output, _definition)| {
67 (
68 output.clone(),
69 Definition::default_for_namespace(&namespaces),
70 )
71 })
72 .collect();
73
74 vec![TransformOutput::new(DataType::Log, definition)]
75 }
76}
77
78const GC_INTERVAL: usize = 16;
85
86#[derive(Derivative)]
87#[derivative(Debug)]
88pub struct Lua {
89 #[derivative(Debug = "ignore")]
90 source: String,
91 #[derivative(Debug = "ignore")]
92 search_dirs: Vec<String>,
93 #[derivative(Debug = "ignore")]
94 lua: mlua::Lua,
95 vector_func: mlua::RegistryKey,
96 invocations_after_gc: usize,
97}
98
99impl Clone for Lua {
100 fn clone(&self) -> Self {
101 Lua::new(self.source.clone(), self.search_dirs.clone())
102 .expect("Tried to clone existing valid lua transform. This is an invariant.")
103 }
104}
105
106#[derive(Clone, FromLua)]
109struct LuaEvent {
110 inner: Event,
111}
112
113impl Lua {
114 pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
115 let lua = unsafe {
118 mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
119 };
120
121 let additional_paths = search_dirs
122 .iter()
123 .map(|d| format!("{d}/?.lua"))
124 .collect::<Vec<_>>()
125 .join(";");
126
127 if !additional_paths.is_empty() {
128 let package = lua
129 .globals()
130 .get::<mlua::Table>("package")
131 .context(InvalidLuaSnafu)?;
132 let current_paths = package
133 .get::<String>("path")
134 .unwrap_or_else(|_| ";".to_string());
135 let paths = format!("{additional_paths};{current_paths}");
136 package.set("path", paths).context(InvalidLuaSnafu)?;
137 }
138
139 let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
140 let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
141
142 Ok(Self {
143 source,
144 search_dirs,
145 lua,
146 vector_func,
147 invocations_after_gc: 0,
148 })
149 }
150
151 fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
152 let source_id = event.source_id().cloned();
153 let lua = &self.lua;
154 let globals = lua.globals();
155
156 globals.raw_set("event", LuaEvent { inner: event })?;
157
158 let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
159 func.call::<()>(())?;
160
161 let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
162 option.map(|lua_event| {
163 let mut event = lua_event.inner;
164 if let Some(source_id) = source_id {
165 event.set_source_id(source_id);
166 }
167 event
168 })
169 });
170
171 self.invocations_after_gc += 1;
172 if self.invocations_after_gc % GC_INTERVAL == 0 {
173 emit!(LuaGcTriggered {
174 used_memory: self.lua.used_memory()
175 });
176 self.lua.gc_collect()?;
177 self.invocations_after_gc = 0;
178 }
179
180 result
181 }
182
183 pub fn transform_one(&mut self, event: Event) -> Option<Event> {
184 match self.process(event) {
185 Ok(event) => event,
186 Err(error) => {
187 emit!(LuaScriptError { error });
188 None
189 }
190 }
191 }
192}
193
194impl TaskTransform<Event> for Lua {
195 fn transform(
196 self: Box<Self>,
197 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
198 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
199 where
200 Self: 'static,
201 {
202 let mut inner = self;
203 Box::pin(
204 task.filter_map(move |event| {
205 let mut output = Vec::with_capacity(1);
206 ready(match inner.process(event) {
207 Ok(event) => {
208 output.extend(event);
209 Some(stream::iter(output))
210 }
211 Err(error) => {
212 emit!(LuaScriptError { error });
213 None
214 }
215 })
216 })
217 .flatten(),
218 )
219 }
220}
221
222impl mlua::UserData for LuaEvent {
223 fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
224 methods.add_meta_method_mut(
225 mlua::MetaMethod::NewIndex,
226 |_lua, this, (key, value): (String, Option<mlua::Value>)| {
227 let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
228 match value {
229 Some(mlua::Value::String(string)) => {
230 this.inner.as_mut_log().insert(
231 &key_path,
232 Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
233 );
234 }
235 Some(mlua::Value::Integer(integer)) => {
236 this.inner
237 .as_mut_log()
238 .insert(&key_path, Value::Integer(integer));
239 }
240 Some(mlua::Value::Number(number)) if !number.is_nan() => {
241 this.inner
242 .as_mut_log()
243 .insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
244 }
245 Some(mlua::Value::Boolean(boolean)) => {
246 this.inner
247 .as_mut_log()
248 .insert(&key_path, Value::Boolean(boolean));
249 }
250 Some(mlua::Value::Nil) | None => {
251 this.inner.as_mut_log().remove(&key_path);
252 }
253 _ => {
254 info!(
255 message =
256 "Could not set field to Lua value of invalid type, dropping field.",
257 field = key.as_str(),
258 internal_log_rate_limit = true
259 );
260 this.inner.as_mut_log().remove(&key_path);
261 }
262 }
263
264 Ok(())
265 },
266 );
267
268 methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
269 if let Some(value) = this
270 .inner
271 .as_log()
272 .parse_path_and_get_value(key.as_str())
273 .ok()
274 .flatten()
275 {
276 let string = lua.create_string(value.coerce_to_bytes())?;
277 Ok(Some(string))
278 } else {
279 Ok(None)
280 }
281 });
282
283 methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
284 let state = lua.create_table()?;
285 {
286 if let Some(keys) = event.inner.as_log().keys() {
287 let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
288 state.raw_set("keys", keys)?;
289 }
290 state.raw_set("event", event)?;
291 }
292 let function =
293 lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
294 let event: LuaEvent = state.raw_get("event")?;
295 let keys: mlua::Table = state.raw_get("keys")?;
296 let next: mlua::Function = lua.globals().raw_get("next")?;
297 let key: Option<String> = next.call((keys, prev))?;
298 let value = key.clone().and_then(|k| {
299 event
300 .inner
301 .as_log()
302 .parse_path_and_get_value(k.as_str())
303 .ok()
304 .flatten()
305 });
306 match value {
307 Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
308 None => Ok((None, None)),
309 }
310 })?;
311 Ok((function, state))
312 });
313 }
314}
315
316pub fn format_error(error: &mlua::Error) -> String {
317 match error {
318 mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
319 err => err.to_string(),
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use std::sync::Arc;
326
327 use super::*;
328 use crate::event::{Event, LogEvent, Value};
329 use crate::{config::ComponentKey, test_util};
330
331 #[test]
332 fn lua_add_field() {
333 let event = transform_one(
334 r#"
335 event["hello"] = "goodbye"
336 "#,
337 LogEvent::from("program me"),
338 )
339 .unwrap();
340
341 assert_eq!(event.as_log()["hello"], "goodbye".into());
342 }
343
344 #[test]
345 fn lua_read_field() {
346 let event = transform_one(
347 r#"
348 _, _, name = string.find(event["message"], "Hello, my name is (%a+).")
349 event["name"] = name
350 "#,
351 LogEvent::from("Hello, my name is Bob."),
352 )
353 .unwrap();
354
355 assert_eq!(event.as_log()["name"], "Bob".into());
356 }
357
358 #[test]
359 fn lua_remove_field() {
360 let mut log = LogEvent::default();
361 log.insert("name", "Bob");
362 let event = transform_one(
363 r#"
364 event["name"] = nil
365 "#,
366 log,
367 )
368 .unwrap();
369
370 assert!(event.as_log().get("name").is_none());
371 }
372
373 #[test]
374 fn lua_drop_event() {
375 let mut log = LogEvent::default();
376 log.insert("name", "Bob");
377 let event = transform_one(
378 r"
379 event = nil
380 ",
381 log,
382 );
383
384 assert!(event.is_none());
385 }
386
387 #[test]
388 fn lua_read_empty_field() {
389 let event = transform_one(
390 r#"
391 if event["non-existant"] == nil then
392 event["result"] = "empty"
393 else
394 event["result"] = "found"
395 end
396 "#,
397 LogEvent::default(),
398 )
399 .unwrap();
400
401 assert_eq!(event.as_log()["result"], "empty".into());
402 }
403
404 #[test]
405 fn lua_integer_value() {
406 let event = transform_one(
407 r#"
408 event["number"] = 3
409 "#,
410 LogEvent::default(),
411 )
412 .unwrap();
413 assert_eq!(event.as_log()["number"], Value::Integer(3));
414 }
415
416 #[test]
417 fn lua_numeric_value() {
418 let event = transform_one(
419 r#"
420 event["number"] = 3.14159
421 "#,
422 LogEvent::default(),
423 )
424 .unwrap();
425 assert_eq!(event.as_log()["number"], Value::from(3.14159));
426 }
427
428 #[test]
429 fn lua_boolean_value() {
430 let event = transform_one(
431 r#"
432 event["bool"] = true
433 "#,
434 LogEvent::default(),
435 )
436 .unwrap();
437 assert_eq!(event.as_log()["bool"], Value::Boolean(true));
438 }
439
440 #[test]
441 fn lua_non_coercible_value() {
442 let event = transform_one(
443 r#"
444 event["junk"] = {"asdf"}
445 "#,
446 LogEvent::default(),
447 )
448 .unwrap();
449 assert_eq!(event.as_log().get("junk"), None);
450 }
451
452 #[test]
453 fn lua_non_string_key_write() {
454 crate::test_util::trace_init();
455 let mut transform = Lua::new(
456 r#"
457 event[false] = "hello"
458 "#
459 .to_string(),
460 vec![],
461 )
462 .unwrap();
463
464 let err = transform.process(LogEvent::default().into()).unwrap_err();
465 let err = format_error(&err);
466 assert!(
467 err.contains("error converting Lua boolean to String"),
468 "{}",
469 err
470 );
471 }
472
473 #[test]
474 fn lua_non_string_key_read() {
475 crate::test_util::trace_init();
476 let mut transform = Lua::new(
477 r"
478 print(event[false])
479 "
480 .to_string(),
481 vec![],
482 )
483 .unwrap();
484
485 let err = transform.process(LogEvent::default().into()).unwrap_err();
486 let err = format_error(&err);
487 assert!(
488 err.contains("error converting Lua boolean to String"),
489 "{}",
490 err
491 );
492 }
493
494 #[test]
495 fn lua_script_error() {
496 crate::test_util::trace_init();
497 let mut transform = Lua::new(
498 r#"
499 error("this is an error")
500 "#
501 .to_string(),
502 vec![],
503 )
504 .unwrap();
505
506 let err = transform.process(LogEvent::default().into()).unwrap_err();
507 let err = format_error(&err);
508 assert!(err.contains("this is an error"), "{}", err);
509 }
510
511 #[test]
512 fn lua_syntax_error() {
513 crate::test_util::trace_init();
514 let err = Lua::new(
515 r"
516 1234 = sadf <>&*!#@
517 "
518 .to_string(),
519 vec![],
520 )
521 .map(|_| ())
522 .unwrap_err()
523 .to_string();
524
525 assert!(err.contains("syntax error:"), "{}", err);
526 }
527
528 #[test]
529 fn lua_load_file() {
530 use std::{fs::File, io::Write};
531 crate::test_util::trace_init();
532
533 let dir = tempfile::tempdir().unwrap();
534
535 let mut file = File::create(dir.path().join("script2.lua")).unwrap();
536 write!(
537 &mut file,
538 r#"
539 local M = {{}}
540
541 local function modify(event2)
542 event2["\"new field\""] = "new value"
543 end
544 M.modify = modify
545
546 return M
547 "#
548 )
549 .unwrap();
550
551 let source = r#"
552 local script2 = require("script2")
553 script2.modify(event)
554 "#
555 .to_string();
556
557 let mut transform =
558 Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
559 let event = transform.transform_one(LogEvent::default().into()).unwrap();
560 assert_eq!(event.as_log()["\"new field\""], "new value".into());
561 }
562
563 #[test]
564 fn lua_pairs() {
565 let mut event = LogEvent::default();
566 event.insert("name", "Bob");
567 event.insert("friend", "Alice");
568
569 let event = transform_one(
570 r"
571 for k,v in pairs(event) do
572 event[k] = k .. v
573 end
574 ",
575 event,
576 )
577 .unwrap();
578
579 assert_eq!(event.as_log()["name"], "nameBob".into());
580 assert_eq!(event.as_log()["friend"], "friendAlice".into());
581 }
582
583 fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
584 crate::test_util::trace_init();
585
586 let source = source_id();
587 let mut event = event.into();
588 event.set_source_id(Arc::clone(&source));
589
590 let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
591 let event = transform.transform_one(event);
592
593 if let Some(event) = &event {
594 assert_eq!(event.source_id(), Some(&source));
595 }
596
597 event
598 }
599
600 fn source_id() -> Arc<ComponentKey> {
601 Arc::new(ComponentKey::from(test_util::random_string(16)))
602 }
603}