codecs/decoding/format/
vrl.rs

1use bytes::Bytes;
2use smallvec::{SmallVec, smallvec};
3use vector_config_macros::configurable_component;
4use vector_core::{
5    compile_vrl,
6    config::{DataType, LogNamespace},
7    event::{Event, TargetEvents, VrlTarget},
8    schema,
9};
10use vrl::{
11    compiler::{CompileConfig, Program, TimeZone, TypeState, runtime::Runtime, state::ExternalEnv},
12    diagnostic::Formatter,
13    value::Kind,
14};
15
16use crate::{BytesDeserializerConfig, decoding::format::Deserializer};
17
18/// Config used to build a `VrlDeserializer`.
19#[configurable_component]
20#[derive(Debug, Clone, Default)]
21pub struct VrlDeserializerConfig {
22    /// VRL-specific decoding options.
23    pub vrl: VrlDeserializerOptions,
24}
25
26/// VRL-specific decoding options.
27#[configurable_component]
28#[derive(Debug, Clone, PartialEq, Eq, Default)]
29pub struct VrlDeserializerOptions {
30    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
31    /// Note that the final contents of the `.` target will be used as the decoding result.
32    /// Compilation error or use of 'abort' in a program will result in a decoding error.
33    ///
34    ///
35    /// [vrl]: https://vector.dev/docs/reference/vrl
36    pub source: String,
37
38    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
39    /// time zone. The time zone name may be any name in the [TZ database][tz_database], or `local`
40    /// to indicate system local time.
41    ///
42    /// If not set, `local` is used.
43    ///
44    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
45    #[serde(default)]
46    #[configurable(metadata(docs::advanced))]
47    pub timezone: Option<TimeZone>,
48}
49
50impl VrlDeserializerConfig {
51    /// Build the `VrlDeserializer` from this configuration.
52    pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
53        let state = TypeState {
54            local: Default::default(),
55            external: ExternalEnv::default(),
56        };
57
58        match compile_vrl(
59            &self.vrl.source,
60            &vector_vrl_functions::all(),
61            &state,
62            CompileConfig::default(),
63        ) {
64            Ok(result) => Ok(VrlDeserializer {
65                program: result.program,
66                timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
67            }),
68            Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
69                .to_string()
70                .into()),
71        }
72    }
73
74    /// Return the type of event build by this deserializer.
75    pub fn output_type(&self) -> DataType {
76        DataType::Log
77    }
78
79    /// The schema produced by the deserializer.
80    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
81        match log_namespace {
82            LogNamespace::Legacy => {
83                schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
84            }
85            LogNamespace::Vector => {
86                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
87            }
88        }
89    }
90}
91
92/// Deserializer that builds `Event`s from a byte frame containing logs compatible with VRL.
93#[derive(Debug, Clone)]
94pub struct VrlDeserializer {
95    program: Program,
96    timezone: TimeZone,
97}
98
99fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
100    let bytes_deserializer = BytesDeserializerConfig::new().build();
101    let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
102    Event::from(log_event)
103}
104
105impl Deserializer for VrlDeserializer {
106    fn parse(
107        &self,
108        bytes: Bytes,
109        log_namespace: LogNamespace,
110    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
111        let event = parse_bytes(bytes, log_namespace);
112        match self.run_vrl(event, log_namespace) {
113            Ok(events) => Ok(events),
114            Err(e) => Err(e),
115        }
116    }
117}
118
119impl VrlDeserializer {
120    fn run_vrl(
121        &self,
122        event: Event,
123        log_namespace: LogNamespace,
124    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
125        let mut runtime = Runtime::default();
126        let mut target = VrlTarget::new(event, self.program.info(), true);
127        match runtime.resolve(&mut target, &self.program, &self.timezone) {
128            Ok(_) => match target.into_events(log_namespace) {
129                TargetEvents::One(event) => Ok(smallvec![event]),
130                TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
131                TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
132            },
133            Err(e) => Err(e.to_string().into()),
134        }
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use chrono::{DateTime, Utc};
141    use indoc::indoc;
142    use vrl::{btreemap, path::OwnedTargetPath, value::Value};
143
144    use super::*;
145
146    fn make_decoder(source: &str) -> VrlDeserializer {
147        VrlDeserializerConfig {
148            vrl: VrlDeserializerOptions {
149                source: source.to_string(),
150                timezone: None,
151            },
152        }
153        .build()
154        .expect("Failed to build VrlDeserializer")
155    }
156
157    #[test]
158    fn test_json_message() {
159        let source = indoc!(
160            r#"
161            %m1 = "metadata"
162            . = string!(.)
163            . = parse_json!(.)
164            "#
165        );
166
167        let decoder = make_decoder(source);
168
169        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
170        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
171        assert_eq!(result.len(), 1);
172        let event = result.first().unwrap();
173        assert_eq!(
174            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
175            btreemap! { "message" => "Hello VRL" }.into()
176        );
177        assert_eq!(
178            *event
179                .as_log()
180                .get(&OwnedTargetPath::metadata_root())
181                .unwrap(),
182            btreemap! { "m1" => "metadata" }.into()
183        );
184    }
185
186    #[test]
187    fn test_ignored_returned_expression() {
188        let source = indoc!(
189            r#"
190            . = { "a" : 1 }
191            { "b" : 9 }
192        "#
193        );
194
195        let decoder = make_decoder(source);
196
197        let log_bytes = Bytes::from("some bytes");
198        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
199        assert_eq!(result.len(), 1);
200        let event = result.first().unwrap();
201        assert_eq!(
202            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
203            btreemap! { "a" => 1 }.into()
204        );
205    }
206
207    #[test]
208    fn test_multiple_events() {
209        let source = indoc!(". = [0,1,2]");
210        let decoder = make_decoder(source);
211        let log_bytes = Bytes::from("some bytes");
212        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
213        assert_eq!(result.len(), 3);
214        for (i, event) in result.iter().enumerate() {
215            assert_eq!(
216                *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
217                i.into()
218            );
219        }
220    }
221
222    #[test]
223    fn test_syslog_and_cef_input() {
224        let source = indoc!(
225            r#"
226            if exists(.message) {
227                . = string!(.message)
228            }
229            . = parse_syslog(.) ?? parse_cef(.) ?? null
230            "#
231        );
232
233        let decoder = make_decoder(source);
234
235        // Syslog input
236        let syslog_bytes = Bytes::from(
237            "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
238        );
239        let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
240        assert_eq!(result.len(), 1);
241        let syslog_event = result.first().unwrap();
242        assert_eq!(
243            *syslog_event
244                .as_log()
245                .get(&OwnedTargetPath::event_root())
246                .unwrap(),
247            btreemap! {
248                "appname" => "su",
249                "facility" => "auth",
250                "hostname" => "mymachine.example.com",
251                "message" => "'su root' failed for user on /dev/pts/8",
252                "msgid" => "ID47",
253                "severity" => "crit",
254                "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
255                "version" => 1
256            }
257            .into()
258        );
259
260        // CEF input
261        let cef_bytes = Bytes::from(
262            "CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232",
263        );
264        let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
265        assert_eq!(result.len(), 1);
266        let cef_event = result.first().unwrap();
267        assert_eq!(
268            *cef_event
269                .as_log()
270                .get(&OwnedTargetPath::event_root())
271                .unwrap(),
272            btreemap! {
273                "cefVersion" =>"0",
274                "deviceEventClassId" =>"100",
275                "deviceProduct" =>"Threat Manager",
276                "deviceVendor" =>"Security",
277                "deviceVersion" =>"1.0",
278                "dst" =>"2.1.2.2",
279                "name" =>"worm successfully stopped",
280                "severity" =>"10",
281                "spt" =>"1232",
282                "src" =>"10.0.0.1"
283            }
284            .into()
285        );
286        let random_bytes = Bytes::from("a|- -| x");
287        let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
288        let random_event = result.first().unwrap();
289        assert_eq!(result.len(), 1);
290        assert_eq!(
291            *random_event
292                .as_log()
293                .get(&OwnedTargetPath::event_root())
294                .unwrap(),
295            Value::Null
296        );
297    }
298
299    #[test]
300    fn test_invalid_source() {
301        let error = VrlDeserializerConfig {
302            vrl: VrlDeserializerOptions {
303                source: ". ?".to_string(),
304                timezone: None,
305            },
306        }
307        .build()
308        .unwrap_err()
309        .to_string();
310        assert!(error.contains("error[E203]: syntax error"));
311    }
312
313    #[test]
314    fn test_abort() {
315        let decoder = make_decoder("abort");
316        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
317        let error = decoder
318            .parse(log_bytes, LogNamespace::Vector)
319            .unwrap_err()
320            .to_string();
321        assert!(error.contains("aborted"));
322    }
323}