enrichment/
find_enrichment_table_records.rs

1use std::{collections::BTreeMap, sync::LazyLock};
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7    Case, Condition, IndexHandle, TableRegistry, TableSearch,
8    vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11static PARAMETERS: LazyLock<Vec<Parameter>> = LazyLock::new(|| {
12    vec![
13        Parameter::required(
14            "table",
15            kind::BYTES,
16            "The [enrichment table](/docs/reference/glossary/#enrichment-tables) to search.",
17        ),
18        Parameter::required(
19            "condition",
20            kind::OBJECT,
21            "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
22        ),
23        Parameter::optional(
24            "select",
25            kind::ARRAY,
26            "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
27        ),
28        Parameter::optional(
29            "case_sensitive",
30            kind::BOOLEAN,
31            "Whether text fields need to match cases exactly.",
32        )
33        .default(&DEFAULT_CASE_SENSITIVE),
34        Parameter::optional(
35            "wildcard",
36            kind::BYTES,
37            "Value to use for wildcard matching in the search.",
38        ),
39    ]
40});
41
42fn find_enrichment_table_records(
43    select: Option<Value>,
44    enrichment_tables: &TableSearch,
45    table: &str,
46    case_sensitive: Case,
47    wildcard: Option<Value>,
48    condition: &[Condition],
49    index: Option<IndexHandle>,
50) -> Resolved {
51    let select = select
52        .map(|select| match select {
53            Value::Array(arr) => arr
54                .iter()
55                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
56                .collect::<std::result::Result<Vec<_>, _>>(),
57            value => Err(ValueError::Expected {
58                got: value.kind(),
59                expected: Kind::array(Collection::any()),
60            }),
61        })
62        .transpose()?;
63
64    let data = enrichment_tables
65        .find_table_rows(
66            table,
67            case_sensitive,
68            condition,
69            select.as_ref().map(|select| select.as_ref()),
70            wildcard.as_ref(),
71            index,
72        )?
73        .into_iter()
74        .map(Value::Object)
75        .collect();
76    Ok(Value::Array(data))
77}
78
79#[derive(Clone, Copy, Debug)]
80pub struct FindEnrichmentTableRecords;
81impl Function for FindEnrichmentTableRecords {
82    fn identifier(&self) -> &'static str {
83        "find_enrichment_table_records"
84    }
85
86    fn usage(&self) -> &'static str {
87        const_str::concat!(
88            "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for rows that match the provided condition.\n\n",
89            super::ENRICHMENT_TABLE_EXPLAINER
90        )
91    }
92
93    fn category(&self) -> &'static str {
94        Category::Enrichment.as_ref()
95    }
96
97    fn return_kind(&self) -> u16 {
98        kind::ARRAY
99    }
100
101    fn parameters(&self) -> &'static [Parameter] {
102        &PARAMETERS
103    }
104
105    fn examples(&self) -> &'static [Example] {
106        const RESULT: Result<&'static str, &'static str> = Ok(indoc! {r#"
107            [{"id": 1, "firstname": "Bob", "surname": "Smith"},
108             {"id": 2, "firstname": "Fred", "surname": "Smith"}]
109        "#});
110
111        const EXAMPLES: &[Example] = &[
112            example! {
113                title: "Exact match",
114                source: indoc! {r#"
115                    find_enrichment_table_records!(
116                        "test",
117                        {"surname": "Smith"}
118                    )
119                "#},
120                result: RESULT,
121            },
122            example! {
123                title: "Case insensitive match",
124                source: indoc! {r#"
125                    find_enrichment_table_records!(
126                        "test",
127                        {"surname": "smith"},
128                        case_sensitive: false
129                    )
130                "#},
131                result: RESULT,
132            },
133            example! {
134                title: "Wildcard match",
135                source: indoc! {r#"
136                    find_enrichment_table_records!(
137                        "test",
138                        {"firstname": "Bob"},
139                        wildcard: "fred",
140                        case_sensitive: false
141                    )
142                "#},
143                result: RESULT,
144            },
145            example! {
146                title: "Date range search",
147                source: indoc! {r#"
148                    find_enrichment_table_records!(
149                        "test",
150                        {
151                            "surname": "Smith",
152                            "date_of_birth": {
153                                "from": t'1985-01-01T00:00:00Z',
154                                "to": t'1985-12-31T00:00:00Z'
155                            }
156                        }
157                    )
158                "#},
159                result: RESULT,
160            },
161        ];
162
163        EXAMPLES
164    }
165
166    fn compile(
167        &self,
168        state: &TypeState,
169        ctx: &mut FunctionCompileContext,
170        arguments: ArgumentList,
171    ) -> Compiled {
172        let registry = ctx
173            .get_external_context_mut::<TableRegistry>()
174            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
175
176        let tables = registry
177            .table_ids()
178            .into_iter()
179            .map(Value::from)
180            .collect::<Vec<_>>();
181
182        let table = arguments
183            .required_enum("table", &tables, state)?
184            .try_bytes_utf8_lossy()
185            .expect("table is not valid utf8")
186            .into_owned();
187        let condition = arguments.required_object("condition")?;
188
189        let select = arguments.optional("select");
190
191        let case_sensitive = is_case_sensitive(&arguments, state)?;
192        let wildcard = arguments.optional("wildcard");
193        let index = Some(
194            add_index(registry, &table, case_sensitive, &condition)
195                .map_err(|err| Box::new(err) as Box<_>)?,
196        );
197
198        Ok(FindEnrichmentTableRecordsFn {
199            table,
200            condition,
201            index,
202            select,
203            case_sensitive,
204            wildcard,
205            enrichment_tables: registry.as_readonly(),
206        }
207        .as_expr())
208    }
209}
210
211#[derive(Debug, Clone)]
212pub struct FindEnrichmentTableRecordsFn {
213    table: String,
214    condition: BTreeMap<KeyString, expression::Expr>,
215    index: Option<IndexHandle>,
216    select: Option<Box<dyn Expression>>,
217    case_sensitive: Case,
218    wildcard: Option<Box<dyn Expression>>,
219    enrichment_tables: TableSearch,
220}
221
222impl FunctionExpression for FindEnrichmentTableRecordsFn {
223    fn resolve(&self, ctx: &mut Context) -> Resolved {
224        let condition = self
225            .condition
226            .iter()
227            .map(|(key, value)| {
228                let value = value.resolve(ctx)?;
229                evaluate_condition(key, value)
230            })
231            .collect::<ExpressionResult<Vec<Condition>>>()?;
232
233        let select = self
234            .select
235            .as_ref()
236            .map(|array| array.resolve(ctx))
237            .transpose()?;
238
239        let table = &self.table;
240        let case_sensitive = self.case_sensitive;
241        let wildcard = self
242            .wildcard
243            .as_ref()
244            .map(|array| array.resolve(ctx))
245            .transpose()?;
246        let index = self.index;
247        let enrichment_tables = &self.enrichment_tables;
248
249        find_enrichment_table_records(
250            select,
251            enrichment_tables,
252            table,
253            case_sensitive,
254            wildcard,
255            &condition,
256            index,
257        )
258    }
259
260    fn type_def(&self, _: &TypeState) -> TypeDef {
261        TypeDef::array(Collection::from_unknown(Kind::object(Collection::any()))).fallible()
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use vrl::{
268        compiler::{TargetValue, TimeZone, state::RuntimeState},
269        value,
270        value::Secrets,
271    };
272
273    use super::*;
274    use crate::test_util::get_table_registry;
275
276    #[test]
277    fn find_table_row() {
278        let registry = get_table_registry();
279        let func = FindEnrichmentTableRecordsFn {
280            table: "dummy1".to_string(),
281            condition: BTreeMap::from([(
282                "field".into(),
283                expression::Literal::from("value").into(),
284            )]),
285            index: Some(IndexHandle(999)),
286            select: None,
287            case_sensitive: Case::Sensitive,
288            wildcard: None,
289            enrichment_tables: registry.as_readonly(),
290        };
291
292        let tz = TimeZone::default();
293        let object: Value = ObjectMap::new().into();
294        let mut target = TargetValue {
295            value: object,
296            metadata: value!({}),
297            secrets: Secrets::new(),
298        };
299        let mut runtime_state = RuntimeState::default();
300        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
301
302        registry.finish_load();
303
304        let got = func.resolve(&mut ctx);
305
306        assert_eq!(Ok(value![vec![value!({ "field": "result" })]]), got);
307    }
308}