1use std::collections::BTreeMap;
2
3use vrl::prelude::*;
4
5use crate::{
6 Case, Condition, IndexHandle, TableRegistry, TableSearch,
7 vrl_util::{self, add_index, evaluate_condition, is_case_sensitive},
8};
9
10fn get_enrichment_table_record(
11 select: Option<Value>,
12 enrichment_tables: &TableSearch,
13 table: &str,
14 case_sensitive: Case,
15 wildcard: Option<Value>,
16 condition: &[Condition],
17 index: Option<IndexHandle>,
18) -> Resolved {
19 let select = select
20 .map(|array| match array {
21 Value::Array(arr) => arr
22 .iter()
23 .map(|value| Ok(value.try_bytes_utf8_lossy()?.to_string()))
24 .collect::<std::result::Result<Vec<_>, _>>(),
25 value => Err(ValueError::Expected {
26 got: value.kind(),
27 expected: Kind::array(Collection::any()),
28 }),
29 })
30 .transpose()?;
31
32 let data = enrichment_tables.find_table_row(
33 table,
34 case_sensitive,
35 condition,
36 select.as_ref().map(|select| select.as_ref()),
37 wildcard.as_ref(),
38 index,
39 )?;
40
41 Ok(Value::Object(data))
42}
43
44#[derive(Clone, Copy, Debug)]
45pub struct GetEnrichmentTableRecord;
46impl Function for GetEnrichmentTableRecord {
47 fn identifier(&self) -> &'static str {
48 "get_enrichment_table_record"
49 }
50
51 fn usage(&self) -> &'static str {
52 const USAGE: &str = const_str::concat!(
53 "Searches an [enrichment table](/docs/reference/glossary/#enrichment-tables) for a row that matches the provided condition. A single row must be matched. If no rows are found or more than one row is found, an error is returned.\n\n",
54 super::ENRICHMENT_TABLE_EXPLAINER
55 );
56 USAGE
57 }
58
59 fn parameters(&self) -> &'static [Parameter] {
60 &[
61 Parameter {
62 keyword: "table",
63 kind: kind::BYTES,
64 required: true,
65 },
66 Parameter {
67 keyword: "condition",
68 kind: kind::OBJECT,
69 required: true,
70 },
71 Parameter {
72 keyword: "select",
73 kind: kind::ARRAY,
74 required: false,
75 },
76 Parameter {
77 keyword: "case_sensitive",
78 kind: kind::BOOLEAN,
79 required: false,
80 },
81 Parameter {
82 keyword: "wildcard",
83 kind: kind::BYTES,
84 required: false,
85 },
86 ]
87 }
88
89 fn examples(&self) -> &'static [Example] {
90 &[example!(
91 title: "find records",
92 source: r#"get_enrichment_table_record!("test", {"id": 1})"#,
93 result: Ok(r#"{"id": 1, "firstname": "Bob", "surname": "Smith"}"#),
94 )]
95 }
96
97 fn compile(
98 &self,
99 state: &TypeState,
100 ctx: &mut FunctionCompileContext,
101 arguments: ArgumentList,
102 ) -> Compiled {
103 let registry = ctx
104 .get_external_context_mut::<TableRegistry>()
105 .ok_or(Box::new(vrl_util::Error::TablesNotLoaded) as Box<dyn DiagnosticMessage>)?;
106
107 let tables = registry
108 .table_ids()
109 .into_iter()
110 .map(Value::from)
111 .collect::<Vec<_>>();
112
113 let table = arguments
114 .required_enum("table", &tables, state)?
115 .try_bytes_utf8_lossy()
116 .expect("table is not valid utf8")
117 .into_owned();
118 let condition = arguments.required_object("condition")?;
119
120 let select = arguments.optional("select");
121
122 let case_sensitive = is_case_sensitive(&arguments, state)?;
123 let wildcard = arguments.optional("wildcard");
124 let index = Some(
125 add_index(registry, &table, case_sensitive, &condition)
126 .map_err(|err| Box::new(err) as Box<_>)?,
127 );
128
129 Ok(GetEnrichmentTableRecordFn {
130 table,
131 condition,
132 index,
133 select,
134 case_sensitive,
135 wildcard,
136 enrichment_tables: registry.as_readonly(),
137 }
138 .as_expr())
139 }
140}
141
142#[derive(Debug, Clone)]
143pub struct GetEnrichmentTableRecordFn {
144 table: String,
145 condition: BTreeMap<KeyString, expression::Expr>,
146 index: Option<IndexHandle>,
147 select: Option<Box<dyn Expression>>,
148 wildcard: Option<Box<dyn Expression>>,
149 case_sensitive: Case,
150 enrichment_tables: TableSearch,
151}
152
153impl FunctionExpression for GetEnrichmentTableRecordFn {
154 fn resolve(&self, ctx: &mut Context) -> Resolved {
155 let condition = self
156 .condition
157 .iter()
158 .map(|(key, value)| {
159 let value = value.resolve(ctx)?;
160 evaluate_condition(key, value)
161 })
162 .collect::<ExpressionResult<Vec<Condition>>>()?;
163
164 let select = self
165 .select
166 .as_ref()
167 .map(|array| array.resolve(ctx))
168 .transpose()?;
169
170 let table = &self.table;
171 let case_sensitive = self.case_sensitive;
172 let wildcard = self
173 .wildcard
174 .as_ref()
175 .map(|array| array.resolve(ctx))
176 .transpose()?;
177 let index = self.index;
178 let enrichment_tables = &self.enrichment_tables;
179
180 get_enrichment_table_record(
181 select,
182 enrichment_tables,
183 table,
184 case_sensitive,
185 wildcard,
186 &condition,
187 index,
188 )
189 }
190
191 fn type_def(&self, _: &TypeState) -> TypeDef {
192 TypeDef::object(Collection::any()).fallible()
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use vrl::{
199 compiler::{TargetValue, prelude::TimeZone, state::RuntimeState},
200 value,
201 value::Secrets,
202 };
203
204 use super::*;
205 use crate::test_util::get_table_registry;
206
207 #[test]
208 fn find_table_row() {
209 let registry = get_table_registry();
210 let func = GetEnrichmentTableRecordFn {
211 table: "dummy1".to_string(),
212 condition: BTreeMap::from([(
213 "field".into(),
214 expression::Literal::from("value").into(),
215 )]),
216 index: Some(IndexHandle(999)),
217 select: None,
218 case_sensitive: Case::Sensitive,
219 wildcard: None,
220 enrichment_tables: registry.as_readonly(),
221 };
222
223 let tz = TimeZone::default();
224 let object: Value = BTreeMap::new().into();
225 let mut target = TargetValue {
226 value: object,
227 metadata: value!({}),
228 secrets: Secrets::new(),
229 };
230 let mut runtime_state = RuntimeState::default();
231 let mut ctx = Context::new(&mut target, &mut runtime_state, &tz);
232
233 registry.finish_load();
234
235 let got = func.resolve(&mut ctx);
236
237 assert_eq!(Ok(value! ({ "field": "result" })), got);
238 }
239}