1use std::{collections::BTreeMap, sync::LazyLock};
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7 Case, Condition, IndexHandle, TableRegistry, TableSearch,
8 vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11static PARAMETERS: LazyLock<Vec<Parameter>> = LazyLock::new(|| {
12 vec to search.",
17 ),
18 Parameter::required(
19 "condition",
20 kind::OBJECT,
21 "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
22 ),
23 Parameter::optional(
24 "select",
25 kind::ARRAY,
26 "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
27 ),
28 Parameter::optional(
29 "case_sensitive",
30 kind::BOOLEAN,
31 "Whether text fields need to match cases exactly.",
32 )
33 .default(&DEFAULT_CASE_SENSITIVE),
34 Parameter::optional(
35 "wildcard",
36 kind::BYTES,
37 "Value to use for wildcard matching in the search.",
38 ),
39 ]
40});
41
42fn find_enrichment_table_records(
43 select: Option<Value>,
44 enrichment_tables: &TableSearch,
45 table: &str,
46 case_sensitive: Case,
47 wildcard: Option<Value>,
48 condition: &[Condition],
49 index: Option<IndexHandle>,
50) -> Resolved {
51 let select = select
52 .map(|select| match select {
53 Value::Array(arr) => arr
54 .iter()
55 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
56 .collect::<std::result::Result<Vec<_>, _>>(),
57 value => Err(ValueError::Expected {
58 got: value.kind(),
59 expected: Kind::array(Collection::any()),
60 }),
61 })
62 .transpose()?;
63
64 let data = enrichment_tables
65 .find_table_rows(
66 table,
67 case_sensitive,
68 condition,
69 select.as_ref().map(|select| select.as_ref()),
70 wildcard.as_ref(),
71 index,
72 )?
73 .into_iter()
74 .map(Value::Object)
75 .collect();
76 Ok(Value::Array(data))
77}
78
79#[derive(Clone, Copy, Debug)]
80pub struct FindEnrichmentTableRecords;
81impl Function for FindEnrichmentTableRecords {
82 fn identifier(&self) -> &'static str {
83 "find_enrichment_table_records"
84 }
85
86 fn usage(&self) -> &'static str {
87 const_str::concat!(
88 "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for rows that match the provided condition.\n\n",
89 super::ENRICHMENT_TABLE_EXPLAINER
90 )
91 }
92
93 fn category(&self) -> &'static str {
94 Category::Enrichment.as_ref()
95 }
96
97 fn return_kind(&self) -> u16 {
98 kind::ARRAY
99 }
100
101 fn parameters(&self) -> &'static [Parameter] {
102 &PARAMETERS
103 }
104
105 fn examples(&self) -> &'static [Example] {
106 const RESULT: Result<&'static str, &'static str> = Ok(indoc! {r#"
107 [{"id": 1, "firstname": "Bob", "surname": "Smith"},
108 {"id": 2, "firstname": "Fred", "surname": "Smith"}]
109 "#});
110
111 const EXAMPLES: &[Example] = &[
112 example! {
113 title: "Exact match",
114 source: indoc! {r#"
115 find_enrichment_table_records!(
116 "test",
117 {"surname": "Smith"}
118 )
119 "#},
120 result: RESULT,
121 },
122 example! {
123 title: "Case insensitive match",
124 source: indoc! {r#"
125 find_enrichment_table_records!(
126 "test",
127 {"surname": "smith"},
128 case_sensitive: false
129 )
130 "#},
131 result: RESULT,
132 },
133 example! {
134 title: "Wildcard match",
135 source: indoc! {r#"
136 find_enrichment_table_records!(
137 "test",
138 {"firstname": "Bob"},
139 wildcard: "fred",
140 case_sensitive: false
141 )
142 "#},
143 result: RESULT,
144 },
145 example! {
146 title: "Date range search",
147 source: indoc! {r#"
148 find_enrichment_table_records!(
149 "test",
150 {
151 "surname": "Smith",
152 "date_of_birth": {
153 "from": t'1985-01-01T00:00:00Z',
154 "to": t'1985-12-31T00:00:00Z'
155 }
156 }
157 )
158 "#},
159 result: RESULT,
160 },
161 ];
162
163 EXAMPLES
164 }
165
166 fn compile(
167 &self,
168 state: &TypeState,
169 ctx: &mut FunctionCompileContext,
170 arguments: ArgumentList,
171 ) -> Compiled {
172 let registry = ctx
173 .get_external_context_mut::<TableRegistry>()
174 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
175
176 let tables = registry
177 .table_ids()
178 .into_iter()
179 .map(Value::from)
180 .collect::<Vec<_>>();
181
182 let table = arguments
183 .required_enum("table", &tables, state)?
184 .try_bytes_utf8_lossy()
185 .expect("table is not valid utf8")
186 .into_owned();
187 let condition = arguments.required_object("condition")?;
188
189 let select = arguments.optional("select");
190
191 let case_sensitive = is_case_sensitive(&arguments, state)?;
192 let wildcard = arguments.optional("wildcard");
193 let index = Some(
194 add_index(registry, &table, case_sensitive, &condition)
195 .map_err(|err| Box::new(err) as Box<_>)?,
196 );
197
198 Ok(FindEnrichmentTableRecordsFn {
199 table,
200 condition,
201 index,
202 select,
203 case_sensitive,
204 wildcard,
205 enrichment_tables: registry.as_readonly(),
206 }
207 .as_expr())
208 }
209}
210
211#[derive(Debug, Clone)]
212pub struct FindEnrichmentTableRecordsFn {
213 table: String,
214 condition: BTreeMap<KeyString, expression::Expr>,
215 index: Option<IndexHandle>,
216 select: Option<Box<dyn Expression>>,
217 case_sensitive: Case,
218 wildcard: Option<Box<dyn Expression>>,
219 enrichment_tables: TableSearch,
220}
221
222impl FunctionExpression for FindEnrichmentTableRecordsFn {
223 fn resolve(&self, ctx: &mut Context) -> Resolved {
224 let condition = self
225 .condition
226 .iter()
227 .map(|(key, value)| {
228 let value = value.resolve(ctx)?;
229 evaluate_condition(key, value)
230 })
231 .collect::<ExpressionResult<Vec<Condition>>>()?;
232
233 let select = self
234 .select
235 .as_ref()
236 .map(|array| array.resolve(ctx))
237 .transpose()?;
238
239 let table = &self.table;
240 let case_sensitive = self.case_sensitive;
241 let wildcard = self
242 .wildcard
243 .as_ref()
244 .map(|array| array.resolve(ctx))
245 .transpose()?;
246 let index = self.index;
247 let enrichment_tables = &self.enrichment_tables;
248
249 find_enrichment_table_records(
250 select,
251 enrichment_tables,
252 table,
253 case_sensitive,
254 wildcard,
255 &condition,
256 index,
257 )
258 }
259
260 fn type_def(&self, _: &TypeState) -> TypeDef {
261 TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use vrl::{
268 compiler::{TargetValue, TimeZone, state::RuntimeState},
269 value,
270 value::Secrets,
271 };
272
273 use super::*;
274 use crate::test_util::get_table_registry;
275
276 #[test]
277 fn find_table_row() {
278 let registry = get_table_registry();
279 let func = FindEnrichmentTableRecordsFn {
280 table: "dummy1".to_string(),
281 condition: BTreeMap::from([(
282 "field".into(),
283 expression::Literal::from("value").into(),
284 )]),
285 index: Some(IndexHandle(999)),
286 select: None,
287 case_sensitive: Case::Sensitive,
288 wildcard: None,
289 enrichment_tables: registry.as_readonly(),
290 };
291
292 let tz = TimeZone::default();
293 let object: Value = ObjectMap::new().into();
294 let mut target = TargetValue {
295 value: object,
296 metadata: value!({}),
297 secrets: Secrets::new(),
298 };
299 let mut runtime_state = RuntimeState::default();
300 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
301
302 registry.finish_load();
303
304 let got = func.resolve(&mut ctx);
305
306 assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
307 }
308}