codecs/decoding/format/
vrl.rs

1use bytes::Bytes;
2use derivative::Derivative;
3use smallvec::{SmallVec, smallvec};
4use vector_config_macros::configurable_component;
5use vector_core::{
6    compile_vrl,
7    config::{DataType, LogNamespace},
8    event::{Event, TargetEvents, VrlTarget},
9    schema,
10};
11use vrl::{
12    compiler::{CompileConfig, Program, TimeZone, TypeState, runtime::Runtime, state::ExternalEnv},
13    diagnostic::Formatter,
14    value::Kind,
15};
16
17use crate::{BytesDeserializerConfig, decoding::format::Deserializer};
18
19/// Config used to build a `VrlDeserializer`.
20#[configurable_component]
21#[derive(Debug, Clone, Default)]
22pub struct VrlDeserializerConfig {
23    /// VRL-specific decoding options.
24    pub vrl: VrlDeserializerOptions,
25}
26
27/// VRL-specific decoding options.
28#[configurable_component]
29#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
30#[derivative(Default)]
31pub struct VrlDeserializerOptions {
32    /// The [Vector Remap Language][vrl] (VRL) program to execute for each event.
33    /// Note that the final contents of the `.` target will be used as the decoding result.
34    /// Compilation error or use of 'abort' in a program will result in a decoding error.
35    ///
36    ///
37    /// [vrl]: https://vector.dev/docs/reference/vrl
38    pub source: String,
39
40    /// The name of the timezone to apply to timestamp conversions that do not contain an explicit
41    /// time zone. The time zone name may be any name in the [TZ database][tz_database], or `local`
42    /// to indicate system local time.
43    ///
44    /// If not set, `local` is used.
45    ///
46    /// [tz_database]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
47    #[serde(default)]
48    #[configurable(metadata(docs::advanced))]
49    pub timezone: Option<TimeZone>,
50}
51
52impl VrlDeserializerConfig {
53    /// Build the `VrlDeserializer` from this configuration.
54    pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
55        let state = TypeState {
56            local: Default::default(),
57            external: ExternalEnv::default(),
58        };
59
60        match compile_vrl(
61            &self.vrl.source,
62            &vrl::stdlib::all(),
63            &state,
64            CompileConfig::default(),
65        ) {
66            Ok(result) => Ok(VrlDeserializer {
67                program: result.program,
68                timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
69            }),
70            Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
71                .to_string()
72                .into()),
73        }
74    }
75
76    /// Return the type of event build by this deserializer.
77    pub fn output_type(&self) -> DataType {
78        DataType::Log
79    }
80
81    /// The schema produced by the deserializer.
82    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
83        match log_namespace {
84            LogNamespace::Legacy => {
85                schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
86            }
87            LogNamespace::Vector => {
88                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
89            }
90        }
91    }
92}
93
94/// Deserializer that builds `Event`s from a byte frame containing logs compatible with VRL.
95#[derive(Debug, Clone)]
96pub struct VrlDeserializer {
97    program: Program,
98    timezone: TimeZone,
99}
100
101fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
102    let bytes_deserializer = BytesDeserializerConfig::new().build();
103    let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
104    Event::from(log_event)
105}
106
107impl Deserializer for VrlDeserializer {
108    fn parse(
109        &self,
110        bytes: Bytes,
111        log_namespace: LogNamespace,
112    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
113        let event = parse_bytes(bytes, log_namespace);
114        match self.run_vrl(event, log_namespace) {
115            Ok(events) => Ok(events),
116            Err(e) => Err(e),
117        }
118    }
119}
120
121impl VrlDeserializer {
122    fn run_vrl(
123        &self,
124        event: Event,
125        log_namespace: LogNamespace,
126    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
127        let mut runtime = Runtime::default();
128        let mut target = VrlTarget::new(event, self.program.info(), true);
129        match runtime.resolve(&mut target, &self.program, &self.timezone) {
130            Ok(_) => match target.into_events(log_namespace) {
131                TargetEvents::One(event) => Ok(smallvec![event]),
132                TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
133                TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
134            },
135            Err(e) => Err(e.to_string().into()),
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use chrono::{DateTime, Utc};
143    use indoc::indoc;
144    use vrl::{btreemap, path::OwnedTargetPath, value::Value};
145
146    use super::*;
147
148    fn make_decoder(source: &str) -> VrlDeserializer {
149        VrlDeserializerConfig {
150            vrl: VrlDeserializerOptions {
151                source: source.to_string(),
152                timezone: None,
153            },
154        }
155        .build()
156        .expect("Failed to build VrlDeserializer")
157    }
158
159    #[test]
160    fn test_json_message() {
161        let source = indoc!(
162            r#"
163            %m1 = "metadata"
164            . = string!(.)
165            . = parse_json!(.)
166            "#
167        );
168
169        let decoder = make_decoder(source);
170
171        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
172        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
173        assert_eq!(result.len(), 1);
174        let event = result.first().unwrap();
175        assert_eq!(
176            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
177            btreemap! { "message" => "Hello VRL" }.into()
178        );
179        assert_eq!(
180            *event
181                .as_log()
182                .get(&OwnedTargetPath::metadata_root())
183                .unwrap(),
184            btreemap! { "m1" => "metadata" }.into()
185        );
186    }
187
188    #[test]
189    fn test_ignored_returned_expression() {
190        let source = indoc!(
191            r#"
192            . = { "a" : 1 }
193            { "b" : 9 }
194        "#
195        );
196
197        let decoder = make_decoder(source);
198
199        let log_bytes = Bytes::from("some bytes");
200        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
201        assert_eq!(result.len(), 1);
202        let event = result.first().unwrap();
203        assert_eq!(
204            *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
205            btreemap! { "a" => 1 }.into()
206        );
207    }
208
209    #[test]
210    fn test_multiple_events() {
211        let source = indoc!(". = [0,1,2]");
212        let decoder = make_decoder(source);
213        let log_bytes = Bytes::from("some bytes");
214        let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
215        assert_eq!(result.len(), 3);
216        for (i, event) in result.iter().enumerate() {
217            assert_eq!(
218                *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
219                i.into()
220            );
221        }
222    }
223
224    #[test]
225    fn test_syslog_and_cef_input() {
226        let source = indoc!(
227            r#"
228            if exists(.message) {
229                . = string!(.message)
230            }
231            . = parse_syslog(.) ?? parse_cef(.) ?? null
232            "#
233        );
234
235        let decoder = make_decoder(source);
236
237        // Syslog input
238        let syslog_bytes = Bytes::from(
239            "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
240        );
241        let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
242        assert_eq!(result.len(), 1);
243        let syslog_event = result.first().unwrap();
244        assert_eq!(
245            *syslog_event
246                .as_log()
247                .get(&OwnedTargetPath::event_root())
248                .unwrap(),
249            btreemap! {
250                "appname" => "su",
251                "facility" => "auth",
252                "hostname" => "mymachine.example.com",
253                "message" => "'su root' failed for user on /dev/pts/8",
254                "msgid" => "ID47",
255                "severity" => "crit",
256                "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
257                "version" => 1
258            }
259            .into()
260        );
261
262        // CEF input
263        let cef_bytes = Bytes::from(
264            "CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232",
265        );
266        let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
267        assert_eq!(result.len(), 1);
268        let cef_event = result.first().unwrap();
269        assert_eq!(
270            *cef_event
271                .as_log()
272                .get(&OwnedTargetPath::event_root())
273                .unwrap(),
274            btreemap! {
275                "cefVersion" =>"0",
276                "deviceEventClassId" =>"100",
277                "deviceProduct" =>"Threat Manager",
278                "deviceVendor" =>"Security",
279                "deviceVersion" =>"1.0",
280                "dst" =>"2.1.2.2",
281                "name" =>"worm successfully stopped",
282                "severity" =>"10",
283                "spt" =>"1232",
284                "src" =>"10.0.0.1"
285            }
286            .into()
287        );
288        let random_bytes = Bytes::from("a|- -| x");
289        let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
290        let random_event = result.first().unwrap();
291        assert_eq!(result.len(), 1);
292        assert_eq!(
293            *random_event
294                .as_log()
295                .get(&OwnedTargetPath::event_root())
296                .unwrap(),
297            Value::Null
298        );
299    }
300
301    #[test]
302    fn test_invalid_source() {
303        let error = VrlDeserializerConfig {
304            vrl: VrlDeserializerOptions {
305                source: ". ?".to_string(),
306                timezone: None,
307            },
308        }
309        .build()
310        .unwrap_err()
311        .to_string();
312        assert!(error.contains("error[E203]: syntax error"));
313    }
314
315    #[test]
316    fn test_abort() {
317        let decoder = make_decoder("abort");
318        let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
319        let error = decoder
320            .parse(log_bytes, LogNamespace::Vector)
321            .unwrap_err()
322            .to_string();
323        assert!(error.contains("aborted"));
324    }
325}