1use vector_lib::{TimeZone, compile_vrl, configurable::configurable_component, emit};
2use vector_vrl_metrics::MetricsStorage;
3use vrl::{
4 compiler::{
5 CompilationResult, CompileConfig, Program, TypeState, VrlRuntime,
6 runtime::{Runtime, RuntimeResult, Terminate},
7 },
8 value::Value,
9};
10
11use crate::{
12 conditions::{Condition, Conditional, ConditionalConfig},
13 config::LogNamespace,
14 event::{Event, TargetEvents, VrlTarget},
15 format_vrl_diagnostics,
16 internal_events::VrlConditionExecutionError,
17};
18
19#[configurable_component]
21#[derive(Clone, Debug, Default, PartialEq, Eq)]
22pub struct VrlConfig {
23 pub(crate) source: String,
25
26 #[configurable(derived, metadata(docs::hidden))]
27 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
28 pub(crate) runtime: VrlRuntime,
29}
30
31impl_generate_config_from_default!(VrlConfig);
32
33impl ConditionalConfig for VrlConfig {
34 fn build(
35 &self,
36 enrichment_tables: &vector_lib::enrichment::TableRegistry,
37 metrics_storage: &MetricsStorage,
38 ) -> crate::Result<Condition> {
39 let functions = vector_vrl_functions::all();
50
51 let state = TypeState::default();
52
53 let mut config = CompileConfig::default();
54 config.set_custom(enrichment_tables.clone());
55 config.set_custom(metrics_storage.clone());
56 config.set_read_only();
57
58 let CompilationResult {
59 program,
60 warnings,
61 config: _,
62 } = compile_vrl(&self.source, &functions, &state, config)
63 .map_err(|diagnostics| format_vrl_diagnostics(&self.source, diagnostics))?;
64
65 if !program.final_type_info().result.is_boolean() {
66 return Err("VRL conditions must return a boolean.".into());
67 }
68
69 if !warnings.is_empty() {
70 let warnings = format_vrl_diagnostics(&self.source, warnings);
71 warn!(message = "VRL compilation warning.", %warnings);
72 }
73
74 match self.runtime {
75 VrlRuntime::Ast => Ok(Condition::Vrl(Vrl {
76 program,
77 source: self.source.clone(),
78 })),
79 }
80 }
81}
82
83#[derive(Debug, Clone)]
84pub struct Vrl {
85 pub(super) program: Program,
86 pub(super) source: String,
87}
88
89impl Vrl {
90 fn run(&self, event: Event) -> (Event, RuntimeResult) {
91 let log_namespace = event
92 .maybe_as_log()
93 .map(|log| log.namespace())
94 .unwrap_or(LogNamespace::Legacy);
95 let mut target = VrlTarget::new(event, self.program.info(), false);
96 let timezone = TimeZone::default();
98
99 let result = Runtime::default().resolve(&mut target, &self.program, &timezone);
100 let original_event = match target.into_events(log_namespace) {
101 TargetEvents::One(event) => event,
102 _ => panic!("Event was modified in a condition. This is an internal compiler error."),
103 };
104 (original_event, result)
105 }
106}
107
108impl Conditional for Vrl {
109 fn check(&self, event: Event) -> (bool, Event) {
110 let (event, result) = self.run(event);
111
112 let result = result
113 .map(|value| match value {
114 Value::Boolean(boolean) => boolean,
115 _ => panic!("VRL condition did not return a boolean type"),
116 })
117 .unwrap_or_else(|err| {
118 emit!(VrlConditionExecutionError {
119 error: err.to_string().as_ref()
120 });
121 false
122 });
123 (result, event)
124 }
125
126 fn check_with_context(&self, event: Event) -> (Result<(), String>, Event) {
127 let (event, result) = self.run(event);
128
129 let value_result = result.map_err(|err| match err {
130 Terminate::Abort(err) => {
131 let err = format_vrl_diagnostics(
132 &self.source,
133 vrl::diagnostic::Diagnostic::from(
134 Box::new(err) as Box<dyn vrl::diagnostic::DiagnosticMessage>
135 ),
136 );
137 format!("source execution aborted: {err}")
138 }
139 Terminate::Error(err) => {
140 let err = format_vrl_diagnostics(
141 &self.source,
142 vrl::diagnostic::Diagnostic::from(
143 Box::new(err) as Box<dyn vrl::diagnostic::DiagnosticMessage>
144 ),
145 );
146 format!("source execution failed: {err}")
147 }
148 });
149
150 let value = match value_result {
151 Ok(value) => value,
152 Err(err) => {
153 return (Err(err), event);
154 }
155 };
156
157 let result = match value {
158 Value::Boolean(v) if v => Ok(()),
159 Value::Boolean(v) if !v => Err("source execution resolved to false".into()),
160 _ => Err("source execution resolved to a non-boolean value".into()),
161 };
162 (result, event)
163 }
164}
165
166#[cfg(test)]
167mod test {
168 use vector_lib::metric_tags;
169
170 use super::*;
171 use crate::{
172 event::{Metric, MetricKind, MetricValue},
173 log_event,
174 };
175
176 #[test]
177 fn generate_config() {
178 crate::test_util::test_generate_config::<VrlConfig>();
179 }
180
181 #[test]
182 fn check_vrl() {
183 let checks = vec![
184 (
185 log_event![], "true == true", Ok(()), Ok(()), ),
190 (
191 log_event!["foo" => true, "bar" => false],
192 "to_bool(.bar || .foo) ?? false",
193 Ok(()),
194 Ok(()),
195 ),
196 (
197 log_event![],
198 "true == false",
199 Ok(()),
200 Err("source execution resolved to false"),
201 ),
202 (
222 Event::Metric(
223 Metric::new(
224 "zork",
225 MetricKind::Incremental,
226 MetricValue::Counter { value: 1.0 },
227 )
228 .with_namespace(Some("zerk"))
229 .with_tags(Some(metric_tags!("host" => "zoobub"))),
230 ),
231 r#".name == "zork" && .tags.host == "zoobub" && .kind == "incremental""#,
232 Ok(()),
233 Ok(()),
234 ),
235 (
236 log_event![],
237 r#""i_return_a_string""#,
238 Err("VRL conditions must return a boolean.".into()),
239 Ok(()),
240 ),
241 ];
242
243 for (event, source, build, check) in checks {
244 let source = source.to_owned();
245 let config = VrlConfig {
246 source,
247 runtime: Default::default(),
248 };
249
250 assert_eq!(
251 config
252 .build(&Default::default(), &Default::default())
253 .map(|_| ())
254 .map_err(|e| e.to_string()),
255 build
256 );
257
258 if let Ok(cond) = config.build(&Default::default(), &Default::default()) {
259 assert_eq!(
260 cond.check_with_context(event.clone()).0,
261 check.map_err(|e| e.to_string())
262 );
263 }
264 }
265 }
266}