enrichment/
find_enrichment_table_records.rs

1use std::collections::BTreeMap;
2use vrl::prelude::*;
3
4use crate::vrl_util::is_case_sensitive;
5use crate::{
6    vrl_util::{self, add_index, evaluate_condition},
7    Case, Condition, IndexHandle, TableRegistry, TableSearch,
8};
9
10fn find_enrichment_table_records(
11    select: Option<Value>,
12    enrichment_tables: &TableSearch,
13    table: &str,
14    case_sensitive: Case,
15    wildcard: Option<Value>,
16    condition: &[Condition],
17    index: Option<IndexHandle>,
18) -> Resolved {
19    let select = select
20        .map(|select| match select {
21            Value::Array(arr) => arr
22                .iter()
23                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24                .collect::<std::result::Result<Vec<_>, _>>(),
25            value => Err(ValueError::Expected {
26                got: value.kind(),
27                expected: Kind::array(Collection::any()),
28            }),
29        })
30        .transpose()?;
31
32    let data = enrichment_tables
33        .find_table_rows(
34            table,
35            case_sensitive,
36            condition,
37            select.as_ref().map(|select| select.as_ref()),
38            wildcard.as_ref(),
39            index,
40        )?
41        .into_iter()
42        .map(Value::Object)
43        .collect();
44    Ok(Value::Array(data))
45}
46
47#[derive(Clone, Copy, Debug)]
48pub struct FindEnrichmentTableRecords;
49impl Function for FindEnrichmentTableRecords {
50    fn identifier(&self) -> &'static str {
51        "find_enrichment_table_records"
52    }
53
54    fn parameters(&self) -> &'static [Parameter] {
55        &[
56            Parameter {
57                keyword: "table",
58                kind: kind::BYTES,
59                required: true,
60            },
61            Parameter {
62                keyword: "condition",
63                kind: kind::OBJECT,
64                required: true,
65            },
66            Parameter {
67                keyword: "select",
68                kind: kind::ARRAY,
69                required: false,
70            },
71            Parameter {
72                keyword: "case_sensitive",
73                kind: kind::BOOLEAN,
74                required: false,
75            },
76            Parameter {
77                keyword: "wildcard",
78                kind: kind::BYTES,
79                required: false,
80            },
81        ]
82    }
83
84    fn examples(&self) -> &'static [Example] {
85        &[Example {
86            title: "find records",
87            source: r#"find_enrichment_table_records!("test", {"surname": "Smith"})"#,
88            result: Ok(
89                indoc! { r#"[{"id": 1, "firstname": "Bob", "surname": "Smith"},
90                             {"id": 2, "firstname": "Fred", "surname": "Smith"}]"#,
91                },
92            ),
93        }]
94    }
95
96    fn compile(
97        &self,
98        state: &TypeState,
99        ctx: &mut FunctionCompileContext,
100        arguments: ArgumentList,
101    ) -> Compiled {
102        let registry = ctx
103            .get_external_context_mut::<TableRegistry>()
104            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
105
106        let tables = registry
107            .table_ids()
108            .into_iter()
109            .map(Value::from)
110            .collect::<Vec<_>>();
111
112        let table = arguments
113            .required_enum("table", &tables, state)?
114            .try_bytes_utf8_lossy()
115            .expect("table is not valid utf8")
116            .into_owned();
117        let condition = arguments.required_object("condition")?;
118
119        let select = arguments.optional("select");
120
121        let case_sensitive = is_case_sensitive(&arguments, state)?;
122        let wildcard = arguments.optional("wildcard");
123        let index = Some(
124            add_index(registry, &table, case_sensitive, &condition)
125                .map_err(|err| Box::new(err) as Box<_>)?,
126        );
127
128        Ok(FindEnrichmentTableRecordsFn {
129            table,
130            condition,
131            index,
132            select,
133            case_sensitive,
134            wildcard,
135            enrichment_tables: registry.as_readonly(),
136        }
137        .as_expr())
138    }
139}
140
141#[derive(Debug, Clone)]
142pub struct FindEnrichmentTableRecordsFn {
143    table: String,
144    condition: BTreeMap<KeyString, expression::Expr>,
145    index: Option<IndexHandle>,
146    select: Option<Box<dyn Expression>>,
147    case_sensitive: Case,
148    wildcard: Option<Box<dyn Expression>>,
149    enrichment_tables: TableSearch,
150}
151
152impl FunctionExpression for FindEnrichmentTableRecordsFn {
153    fn resolve(&self, ctx: &mut Context) -> Resolved {
154        let condition = self
155            .condition
156            .iter()
157            .map(|(key, value)| {
158                let value = value.resolve(ctx)?;
159                evaluate_condition(key, value)
160            })
161            .collect::<ExpressionResult<Vec<Condition>>>()?;
162
163        let select = self
164            .select
165            .as_ref()
166            .map(|array| array.resolve(ctx))
167            .transpose()?;
168
169        let table = &self.table;
170        let case_sensitive = self.case_sensitive;
171        let wildcard = self
172            .wildcard
173            .as_ref()
174            .map(|array| array.resolve(ctx))
175            .transpose()?;
176        let index = self.index;
177        let enrichment_tables = &self.enrichment_tables;
178
179        find_enrichment_table_records(
180            select,
181            enrichment_tables,
182            table,
183            case_sensitive,
184            wildcard,
185            &condition,
186            index,
187        )
188    }
189
190    fn type_def(&self, _: &TypeState) -> TypeDef {
191        TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use vrl::compiler::state::RuntimeState;
198    use vrl::compiler::TargetValue;
199    use vrl::compiler::TimeZone;
200    use vrl::value;
201    use vrl::value::Secrets;
202
203    use super::*;
204    use crate::test_util::get_table_registry;
205
206    #[test]
207    fn find_table_row() {
208        let registry = get_table_registry();
209        let func = FindEnrichmentTableRecordsFn {
210            table: "dummy1".to_string(),
211            condition: BTreeMap::from([(
212                "field".into(),
213                expression::Literal::from("value").into(),
214            )]),
215            index: Some(IndexHandle(999)),
216            select: None,
217            case_sensitive: Case::Sensitive,
218            wildcard: None,
219            enrichment_tables: registry.as_readonly(),
220        };
221
222        let tz = TimeZone::default();
223        let object: Value = ObjectMap::new().into();
224        let mut target = TargetValue {
225            value: object,
226            metadata: value!({}),
227            secrets: Secrets::new(),
228        };
229        let mut runtime_state = RuntimeState::default();
230        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
231
232        registry.finish_load();
233
234        let got = func.resolve(&mut ctx);
235
236        assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
237    }
238}