codecs/decoding/format/
vrl.rs

1use crate::decoding::format::Deserializer;
2use crate::BytesDeserializerConfig;
3use bytes::Bytes;
4use derivative::Derivative;
5use smallvec::{smallvec, SmallVec};
6use vector_config_macros::configurable_component;
7use vector_core::config::{DataType, LogNamespace};
8use vector_core::event::{Event, TargetEvents, VrlTarget};
9use vector_core::{compile_vrl, schema};
10use vrl::compiler::state::ExternalEnv;
11use vrl::compiler::{runtime::Runtime, CompileConfig, Program, TimeZone, TypeState};
12use vrl::diagnostic::Formatter;
13use vrl::value::Kind;
14
15/// Config used to build a `VrlDeserializer`.
16#[configurable_component]
17#[derive(Debug, Clone, Default)]
18pub struct VrlDeserializerConfig {
19    /// VRL-specific decoding options.
20    pub vrl: VrlDeserializerOptions,
21}
22
23/// VRL-specific decoding options.
24#[configurable_component]
25#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
26#[derivative(Default)]
27pub struct VrlDeserializerOptions {
28    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
29    /// Note that the final contents of the `.` target will be used as the decoding result.
30    /// Compilation error or use of 'abort' in a program will result in a decoding error.
31    ///
32    ///
33    /// [vrl]: https://vector.dev/docs/reference/vrl
34    pub source: String,
35
36    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
37    /// time zone. The time zone name may be any name in the [TZ database][tz_database], or `local`
38    /// to indicate system local time.
39    ///
40    /// If not set, `local` is used.
41    ///
42    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
43    #[serde(default)]
44    #[configurable(metadata(docs::advanced))]
45    pub timezone: Option<TimeZone>,
46}
47
48impl VrlDeserializerConfig {
49    /// Build the `VrlDeserializer` from this configuration.
50    pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
51        let state = TypeState {
52            local: Default::default(),
53            external: ExternalEnv::default(),
54        };
55
56        match compile_vrl(
57            &self.vrl.source,
58            &vrl::stdlib::all(),
59            &state,
60            CompileConfig::default(),
61        ) {
62            Ok(result) => Ok(VrlDeserializer {
63                program: result.program,
64                timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
65            }),
66            Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
67                .to_string()
68                .into()),
69        }
70    }
71
72    /// Return the type of event build by this deserializer.
73    pub fn output_type(&self) -> DataType {
74        DataType::Log
75    }
76
77    /// The schema produced by the deserializer.
78    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
79        match log_namespace {
80            LogNamespace::Legacy => {
81                schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
82            }
83            LogNamespace::Vector => {
84                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
85            }
86        }
87    }
88}
89
90/// Deserializer that builds `Event`s from a byte frame containing logs compatible with VRL.
91#[derive(Debug, Clone)]
92pub struct VrlDeserializer {
93    program: Program,
94    timezone: TimeZone,
95}
96
97fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
98    let bytes_deserializer = BytesDeserializerConfig::new().build();
99    let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
100    Event::from(log_event)
101}
102
103impl Deserializer for VrlDeserializer {
104    fn parse(
105        &self,
106        bytes: Bytes,
107        log_namespace: LogNamespace,
108    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
109        let event = parse_bytes(bytes, log_namespace);
110        match self.run_vrl(event, log_namespace) {
111            Ok(events) => Ok(events),
112            Err(e) => Err(e),
113        }
114    }
115}
116
117impl VrlDeserializer {
118    fn run_vrl(
119        &self,
120        event: Event,
121        log_namespace: LogNamespace,
122    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
123        let mut runtime = Runtime::default();
124        let mut target = VrlTarget::new(event, self.program.info(), true);
125        match runtime.resolve(&mut target, &self.program, &self.timezone) {
126            Ok(_) => match target.into_events(log_namespace) {
127                TargetEvents::One(event) => Ok(smallvec![event]),
128                TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
129                TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
130            },
131            Err(e) => Err(e.to_string().into()),
132        }
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139    use chrono::{DateTime, Utc};
140    use indoc::indoc;
141    use vrl::btreemap;
142    use vrl::path::OwnedTargetPath;
143    use vrl::value::Value;
144
145    fn make_decoder(source: &str) -> VrlDeserializer {
146        VrlDeserializerConfig {
147            vrl: VrlDeserializerOptions {
148                source: source.to_string(),
149                timezone: None,
150            },
151        }
152        .build()
153        .expect("Failed to build VrlDeserializer")
154    }
155
156    #[test]
157    fn test_json_message() {
158        let source = indoc!(
159            r#"
160            %m1 = "metadata"
161            . = string!(.)
162            . = parse_json!(.)
163            "#
164        );
165
166        let decoder = make_decoder(source);
167
168        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
169        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
170        assert_eq!(result.len(), 1);
171        let event = result.first().unwrap();
172        assert_eq!(
173            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
174            btreemap! { "message" => "Hello VRL" }.into()
175        );
176        assert_eq!(
177            *event
178                .as_log()
179                .get(&OwnedTargetPath::metadata_root())
180                .unwrap(),
181            btreemap! { "m1" => "metadata" }.into()
182        );
183    }
184
185    #[test]
186    fn test_ignored_returned_expression() {
187        let source = indoc!(
188            r#"
189            . = { "a" : 1 }
190            { "b" : 9 }
191        "#
192        );
193
194        let decoder = make_decoder(source);
195
196        let log_bytes = Bytes::from("some bytes");
197        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
198        assert_eq!(result.len(), 1);
199        let event = result.first().unwrap();
200        assert_eq!(
201            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
202            btreemap! { "a" => 1 }.into()
203        );
204    }
205
206    #[test]
207    fn test_multiple_events() {
208        let source = indoc!(". = [0,1,2]");
209        let decoder = make_decoder(source);
210        let log_bytes = Bytes::from("some bytes");
211        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
212        assert_eq!(result.len(), 3);
213        for (i, event) in result.iter().enumerate() {
214            assert_eq!(
215                *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
216                i.into()
217            );
218        }
219    }
220
221    #[test]
222    fn test_syslog_and_cef_input() {
223        let source = indoc!(
224            r#"
225            if exists(.message) {
226                . = string!(.message)
227            }
228            . = parse_syslog(.) ?? parse_cef(.) ?? null
229            "#
230        );
231
232        let decoder = make_decoder(source);
233
234        // Syslog input
235        let syslog_bytes = Bytes::from(
236            "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
237        );
238        let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
239        assert_eq!(result.len(), 1);
240        let syslog_event = result.first().unwrap();
241        assert_eq!(
242            *syslog_event
243                .as_log()
244                .get(&OwnedTargetPath::event_root())
245                .unwrap(),
246            btreemap! {
247                "appname" => "su",
248                "facility" => "auth",
249                "hostname" => "mymachine.example.com",
250                "message" => "'su root' failed for user on /dev/pts/8",
251                "msgid" => "ID47",
252                "severity" => "crit",
253                "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
254                "version" => 1
255            }
256            .into()
257        );
258
259        // CEF input
260        let cef_bytes = Bytes::from("CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232");
261        let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
262        assert_eq!(result.len(), 1);
263        let cef_event = result.first().unwrap();
264        assert_eq!(
265            *cef_event
266                .as_log()
267                .get(&OwnedTargetPath::event_root())
268                .unwrap(),
269            btreemap! {
270                "cefVersion" =>"0",
271                "deviceEventClassId" =>"100",
272                "deviceProduct" =>"Threat Manager",
273                "deviceVendor" =>"Security",
274                "deviceVersion" =>"1.0",
275                "dst" =>"2.1.2.2",
276                "name" =>"worm successfully stopped",
277                "severity" =>"10",
278                "spt" =>"1232",
279                "src" =>"10.0.0.1"
280            }
281            .into()
282        );
283        let random_bytes = Bytes::from("a|- -| x");
284        let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
285        let random_event = result.first().unwrap();
286        assert_eq!(result.len(), 1);
287        assert_eq!(
288            *random_event
289                .as_log()
290                .get(&OwnedTargetPath::event_root())
291                .unwrap(),
292            Value::Null
293        );
294    }
295
296    #[test]
297    fn test_invalid_source() {
298        let error = VrlDeserializerConfig {
299            vrl: VrlDeserializerOptions {
300                source: ". ?".to_string(),
301                timezone: None,
302            },
303        }
304        .build()
305        .unwrap_err()
306        .to_string();
307        assert!(error.contains("error[E203]: syntax error"));
308    }
309
310    #[test]
311    fn test_abort() {
312        let decoder = make_decoder("abort");
313        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
314        let error = decoder
315            .parse(log_bytes, LogNamespace::Vector)
316            .unwrap_err()
317            .to_string();
318        assert!(error.contains("aborted"));
319    }
320}