1use bytes::Bytes;
2use smallvec::{SmallVec, smallvec};
3use vector_config_macros::configurable_component;
4use vector_core::{
5 compile_vrl,
6 config::{DataType, LogNamespace},
7 event::{Event, TargetEvents, VrlTarget},
8 schema,
9};
10use vrl::{
11 compiler::{CompileConfig, Program, TimeZone, TypeState, runtime::Runtime, state::ExternalEnv},
12 diagnostic::Formatter,
13 value::Kind,
14};
15
16use crate::{BytesDeserializerConfig, decoding::format::Deserializer};
17
18#[configurable_component]
20#[derive(Debug, Clone, Default)]
21pub struct VrlDeserializerConfig {
22 pub vrl: VrlDeserializerOptions,
24}
25
26#[configurable_component]
28#[derive(Debug, Clone, PartialEq, Eq, Default)]
29pub struct VrlDeserializerOptions {
30 pub source: String,
37
38 #[serde(default)]
46 #[configurable(metadata(docs::advanced))]
47 pub timezone: Option<TimeZone>,
48}
49
50impl VrlDeserializerConfig {
51 pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
53 let state = TypeState {
54 local: Default::default(),
55 external: ExternalEnv::default(),
56 };
57
58 match compile_vrl(
59 &self.vrl.source,
60 &vector_vrl_functions::all(),
61 &state,
62 CompileConfig::default(),
63 ) {
64 Ok(result) => Ok(VrlDeserializer {
65 program: result.program,
66 timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
67 }),
68 Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
69 .to_string()
70 .into()),
71 }
72 }
73
74 pub fn output_type(&self) -> DataType {
76 DataType::Log
77 }
78
79 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
81 match log_namespace {
82 LogNamespace::Legacy => {
83 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
84 }
85 LogNamespace::Vector => {
86 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
87 }
88 }
89 }
90}
91
92#[derive(Debug, Clone)]
94pub struct VrlDeserializer {
95 program: Program,
96 timezone: TimeZone,
97}
98
99fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
100 let bytes_deserializer = BytesDeserializerConfig::new().build();
101 let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
102 Event::from(log_event)
103}
104
105impl Deserializer for VrlDeserializer {
106 fn parse(
107 &self,
108 bytes: Bytes,
109 log_namespace: LogNamespace,
110 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
111 let event = parse_bytes(bytes, log_namespace);
112 match self.run_vrl(event, log_namespace) {
113 Ok(events) => Ok(events),
114 Err(e) => Err(e),
115 }
116 }
117}
118
119impl VrlDeserializer {
120 fn run_vrl(
121 &self,
122 event: Event,
123 log_namespace: LogNamespace,
124 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
125 let mut runtime = Runtime::default();
126 let mut target = VrlTarget::new(event, self.program.info(), true);
127 match runtime.resolve(&mut target, &self.program, &self.timezone) {
128 Ok(_) => match target.into_events(log_namespace) {
129 TargetEvents::One(event) => Ok(smallvec![event]),
130 TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
131 TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
132 },
133 Err(e) => Err(e.to_string().into()),
134 }
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use chrono::{DateTime, Utc};
141 use indoc::indoc;
142 use vrl::{btreemap, path::OwnedTargetPath, value::Value};
143
144 use super::*;
145
146 fn make_decoder(source: &str) -> VrlDeserializer {
147 VrlDeserializerConfig {
148 vrl: VrlDeserializerOptions {
149 source: source.to_string(),
150 timezone: None,
151 },
152 }
153 .build()
154 .expect("Failed to build VrlDeserializer")
155 }
156
157 #[test]
158 fn test_json_message() {
159 let source = indoc!(
160 r#"
161 %m1 = "metadata"
162 . = string!(.)
163 . = parse_json!(.)
164 "#
165 );
166
167 let decoder = make_decoder(source);
168
169 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
170 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
171 assert_eq!(result.len(), 1);
172 let event = result.first().unwrap();
173 assert_eq!(
174 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
175 btreemap! { "message" => "Hello VRL" }.into()
176 );
177 assert_eq!(
178 *event
179 .as_log()
180 .get(&OwnedTargetPath::metadata_root())
181 .unwrap(),
182 btreemap! { "m1" => "metadata" }.into()
183 );
184 }
185
186 #[test]
187 fn test_ignored_returned_expression() {
188 let source = indoc!(
189 r#"
190 . = { "a" : 1 }
191 { "b" : 9 }
192 "#
193 );
194
195 let decoder = make_decoder(source);
196
197 let log_bytes = Bytes::from("some bytes");
198 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
199 assert_eq!(result.len(), 1);
200 let event = result.first().unwrap();
201 assert_eq!(
202 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
203 btreemap! { "a" => 1 }.into()
204 );
205 }
206
207 #[test]
208 fn test_multiple_events() {
209 let source = indoc!(". = [0,1,2]");
210 let decoder = make_decoder(source);
211 let log_bytes = Bytes::from("some bytes");
212 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
213 assert_eq!(result.len(), 3);
214 for (i, event) in result.iter().enumerate() {
215 assert_eq!(
216 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
217 i.into()
218 );
219 }
220 }
221
222 #[test]
223 fn test_syslog_and_cef_input() {
224 let source = indoc!(
225 r#"
226 if exists(.message) {
227 . = string!(.message)
228 }
229 . = parse_syslog(.) ?? parse_cef(.) ?? null
230 "#
231 );
232
233 let decoder = make_decoder(source);
234
235 let syslog_bytes = Bytes::from(
237 "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
238 );
239 let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
240 assert_eq!(result.len(), 1);
241 let syslog_event = result.first().unwrap();
242 assert_eq!(
243 *syslog_event
244 .as_log()
245 .get(&OwnedTargetPath::event_root())
246 .unwrap(),
247 btreemap! {
248 "appname" => "su",
249 "facility" => "auth",
250 "hostname" => "mymachine.example.com",
251 "message" => "'su root' failed for user on /dev/pts/8",
252 "msgid" => "ID47",
253 "severity" => "crit",
254 "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
255 "version" => 1
256 }
257 .into()
258 );
259
260 let cef_bytes = Bytes::from(
262 "CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232",
263 );
264 let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
265 assert_eq!(result.len(), 1);
266 let cef_event = result.first().unwrap();
267 assert_eq!(
268 *cef_event
269 .as_log()
270 .get(&OwnedTargetPath::event_root())
271 .unwrap(),
272 btreemap! {
273 "cefVersion" =>"0",
274 "deviceEventClassId" =>"100",
275 "deviceProduct" =>"Threat Manager",
276 "deviceVendor" =>"Security",
277 "deviceVersion" =>"1.0",
278 "dst" =>"2.1.2.2",
279 "name" =>"worm successfully stopped",
280 "severity" =>"10",
281 "spt" =>"1232",
282 "src" =>"10.0.0.1"
283 }
284 .into()
285 );
286 let random_bytes = Bytes::from("a|- -| x");
287 let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
288 let random_event = result.first().unwrap();
289 assert_eq!(result.len(), 1);
290 assert_eq!(
291 *random_event
292 .as_log()
293 .get(&OwnedTargetPath::event_root())
294 .unwrap(),
295 Value::Null
296 );
297 }
298
299 #[test]
300 fn test_invalid_source() {
301 let error = VrlDeserializerConfig {
302 vrl: VrlDeserializerOptions {
303 source: ". ?".to_string(),
304 timezone: None,
305 },
306 }
307 .build()
308 .unwrap_err()
309 .to_string();
310 assert!(error.contains("error[E203]: syntax error"));
311 }
312
313 #[test]
314 fn test_abort() {
315 let decoder = make_decoder("abort");
316 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
317 let error = decoder
318 .parse(log_bytes, LogNamespace::Vector)
319 .unwrap_err()
320 .to_string();
321 assert!(error.contains("aborted"));
322 }
323}