1use std::collections::BTreeMap;
2
3use vrl::prelude::*;
4
5use crate::{
6 Case, Condition, IndexHandle, TableRegistry, TableSearch,
7 vrl_util::{self, add_index, evaluate_condition, is_case_sensitive},
8};
9
10fn find_enrichment_table_records(
11 select: Option<Value>,
12 enrichment_tables: &TableSearch,
13 table: &str,
14 case_sensitive: Case,
15 wildcard: Option<Value>,
16 condition: &[Condition],
17 index: Option<IndexHandle>,
18) -> Resolved {
19 let select = select
20 .map(|select| match select {
21 Value::Array(arr) => arr
22 .iter()
23 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24 .collect::<std::result::Result<Vec<_>, _>>(),
25 value => Err(ValueError::Expected {
26 got: value.kind(),
27 expected: Kind::array(Collection::any()),
28 }),
29 })
30 .transpose()?;
31
32 let data = enrichment_tables
33 .find_table_rows(
34 table,
35 case_sensitive,
36 condition,
37 select.as_ref().map(|select| select.as_ref()),
38 wildcard.as_ref(),
39 index,
40 )?
41 .into_iter()
42 .map(Value::Object)
43 .collect();
44 Ok(Value::Array(data))
45}
46
47#[derive(Clone, Copy, Debug)]
48pub struct FindEnrichmentTableRecords;
49impl Function for FindEnrichmentTableRecords {
50 fn identifier(&self) -> &'static str {
51 "find_enrichment_table_records"
52 }
53
54 fn usage(&self) -> &'static str {
55 const_str::concat!(
56 "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for rows that match the provided condition.\n\n",
57 super::ENRICHMENT_TABLE_EXPLAINER
58 )
59 }
60
61 fn parameters(&self) -> &'static [Parameter] {
62 &[
63 Parameter {
64 keyword: "table",
65 kind: kind::BYTES,
66 required: true,
67 },
68 Parameter {
69 keyword: "condition",
70 kind: kind::OBJECT,
71 required: true,
72 },
73 Parameter {
74 keyword: "select",
75 kind: kind::ARRAY,
76 required: false,
77 },
78 Parameter {
79 keyword: "case_sensitive",
80 kind: kind::BOOLEAN,
81 required: false,
82 },
83 Parameter {
84 keyword: "wildcard",
85 kind: kind::BYTES,
86 required: false,
87 },
88 ]
89 }
90
91 fn examples(&self) -> &'static [Example] {
92 &[example!(
93 title: "find records",
94 source: r#"find_enrichment_table_records!("test", {"surname": "Smith"})"#,
95 result: Ok(
96 indoc! { r#"[{"id": 1, "firstname": "Bob", "surname": "Smith"},
97 {"id": 2, "firstname": "Fred", "surname": "Smith"}]"#,
98 },
99 ),
100 )]
101 }
102
103 fn compile(
104 &self,
105 state: &TypeState,
106 ctx: &mut FunctionCompileContext,
107 arguments: ArgumentList,
108 ) -> Compiled {
109 let registry = ctx
110 .get_external_context_mut::<TableRegistry>()
111 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
112
113 let tables = registry
114 .table_ids()
115 .into_iter()
116 .map(Value::from)
117 .collect::<Vec<_>>();
118
119 let table = arguments
120 .required_enum("table", &tables, state)?
121 .try_bytes_utf8_lossy()
122 .expect("table is not valid utf8")
123 .into_owned();
124 let condition = arguments.required_object("condition")?;
125
126 let select = arguments.optional("select");
127
128 let case_sensitive = is_case_sensitive(&arguments, state)?;
129 let wildcard = arguments.optional("wildcard");
130 let index = Some(
131 add_index(registry, &table, case_sensitive, &condition)
132 .map_err(|err| Box::new(err) as Box<_>)?,
133 );
134
135 Ok(FindEnrichmentTableRecordsFn {
136 table,
137 condition,
138 index,
139 select,
140 case_sensitive,
141 wildcard,
142 enrichment_tables: registry.as_readonly(),
143 }
144 .as_expr())
145 }
146}
147
148#[derive(Debug, Clone)]
149pub struct FindEnrichmentTableRecordsFn {
150 table: String,
151 condition: BTreeMap<KeyString, expression::Expr>,
152 index: Option<IndexHandle>,
153 select: Option<Box<dyn Expression>>,
154 case_sensitive: Case,
155 wildcard: Option<Box<dyn Expression>>,
156 enrichment_tables: TableSearch,
157}
158
159impl FunctionExpression for FindEnrichmentTableRecordsFn {
160 fn resolve(&self, ctx: &mut Context) -> Resolved {
161 let condition = self
162 .condition
163 .iter()
164 .map(|(key, value)| {
165 let value = value.resolve(ctx)?;
166 evaluate_condition(key, value)
167 })
168 .collect::<ExpressionResult<Vec<Condition>>>()?;
169
170 let select = self
171 .select
172 .as_ref()
173 .map(|array| array.resolve(ctx))
174 .transpose()?;
175
176 let table = &self.table;
177 let case_sensitive = self.case_sensitive;
178 let wildcard = self
179 .wildcard
180 .as_ref()
181 .map(|array| array.resolve(ctx))
182 .transpose()?;
183 let index = self.index;
184 let enrichment_tables = &self.enrichment_tables;
185
186 find_enrichment_table_records(
187 select,
188 enrichment_tables,
189 table,
190 case_sensitive,
191 wildcard,
192 &condition,
193 index,
194 )
195 }
196
197 fn type_def(&self, _: &TypeState) -> TypeDef {
198 TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use vrl::{
205 compiler::{TargetValue, TimeZone, state::RuntimeState},
206 value,
207 value::Secrets,
208 };
209
210 use super::*;
211 use crate::test_util::get_table_registry;
212
213 #[test]
214 fn find_table_row() {
215 let registry = get_table_registry();
216 let func = FindEnrichmentTableRecordsFn {
217 table: "dummy1".to_string(),
218 condition: BTreeMap::from([(
219 "field".into(),
220 expression::Literal::from("value").into(),
221 )]),
222 index: Some(IndexHandle(999)),
223 select: None,
224 case_sensitive: Case::Sensitive,
225 wildcard: None,
226 enrichment_tables: registry.as_readonly(),
227 };
228
229 let tz = TimeZone::default();
230 let object: Value = ObjectMap::new().into();
231 let mut target = TargetValue {
232 value: object,
233 metadata: value!({}),
234 secrets: Secrets::new(),
235 };
236 let mut runtime_state = RuntimeState::default();
237 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
238
239 registry.finish_load();
240
241 let got = func.resolve(&mut ctx);
242
243 assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
244 }
245}