1use std::collections::BTreeMap;
2use vrl::prelude::*;
3
4use crate::vrl_util::is_case_sensitive;
5use crate::{
6 vrl_util::{self, add_index, evaluate_condition},
7 Case, Condition, IndexHandle, TableRegistry, TableSearch,
8};
9
10fn get_enrichment_table_record(
11 select: Option<Value>,
12 enrichment_tables: &TableSearch,
13 table: &str,
14 case_sensitive: Case,
15 wildcard: Option<Value>,
16 condition: &[Condition],
17 index: Option<IndexHandle>,
18) -> Resolved {
19 let select = select
20 .map(|array| match array {
21 Value::Array(arr) => arr
22 .iter()
23 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24 .collect::<std::result::Result<Vec<_>, _>>(),
25 value => Err(ValueError::Expected {
26 got: value.kind(),
27 expected: Kind::array(Collection::any()),
28 }),
29 })
30 .transpose()?;
31
32 let data = enrichment_tables.find_table_row(
33 table,
34 case_sensitive,
35 condition,
36 select.as_ref().map(|select| select.as_ref()),
37 wildcard.as_ref(),
38 index,
39 )?;
40
41 Ok(Value::Object(data))
42}
43
44#[derive(Clone, Copy, Debug)]
45pub struct GetEnrichmentTableRecord;
46impl Function for GetEnrichmentTableRecord {
47 fn identifier(&self) -> &'static str {
48 "get_enrichment_table_record"
49 }
50
51 fn parameters(&self) -> &'static [Parameter] {
52 &[
53 Parameter {
54 keyword: "table",
55 kind: kind::BYTES,
56 required: true,
57 },
58 Parameter {
59 keyword: "condition",
60 kind: kind::OBJECT,
61 required: true,
62 },
63 Parameter {
64 keyword: "select",
65 kind: kind::ARRAY,
66 required: false,
67 },
68 Parameter {
69 keyword: "case_sensitive",
70 kind: kind::BOOLEAN,
71 required: false,
72 },
73 Parameter {
74 keyword: "wildcard",
75 kind: kind::BYTES,
76 required: false,
77 },
78 ]
79 }
80
81 fn examples(&self) -> &'static [Example] {
82 &[Example {
83 title: "find records",
84 source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
85 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
86 }]
87 }
88
89 fn compile(
90 &self,
91 state: &TypeState,
92 ctx: &mut FunctionCompileContext,
93 arguments: ArgumentList,
94 ) -> Compiled {
95 let registry = ctx
96 .get_external_context_mut::<TableRegistry>()
97 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
98
99 let tables = registry
100 .table_ids()
101 .into_iter()
102 .map(Value::from)
103 .collect::<Vec<_>>();
104
105 let table = arguments
106 .required_enum("table", &tables, state)?
107 .try_bytes_utf8_lossy()
108 .expect("table is not valid utf8")
109 .into_owned();
110 let condition = arguments.required_object("condition")?;
111
112 let select = arguments.optional("select");
113
114 let case_sensitive = is_case_sensitive(&arguments, state)?;
115 let wildcard = arguments.optional("wildcard");
116 let index = Some(
117 add_index(registry, &table, case_sensitive, &condition)
118 .map_err(|err| Box::new(err) as Box<_>)?,
119 );
120
121 Ok(GetEnrichmentTableRecordFn {
122 table,
123 condition,
124 index,
125 select,
126 case_sensitive,
127 wildcard,
128 enrichment_tables: registry.as_readonly(),
129 }
130 .as_expr())
131 }
132}
133
134#[derive(Debug, Clone)]
135pub struct GetEnrichmentTableRecordFn {
136 table: String,
137 condition: BTreeMap<KeyString, expression::Expr>,
138 index: Option<IndexHandle>,
139 select: Option<Box<dyn Expression>>,
140 wildcard: Option<Box<dyn Expression>>,
141 case_sensitive: Case,
142 enrichment_tables: TableSearch,
143}
144
145impl FunctionExpression for GetEnrichmentTableRecordFn {
146 fn resolve(&self, ctx: &mut Context) -> Resolved {
147 let condition = self
148 .condition
149 .iter()
150 .map(|(key, value)| {
151 let value = value.resolve(ctx)?;
152 evaluate_condition(key, value)
153 })
154 .collect::<ExpressionResult<Vec<Condition>>>()?;
155
156 let select = self
157 .select
158 .as_ref()
159 .map(|array| array.resolve(ctx))
160 .transpose()?;
161
162 let table = &self.table;
163 let case_sensitive = self.case_sensitive;
164 let wildcard = self
165 .wildcard
166 .as_ref()
167 .map(|array| array.resolve(ctx))
168 .transpose()?;
169 let index = self.index;
170 let enrichment_tables = &self.enrichment_tables;
171
172 get_enrichment_table_record(
173 select,
174 enrichment_tables,
175 table,
176 case_sensitive,
177 wildcard,
178 &condition,
179 index,
180 )
181 }
182
183 fn type_def(&self, _: &TypeState) -> TypeDef {
184 TypeDef::object(Collection::any()).fallible()
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use vrl::compiler::prelude::TimeZone;
191 use vrl::compiler::state::RuntimeState;
192 use vrl::compiler::TargetValue;
193 use vrl::value;
194 use vrl::value::Secrets;
195
196 use super::*;
197 use crate::test_util::get_table_registry;
198
199 #[test]
200 fn find_table_row() {
201 let registry = get_table_registry();
202 let func = GetEnrichmentTableRecordFn {
203 table: "dummy1".to_string(),
204 condition: BTreeMap::from([(
205 "field".into(),
206 expression::Literal::from("value").into(),
207 )]),
208 index: Some(IndexHandle(999)),
209 select: None,
210 case_sensitive: Case::Sensitive,
211 wildcard: None,
212 enrichment_tables: registry.as_readonly(),
213 };
214
215 let tz = TimeZone::default();
216 let object: Value = BTreeMap::new().into();
217 let mut target = TargetValue {
218 value: object,
219 metadata: value!({}),
220 secrets: Secrets::new(),
221 };
222 let mut runtime_state = RuntimeState::default();
223 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
224
225 registry.finish_load();
226
227 let got = func.resolve(&mut ctx);
228
229 assert_eq!(Ok(value! ({ "field": "result" })), got);
230 }
231}