enrichment/
get_enrichment_table_record.rs

1use std::collections::BTreeMap;
2
3use vrl::prelude::*;
4
5use crate::{
6    Case, Condition, IndexHandle, TableRegistry, TableSearch,
7    vrl_util::{self, add_index, evaluate_condition, is_case_sensitive},
8};
9
10fn get_enrichment_table_record(
11    select: Option<Value>,
12    enrichment_tables: &TableSearch,
13    table: &str,
14    case_sensitive: Case,
15    wildcard: Option<Value>,
16    condition: &[Condition],
17    index: Option<IndexHandle>,
18) -> Resolved {
19    let select = select
20        .map(|array| match array {
21            Value::Array(arr) => arr
22                .iter()
23                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24                .collect::<std::result::Result<Vec<_>, _>>(),
25            value => Err(ValueError::Expected {
26                got: value.kind(),
27                expected: Kind::array(Collection::any()),
28            }),
29        })
30        .transpose()?;
31
32    let data = enrichment_tables.find_table_row(
33        table,
34        case_sensitive,
35        condition,
36        select.as_ref().map(|select| select.as_ref()),
37        wildcard.as_ref(),
38        index,
39    )?;
40
41    Ok(Value::Object(data))
42}
43
44#[derive(Clone, Copy, Debug)]
45pub struct GetEnrichmentTableRecord;
46impl Function for GetEnrichmentTableRecord {
47    fn identifier(&self) -> &'static str {
48        "get_enrichment_table_record"
49    }
50
51    fn usage(&self) -> &'static str {
52        const USAGE: &str = const_str::concat!(
53            "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for a row that matches the provided condition. A single row must be matched. If no rows are found or more than one row is found, an error is returned.\n\n",
54            super::ENRICHMENT_TABLE_EXPLAINER
55        );
56        USAGE
57    }
58
59    fn parameters(&self) -> &'static [Parameter] {
60        &[
61            Parameter {
62                keyword: "table",
63                kind: kind::BYTES,
64                required: true,
65            },
66            Parameter {
67                keyword: "condition",
68                kind: kind::OBJECT,
69                required: true,
70            },
71            Parameter {
72                keyword: "select",
73                kind: kind::ARRAY,
74                required: false,
75            },
76            Parameter {
77                keyword: "case_sensitive",
78                kind: kind::BOOLEAN,
79                required: false,
80            },
81            Parameter {
82                keyword: "wildcard",
83                kind: kind::BYTES,
84                required: false,
85            },
86        ]
87    }
88
89    fn examples(&self) -> &'static [Example] {
90        &[example!(
91            title: "find records",
92            source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
93            result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
94        )]
95    }
96
97    fn compile(
98        &self,
99        state: &TypeState,
100        ctx: &mut FunctionCompileContext,
101        arguments: ArgumentList,
102    ) -> Compiled {
103        let registry = ctx
104            .get_external_context_mut::<TableRegistry>()
105            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
106
107        let tables = registry
108            .table_ids()
109            .into_iter()
110            .map(Value::from)
111            .collect::<Vec<_>>();
112
113        let table = arguments
114            .required_enum("table", &tables, state)?
115            .try_bytes_utf8_lossy()
116            .expect("table is not valid utf8")
117            .into_owned();
118        let condition = arguments.required_object("condition")?;
119
120        let select = arguments.optional("select");
121
122        let case_sensitive = is_case_sensitive(&arguments, state)?;
123        let wildcard = arguments.optional("wildcard");
124        let index = Some(
125            add_index(registry, &table, case_sensitive, &condition)
126                .map_err(|err| Box::new(err) as Box<_>)?,
127        );
128
129        Ok(GetEnrichmentTableRecordFn {
130            table,
131            condition,
132            index,
133            select,
134            case_sensitive,
135            wildcard,
136            enrichment_tables: registry.as_readonly(),
137        }
138        .as_expr())
139    }
140}
141
142#[derive(Debug, Clone)]
143pub struct GetEnrichmentTableRecordFn {
144    table: String,
145    condition: BTreeMap<KeyString, expression::Expr>,
146    index: Option<IndexHandle>,
147    select: Option<Box<dyn Expression>>,
148    wildcard: Option<Box<dyn Expression>>,
149    case_sensitive: Case,
150    enrichment_tables: TableSearch,
151}
152
153impl FunctionExpression for GetEnrichmentTableRecordFn {
154    fn resolve(&self, ctx: &mut Context) -> Resolved {
155        let condition = self
156            .condition
157            .iter()
158            .map(|(key, value)| {
159                let value = value.resolve(ctx)?;
160                evaluate_condition(key, value)
161            })
162            .collect::<ExpressionResult<Vec<Condition>>>()?;
163
164        let select = self
165            .select
166            .as_ref()
167            .map(|array| array.resolve(ctx))
168            .transpose()?;
169
170        let table = &self.table;
171        let case_sensitive = self.case_sensitive;
172        let wildcard = self
173            .wildcard
174            .as_ref()
175            .map(|array| array.resolve(ctx))
176            .transpose()?;
177        let index = self.index;
178        let enrichment_tables = &self.enrichment_tables;
179
180        get_enrichment_table_record(
181            select,
182            enrichment_tables,
183            table,
184            case_sensitive,
185            wildcard,
186            &condition,
187            index,
188        )
189    }
190
191    fn type_def(&self, _: &TypeState) -> TypeDef {
192        TypeDef::object(Collection::any()).fallible()
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use vrl::{
199        compiler::{TargetValue, prelude::TimeZone, state::RuntimeState},
200        value,
201        value::Secrets,
202    };
203
204    use super::*;
205    use crate::test_util::get_table_registry;
206
207    #[test]
208    fn find_table_row() {
209        let registry = get_table_registry();
210        let func = GetEnrichmentTableRecordFn {
211            table: "dummy1".to_string(),
212            condition: BTreeMap::from([(
213                "field".into(),
214                expression::Literal::from("value").into(),
215            )]),
216            index: Some(IndexHandle(999)),
217            select: None,
218            case_sensitive: Case::Sensitive,
219            wildcard: None,
220            enrichment_tables: registry.as_readonly(),
221        };
222
223        let tz = TimeZone::default();
224        let object: Value = BTreeMap::new().into();
225        let mut target = TargetValue {
226            value: object,
227            metadata: value!({}),
228            secrets: Secrets::new(),
229        };
230        let mut runtime_state = RuntimeState::default();
231        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
232
233        registry.finish_load();
234
235        let got = func.resolve(&mut ctx);
236
237        assert_eq!(Ok(value! ({ "field": "result" })), got);
238    }
239}