vector/transforms/lua/v1/
mod.rs

1use std::{future::ready, pin::Pin};
2
3use futures::{stream, Stream, StreamExt};
4use mlua::ExternalError;
5use mlua::FromLua;
6use ordered_float::NotNan;
7use snafu::{ResultExt, Snafu};
8use vector_lib::configurable::configurable_component;
9use vrl::path::parse_target_path;
10
11use crate::config::OutputId;
12use crate::schema::Definition;
13use crate::{
14    config::{DataType, Input, TransformOutput},
15    event::{Event, Value},
16    internal_events::{LuaGcTriggered, LuaScriptError},
17    schema,
18    transforms::{TaskTransform, Transform},
19};
20
21#[derive(Debug, Snafu)]
22enum BuildError {
23    #[snafu(display("Lua error: {}", source))]
24    InvalidLua { source: mlua::Error },
25}
26
27/// Configuration for version one of the `lua` transform.
28#[configurable_component]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields)]
31pub struct LuaConfig {
32    /// The Lua program to execute for each event.
33    source: String,
34
35    /// A list of directories to search when loading a Lua file via the `require` function.
36    ///
37    /// If not specified, the modules are looked up in the configuration directories.
38    #[serde(default)]
39    search_dirs: Vec<String>,
40}
41
42impl LuaConfig {
43    pub fn build(&self) -> crate::Result<Transform> {
44        warn!(
45            "DEPRECATED The `lua` transform API version 1 is deprecated. Please convert your script to version 2."
46        );
47        Lua::new(self.source.clone(), self.search_dirs.clone()).map(Transform::event_task)
48    }
49
50    pub fn input(&self) -> Input {
51        Input::log()
52    }
53
54    pub fn outputs(
55        &self,
56        input_definitions: &[(OutputId, schema::Definition)],
57    ) -> Vec<TransformOutput> {
58        // Lua causes the type definition to be reset
59        let namespaces = input_definitions
60            .iter()
61            .flat_map(|(_output, definition)| definition.log_namespaces().clone())
62            .collect();
63
64        let definition = input_definitions
65            .iter()
66            .map(|(output, _definition)| {
67                (
68                    output.clone(),
69                    Definition::default_for_namespace(&namespaces),
70                )
71            })
72            .collect();
73
74        vec![TransformOutput::new(DataType::Log, definition)]
75    }
76}
77
78// Lua's garbage collector sometimes seems to be not executed automatically on high event rates,
79// which leads to leak-like RAM consumption pattern. This constant sets the number of invocations of
80// the Lua transform after which GC would be called, thus ensuring that the RAM usage is not too high.
81//
82// This constant is larger than 1 because calling GC is an expensive operation, so doing it
83// after each transform would have significant footprint on the performance.
84const GC_INTERVAL: usize = 16;
85
86#[derive(Derivative)]
87#[derivative(Debug)]
88pub struct Lua {
89    #[derivative(Debug = "ignore")]
90    source: String,
91    #[derivative(Debug = "ignore")]
92    search_dirs: Vec<String>,
93    #[derivative(Debug = "ignore")]
94    lua: mlua::Lua,
95    vector_func: mlua::RegistryKey,
96    invocations_after_gc: usize,
97}
98
99impl Clone for Lua {
100    fn clone(&self) -> Self {
101        Lua::new(self.source.clone(), self.search_dirs.clone())
102            .expect("Tried to clone existing valid lua transform. This is an invariant.")
103    }
104}
105
106// This wrapping structure is added in order to make it possible to have independent implementations
107// of `mlua::UserData` trait for event in version 1 and version 2 of the transform.
108#[derive(Clone, FromLua)]
109struct LuaEvent {
110    inner: Event,
111}
112
113impl Lua {
114    pub fn new(source: String, search_dirs: Vec<String>) -> crate::Result<Self> {
115        // In order to support loading C modules in Lua, we need to create unsafe instance
116        // without debug library.
117        let lua = unsafe {
118            mlua::Lua::unsafe_new_with(mlua::StdLib::ALL_SAFE, mlua::LuaOptions::default())
119        };
120
121        let additional_paths = search_dirs
122            .iter()
123            .map(|d| format!("{d}/?.lua"))
124            .collect::<Vec<_>>()
125            .join(";");
126
127        if !additional_paths.is_empty() {
128            let package = lua
129                .globals()
130                .get::<mlua::Table>("package")
131                .context(InvalidLuaSnafu)?;
132            let current_paths = package
133                .get::<String>("path")
134                .unwrap_or_else(|_| ";".to_string());
135            let paths = format!("{additional_paths};{current_paths}");
136            package.set("path", paths).context(InvalidLuaSnafu)?;
137        }
138
139        let func = lua.load(&source).into_function().context(InvalidLuaSnafu)?;
140        let vector_func = lua.create_registry_value(func).context(InvalidLuaSnafu)?;
141
142        Ok(Self {
143            source,
144            search_dirs,
145            lua,
146            vector_func,
147            invocations_after_gc: 0,
148        })
149    }
150
151    fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
152        let source_id = event.source_id().cloned();
153        let lua = &self.lua;
154        let globals = lua.globals();
155
156        globals.raw_set("event", LuaEvent { inner: event })?;
157
158        let func = lua.registry_value::<mlua::Function>(&self.vector_func)?;
159        func.call::<()>(())?;
160
161        let result = globals.raw_get::<Option<LuaEvent>>("event").map(|option| {
162            option.map(|lua_event| {
163                let mut event = lua_event.inner;
164                if let Some(source_id) = source_id {
165                    event.set_source_id(source_id);
166                }
167                event
168            })
169        });
170
171        self.invocations_after_gc += 1;
172        if self.invocations_after_gc % GC_INTERVAL == 0 {
173            emit!(LuaGcTriggered {
174                used_memory: self.lua.used_memory()
175            });
176            self.lua.gc_collect()?;
177            self.invocations_after_gc = 0;
178        }
179
180        result
181    }
182
183    pub fn transform_one(&mut self, event: Event) -> Option<Event> {
184        match self.process(event) {
185            Ok(event) => event,
186            Err(error) => {
187                emit!(LuaScriptError { error });
188                None
189            }
190        }
191    }
192}
193
194impl TaskTransform<Event> for Lua {
195    fn transform(
196        self: Box<Self>,
197        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
198    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
199    where
200        Self: 'static,
201    {
202        let mut inner = self;
203        Box::pin(
204            task.filter_map(move |event| {
205                let mut output = Vec::with_capacity(1);
206                ready(match inner.process(event) {
207                    Ok(event) => {
208                        output.extend(event);
209                        Some(stream::iter(output))
210                    }
211                    Err(error) => {
212                        emit!(LuaScriptError { error });
213                        None
214                    }
215                })
216            })
217            .flatten(),
218        )
219    }
220}
221
222impl mlua::UserData for LuaEvent {
223    fn add_methods<M: mlua::UserDataMethods<Self>>(methods: &mut M) {
224        methods.add_meta_method_mut(
225            mlua::MetaMethod::NewIndex,
226            |_lua, this, (key, value): (String, Option<mlua::Value>)| {
227                let key_path = parse_target_path(key.as_str()).map_err(|e| e.into_lua_err())?;
228                match value {
229                    Some(mlua::Value::String(string)) => {
230                        this.inner.as_mut_log().insert(
231                            &key_path,
232                            Value::from(string.to_str().expect("Expected UTF-8.").to_owned()),
233                        );
234                    }
235                    Some(mlua::Value::Integer(integer)) => {
236                        this.inner
237                            .as_mut_log()
238                            .insert(&key_path, Value::Integer(integer));
239                    }
240                    Some(mlua::Value::Number(number)) if !number.is_nan() => {
241                        this.inner
242                            .as_mut_log()
243                            .insert(&key_path, Value::Float(NotNan::new(number).unwrap()));
244                    }
245                    Some(mlua::Value::Boolean(boolean)) => {
246                        this.inner
247                            .as_mut_log()
248                            .insert(&key_path, Value::Boolean(boolean));
249                    }
250                    Some(mlua::Value::Nil) | None => {
251                        this.inner.as_mut_log().remove(&key_path);
252                    }
253                    _ => {
254                        info!(
255                            message =
256                                "Could not set field to Lua value of invalid type, dropping field.",
257                            field = key.as_str(),
258                            internal_log_rate_limit = true
259                        );
260                        this.inner.as_mut_log().remove(&key_path);
261                    }
262                }
263
264                Ok(())
265            },
266        );
267
268        methods.add_meta_method(mlua::MetaMethod::Index, |lua, this, key: String| {
269            if let Some(value) = this
270                .inner
271                .as_log()
272                .parse_path_and_get_value(key.as_str())
273                .ok()
274                .flatten()
275            {
276                let string = lua.create_string(value.coerce_to_bytes())?;
277                Ok(Some(string))
278            } else {
279                Ok(None)
280            }
281        });
282
283        methods.add_meta_function(mlua::MetaMethod::Pairs, |lua, event: LuaEvent| {
284            let state = lua.create_table()?;
285            {
286                if let Some(keys) = event.inner.as_log().keys() {
287                    let keys = lua.create_table_from(keys.map(|k| (k, true)))?;
288                    state.raw_set("keys", keys)?;
289                }
290                state.raw_set("event", event)?;
291            }
292            let function =
293                lua.create_function(|lua, (state, prev): (mlua::Table, Option<String>)| {
294                    let event: LuaEvent = state.raw_get("event")?;
295                    let keys: mlua::Table = state.raw_get("keys")?;
296                    let next: mlua::Function = lua.globals().raw_get("next")?;
297                    let key: Option<String> = next.call((keys, prev))?;
298                    let value = key.clone().and_then(|k| {
299                        event
300                            .inner
301                            .as_log()
302                            .parse_path_and_get_value(k.as_str())
303                            .ok()
304                            .flatten()
305                    });
306                    match value {
307                        Some(value) => Ok((key, Some(lua.create_string(value.coerce_to_bytes())?))),
308                        None => Ok((None, None)),
309                    }
310                })?;
311            Ok((function, state))
312        });
313    }
314}
315
316pub fn format_error(error: &mlua::Error) -> String {
317    match error {
318        mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback,
319        err => err.to_string(),
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use std::sync::Arc;
326
327    use super::*;
328    use crate::event::{Event, LogEvent, Value};
329    use crate::{config::ComponentKey, test_util};
330
331    #[test]
332    fn lua_add_field() {
333        let event = transform_one(
334            r#"
335              event["hello"] = "goodbye"
336            "#,
337            LogEvent::from("program me"),
338        )
339        .unwrap();
340
341        assert_eq!(event.as_log()["hello"], "goodbye".into());
342    }
343
344    #[test]
345    fn lua_read_field() {
346        let event = transform_one(
347            r#"
348              _, _, name = string.find(event["message"], "Hello, my name is (%a+).")
349              event["name"] = name
350            "#,
351            LogEvent::from("Hello, my name is Bob."),
352        )
353        .unwrap();
354
355        assert_eq!(event.as_log()["name"], "Bob".into());
356    }
357
358    #[test]
359    fn lua_remove_field() {
360        let mut log = LogEvent::default();
361        log.insert("name", "Bob");
362        let event = transform_one(
363            r#"
364              event["name"] = nil
365            "#,
366            log,
367        )
368        .unwrap();
369
370        assert!(event.as_log().get("name").is_none());
371    }
372
373    #[test]
374    fn lua_drop_event() {
375        let mut log = LogEvent::default();
376        log.insert("name", "Bob");
377        let event = transform_one(
378            r"
379              event = nil
380            ",
381            log,
382        );
383
384        assert!(event.is_none());
385    }
386
387    #[test]
388    fn lua_read_empty_field() {
389        let event = transform_one(
390            r#"
391              if event["non-existant"] == nil then
392                event["result"] = "empty"
393              else
394                event["result"] = "found"
395              end
396            "#,
397            LogEvent::default(),
398        )
399        .unwrap();
400
401        assert_eq!(event.as_log()["result"], "empty".into());
402    }
403
404    #[test]
405    fn lua_integer_value() {
406        let event = transform_one(
407            r#"
408              event["number"] = 3
409            "#,
410            LogEvent::default(),
411        )
412        .unwrap();
413        assert_eq!(event.as_log()["number"], Value::Integer(3));
414    }
415
416    #[test]
417    fn lua_numeric_value() {
418        let event = transform_one(
419            r#"
420              event["number"] = 3.14159
421            "#,
422            LogEvent::default(),
423        )
424        .unwrap();
425        assert_eq!(event.as_log()["number"], Value::from(3.14159));
426    }
427
428    #[test]
429    fn lua_boolean_value() {
430        let event = transform_one(
431            r#"
432              event["bool"] = true
433            "#,
434            LogEvent::default(),
435        )
436        .unwrap();
437        assert_eq!(event.as_log()["bool"], Value::Boolean(true));
438    }
439
440    #[test]
441    fn lua_non_coercible_value() {
442        let event = transform_one(
443            r#"
444              event["junk"] = {"asdf"}
445            "#,
446            LogEvent::default(),
447        )
448        .unwrap();
449        assert_eq!(event.as_log().get("junk"), None);
450    }
451
452    #[test]
453    fn lua_non_string_key_write() {
454        crate::test_util::trace_init();
455        let mut transform = Lua::new(
456            r#"
457              event[false] = "hello"
458            "#
459            .to_string(),
460            vec![],
461        )
462        .unwrap();
463
464        let err = transform.process(LogEvent::default().into()).unwrap_err();
465        let err = format_error(&err);
466        assert!(
467            err.contains("error converting Lua boolean to String"),
468            "{}",
469            err
470        );
471    }
472
473    #[test]
474    fn lua_non_string_key_read() {
475        crate::test_util::trace_init();
476        let mut transform = Lua::new(
477            r"
478              print(event[false])
479            "
480            .to_string(),
481            vec![],
482        )
483        .unwrap();
484
485        let err = transform.process(LogEvent::default().into()).unwrap_err();
486        let err = format_error(&err);
487        assert!(
488            err.contains("error converting Lua boolean to String"),
489            "{}",
490            err
491        );
492    }
493
494    #[test]
495    fn lua_script_error() {
496        crate::test_util::trace_init();
497        let mut transform = Lua::new(
498            r#"
499              error("this is an error")
500            "#
501            .to_string(),
502            vec![],
503        )
504        .unwrap();
505
506        let err = transform.process(LogEvent::default().into()).unwrap_err();
507        let err = format_error(&err);
508        assert!(err.contains("this is an error"), "{}", err);
509    }
510
511    #[test]
512    fn lua_syntax_error() {
513        crate::test_util::trace_init();
514        let err = Lua::new(
515            r"
516              1234 = sadf <>&*!#@
517            "
518            .to_string(),
519            vec![],
520        )
521        .map(|_| ())
522        .unwrap_err()
523        .to_string();
524
525        assert!(err.contains("syntax error:"), "{}", err);
526    }
527
528    #[test]
529    fn lua_load_file() {
530        use std::{fs::File, io::Write};
531        crate::test_util::trace_init();
532
533        let dir = tempfile::tempdir().unwrap();
534
535        let mut file = File::create(dir.path().join("script2.lua")).unwrap();
536        write!(
537            &mut file,
538            r#"
539              local M = {{}}
540
541              local function modify(event2)
542                event2["\"new field\""] = "new value"
543              end
544              M.modify = modify
545
546              return M
547            "#
548        )
549        .unwrap();
550
551        let source = r#"
552          local script2 = require("script2")
553          script2.modify(event)
554        "#
555        .to_string();
556
557        let mut transform =
558            Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
559        let event = transform.transform_one(LogEvent::default().into()).unwrap();
560        assert_eq!(event.as_log()["\"new field\""], "new value".into());
561    }
562
563    #[test]
564    fn lua_pairs() {
565        let mut event = LogEvent::default();
566        event.insert("name", "Bob");
567        event.insert("friend", "Alice");
568
569        let event = transform_one(
570            r"
571              for k,v in pairs(event) do
572                event[k] = k .. v
573              end
574            ",
575            event,
576        )
577        .unwrap();
578
579        assert_eq!(event.as_log()["name"], "nameBob".into());
580        assert_eq!(event.as_log()["friend"], "friendAlice".into());
581    }
582
583    fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
584        crate::test_util::trace_init();
585
586        let source = source_id();
587        let mut event = event.into();
588        event.set_source_id(Arc::clone(&source));
589
590        let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
591        let event = transform.transform_one(event);
592
593        if let Some(event) = &event {
594            assert_eq!(event.source_id(), Some(&source));
595        }
596
597        event
598    }
599
600    fn source_id() -> Arc<ComponentKey> {
601        Arc::new(ComponentKey::from(test_util::random_string(16)))
602    }
603}