1use std::{
33 collections::HashMap,
34 sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use vrl::value::{ObjectMap, Value};
39
40use super::{Condition, IndexHandle, Table};
41use crate::Case;
42
43type TableMap = HashMap<String, Box<dyn Table + Send + Sync>>;
45
46#[derive(Clone, Default)]
47pub struct TableRegistry {
48 loading: Arc<Mutex<Option<TableMap>>>,
49 tables: Arc<ArcSwap<Option<TableMap>>>,
50}
51
52impl PartialEq for TableRegistry {
54 fn eq(&self, other: &Self) -> bool {
55 Arc::ptr_eq(&self.tables, &other.tables) && Arc::ptr_eq(&self.loading, &other.loading)
56 || self.tables.load().is_none()
57 && other.tables.load().is_none()
58 && self.loading.lock().expect("lock poison").is_none()
59 && other.loading.lock().expect("lock poison").is_none()
60 }
61}
62impl Eq for TableRegistry {}
63
64impl TableRegistry {
65 pub fn load(&self, mut tables: TableMap) {
94 let mut loading = self.loading.lock().unwrap();
95 let existing = self.tables.load();
96 if let Some(existing) = &**existing {
97 let extend = existing
99 .iter()
100 .filter(|(key, _)| !tables.contains_key(*key))
101 .map(|(key, value)| (key.clone(), value.clone()))
102 .collect::<HashMap<_, _>>();
103
104 tables.extend(extend);
105 }
106 match *loading {
107 None => *loading = Some(tables),
108 Some(ref mut loading) => loading.extend(tables),
109 }
110 }
111
112 pub fn finish_load(&self) {
121 let mut tables_lock = self.loading.lock().unwrap();
122 let tables = tables_lock.take();
123 self.tables.swap(Arc::new(tables));
124 }
125
126 pub fn table_ids(&self) -> Vec<String> {
135 let locked = self.loading.lock().unwrap();
136 match *locked {
137 Some(ref tables) => tables.keys().cloned().collect(),
138 None => Vec::new(),
139 }
140 }
141
142 pub fn add_index(
150 &mut self,
151 table: &str,
152 case: Case,
153 fields: &[&str],
154 ) -> Result<IndexHandle, String> {
155 let mut locked = self.loading.lock().unwrap();
156
157 match *locked {
158 None => Err("finish_load has been called".to_string()),
159 Some(ref mut tables) => match tables.get_mut(table) {
160 None => Err(format!("table '{table}' not loaded")),
161 Some(table) => table.add_index(case, fields),
162 },
163 }
164 }
165
166 pub fn as_readonly(&self) -> TableSearch {
169 TableSearch(self.tables.clone())
170 }
171
172 pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
175 match &**self.tables.load() {
176 Some(tables) => tables
177 .get(table)
178 .map(|table| table.index_fields())
179 .unwrap_or_default(),
180 None => Vec::new(),
181 }
182 }
183
184 pub fn needs_reload(&self, table: &str) -> bool {
187 match &**self.tables.load() {
188 Some(tables) => tables
189 .get(table)
190 .map(|table| table.needs_reload())
191 .unwrap_or(true),
192 None => true,
193 }
194 }
195}
196
197impl std::fmt::Debug for TableRegistry {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 fmt_enrichment_table(f, "TableRegistry", &self.tables)
200 }
201}
202
203#[derive(Clone, Default)]
207pub struct TableSearch(Arc<ArcSwap<Option<TableMap>>>);
208
209impl TableSearch {
210 pub fn find_table_row<'a>(
214 &self,
215 table: &str,
216 case: Case,
217 condition: &'a [Condition<'a>],
218 select: Option<&[String]>,
219 wildcard: Option<&Value>,
220 index: Option<IndexHandle>,
221 ) -> Result<ObjectMap, String> {
222 let tables = self.0.load();
223 if let Some(ref tables) = **tables {
224 match tables.get(table) {
225 None => Err(format!("table {table} not loaded")),
226 Some(table) => table.find_table_row(case, condition, select, wildcard, index),
227 }
228 } else {
229 Err("finish_load not called".to_string())
230 }
231 }
232
233 pub fn find_table_rows<'a>(
237 &self,
238 table: &str,
239 case: Case,
240 condition: &'a [Condition<'a>],
241 select: Option<&[String]>,
242 wildcard: Option<&Value>,
243 index: Option<IndexHandle>,
244 ) -> Result<Vec<ObjectMap>, String> {
245 let tables = self.0.load();
246 if let Some(ref tables) = **tables {
247 match tables.get(table) {
248 None => Err(format!("table {table} not loaded")),
249 Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
250 }
251 } else {
252 Err("finish_load not called".to_string())
253 }
254 }
255}
256
257impl std::fmt::Debug for TableSearch {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 fmt_enrichment_table(f, "EnrichmentTableSearch", &self.0)
260 }
261}
262
263fn fmt_enrichment_table(
265 f: &mut std::fmt::Formatter<'_>,
266 name: &'static str,
267 tables: &Arc<ArcSwap<Option<TableMap>>>,
268) -> std::fmt::Result {
269 let tables = tables.load();
270 match **tables {
271 Some(ref tables) => {
272 let mut tables = tables.iter().fold(String::from("("), |mut s, (key, _)| {
273 s.push_str(key);
274 s.push_str(", ");
275 s
276 });
277
278 tables.truncate(std::cmp::max(tables.len(), 0));
279 tables.push(')');
280
281 write!(f, "{name} {tables}")
282 }
283 None => write!(f, "{name} loading"),
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use vrl::value::Value;
290
291 use super::*;
292 use crate::test_util::DummyEnrichmentTable;
293
294 #[test]
295 fn tables_loaded() {
296 let mut tables: TableMap = HashMap::new();
297 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
298 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
299
300 let registry = super::TableRegistry::default();
301 registry.load(tables);
302 let mut result = registry.table_ids();
303 result.sort();
304 assert_eq!(vec!["dummy1", "dummy2"], result);
305 }
306
307 #[test]
308 fn can_add_indexes() {
309 let mut tables: TableMap = HashMap::new();
310 let indexes = Arc::new(Mutex::new(Vec::new()));
311 let dummy = DummyEnrichmentTable::new_with_index(indexes.clone());
312 tables.insert("dummy1".to_string(), Box::new(dummy));
313 let mut registry = super::TableRegistry::default();
314 registry.load(tables);
315 assert_eq!(
316 Ok(IndexHandle(0)),
317 registry.add_index("dummy1", Case::Sensitive, &["erk"])
318 );
319
320 let indexes = indexes.lock().unwrap();
321 assert_eq!(vec!["erk".to_string()], *indexes[0]);
322 }
323
324 #[test]
325 fn can_not_find_table_row_before_finish() {
326 let mut tables: TableMap = HashMap::new();
327 let dummy = DummyEnrichmentTable::new();
328 tables.insert("dummy1".to_string(), Box::new(dummy));
329 let registry = super::TableRegistry::default();
330 registry.load(tables);
331 let tables = registry.as_readonly();
332
333 assert_eq!(
334 Err("finish_load not called".to_string()),
335 tables.find_table_row(
336 "dummy1",
337 Case::Sensitive,
338 &[Condition::Equals {
339 field: "thing",
340 value: Value::from("thang"),
341 }],
342 None,
343 None,
344 None
345 )
346 );
347 }
348
349 #[test]
350 fn can_not_add_indexes_after_finish() {
351 let mut tables: TableMap = HashMap::new();
352 let dummy = DummyEnrichmentTable::new();
353 tables.insert("dummy1".to_string(), Box::new(dummy));
354 let mut registry = super::TableRegistry::default();
355 registry.load(tables);
356 registry.finish_load();
357 assert_eq!(
358 Err("finish_load has been called".to_string()),
359 registry.add_index("dummy1", Case::Sensitive, &["erk"])
360 );
361 }
362
363 #[test]
364 fn can_find_table_row_after_finish() {
365 let mut tables: TableMap = HashMap::new();
366 let dummy = DummyEnrichmentTable::new();
367 tables.insert("dummy1".to_string(), Box::new(dummy));
368
369 let registry = super::TableRegistry::default();
370 registry.load(tables);
371 let tables_search = registry.as_readonly();
372
373 registry.finish_load();
374
375 assert_eq!(
376 Ok(ObjectMap::from([("field".into(), Value::from("result"))])),
377 tables_search.find_table_row(
378 "dummy1",
379 Case::Sensitive,
380 &[Condition::Equals {
381 field: "thing",
382 value: Value::from("thang"),
383 }],
384 None,
385 None,
386 None
387 )
388 );
389 }
390
391 #[test]
392 fn can_reload() {
393 let mut tables: TableMap = HashMap::new();
394 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
395
396 let registry = super::TableRegistry::default();
397 registry.load(tables);
398
399 assert_eq!(vec!["dummy1".to_string()], registry.table_ids());
400
401 registry.finish_load();
402
403 assert!(registry.table_ids().is_empty());
405
406 let mut tables: TableMap = HashMap::new();
407 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
408
409 registry.load(tables);
411 let mut table_ids = registry.table_ids();
412 table_ids.sort();
413
414 assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
415 }
416
417 #[test]
418 fn reloads_existing_tables() {
419 let mut tables: TableMap = HashMap::new();
420 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
421 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
422
423 let registry = super::TableRegistry::default();
424 registry.load(tables);
425 registry.finish_load();
426
427 assert!(registry.table_ids().is_empty());
429
430 let mut new_data = ObjectMap::new();
431 new_data.insert("thing".into(), Value::Null);
432
433 let mut tables: TableMap = HashMap::new();
434 tables.insert(
435 "dummy2".to_string(),
436 Box::new(DummyEnrichmentTable::new_with_data(new_data)),
437 );
438
439 registry.load(tables);
441 let tables = registry.loading.lock().unwrap();
442 let tables = tables.clone().unwrap();
443
444 assert_eq!(
446 Value::from("result"),
447 tables
448 .get("dummy1")
449 .unwrap()
450 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
451 .unwrap()
452 .get("field")
453 .cloned()
454 .unwrap()
455 );
456
457 assert_eq!(
459 Value::Null,
460 tables
461 .get("dummy2")
462 .unwrap()
463 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
464 .unwrap()
465 .get("thing")
466 .cloned()
467 .unwrap()
468 );
469 }
470}