enrichment/
get_enrichment_table_record.rs

1use std::{collections::BTreeMap, sync::LazyLock};
2
3use vector_vrl_category::Category;
4use vrl::prelude::*;
5
6use crate::{
7    Case, Condition, IndexHandle, TableRegistry, TableSearch,
8    vrl_util::{self, DEFAULT_CASE_SENSITIVE, add_index, evaluate_condition, is_case_sensitive},
9};
10
11static PARAMETERS: LazyLock<Vec<Parameter>> = LazyLock::new(|| {
12    vec![
13        Parameter::required(
14            "table",
15            kind::BYTES,
16            "The [enrichment table](/docs/reference/glossary/#enrichment-tables) to search.",
17        ),
18        Parameter::required(
19            "condition",
20            kind::OBJECT,
21            "The condition to search on. Since the condition is used at boot time to create indices into the data, these conditions must be statically defined.",
22        ),
23        Parameter::optional(
24            "select",
25            kind::ARRAY,
26            "A subset of fields from the enrichment table to return. If not specified, all fields are returned.",
27        ),
28        Parameter::optional(
29            "case_sensitive",
30            kind::BOOLEAN,
31            "Whether the text fields match the case exactly.",
32        )
33        .default(&DEFAULT_CASE_SENSITIVE),
34        Parameter::optional(
35            "wildcard",
36            kind::BYTES,
37            "Value to use for wildcard matching in the search.",
38        ),
39    ]
40});
41
42fn get_enrichment_table_record(
43    select: Option<Value>,
44    enrichment_tables: &TableSearch,
45    table: &str,
46    case_sensitive: Case,
47    wildcard: Option<Value>,
48    condition: &[Condition],
49    index: Option<IndexHandle>,
50) -> Resolved {
51    let select = select
52        .map(|array| match array {
53            Value::Array(arr) => arr
54                .iter()
55                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
56                .collect::<std::result::Result<Vec<_>, _>>(),
57            value => Err(ValueError::Expected {
58                got: value.kind(),
59                expected: Kind::array(Collection::any()),
60            }),
61        })
62        .transpose()?;
63
64    let data = enrichment_tables.find_table_row(
65        table,
66        case_sensitive,
67        condition,
68        select.as_ref().map(|select| select.as_ref()),
69        wildcard.as_ref(),
70        index,
71    )?;
72
73    Ok(Value::Object(data))
74}
75
76#[derive(Clone, Copy, Debug)]
77pub struct GetEnrichmentTableRecord;
78impl Function for GetEnrichmentTableRecord {
79    fn identifier(&self) -> &'static str {
80        "get_enrichment_table_record"
81    }
82
83    fn usage(&self) -> &'static str {
84        const USAGE: &str = const_str::concat!(
85            "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for a row that matches the provided condition. A single row must be matched. If no rows are found or more than one row is found, an error is returned.\n\n",
86            super::ENRICHMENT_TABLE_EXPLAINER
87        );
88        USAGE
89    }
90
91    fn internal_failure_reasons(&self) -> &'static [&'static str] {
92        &[
93            "The row is not found.",
94            "Multiple rows are found that match the condition.",
95        ]
96    }
97
98    fn category(&self) -> &'static str {
99        Category::Enrichment.as_ref()
100    }
101
102    fn return_kind(&self) -> u16 {
103        kind::OBJECT
104    }
105
106    fn parameters(&self) -> &'static [Parameter] {
107        &PARAMETERS
108    }
109
110    fn examples(&self) -> &'static [Example] {
111        &[
112            example! {
113                title: "Exact match",
114                source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
115                result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
116            },
117            example! {
118                title: "Case insensitive match",
119                source: indoc !{r#"
120                    get_enrichment_table_record!(
121                        "test",
122                        {"surname": "bob", "firstname": "John"},
123                        case_sensitive: false
124                    )
125                "#},
126                result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
127            },
128            example! {
129                title: "Date range search",
130                source: indoc! {r#"
131                    get_enrichment_table_record!(
132                        "test",
133                        {
134                            "surname": "Smith",
135                            "date_of_birth": {
136                                "from": t'1985-01-01T00:00:00Z',
137                                "to": t'1985-12-31T00:00:00Z'
138                            }
139                        }
140                    )
141                "#},
142                result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
143            },
144        ]
145    }
146
147    fn compile(
148        &self,
149        state: &TypeState,
150        ctx: &mut FunctionCompileContext,
151        arguments: ArgumentList,
152    ) -> Compiled {
153        let registry = ctx
154            .get_external_context_mut::<TableRegistry>()
155            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
156
157        let tables = registry
158            .table_ids()
159            .into_iter()
160            .map(Value::from)
161            .collect::<Vec<_>>();
162
163        let table = arguments
164            .required_enum("table", &tables, state)?
165            .try_bytes_utf8_lossy()
166            .expect("table is not valid utf8")
167            .into_owned();
168        let condition = arguments.required_object("condition")?;
169
170        let select = arguments.optional("select");
171
172        let case_sensitive = is_case_sensitive(&arguments, state)?;
173        let wildcard = arguments.optional("wildcard");
174        let index = Some(
175            add_index(registry, &table, case_sensitive, &condition)
176                .map_err(|err| Box::new(err) as Box<_>)?,
177        );
178
179        Ok(GetEnrichmentTableRecordFn {
180            table,
181            condition,
182            index,
183            select,
184            case_sensitive,
185            wildcard,
186            enrichment_tables: registry.as_readonly(),
187        }
188        .as_expr())
189    }
190}
191
192#[derive(Debug, Clone)]
193pub struct GetEnrichmentTableRecordFn {
194    table: String,
195    condition: BTreeMap<KeyString, expression::Expr>,
196    index: Option<IndexHandle>,
197    select: Option<Box<dyn Expression>>,
198    wildcard: Option<Box<dyn Expression>>,
199    case_sensitive: Case,
200    enrichment_tables: TableSearch,
201}
202
203impl FunctionExpression for GetEnrichmentTableRecordFn {
204    fn resolve(&self, ctx: &mut Context) -> Resolved {
205        let condition = self
206            .condition
207            .iter()
208            .map(|(key, value)| {
209                let value = value.resolve(ctx)?;
210                evaluate_condition(key, value)
211            })
212            .collect::<ExpressionResult<Vec<Condition>>>()?;
213
214        let select = self
215            .select
216            .as_ref()
217            .map(|array| array.resolve(ctx))
218            .transpose()?;
219
220        let table = &self.table;
221        let case_sensitive = self.case_sensitive;
222        let wildcard = self
223            .wildcard
224            .as_ref()
225            .map(|array| array.resolve(ctx))
226            .transpose()?;
227        let index = self.index;
228        let enrichment_tables = &self.enrichment_tables;
229
230        get_enrichment_table_record(
231            select,
232            enrichment_tables,
233            table,
234            case_sensitive,
235            wildcard,
236            &condition,
237            index,
238        )
239    }
240
241    fn type_def(&self, _: &TypeState) -> TypeDef {
242        TypeDef::object(Collection::any()).fallible()
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use vrl::{
249        compiler::{TargetValue, prelude::TimeZone, state::RuntimeState},
250        value,
251        value::Secrets,
252    };
253
254    use super::*;
255    use crate::test_util::get_table_registry;
256
257    #[test]
258    fn find_table_row() {
259        let registry = get_table_registry();
260        let func = GetEnrichmentTableRecordFn {
261            table: "dummy1".to_string(),
262            condition: BTreeMap::from([(
263                "field".into(),
264                expression::Literal::from("value").into(),
265            )]),
266            index: Some(IndexHandle(999)),
267            select: None,
268            case_sensitive: Case::Sensitive,
269            wildcard: None,
270            enrichment_tables: registry.as_readonly(),
271        };
272
273        let tz = TimeZone::default();
274        let object: Value = BTreeMap::new().into();
275        let mut target = TargetValue {
276            value: object,
277            metadata: value!({}),
278            secrets: Secrets::new(),
279        };
280        let mut runtime_state = RuntimeState::default();
281        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
282
283        registry.finish_load();
284
285        let got = func.resolve(&mut ctx);
286
287        assert_eq!(Ok(value! ({ "field": "result" })), got);
288    }
289}