enrichment/
find_enrichment_table_records.rs

1use std::collections::BTreeMap;
2
3use vrl::prelude::*;
4
5use crate::{
6    Case, Condition, IndexHandle, TableRegistry, TableSearch,
7    vrl_util::{self, add_index, evaluate_condition, is_case_sensitive},
8};
9
10fn find_enrichment_table_records(
11    select: Option<Value>,
12    enrichment_tables: &TableSearch,
13    table: &str,
14    case_sensitive: Case,
15    wildcard: Option<Value>,
16    condition: &[Condition],
17    index: Option<IndexHandle>,
18) -> Resolved {
19    let select = select
20        .map(|select| match select {
21            Value::Array(arr) => arr
22                .iter()
23                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24                .collect::<std::result::Result<Vec<_>, _>>(),
25            value => Err(ValueError::Expected {
26                got: value.kind(),
27                expected: Kind::array(Collection::any()),
28            }),
29        })
30        .transpose()?;
31
32    let data = enrichment_tables
33        .find_table_rows(
34            table,
35            case_sensitive,
36            condition,
37            select.as_ref().map(|select| select.as_ref()),
38            wildcard.as_ref(),
39            index,
40        )?
41        .into_iter()
42        .map(Value::Object)
43        .collect();
44    Ok(Value::Array(data))
45}
46
47#[derive(Clone, Copy, Debug)]
48pub struct FindEnrichmentTableRecords;
49impl Function for FindEnrichmentTableRecords {
50    fn identifier(&self) -> &'static str {
51        "find_enrichment_table_records"
52    }
53
54    fn usage(&self) -> &'static str {
55        const_str::concat!(
56            "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for rows that match the provided condition.\n\n",
57            super::ENRICHMENT_TABLE_EXPLAINER
58        )
59    }
60
61    fn parameters(&self) -> &'static [Parameter] {
62        &[
63            Parameter {
64                keyword: "table",
65                kind: kind::BYTES,
66                required: true,
67            },
68            Parameter {
69                keyword: "condition",
70                kind: kind::OBJECT,
71                required: true,
72            },
73            Parameter {
74                keyword: "select",
75                kind: kind::ARRAY,
76                required: false,
77            },
78            Parameter {
79                keyword: "case_sensitive",
80                kind: kind::BOOLEAN,
81                required: false,
82            },
83            Parameter {
84                keyword: "wildcard",
85                kind: kind::BYTES,
86                required: false,
87            },
88        ]
89    }
90
91    fn examples(&self) -> &'static [Example] {
92        &[example!(
93            title: "find records",
94            source: r#"find_enrichment_table_records!("test", {"surname": "Smith"})"#,
95            result: Ok(
96                indoc! { r#"[{"id": 1, "firstname": "Bob", "surname": "Smith"},
97                             {"id": 2, "firstname": "Fred", "surname": "Smith"}]"#,
98                },
99            ),
100        )]
101    }
102
103    fn compile(
104        &self,
105        state: &TypeState,
106        ctx: &mut FunctionCompileContext,
107        arguments: ArgumentList,
108    ) -> Compiled {
109        let registry = ctx
110            .get_external_context_mut::<TableRegistry>()
111            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
112
113        let tables = registry
114            .table_ids()
115            .into_iter()
116            .map(Value::from)
117            .collect::<Vec<_>>();
118
119        let table = arguments
120            .required_enum("table", &tables, state)?
121            .try_bytes_utf8_lossy()
122            .expect("table is not valid utf8")
123            .into_owned();
124        let condition = arguments.required_object("condition")?;
125
126        let select = arguments.optional("select");
127
128        let case_sensitive = is_case_sensitive(&arguments, state)?;
129        let wildcard = arguments.optional("wildcard");
130        let index = Some(
131            add_index(registry, &table, case_sensitive, &condition)
132                .map_err(|err| Box::new(err) as Box<_>)?,
133        );
134
135        Ok(FindEnrichmentTableRecordsFn {
136            table,
137            condition,
138            index,
139            select,
140            case_sensitive,
141            wildcard,
142            enrichment_tables: registry.as_readonly(),
143        }
144        .as_expr())
145    }
146}
147
148#[derive(Debug, Clone)]
149pub struct FindEnrichmentTableRecordsFn {
150    table: String,
151    condition: BTreeMap<KeyString, expression::Expr>,
152    index: Option<IndexHandle>,
153    select: Option<Box<dyn Expression>>,
154    case_sensitive: Case,
155    wildcard: Option<Box<dyn Expression>>,
156    enrichment_tables: TableSearch,
157}
158
159impl FunctionExpression for FindEnrichmentTableRecordsFn {
160    fn resolve(&self, ctx: &mut Context) -> Resolved {
161        let condition = self
162            .condition
163            .iter()
164            .map(|(key, value)| {
165                let value = value.resolve(ctx)?;
166                evaluate_condition(key, value)
167            })
168            .collect::<ExpressionResult<Vec<Condition>>>()?;
169
170        let select = self
171            .select
172            .as_ref()
173            .map(|array| array.resolve(ctx))
174            .transpose()?;
175
176        let table = &self.table;
177        let case_sensitive = self.case_sensitive;
178        let wildcard = self
179            .wildcard
180            .as_ref()
181            .map(|array| array.resolve(ctx))
182            .transpose()?;
183        let index = self.index;
184        let enrichment_tables = &self.enrichment_tables;
185
186        find_enrichment_table_records(
187            select,
188            enrichment_tables,
189            table,
190            case_sensitive,
191            wildcard,
192            &condition,
193            index,
194        )
195    }
196
197    fn type_def(&self, _: &TypeState) -> TypeDef {
198        TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use vrl::{
205        compiler::{TargetValue, TimeZone, state::RuntimeState},
206        value,
207        value::Secrets,
208    };
209
210    use super::*;
211    use crate::test_util::get_table_registry;
212
213    #[test]
214    fn find_table_row() {
215        let registry = get_table_registry();
216        let func = FindEnrichmentTableRecordsFn {
217            table: "dummy1".to_string(),
218            condition: BTreeMap::from([(
219                "field".into(),
220                expression::Literal::from("value").into(),
221            )]),
222            index: Some(IndexHandle(999)),
223            select: None,
224            case_sensitive: Case::Sensitive,
225            wildcard: None,
226            enrichment_tables: registry.as_readonly(),
227        };
228
229        let tz = TimeZone::default();
230        let object: Value = ObjectMap::new().into();
231        let mut target = TargetValue {
232            value: object,
233            metadata: value!({}),
234            secrets: Secrets::new(),
235        };
236        let mut runtime_state = RuntimeState::default();
237        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
238
239        registry.finish_load();
240
241        let got = func.resolve(&mut ctx);
242
243        assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
244    }
245}