1use std::collections::BTreeMap;
2use vrl::prelude::*;
3
4use crate::vrl_util::is_case_sensitive;
5use crate::{
6 vrl_util::{self, add_index, evaluate_condition},
7 Case, Condition, IndexHandle, TableRegistry, TableSearch,
8};
9
10fn find_enrichment_table_records(
11 select: Option<Value>,
12 enrichment_tables: &TableSearch,
13 table: &str,
14 case_sensitive: Case,
15 wildcard: Option<Value>,
16 condition: &[Condition],
17 index: Option<IndexHandle>,
18) -> Resolved {
19 let select = select
20 .map(|select| match select {
21 Value::Array(arr) => arr
22 .iter()
23 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24 .collect::<std::result::Result<Vec<_>, _>>(),
25 value => Err(ValueError::Expected {
26 got: value.kind(),
27 expected: Kind::array(Collection::any()),
28 }),
29 })
30 .transpose()?;
31
32 let data = enrichment_tables
33 .find_table_rows(
34 table,
35 case_sensitive,
36 condition,
37 select.as_ref().map(|select| select.as_ref()),
38 wildcard.as_ref(),
39 index,
40 )?
41 .into_iter()
42 .map(Value::Object)
43 .collect();
44 Ok(Value::Array(data))
45}
46
47#[derive(Clone, Copy, Debug)]
48pub struct FindEnrichmentTableRecords;
49impl Function for FindEnrichmentTableRecords {
50 fn identifier(&self) -> &'static str {
51 "find_enrichment_table_records"
52 }
53
54 fn parameters(&self) -> &'static [Parameter] {
55 &[
56 Parameter {
57 keyword: "table",
58 kind: kind::BYTES,
59 required: true,
60 },
61 Parameter {
62 keyword: "condition",
63 kind: kind::OBJECT,
64 required: true,
65 },
66 Parameter {
67 keyword: "select",
68 kind: kind::ARRAY,
69 required: false,
70 },
71 Parameter {
72 keyword: "case_sensitive",
73 kind: kind::BOOLEAN,
74 required: false,
75 },
76 Parameter {
77 keyword: "wildcard",
78 kind: kind::BYTES,
79 required: false,
80 },
81 ]
82 }
83
84 fn examples(&self) -> &'static [Example] {
85 &[Example {
86 title: "find records",
87 source: r#"find_enrichment_table_records!("test", {"surname": "Smith"})"#,
88 result: Ok(
89 indoc! { r#"[{"id": 1, "firstname": "Bob", "surname": "Smith"},
90 {"id": 2, "firstname": "Fred", "surname": "Smith"}]"#,
91 },
92 ),
93 }]
94 }
95
96 fn compile(
97 &self,
98 state: &TypeState,
99 ctx: &mut FunctionCompileContext,
100 arguments: ArgumentList,
101 ) -> Compiled {
102 let registry = ctx
103 .get_external_context_mut::<TableRegistry>()
104 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
105
106 let tables = registry
107 .table_ids()
108 .into_iter()
109 .map(Value::from)
110 .collect::<Vec<_>>();
111
112 let table = arguments
113 .required_enum("table", &tables, state)?
114 .try_bytes_utf8_lossy()
115 .expect("table is not valid utf8")
116 .into_owned();
117 let condition = arguments.required_object("condition")?;
118
119 let select = arguments.optional("select");
120
121 let case_sensitive = is_case_sensitive(&arguments, state)?;
122 let wildcard = arguments.optional("wildcard");
123 let index = Some(
124 add_index(registry, &table, case_sensitive, &condition)
125 .map_err(|err| Box::new(err) as Box<_>)?,
126 );
127
128 Ok(FindEnrichmentTableRecordsFn {
129 table,
130 condition,
131 index,
132 select,
133 case_sensitive,
134 wildcard,
135 enrichment_tables: registry.as_readonly(),
136 }
137 .as_expr())
138 }
139}
140
141#[derive(Debug, Clone)]
142pub struct FindEnrichmentTableRecordsFn {
143 table: String,
144 condition: BTreeMap<KeyString, expression::Expr>,
145 index: Option<IndexHandle>,
146 select: Option<Box<dyn Expression>>,
147 case_sensitive: Case,
148 wildcard: Option<Box<dyn Expression>>,
149 enrichment_tables: TableSearch,
150}
151
152impl FunctionExpression for FindEnrichmentTableRecordsFn {
153 fn resolve(&self, ctx: &mut Context) -> Resolved {
154 let condition = self
155 .condition
156 .iter()
157 .map(|(key, value)| {
158 let value = value.resolve(ctx)?;
159 evaluate_condition(key, value)
160 })
161 .collect::<ExpressionResult<Vec<Condition>>>()?;
162
163 let select = self
164 .select
165 .as_ref()
166 .map(|array| array.resolve(ctx))
167 .transpose()?;
168
169 let table = &self.table;
170 let case_sensitive = self.case_sensitive;
171 let wildcard = self
172 .wildcard
173 .as_ref()
174 .map(|array| array.resolve(ctx))
175 .transpose()?;
176 let index = self.index;
177 let enrichment_tables = &self.enrichment_tables;
178
179 find_enrichment_table_records(
180 select,
181 enrichment_tables,
182 table,
183 case_sensitive,
184 wildcard,
185 &condition,
186 index,
187 )
188 }
189
190 fn type_def(&self, _: &TypeState) -> TypeDef {
191 TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use vrl::compiler::state::RuntimeState;
198 use vrl::compiler::TargetValue;
199 use vrl::compiler::TimeZone;
200 use vrl::value;
201 use vrl::value::Secrets;
202
203 use super::*;
204 use crate::test_util::get_table_registry;
205
206 #[test]
207 fn find_table_row() {
208 let registry = get_table_registry();
209 let func = FindEnrichmentTableRecordsFn {
210 table: "dummy1".to_string(),
211 condition: BTreeMap::from([(
212 "field".into(),
213 expression::Literal::from("value").into(),
214 )]),
215 index: Some(IndexHandle(999)),
216 select: None,
217 case_sensitive: Case::Sensitive,
218 wildcard: None,
219 enrichment_tables: registry.as_readonly(),
220 };
221
222 let tz = TimeZone::default();
223 let object: Value = ObjectMap::new().into();
224 let mut target = TargetValue {
225 value: object,
226 metadata: value!({}),
227 secrets: Secrets::new(),
228 };
229 let mut runtime_state = RuntimeState::default();
230 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
231
232 registry.finish_load();
233
234 let got = func.resolve(&mut ctx);
235
236 assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
237 }
238}