vector/transforms/lua/v1/
mod.rs

1use std::{future::ready, pin::Pin};
2
3use futures::{Stream, StreamExt, stream};
4use mlua::{ExternalError, FromLua};
5use ordered_float::NotNan;
6use snafu::{ResultExt, Snafu};
7use vector_lib::configurable::configurable_component;
8use vrl::path::parse_target_path;
9
10use crate::{
11    config::{DataType, Input, OutputId, TransformOutput},
12    event::{Event, Value},
13    internal_events::{LuaGcTriggered, LuaScriptError},
14    schema,
15    schema::Definition,
16    transforms::{TaskTransform, Transform},
17};
18
19#[derive(Debug, Snafu)]
20enum BuildError {
21    #[snafu(display("Lua error: {}", source))]
22    InvalidLua { source: mlua::Error },
23}
24
25/// Configuration for version one of the `lua` transform.
26#[configurable_component]
27#[derive(Clone, Debug)]
28#[serde(deny_unknown_fields)]
29pub struct LuaConfig {
30    /// The Lua program to execute for each event.
31    source: String,
32
33    /// A list of directories to search when loading a Lua file via the `require` function.
34    ///
35    /// If not specified, the modules are looked up in the configuration directories.
36    #[serde(default)]
37    search_dirs: Vec<String>,
38}
39
40impl LuaConfig {
41    pub fn build(&self) -> crate::Result<Transform> {
42        warn!(
43            "DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
44        );
45        Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
46    }
47
48    pub fn input(&self) -> Input {
49        Input::log()
50    }
51
52    pub fn outputs(
53        &self,
54        input_definitions: &[(OutputId, schema::Definition)],
55    ) -> Vec<TransformOutput> {
56        // Lua causes the type definition to be reset
57        let namespaces = input_definitions
58            .iter()
59            .flat_map(|(_output, definition)| definition.log_namespaces().clone())
60            .collect();
61
62        let definition = input_definitions
63            .iter()
64            .map(|(output, _definition)| {
65                (
66                    output.clone(),
67                    Definition::default_for_namespace(&namespaces),
68                )
69            })
70            .collect();
71
72        vec![TransformOutput::new(DataType::Log, definition)]
73    }
74}
75
76// Lua's garbage collector sometimes seems to be not executed automatically on high event rates,
77// which leads to leak-like RAM consumption pattern. This constant sets the number of invocations of
78// the Lua transform after which GC would be called, thus ensuring that the RAM usage is not too high.
79//
80// This constant is larger than 1 because calling GC is an expensive operation, so doing it
81// after each transform would have significant footprint on the performance.
82const GC_INTERVAL: usize = 16;
83
84#[derive(Derivative)]
85#[derivative(Debug)]
86pub struct Lua {
87    #[derivative(Debug = "ignore")]
88    source: String,
89    #[derivative(Debug = "ignore")]
90    search_dirs: Vec<String>,
91    #[derivative(Debug = "ignore")]
92    lua: mlua::Lua,
93    vector_func: mlua::RegistryKey,
94    invocations_after_gc: usize,
95}
96
97impl Clone for Lua {
98    fn clone(&self) -> Self {
99        Lua::new(self.source.clone(), self.search_dirs.clone())
100            .expect("Tried to clone existing valid lua transform. This is an invariant.")
101    }
102}
103
104// This wrapping structure is added in order to make it possible to have independent implementations
105// of `mlua::UserData` trait for event in version 1 and version 2 of the transform.
106#[derive(Clone, FromLua)]
107struct LuaEvent {
108    inner: Event,
109}
110
111impl Lua {
112    pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
113        // In order to support loading C modules in Lua, we need to create unsafe instance
114        // without debug library.
115        let lua = unsafe {
116            mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
117        };
118
119        let additional_paths = search_dirs
120            .iter()
121            .map(|d| format!("{d}/?.lua"))
122            .collect::<Vec<_>>()
123            .join(";");
124
125        if !additional_paths.is_empty() {
126            let package = lua
127                .globals()
128                .get::<mlua::Table>("package")
129                .context(InvalidLuaSnafu)?;
130            let current_paths = package
131                .get::<String>("path")
132                .unwrap_or_else(|_| ";".to_string());
133            let paths = format!("{additional_paths};{current_paths}");
134            package.set("path", paths).context(InvalidLuaSnafu)?;
135        }
136
137        let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
138        let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
139
140        Ok(Self {
141            source,
142            search_dirs,
143            lua,
144            vector_func,
145            invocations_after_gc: 0,
146        })
147    }
148
149    fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
150        let source_id = event.source_id().cloned();
151        let lua = &self.lua;
152        let globals = lua.globals();
153
154        globals.raw_set("event", LuaEvent { inner: event })?;
155
156        let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
157        func.call::<()>(())?;
158
159        let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
160            option.map(|lua_event| {
161                let mut event = lua_event.inner;
162                if let Some(source_id) = source_id {
163                    event.set_source_id(source_id);
164                }
165                event
166            })
167        });
168
169        self.invocations_after_gc += 1;
170        if self.invocations_after_gc.is_multiple_of(GC_INTERVAL) {
171            emit!(LuaGcTriggered {
172                used_memory: self.lua.used_memory()
173            });
174            self.lua.gc_collect()?;
175            self.invocations_after_gc = 0;
176        }
177
178        result
179    }
180
181    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
182        match self.process(event) {
183            Ok(event) => event,
184            Err(error) => {
185                emit!(LuaScriptError { error });
186                None
187            }
188        }
189    }
190}
191
192impl TaskTransform<Event> for Lua {
193    fn transform(
194        self: Box<Self>,
195        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
196    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
197    where
198        Self: 'static,
199    {
200        let mut inner = self;
201        Box::pin(
202            task.filter_map(move |event| {
203                let mut output = Vec::with_capacity(1);
204                ready(match inner.process(event) {
205                    Ok(event) => {
206                        output.extend(event);
207                        Some(stream::iter(output))
208                    }
209                    Err(error) => {
210                        emit!(LuaScriptError { error });
211                        None
212                    }
213                })
214            })
215            .flatten(),
216        )
217    }
218}
219
220impl mlua::UserData for LuaEvent {
221    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
222        methods.add_meta_method_mut(
223            mlua::MetaMethod::NewIndex,
224            |_lua, this, (key, value): (String, Option<mlua::Value>)| {
225                let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
226                match value {
227                    Some(mlua::Value::String(string)) => {
228                        this.inner.as_mut_log().insert(
229                            &key_path,
230                            Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
231                        );
232                    }
233                    Some(mlua::Value::Integer(integer)) => {
234                        this.inner
235                            .as_mut_log()
236                            .insert(&key_path, Value::Integer(integer));
237                    }
238                    Some(mlua::Value::Number(number)) if !number.is_nan() => {
239                        this.inner
240                            .as_mut_log()
241                            .insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
242                    }
243                    Some(mlua::Value::Boolean(boolean)) => {
244                        this.inner
245                            .as_mut_log()
246                            .insert(&key_path, Value::Boolean(boolean));
247                    }
248                    Some(mlua::Value::Nil) | None => {
249                        this.inner.as_mut_log().remove(&key_path);
250                    }
251                    _ => {
252                        info!(
253                            message =
254                                "Could not set field to Lua value of invalid type, dropping field.",
255                            field = key.as_str()
256                        );
257                        this.inner.as_mut_log().remove(&key_path);
258                    }
259                }
260
261                Ok(())
262            },
263        );
264
265        methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
266            if let Some(value) = this
267                .inner
268                .as_log()
269                .parse_path_and_get_value(key.as_str())
270                .ok()
271                .flatten()
272            {
273                let string = lua.create_string(value.coerce_to_bytes())?;
274                Ok(Some(string))
275            } else {
276                Ok(None)
277            }
278        });
279
280        methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
281            let state = lua.create_table()?;
282            {
283                if let Some(keys) = event.inner.as_log().keys() {
284                    let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
285                    state.raw_set("keys", keys)?;
286                }
287                state.raw_set("event", event)?;
288            }
289            let function =
290                lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
291                    let event: LuaEvent = state.raw_get("event")?;
292                    let keys: mlua::Table = state.raw_get("keys")?;
293                    let next: mlua::Function = lua.globals().raw_get("next")?;
294                    let key: Option<String> = next.call((keys, prev))?;
295                    let value = key.clone().and_then(|k| {
296                        event
297                            .inner
298                            .as_log()
299                            .parse_path_and_get_value(k.as_str())
300                            .ok()
301                            .flatten()
302                    });
303                    match value {
304                        Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
305                        None => Ok((None, None)),
306                    }
307                })?;
308            Ok((function, state))
309        });
310    }
311}
312
313pub fn format_error(error: &mlua::Error) -> String {
314    match error {
315        mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
316        err => err.to_string(),
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use std::sync::Arc;
323
324    use super::*;
325    use crate::{
326        config::ComponentKey,
327        event::{Event, LogEvent, Value},
328        test_util,
329    };
330
331    #[test]
332    fn lua_add_field() {
333        let event = transform_one(
334            r#"
335              event["hello"] = "goodbye"
336            "#,
337            LogEvent::from("program me"),
338        )
339        .unwrap();
340
341        assert_eq!(event.as_log()["hello"], "goodbye".into());
342    }
343
344    #[test]
345    fn lua_read_field() {
346        let event = transform_one(
347            r#"
348              _, _, name = string.find(event["message"], "Hello, my name is (%a+).")
349              event["name"] = name
350            "#,
351            LogEvent::from("Hello, my name is Bob."),
352        )
353        .unwrap();
354
355        assert_eq!(event.as_log()["name"], "Bob".into());
356    }
357
358    #[test]
359    fn lua_remove_field() {
360        let mut log = LogEvent::default();
361        log.insert("name", "Bob");
362        let event = transform_one(
363            r#"
364              event["name"] = nil
365            "#,
366            log,
367        )
368        .unwrap();
369
370        assert!(event.as_log().get("name").is_none());
371    }
372
373    #[test]
374    fn lua_drop_event() {
375        let mut log = LogEvent::default();
376        log.insert("name", "Bob");
377        let event = transform_one(
378            r"
379              event = nil
380            ",
381            log,
382        );
383
384        assert!(event.is_none());
385    }
386
387    #[test]
388    fn lua_read_empty_field() {
389        let event = transform_one(
390            r#"
391              if event["non-existant"] == nil then
392                event["result"] = "empty"
393              else
394                event["result"] = "found"
395              end
396            "#,
397            LogEvent::default(),
398        )
399        .unwrap();
400
401        assert_eq!(event.as_log()["result"], "empty".into());
402    }
403
404    #[test]
405    fn lua_integer_value() {
406        let event = transform_one(
407            r#"
408              event["number"] = 3
409            "#,
410            LogEvent::default(),
411        )
412        .unwrap();
413        assert_eq!(event.as_log()["number"], Value::Integer(3));
414    }
415
416    #[test]
417    fn lua_numeric_value() {
418        let event = transform_one(
419            r#"
420              event["number"] = 3.14159
421            "#,
422            LogEvent::default(),
423        )
424        .unwrap();
425        assert_eq!(event.as_log()["number"], Value::from(3.14159));
426    }
427
428    #[test]
429    fn lua_boolean_value() {
430        let event = transform_one(
431            r#"
432              event["bool"] = true
433            "#,
434            LogEvent::default(),
435        )
436        .unwrap();
437        assert_eq!(event.as_log()["bool"], Value::Boolean(true));
438    }
439
440    #[test]
441    fn lua_non_coercible_value() {
442        let event = transform_one(
443            r#"
444              event["junk"] = {"asdf"}
445            "#,
446            LogEvent::default(),
447        )
448        .unwrap();
449        assert_eq!(event.as_log().get("junk"), None);
450    }
451
452    #[test]
453    fn lua_non_string_key_write() {
454        crate::test_util::trace_init();
455        let mut transform = Lua::new(
456            r#"
457              event[false] = "hello"
458            "#
459            .to_string(),
460            vec![],
461        )
462        .unwrap();
463
464        let err = transform.process(LogEvent::default().into()).unwrap_err();
465        let err = format_error(&err);
466        assert!(
467            err.contains("error converting Lua boolean to String"),
468            "{}",
469            err
470        );
471    }
472
473    #[test]
474    fn lua_non_string_key_read() {
475        crate::test_util::trace_init();
476        let mut transform = Lua::new(
477            r"
478              print(event[false])
479            "
480            .to_string(),
481            vec![],
482        )
483        .unwrap();
484
485        let err = transform.process(LogEvent::default().into()).unwrap_err();
486        let err = format_error(&err);
487        assert!(
488            err.contains("error converting Lua boolean to String"),
489            "{}",
490            err
491        );
492    }
493
494    #[test]
495    fn lua_script_error() {
496        crate::test_util::trace_init();
497        let mut transform = Lua::new(
498            r#"
499              error("this is an error")
500            "#
501            .to_string(),
502            vec![],
503        )
504        .unwrap();
505
506        let err = transform.process(LogEvent::default().into()).unwrap_err();
507        let err = format_error(&err);
508        assert!(err.contains("this is an error"), "{}", err);
509    }
510
511    #[test]
512    fn lua_syntax_error() {
513        crate::test_util::trace_init();
514        let err = Lua::new(
515            r"
516              1234 = sadf <>&*!#@
517            "
518            .to_string(),
519            vec![],
520        )
521        .map(|_| ())
522        .unwrap_err()
523        .to_string();
524
525        assert!(err.contains("syntax error:"), "{}", err);
526    }
527
528    #[test]
529    fn lua_load_file() {
530        use std::{fs::File, io::Write};
531        crate::test_util::trace_init();
532
533        let dir = tempfile::tempdir().unwrap();
534
535        let mut file = File::create(dir.path().join("script2.lua")).unwrap();
536        write!(
537            &mut file,
538            r#"
539              local M = {{}}
540
541              local function modify(event2)
542                event2["\"new field\""] = "new value"
543              end
544              M.modify = modify
545
546              return M
547            "#
548        )
549        .unwrap();
550
551        let source = r#"
552          local script2 = require("script2")
553          script2.modify(event)
554        "#
555        .to_string();
556
557        let mut transform =
558            Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
559        let event = transform.transform_one(LogEvent::default().into()).unwrap();
560        assert_eq!(event.as_log()["\"new field\""], "new value".into());
561    }
562
563    #[test]
564    fn lua_pairs() {
565        let mut event = LogEvent::default();
566        event.insert("name", "Bob");
567        event.insert("friend", "Alice");
568
569        let event = transform_one(
570            r"
571              for k,v in pairs(event) do
572                event[k] = k .. v
573              end
574            ",
575            event,
576        )
577        .unwrap();
578
579        assert_eq!(event.as_log()["name"], "nameBob".into());
580        assert_eq!(event.as_log()["friend"], "friendAlice".into());
581    }
582
583    fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
584        crate::test_util::trace_init();
585
586        let source = source_id();
587        let mut event = event.into();
588        event.set_source_id(Arc::clone(&source));
589
590        let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
591        let event = transform.transform_one(event);
592
593        if let Some(event) = &event {
594            assert_eq!(event.source_id(), Some(&source));
595        }
596
597        event
598    }
599
600    fn source_id() -> Arc<ComponentKey> {
601        Arc::new(ComponentKey::from(test_util::random_string(16)))
602    }
603}