1use bytes::Bytes;
2use derivative::Derivative;
3use smallvec::{SmallVec, smallvec};
4use vector_config_macros::configurable_component;
5use vector_core::{
6 compile_vrl,
7 config::{DataType, LogNamespace},
8 event::{Event, TargetEvents, VrlTarget},
9 schema,
10};
11use vrl::{
12 compiler::{CompileConfig, Program, TimeZone, TypeState, runtime::Runtime, state::ExternalEnv},
13 diagnostic::Formatter,
14 value::Kind,
15};
16
17use crate::{BytesDeserializerConfig, decoding::format::Deserializer};
18
19#[configurable_component]
21#[derive(Debug, Clone, Default)]
22pub struct VrlDeserializerConfig {
23 pub vrl: VrlDeserializerOptions,
25}
26
27#[configurable_component]
29#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
30#[derivative(Default)]
31pub struct VrlDeserializerOptions {
32 pub source: String,
39
40 #[serde(default)]
48 #[configurable(metadata(docs::advanced))]
49 pub timezone: Option<TimeZone>,
50}
51
52impl VrlDeserializerConfig {
53 pub fn build(&self) -> vector_common::Result<VrlDeserializer> {
55 let state = TypeState {
56 local: Default::default(),
57 external: ExternalEnv::default(),
58 };
59
60 match compile_vrl(
61 &self.vrl.source,
62 &vrl::stdlib::all(),
63 &state,
64 CompileConfig::default(),
65 ) {
66 Ok(result) => Ok(VrlDeserializer {
67 program: result.program,
68 timezone: self.vrl.timezone.unwrap_or(TimeZone::Local),
69 }),
70 Err(diagnostics) => Err(Formatter::new(&self.vrl.source, diagnostics)
71 .to_string()
72 .into()),
73 }
74 }
75
76 pub fn output_type(&self) -> DataType {
78 DataType::Log
79 }
80
81 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
83 match log_namespace {
84 LogNamespace::Legacy => {
85 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any())
86 }
87 LogNamespace::Vector => {
88 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
89 }
90 }
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct VrlDeserializer {
97 program: Program,
98 timezone: TimeZone,
99}
100
101fn parse_bytes(bytes: Bytes, log_namespace: LogNamespace) -> Event {
102 let bytes_deserializer = BytesDeserializerConfig::new().build();
103 let log_event = bytes_deserializer.parse_single(bytes, log_namespace);
104 Event::from(log_event)
105}
106
107impl Deserializer for VrlDeserializer {
108 fn parse(
109 &self,
110 bytes: Bytes,
111 log_namespace: LogNamespace,
112 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
113 let event = parse_bytes(bytes, log_namespace);
114 match self.run_vrl(event, log_namespace) {
115 Ok(events) => Ok(events),
116 Err(e) => Err(e),
117 }
118 }
119}
120
121impl VrlDeserializer {
122 fn run_vrl(
123 &self,
124 event: Event,
125 log_namespace: LogNamespace,
126 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
127 let mut runtime = Runtime::default();
128 let mut target = VrlTarget::new(event, self.program.info(), true);
129 match runtime.resolve(&mut target, &self.program, &self.timezone) {
130 Ok(_) => match target.into_events(log_namespace) {
131 TargetEvents::One(event) => Ok(smallvec![event]),
132 TargetEvents::Logs(events_iter) => Ok(SmallVec::from_iter(events_iter)),
133 TargetEvents::Traces(_) => Err("trace targets are not supported".into()),
134 },
135 Err(e) => Err(e.to_string().into()),
136 }
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use chrono::{DateTime, Utc};
143 use indoc::indoc;
144 use vrl::{btreemap, path::OwnedTargetPath, value::Value};
145
146 use super::*;
147
148 fn make_decoder(source: &str) -> VrlDeserializer {
149 VrlDeserializerConfig {
150 vrl: VrlDeserializerOptions {
151 source: source.to_string(),
152 timezone: None,
153 },
154 }
155 .build()
156 .expect("Failed to build VrlDeserializer")
157 }
158
159 #[test]
160 fn test_json_message() {
161 let source = indoc!(
162 r#"
163 %m1 = "metadata"
164 . = string!(.)
165 . = parse_json!(.)
166 "#
167 );
168
169 let decoder = make_decoder(source);
170
171 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
172 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
173 assert_eq!(result.len(), 1);
174 let event = result.first().unwrap();
175 assert_eq!(
176 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
177 btreemap! { "message" => "Hello VRL" }.into()
178 );
179 assert_eq!(
180 *event
181 .as_log()
182 .get(&OwnedTargetPath::metadata_root())
183 .unwrap(),
184 btreemap! { "m1" => "metadata" }.into()
185 );
186 }
187
188 #[test]
189 fn test_ignored_returned_expression() {
190 let source = indoc!(
191 r#"
192 . = { "a" : 1 }
193 { "b" : 9 }
194 "#
195 );
196
197 let decoder = make_decoder(source);
198
199 let log_bytes = Bytes::from("some bytes");
200 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
201 assert_eq!(result.len(), 1);
202 let event = result.first().unwrap();
203 assert_eq!(
204 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
205 btreemap! { "a" => 1 }.into()
206 );
207 }
208
209 #[test]
210 fn test_multiple_events() {
211 let source = indoc!(". = [0,1,2]");
212 let decoder = make_decoder(source);
213 let log_bytes = Bytes::from("some bytes");
214 let result = decoder.parse(log_bytes, LogNamespace::Vector).unwrap();
215 assert_eq!(result.len(), 3);
216 for (i, event) in result.iter().enumerate() {
217 assert_eq!(
218 *event.as_log().get(&OwnedTargetPath::event_root()).unwrap(),
219 i.into()
220 );
221 }
222 }
223
224 #[test]
225 fn test_syslog_and_cef_input() {
226 let source = indoc!(
227 r#"
228 if exists(.message) {
229 . = string!(.message)
230 }
231 . = parse_syslog(.) ?? parse_cef(.) ?? null
232 "#
233 );
234
235 let decoder = make_decoder(source);
236
237 let syslog_bytes = Bytes::from(
239 "<34>1 2024-02-06T15:04:05.000Z mymachine.example.com su - ID47 - 'su root' failed for user on /dev/pts/8",
240 );
241 let result = decoder.parse(syslog_bytes, LogNamespace::Vector).unwrap();
242 assert_eq!(result.len(), 1);
243 let syslog_event = result.first().unwrap();
244 assert_eq!(
245 *syslog_event
246 .as_log()
247 .get(&OwnedTargetPath::event_root())
248 .unwrap(),
249 btreemap! {
250 "appname" => "su",
251 "facility" => "auth",
252 "hostname" => "mymachine.example.com",
253 "message" => "'su root' failed for user on /dev/pts/8",
254 "msgid" => "ID47",
255 "severity" => "crit",
256 "timestamp" => "2024-02-06T15:04:05Z".parse::<DateTime<Utc>>().unwrap(),
257 "version" => 1
258 }
259 .into()
260 );
261
262 let cef_bytes = Bytes::from(
264 "CEF:0|Security|Threat Manager|1.0|100|worm successfully stopped|10|src=10.0.0.1 dst=2.1.2.2 spt=1232",
265 );
266 let result = decoder.parse(cef_bytes, LogNamespace::Vector).unwrap();
267 assert_eq!(result.len(), 1);
268 let cef_event = result.first().unwrap();
269 assert_eq!(
270 *cef_event
271 .as_log()
272 .get(&OwnedTargetPath::event_root())
273 .unwrap(),
274 btreemap! {
275 "cefVersion" =>"0",
276 "deviceEventClassId" =>"100",
277 "deviceProduct" =>"Threat Manager",
278 "deviceVendor" =>"Security",
279 "deviceVersion" =>"1.0",
280 "dst" =>"2.1.2.2",
281 "name" =>"worm successfully stopped",
282 "severity" =>"10",
283 "spt" =>"1232",
284 "src" =>"10.0.0.1"
285 }
286 .into()
287 );
288 let random_bytes = Bytes::from("a|- -| x");
289 let result = decoder.parse(random_bytes, LogNamespace::Vector).unwrap();
290 let random_event = result.first().unwrap();
291 assert_eq!(result.len(), 1);
292 assert_eq!(
293 *random_event
294 .as_log()
295 .get(&OwnedTargetPath::event_root())
296 .unwrap(),
297 Value::Null
298 );
299 }
300
301 #[test]
302 fn test_invalid_source() {
303 let error = VrlDeserializerConfig {
304 vrl: VrlDeserializerOptions {
305 source: ". ?".to_string(),
306 timezone: None,
307 },
308 }
309 .build()
310 .unwrap_err()
311 .to_string();
312 assert!(error.contains("error[E203]: syntax error"));
313 }
314
315 #[test]
316 fn test_abort() {
317 let decoder = make_decoder("abort");
318 let log_bytes = Bytes::from(r#"{ "message": "Hello VRL" }"#);
319 let error = decoder
320 .parse(log_bytes, LogNamespace::Vector)
321 .unwrap_err()
322 .to_string();
323 assert!(error.contains("aborted"));
324 }
325}