1use crate::decoding::format::Deserializer;
2use crate::BytesDeserializerConfig;
3use bytes::Bytes;
4use derivative::Derivative;
5use smallvec::{smallvec, SmallVec};
6use vector_config_macros::configurable_component;
7use vector_core::config::{DataType, LogNamespace};
8use vector_core::event::{Event, TargetEvents, VrlTarget};
9use vector_core::{compile_vrl, schema};
10use vrl::compiler::state::ExternalEnv;
11use vrl::compiler::{runtime::Runtime, CompileConfig, Program, TimeZone, TypeState};
12use vrl::diagnostic::Formatter;
13use vrl::value::Kind;
14
15#[configurable_component]
17#[derive(Debug, Clone, Default)]
18pub struct VrlDeserializerConfig {
19 pub vrl: VrlDeserializerOptions,
21}
22
23#[configurable_component]
25#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
26#[derivative(Default)]
27pub struct VrlDeserializerOptions {
28 pub source: String,
35
36 #[serde(default)]
44 #[configurable(metadata(docs::advanced))]
45 pub timezone: Option<TimeZone>,
46}
47
48impl VrlDeserializerConfig {
49 pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
51 let state = TypeState {
52 local: Default::default(),
53 external: ExternalEnv::default(),
54 };
55
56 match compile_vrl(
57 &self.vrl.source,
58 &vrl::stdlib::all(),
59 &state,
60 CompileConfig::default(),
61 ) {
62 Ok(result) => Ok(VrlDeserializer {
63 program: result.program,
64 timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
65 }),
66 Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
67 .to_string()
68 .into()),
69 }
70 }
71
72 pub fn output_type(&self) -> DataType {
74 DataType::Log
75 }
76
77 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
79 match log_namespace {
80 LogNamespace::Legacy => {
81 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
82 }
83 LogNamespace::Vector => {
84 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
85 }
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct VrlDeserializer {
93 program: Program,
94 timezone: TimeZone,
95}
96
97fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
98 let bytes_deserializer = BytesDeserializerConfig::new().build();
99 let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
100 Event::from(log_event)
101}
102
103impl Deserializer for VrlDeserializer {
104 fn parse(
105 &self,
106 bytes: Bytes,
107 log_namespace: LogNamespace,
108 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
109 let event = parse_bytes(bytes, log_namespace);
110 match self.run_vrl(event, log_namespace) {
111 Ok(events) => Ok(events),
112 Err(e) => Err(e),
113 }
114 }
115}
116
117impl VrlDeserializer {
118 fn run_vrl(
119 &self,
120 event: Event,
121 log_namespace: LogNamespace,
122 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
123 let mut runtime = Runtime::default();
124 let mut target = VrlTarget::new(event, self.program.info(), true);
125 match runtime.resolve(&mut target, &self.program, &self.timezone) {
126 Ok(_) => match target.into_events(log_namespace) {
127 TargetEvents::One(event) => Ok(smallvec![event]),
128 TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
129 TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
130 },
131 Err(e) => Err(e.to_string().into()),
132 }
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139 use chrono::{DateTime, Utc};
140 use indoc::indoc;
141 use vrl::btreemap;
142 use vrl::path::OwnedTargetPath;
143 use vrl::value::Value;
144
145 fn make_decoder(source: &str) -> VrlDeserializer {
146 VrlDeserializerConfig {
147 vrl: VrlDeserializerOptions {
148 source: source.to_string(),
149 timezone: None,
150 },
151 }
152 .build()
153 .expect("Failed to build VrlDeserializer")
154 }
155
156 #[test]
157 fn test_json_message() {
158 let source = indoc!(
159 r#"
160 %m1 = "metadata"
161 . = string!(.)
162 . = parse_json!(.)
163 "#
164 );
165
166 let decoder = make_decoder(source);
167
168 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
169 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
170 assert_eq!(result.len(), 1);
171 let event = result.first().unwrap();
172 assert_eq!(
173 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
174 btreemap! { "message" => "Hello VRL" }.into()
175 );
176 assert_eq!(
177 *event
178 .as_log()
179 .get(&OwnedTargetPath::metadata_root())
180 .unwrap(),
181 btreemap! { "m1" => "metadata" }.into()
182 );
183 }
184
185 #[test]
186 fn test_ignored_returned_expression() {
187 let source = indoc!(
188 r#"
189 . = { "a" : 1 }
190 { "b" : 9 }
191 "#
192 );
193
194 let decoder = make_decoder(source);
195
196 let log_bytes = Bytes::from("some bytes");
197 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
198 assert_eq!(result.len(), 1);
199 let event = result.first().unwrap();
200 assert_eq!(
201 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
202 btreemap! { "a" => 1 }.into()
203 );
204 }
205
206 #[test]
207 fn test_multiple_events() {
208 let source = indoc!(". = [0,1,2]");
209 let decoder = make_decoder(source);
210 let log_bytes = Bytes::from("some bytes");
211 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
212 assert_eq!(result.len(), 3);
213 for (i, event) in result.iter().enumerate() {
214 assert_eq!(
215 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
216 i.into()
217 );
218 }
219 }
220
221 #[test]
222 fn test_syslog_and_cef_input() {
223 let source = indoc!(
224 r#"
225 if exists(.message) {
226 . = string!(.message)
227 }
228 . = parse_syslog(.) ?? parse_cef(.) ?? null
229 "#
230 );
231
232 let decoder = make_decoder(source);
233
234 let syslog_bytes = Bytes::from(
236 "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
237 );
238 let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
239 assert_eq!(result.len(), 1);
240 let syslog_event = result.first().unwrap();
241 assert_eq!(
242 *syslog_event
243 .as_log()
244 .get(&OwnedTargetPath::event_root())
245 .unwrap(),
246 btreemap! {
247 "appname" => "su",
248 "facility" => "auth",
249 "hostname" => "mymachine.example.com",
250 "message" => "'su root' failed for user on /dev/pts/8",
251 "msgid" => "ID47",
252 "severity" => "crit",
253 "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
254 "version" => 1
255 }
256 .into()
257 );
258
259 let cef_bytes = Bytes::from("CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232");
261 let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
262 assert_eq!(result.len(), 1);
263 let cef_event = result.first().unwrap();
264 assert_eq!(
265 *cef_event
266 .as_log()
267 .get(&OwnedTargetPath::event_root())
268 .unwrap(),
269 btreemap! {
270 "cefVersion" =>"0",
271 "deviceEventClassId" =>"100",
272 "deviceProduct" =>"Threat Manager",
273 "deviceVendor" =>"Security",
274 "deviceVersion" =>"1.0",
275 "dst" =>"2.1.2.2",
276 "name" =>"worm successfully stopped",
277 "severity" =>"10",
278 "spt" =>"1232",
279 "src" =>"10.0.0.1"
280 }
281 .into()
282 );
283 let random_bytes = Bytes::from("a|- -| x");
284 let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
285 let random_event = result.first().unwrap();
286 assert_eq!(result.len(), 1);
287 assert_eq!(
288 *random_event
289 .as_log()
290 .get(&OwnedTargetPath::event_root())
291 .unwrap(),
292 Value::Null
293 );
294 }
295
296 #[test]
297 fn test_invalid_source() {
298 let error = VrlDeserializerConfig {
299 vrl: VrlDeserializerOptions {
300 source: ". ?".to_string(),
301 timezone: None,
302 },
303 }
304 .build()
305 .unwrap_err()
306 .to_string();
307 assert!(error.contains("error[E203]: syntax error"));
308 }
309
310 #[test]
311 fn test_abort() {
312 let decoder = make_decoder("abort");
313 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
314 let error = decoder
315 .parse(log_bytes, LogNamespace::Vector)
316 .unwrap_err()
317 .to_string();
318 assert!(error.contains("aborted"));
319 }
320}