enrichment/
get_enrichment_table_record.rs

1use std::collections::BTreeMap;
2use vrl::prelude::*;
3
4use crate::vrl_util::is_case_sensitive;
5use crate::{
6    vrl_util::{self, add_index, evaluate_condition},
7    Case, Condition, IndexHandle, TableRegistry, TableSearch,
8};
9
10fn get_enrichment_table_record(
11    select: Option<Value>,
12    enrichment_tables: &TableSearch,
13    table: &str,
14    case_sensitive: Case,
15    wildcard: Option<Value>,
16    condition: &[Condition],
17    index: Option<IndexHandle>,
18) -> Resolved {
19    let select = select
20        .map(|array| match array {
21            Value::Array(arr) => arr
22                .iter()
23                .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24                .collect::<std::result::Result<Vec<_>, _>>(),
25            value => Err(ValueError::Expected {
26                got: value.kind(),
27                expected: Kind::array(Collection::any()),
28            }),
29        })
30        .transpose()?;
31
32    let data = enrichment_tables.find_table_row(
33        table,
34        case_sensitive,
35        condition,
36        select.as_ref().map(|select| select.as_ref()),
37        wildcard.as_ref(),
38        index,
39    )?;
40
41    Ok(Value::Object(data))
42}
43
44#[derive(Clone, Copy, Debug)]
45pub struct GetEnrichmentTableRecord;
46impl Function for GetEnrichmentTableRecord {
47    fn identifier(&self) -> &'static str {
48        "get_enrichment_table_record"
49    }
50
51    fn parameters(&self) -> &'static [Parameter] {
52        &[
53            Parameter {
54                keyword: "table",
55                kind: kind::BYTES,
56                required: true,
57            },
58            Parameter {
59                keyword: "condition",
60                kind: kind::OBJECT,
61                required: true,
62            },
63            Parameter {
64                keyword: "select",
65                kind: kind::ARRAY,
66                required: false,
67            },
68            Parameter {
69                keyword: "case_sensitive",
70                kind: kind::BOOLEAN,
71                required: false,
72            },
73            Parameter {
74                keyword: "wildcard",
75                kind: kind::BYTES,
76                required: false,
77            },
78        ]
79    }
80
81    fn examples(&self) -> &'static [Example] {
82        &[Example {
83            title: "find records",
84            source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
85            result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
86        }]
87    }
88
89    fn compile(
90        &self,
91        state: &TypeState,
92        ctx: &mut FunctionCompileContext,
93        arguments: ArgumentList,
94    ) -> Compiled {
95        let registry = ctx
96            .get_external_context_mut::<TableRegistry>()
97            .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
98
99        let tables = registry
100            .table_ids()
101            .into_iter()
102            .map(Value::from)
103            .collect::<Vec<_>>();
104
105        let table = arguments
106            .required_enum("table", &tables, state)?
107            .try_bytes_utf8_lossy()
108            .expect("table is not valid utf8")
109            .into_owned();
110        let condition = arguments.required_object("condition")?;
111
112        let select = arguments.optional("select");
113
114        let case_sensitive = is_case_sensitive(&arguments, state)?;
115        let wildcard = arguments.optional("wildcard");
116        let index = Some(
117            add_index(registry, &table, case_sensitive, &condition)
118                .map_err(|err| Box::new(err) as Box<_>)?,
119        );
120
121        Ok(GetEnrichmentTableRecordFn {
122            table,
123            condition,
124            index,
125            select,
126            case_sensitive,
127            wildcard,
128            enrichment_tables: registry.as_readonly(),
129        }
130        .as_expr())
131    }
132}
133
134#[derive(Debug, Clone)]
135pub struct GetEnrichmentTableRecordFn {
136    table: String,
137    condition: BTreeMap<KeyString, expression::Expr>,
138    index: Option<IndexHandle>,
139    select: Option<Box<dyn Expression>>,
140    wildcard: Option<Box<dyn Expression>>,
141    case_sensitive: Case,
142    enrichment_tables: TableSearch,
143}
144
145impl FunctionExpression for GetEnrichmentTableRecordFn {
146    fn resolve(&self, ctx: &mut Context) -> Resolved {
147        let condition = self
148            .condition
149            .iter()
150            .map(|(key, value)| {
151                let value = value.resolve(ctx)?;
152                evaluate_condition(key, value)
153            })
154            .collect::<ExpressionResult<Vec<Condition>>>()?;
155
156        let select = self
157            .select
158            .as_ref()
159            .map(|array| array.resolve(ctx))
160            .transpose()?;
161
162        let table = &self.table;
163        let case_sensitive = self.case_sensitive;
164        let wildcard = self
165            .wildcard
166            .as_ref()
167            .map(|array| array.resolve(ctx))
168            .transpose()?;
169        let index = self.index;
170        let enrichment_tables = &self.enrichment_tables;
171
172        get_enrichment_table_record(
173            select,
174            enrichment_tables,
175            table,
176            case_sensitive,
177            wildcard,
178            &condition,
179            index,
180        )
181    }
182
183    fn type_def(&self, _: &TypeState) -> TypeDef {
184        TypeDef::object(Collection::any()).fallible()
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use vrl::compiler::prelude::TimeZone;
191    use vrl::compiler::state::RuntimeState;
192    use vrl::compiler::TargetValue;
193    use vrl::value;
194    use vrl::value::Secrets;
195
196    use super::*;
197    use crate::test_util::get_table_registry;
198
199    #[test]
200    fn find_table_row() {
201        let registry = get_table_registry();
202        let func = GetEnrichmentTableRecordFn {
203            table: "dummy1".to_string(),
204            condition: BTreeMap::from([(
205                "field".into(),
206                expression::Literal::from("value").into(),
207            )]),
208            index: Some(IndexHandle(999)),
209            select: None,
210            case_sensitive: Case::Sensitive,
211            wildcard: None,
212            enrichment_tables: registry.as_readonly(),
213        };
214
215        let tz = TimeZone::default();
216        let object: Value = BTreeMap::new().into();
217        let mut target = TargetValue {
218            value: object,
219            metadata: value!({}),
220            secrets: Secrets::new(),
221        };
222        let mut runtime_state = RuntimeState::default();
223        let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
224
225        registry.finish_load();
226
227        let got = func.resolve(&mut ctx);
228
229        assert_eq!(Ok(value! ({ "field": "result" })), got);
230    }
231}