vector/transforms/lua/v1/
mod.rs

1use std::{future::ready, pin::Pin};
2
3use futures::{Stream, StreamExt, stream};
4use mlua::{ExternalError, FromLua};
5use ordered_float::NotNan;
6use snafu::{ResultExt, Snafu};
7use vector_lib::configurable::configurable_component;
8use vrl::path::parse_target_path;
9
10use crate::{
11    config::{DataType, Input, OutputId, TransformOutput},
12    event::{Event, Value},
13    internal_events::{LuaGcTriggered, LuaScriptError},
14    schema,
15    schema::Definition,
16    transforms::{TaskTransform, Transform},
17};
18
19#[derive(Debug, Snafu)]
20enum BuildError {
21    #[snafu(display("Lua error: {}", source))]
22    InvalidLua { source: mlua::Error },
23}
24
25/// Configuration for version one of the `lua` transform.
26#[configurable_component]
27#[derive(Clone, Debug)]
28#[serde(deny_unknown_fields)]
29pub struct LuaConfig {
30    /// The Lua program to execute for each event.
31    source: String,
32
33    /// A list of directories to search when loading a Lua file via the `require` function.
34    ///
35    /// If not specified, the modules are looked up in the configuration directories.
36    #[serde(default)]
37    search_dirs: Vec<String>,
38}
39
40impl LuaConfig {
41    pub fn build(&self) -> crate::Result<Transform> {
42        warn!(
43            "DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
44        );
45        Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
46    }
47
48    pub fn input(&self) -> Input {
49        Input::log()
50    }
51
52    pub fn outputs(
53        &self,
54        input_definitions: &[(OutputId, schema::Definition)],
55    ) -> Vec<TransformOutput> {
56        // Lua causes the type definition to be reset
57        let namespaces = input_definitions
58            .iter()
59            .flat_map(|(_output, definition)| definition.log_namespaces().clone())
60            .collect();
61
62        let definition = input_definitions
63            .iter()
64            .map(|(output, _definition)| {
65                (
66                    output.clone(),
67                    Definition::default_for_namespace(&namespaces),
68                )
69            })
70            .collect();
71
72        vec![TransformOutput::new(DataType::Log, definition)]
73    }
74}
75
76// Lua's garbage collector sometimes seems to be not executed automatically on high event rates,
77// which leads to leak-like RAM consumption pattern. This constant sets the number of invocations of
78// the Lua transform after which GC would be called, thus ensuring that the RAM usage is not too high.
79//
80// This constant is larger than 1 because calling GC is an expensive operation, so doing it
81// after each transform would have significant footprint on the performance.
82const GC_INTERVAL: usize = 16;
83
84#[derive(Derivative)]
85#[derivative(Debug)]
86pub struct Lua {
87    #[derivative(Debug = "ignore")]
88    source: String,
89    #[derivative(Debug = "ignore")]
90    search_dirs: Vec<String>,
91    #[derivative(Debug = "ignore")]
92    lua: mlua::Lua,
93    vector_func: mlua::RegistryKey,
94    invocations_after_gc: usize,
95}
96
97impl Clone for Lua {
98    fn clone(&self) -> Self {
99        Lua::new(self.source.clone(), self.search_dirs.clone())
100            .expect("Tried to clone existing valid lua transform. This is an invariant.")
101    }
102}
103
104// This wrapping structure is added in order to make it possible to have independent implementations
105// of `mlua::UserData` trait for event in version 1 and version 2 of the transform.
106#[derive(Clone, FromLua)]
107struct LuaEvent {
108    inner: Event,
109}
110
111impl Lua {
112    pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
113        // In order to support loading C modules in Lua, we need to create unsafe instance
114        // without debug library.
115        let lua = unsafe {
116            mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
117        };
118
119        let additional_paths = search_dirs
120            .iter()
121            .map(|d| format!("{d}/?.lua"))
122            .collect::<Vec<_>>()
123            .join(";");
124
125        if !additional_paths.is_empty() {
126            let package = lua
127                .globals()
128                .get::<mlua::Table>("package")
129                .context(InvalidLuaSnafu)?;
130            let current_paths = package
131                .get::<String>("path")
132                .unwrap_or_else(|_| ";".to_string());
133            let paths = format!("{additional_paths};{current_paths}");
134            package.set("path", paths).context(InvalidLuaSnafu)?;
135        }
136
137        let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
138        let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
139
140        Ok(Self {
141            source,
142            search_dirs,
143            lua,
144            vector_func,
145            invocations_after_gc: 0,
146        })
147    }
148
149    fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
150        let source_id = event.source_id().cloned();
151        let lua = &self.lua;
152        let globals = lua.globals();
153
154        globals.raw_set("event", LuaEvent { inner: event })?;
155
156        let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
157        func.call::<()>(())?;
158
159        let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
160            option.map(|lua_event| {
161                let mut event = lua_event.inner;
162                if let Some(source_id) = source_id {
163                    event.set_source_id(source_id);
164                }
165                event
166            })
167        });
168
169        self.invocations_after_gc += 1;
170        if self.invocations_after_gc % GC_INTERVAL == 0 {
171            emit!(LuaGcTriggered {
172                used_memory: self.lua.used_memory()
173            });
174            self.lua.gc_collect()?;
175            self.invocations_after_gc = 0;
176        }
177
178        result
179    }
180
181    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
182        match self.process(event) {
183            Ok(event) => event,
184            Err(error) => {
185                emit!(LuaScriptError { error });
186                None
187            }
188        }
189    }
190}
191
192impl TaskTransform<Event> for Lua {
193    fn transform(
194        self: Box<Self>,
195        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
196    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
197    where
198        Self: 'static,
199    {
200        let mut inner = self;
201        Box::pin(
202            task.filter_map(move |event| {
203                let mut output = Vec::with_capacity(1);
204                ready(match inner.process(event) {
205                    Ok(event) => {
206                        output.extend(event);
207                        Some(stream::iter(output))
208                    }
209                    Err(error) => {
210                        emit!(LuaScriptError { error });
211                        None
212                    }
213                })
214            })
215            .flatten(),
216        )
217    }
218}
219
220impl mlua::UserData for LuaEvent {
221    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
222        methods.add_meta_method_mut(
223            mlua::MetaMethod::NewIndex,
224            |_lua, this, (key, value): (String, Option<mlua::Value>)| {
225                let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
226                match value {
227                    Some(mlua::Value::String(string)) => {
228                        this.inner.as_mut_log().insert(
229                            &key_path,
230                            Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
231                        );
232                    }
233                    Some(mlua::Value::Integer(integer)) => {
234                        this.inner
235                            .as_mut_log()
236                            .insert(&key_path, Value::Integer(integer));
237                    }
238                    Some(mlua::Value::Number(number)) if !number.is_nan() => {
239                        this.inner
240                            .as_mut_log()
241                            .insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
242                    }
243                    Some(mlua::Value::Boolean(boolean)) => {
244                        this.inner
245                            .as_mut_log()
246                            .insert(&key_path, Value::Boolean(boolean));
247                    }
248                    Some(mlua::Value::Nil) | None => {
249                        this.inner.as_mut_log().remove(&key_path);
250                    }
251                    _ => {
252                        info!(
253                            message =
254                                "Could not set field to Lua value of invalid type, dropping field.",
255                            field = key.as_str(),
256                            internal_log_rate_limit = true
257                        );
258                        this.inner.as_mut_log().remove(&key_path);
259                    }
260                }
261
262                Ok(())
263            },
264        );
265
266        methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
267            if let Some(value) = this
268                .inner
269                .as_log()
270                .parse_path_and_get_value(key.as_str())
271                .ok()
272                .flatten()
273            {
274                let string = lua.create_string(value.coerce_to_bytes())?;
275                Ok(Some(string))
276            } else {
277                Ok(None)
278            }
279        });
280
281        methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
282            let state = lua.create_table()?;
283            {
284                if let Some(keys) = event.inner.as_log().keys() {
285                    let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
286                    state.raw_set("keys", keys)?;
287                }
288                state.raw_set("event", event)?;
289            }
290            let function =
291                lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
292                    let event: LuaEvent = state.raw_get("event")?;
293                    let keys: mlua::Table = state.raw_get("keys")?;
294                    let next: mlua::Function = lua.globals().raw_get("next")?;
295                    let key: Option<String> = next.call((keys, prev))?;
296                    let value = key.clone().and_then(|k| {
297                        event
298                            .inner
299                            .as_log()
300                            .parse_path_and_get_value(k.as_str())
301                            .ok()
302                            .flatten()
303                    });
304                    match value {
305                        Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
306                        None => Ok((None, None)),
307                    }
308                })?;
309            Ok((function, state))
310        });
311    }
312}
313
314pub fn format_error(error: &mlua::Error) -> String {
315    match error {
316        mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
317        err => err.to_string(),
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use std::sync::Arc;
324
325    use super::*;
326    use crate::{
327        config::ComponentKey,
328        event::{Event, LogEvent, Value},
329        test_util,
330    };
331
332    #[test]
333    fn lua_add_field() {
334        let event = transform_one(
335            r#"
336              event["hello"] = "goodbye"
337            "#,
338            LogEvent::from("program me"),
339        )
340        .unwrap();
341
342        assert_eq!(event.as_log()["hello"], "goodbye".into());
343    }
344
345    #[test]
346    fn lua_read_field() {
347        let event = transform_one(
348            r#"
349              _, _, name = string.find(event["message"], "Hello, my name is (%a+).")
350              event["name"] = name
351            "#,
352            LogEvent::from("Hello, my name is Bob."),
353        )
354        .unwrap();
355
356        assert_eq!(event.as_log()["name"], "Bob".into());
357    }
358
359    #[test]
360    fn lua_remove_field() {
361        let mut log = LogEvent::default();
362        log.insert("name", "Bob");
363        let event = transform_one(
364            r#"
365              event["name"] = nil
366            "#,
367            log,
368        )
369        .unwrap();
370
371        assert!(event.as_log().get("name").is_none());
372    }
373
374    #[test]
375    fn lua_drop_event() {
376        let mut log = LogEvent::default();
377        log.insert("name", "Bob");
378        let event = transform_one(
379            r"
380              event = nil
381            ",
382            log,
383        );
384
385        assert!(event.is_none());
386    }
387
388    #[test]
389    fn lua_read_empty_field() {
390        let event = transform_one(
391            r#"
392              if event["non-existant"] == nil then
393                event["result"] = "empty"
394              else
395                event["result"] = "found"
396              end
397            "#,
398            LogEvent::default(),
399        )
400        .unwrap();
401
402        assert_eq!(event.as_log()["result"], "empty".into());
403    }
404
405    #[test]
406    fn lua_integer_value() {
407        let event = transform_one(
408            r#"
409              event["number"] = 3
410            "#,
411            LogEvent::default(),
412        )
413        .unwrap();
414        assert_eq!(event.as_log()["number"], Value::Integer(3));
415    }
416
417    #[test]
418    fn lua_numeric_value() {
419        let event = transform_one(
420            r#"
421              event["number"] = 3.14159
422            "#,
423            LogEvent::default(),
424        )
425        .unwrap();
426        assert_eq!(event.as_log()["number"], Value::from(3.14159));
427    }
428
429    #[test]
430    fn lua_boolean_value() {
431        let event = transform_one(
432            r#"
433              event["bool"] = true
434            "#,
435            LogEvent::default(),
436        )
437        .unwrap();
438        assert_eq!(event.as_log()["bool"], Value::Boolean(true));
439    }
440
441    #[test]
442    fn lua_non_coercible_value() {
443        let event = transform_one(
444            r#"
445              event["junk"] = {"asdf"}
446            "#,
447            LogEvent::default(),
448        )
449        .unwrap();
450        assert_eq!(event.as_log().get("junk"), None);
451    }
452
453    #[test]
454    fn lua_non_string_key_write() {
455        crate::test_util::trace_init();
456        let mut transform = Lua::new(
457            r#"
458              event[false] = "hello"
459            "#
460            .to_string(),
461            vec![],
462        )
463        .unwrap();
464
465        let err = transform.process(LogEvent::default().into()).unwrap_err();
466        let err = format_error(&err);
467        assert!(
468            err.contains("error converting Lua boolean to String"),
469            "{}",
470            err
471        );
472    }
473
474    #[test]
475    fn lua_non_string_key_read() {
476        crate::test_util::trace_init();
477        let mut transform = Lua::new(
478            r"
479              print(event[false])
480            "
481            .to_string(),
482            vec![],
483        )
484        .unwrap();
485
486        let err = transform.process(LogEvent::default().into()).unwrap_err();
487        let err = format_error(&err);
488        assert!(
489            err.contains("error converting Lua boolean to String"),
490            "{}",
491            err
492        );
493    }
494
495    #[test]
496    fn lua_script_error() {
497        crate::test_util::trace_init();
498        let mut transform = Lua::new(
499            r#"
500              error("this is an error")
501            "#
502            .to_string(),
503            vec![],
504        )
505        .unwrap();
506
507        let err = transform.process(LogEvent::default().into()).unwrap_err();
508        let err = format_error(&err);
509        assert!(err.contains("this is an error"), "{}", err);
510    }
511
512    #[test]
513    fn lua_syntax_error() {
514        crate::test_util::trace_init();
515        let err = Lua::new(
516            r"
517              1234 = sadf <>&*!#@
518            "
519            .to_string(),
520            vec![],
521        )
522        .map(|_| ())
523        .unwrap_err()
524        .to_string();
525
526        assert!(err.contains("syntax error:"), "{}", err);
527    }
528
529    #[test]
530    fn lua_load_file() {
531        use std::{fs::File, io::Write};
532        crate::test_util::trace_init();
533
534        let dir = tempfile::tempdir().unwrap();
535
536        let mut file = File::create(dir.path().join("script2.lua")).unwrap();
537        write!(
538            &mut file,
539            r#"
540              local M = {{}}
541
542              local function modify(event2)
543                event2["\"new field\""] = "new value"
544              end
545              M.modify = modify
546
547              return M
548            "#
549        )
550        .unwrap();
551
552        let source = r#"
553          local script2 = require("script2")
554          script2.modify(event)
555        "#
556        .to_string();
557
558        let mut transform =
559            Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
560        let event = transform.transform_one(LogEvent::default().into()).unwrap();
561        assert_eq!(event.as_log()["\"new field\""], "new value".into());
562    }
563
564    #[test]
565    fn lua_pairs() {
566        let mut event = LogEvent::default();
567        event.insert("name", "Bob");
568        event.insert("friend", "Alice");
569
570        let event = transform_one(
571            r"
572              for k,v in pairs(event) do
573                event[k] = k .. v
574              end
575            ",
576            event,
577        )
578        .unwrap();
579
580        assert_eq!(event.as_log()["name"], "nameBob".into());
581        assert_eq!(event.as_log()["friend"], "friendAlice".into());
582    }
583
584    fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
585        crate::test_util::trace_init();
586
587        let source = source_id();
588        let mut event = event.into();
589        event.set_source_id(Arc::clone(&source));
590
591        let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
592        let event = transform.transform_one(event);
593
594        if let Some(event) = &event {
595            assert_eq!(event.source_id(), Some(&source));
596        }
597
598        event
599    }
600
601    fn source_id() -> Arc<ComponentKey> {
602        Arc::new(ComponentKey::from(test_util::random_string(16)))
603    }
604}