1use std::{collections::BTreeMap, sync::LazyLock};
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7 Case, Condition, IndexHandle, TableRegistry, TableSearch,
8 vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11static PARAMETERS: LazyLock<Vec<Parameter>> = LazyLock::new(|| {
12 vec to search.",
17 ),
18 Parameter::required(
19 "condition",
20 kind::OBJECT,
21 "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
22 ),
23 Parameter::optional(
24 "select",
25 kind::ARRAY,
26 "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
27 ),
28 Parameter::optional(
29 "case_sensitive",
30 kind::BOOLEAN,
31 "Whether the text fields match the case exactly.",
32 )
33 .default(&DEFAULT_CASE_SENSITIVE),
34 Parameter::optional(
35 "wildcard",
36 kind::BYTES,
37 "Value to use for wildcard matching in the search.",
38 ),
39 ]
40});
41
42fn get_enrichment_table_record(
43 select: Option<Value>,
44 enrichment_tables: &TableSearch,
45 table: &str,
46 case_sensitive: Case,
47 wildcard: Option<Value>,
48 condition: &[Condition],
49 index: Option<IndexHandle>,
50) -> Resolved {
51 let select = select
52 .map(|array| match array {
53 Value::Array(arr) => arr
54 .iter()
55 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
56 .collect::<std::result::Result<Vec<_>, _>>(),
57 value => Err(ValueError::Expected {
58 got: value.kind(),
59 expected: Kind::array(Collection::any()),
60 }),
61 })
62 .transpose()?;
63
64 let data = enrichment_tables.find_table_row(
65 table,
66 case_sensitive,
67 condition,
68 select.as_ref().map(|select| select.as_ref()),
69 wildcard.as_ref(),
70 index,
71 )?;
72
73 Ok(Value::Object(data))
74}
75
76#[derive(Clone, Copy, Debug)]
77pub struct GetEnrichmentTableRecord;
78impl Function for GetEnrichmentTableRecord {
79 fn identifier(&self) -> &'static str {
80 "get_enrichment_table_record"
81 }
82
83 fn usage(&self) -> &'static str {
84 const USAGE: &str = const_str::concat!(
85 "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for a row that matches the provided condition. A single row must be matched. If no rows are found or more than one row is found, an error is returned.\n\n",
86 super::ENRICHMENT_TABLE_EXPLAINER
87 );
88 USAGE
89 }
90
91 fn internal_failure_reasons(&self) -> &'static [&'static str] {
92 &[
93 "The row is not found.",
94 "Multiple rows are found that match the condition.",
95 ]
96 }
97
98 fn category(&self) -> &'static str {
99 Category::Enrichment.as_ref()
100 }
101
102 fn return_kind(&self) -> u16 {
103 kind::OBJECT
104 }
105
106 fn parameters(&self) -> &'static [Parameter] {
107 &PARAMETERS
108 }
109
110 fn examples(&self) -> &'static [Example] {
111 &[
112 example! {
113 title: "Exact match",
114 source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
115 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
116 },
117 example! {
118 title: "Case insensitive match",
119 source: indoc !{r#"
120 get_enrichment_table_record!(
121 "test",
122 {"surname": "bob", "firstname": "John"},
123 case_sensitive: false
124 )
125 "#},
126 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
127 },
128 example! {
129 title: "Date range search",
130 source: indoc! {r#"
131 get_enrichment_table_record!(
132 "test",
133 {
134 "surname": "Smith",
135 "date_of_birth": {
136 "from": t'1985-01-01T00:00:00Z',
137 "to": t'1985-12-31T00:00:00Z'
138 }
139 }
140 )
141 "#},
142 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
143 },
144 ]
145 }
146
147 fn compile(
148 &self,
149 state: &TypeState,
150 ctx: &mut FunctionCompileContext,
151 arguments: ArgumentList,
152 ) -> Compiled {
153 let registry = ctx
154 .get_external_context_mut::<TableRegistry>()
155 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
156
157 let tables = registry
158 .table_ids()
159 .into_iter()
160 .map(Value::from)
161 .collect::<Vec<_>>();
162
163 let table = arguments
164 .required_enum("table", &tables, state)?
165 .try_bytes_utf8_lossy()
166 .expect("table is not valid utf8")
167 .into_owned();
168 let condition = arguments.required_object("condition")?;
169
170 let select = arguments.optional("select");
171
172 let case_sensitive = is_case_sensitive(&arguments, state)?;
173 let wildcard = arguments.optional("wildcard");
174 let index = Some(
175 add_index(registry, &table, case_sensitive, &condition)
176 .map_err(|err| Box::new(err) as Box<_>)?,
177 );
178
179 Ok(GetEnrichmentTableRecordFn {
180 table,
181 condition,
182 index,
183 select,
184 case_sensitive,
185 wildcard,
186 enrichment_tables: registry.as_readonly(),
187 }
188 .as_expr())
189 }
190}
191
192#[derive(Debug, Clone)]
193pub struct GetEnrichmentTableRecordFn {
194 table: String,
195 condition: BTreeMap<KeyString, expression::Expr>,
196 index: Option<IndexHandle>,
197 select: Option<Box<dyn Expression>>,
198 wildcard: Option<Box<dyn Expression>>,
199 case_sensitive: Case,
200 enrichment_tables: TableSearch,
201}
202
203impl FunctionExpression for GetEnrichmentTableRecordFn {
204 fn resolve(&self, ctx: &mut Context) -> Resolved {
205 let condition = self
206 .condition
207 .iter()
208 .map(|(key, value)| {
209 let value = value.resolve(ctx)?;
210 evaluate_condition(key, value)
211 })
212 .collect::<ExpressionResult<Vec<Condition>>>()?;
213
214 let select = self
215 .select
216 .as_ref()
217 .map(|array| array.resolve(ctx))
218 .transpose()?;
219
220 let table = &self.table;
221 let case_sensitive = self.case_sensitive;
222 let wildcard = self
223 .wildcard
224 .as_ref()
225 .map(|array| array.resolve(ctx))
226 .transpose()?;
227 let index = self.index;
228 let enrichment_tables = &self.enrichment_tables;
229
230 get_enrichment_table_record(
231 select,
232 enrichment_tables,
233 table,
234 case_sensitive,
235 wildcard,
236 &condition,
237 index,
238 )
239 }
240
241 fn type_def(&self, _: &TypeState) -> TypeDef {
242 TypeDef::object(Collection::any()).fallible()
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use vrl::{
249 compiler::{TargetValue, prelude::TimeZone, state::RuntimeState},
250 value,
251 value::Secrets,
252 };
253
254 use super::*;
255 use crate::test_util::get_table_registry;
256
257 #[test]
258 fn find_table_row() {
259 let registry = get_table_registry();
260 let func = GetEnrichmentTableRecordFn {
261 table: "dummy1".to_string(),
262 condition: BTreeMap::from([(
263 "field".into(),
264 expression::Literal::from("value").into(),
265 )]),
266 index: Some(IndexHandle(999)),
267 select: None,
268 case_sensitive: Case::Sensitive,
269 wildcard: None,
270 enrichment_tables: registry.as_readonly(),
271 };
272
273 let tz = TimeZone::default();
274 let object: Value = BTreeMap::new().into();
275 let mut target = TargetValue {
276 value: object,
277 metadata: value!({}),
278 secrets: Secrets::new(),
279 };
280 let mut runtime_state = RuntimeState::default();
281 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
282
283 registry.finish_load();
284
285 let got = func.resolve(&mut ctx);
286
287 assert_eq!(Ok(value! ({ "field": "result" })), got);
288 }
289}