1use std::{
33 collections::HashMap,
34 sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use vrl::value::{ObjectMap, Value};
39
40use super::{Condition, Error, IndexHandle, InternalError, Table};
41use crate::Case;
42
43type TableMap = HashMap<String, Box<dyn Table + Send + Sync>>;
45
46#[derive(Clone, Default)]
47pub struct TableRegistry {
48 loading: Arc<Mutex<Option<TableMap>>>,
49 tables: Arc<ArcSwap<Option<TableMap>>>,
50}
51
52impl PartialEq for TableRegistry {
54 fn eq(&self, other: &Self) -> bool {
55 Arc::ptr_eq(&self.tables, &other.tables) && Arc::ptr_eq(&self.loading, &other.loading)
56 || self.tables.load().is_none()
57 && other.tables.load().is_none()
58 && self.loading.lock().expect("lock poison").is_none()
59 && other.loading.lock().expect("lock poison").is_none()
60 }
61}
62impl Eq for TableRegistry {}
63
64impl TableRegistry {
65 pub fn load(&self, mut tables: TableMap) {
94 let mut loading = self.loading.lock().unwrap();
95 let existing = self.tables.load();
96 if let Some(existing) = &**existing {
97 let extend = existing
99 .iter()
100 .filter(|(key, _)| !tables.contains_key(*key))
101 .map(|(key, value)| (key.clone(), value.clone()))
102 .collect::<HashMap<_, _>>();
103
104 tables.extend(extend);
105 }
106 match *loading {
107 None => *loading = Some(tables),
108 Some(ref mut loading) => loading.extend(tables),
109 }
110 }
111
112 pub fn finish_load(&self) {
121 let mut tables_lock = self.loading.lock().unwrap();
122 let tables = tables_lock.take();
123 self.tables.swap(Arc::new(tables));
124 }
125
126 pub fn table_ids(&self) -> Vec<String> {
135 let locked = self.loading.lock().unwrap();
136 match *locked {
137 Some(ref tables) => tables.keys().cloned().collect(),
138 None => Vec::new(),
139 }
140 }
141
142 pub fn add_index(
150 &mut self,
151 table: &str,
152 case: Case,
153 fields: &[&str],
154 ) -> Result<IndexHandle, Error> {
155 let mut locked = self.loading.lock().unwrap();
156
157 match *locked {
158 None => Err(Error::Internal {
159 source: InternalError::FinishLoadCalled,
160 }),
161 Some(ref mut tables) => match tables.get_mut(table) {
162 None => Err(Error::TableNotLoaded {
163 table: table.to_string(),
164 }),
165 Some(table) => table.add_index(case, fields),
166 },
167 }
168 }
169
170 pub fn as_readonly(&self) -> TableSearch {
173 TableSearch(self.tables.clone())
174 }
175
176 pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
179 match &**self.tables.load() {
180 Some(tables) => tables
181 .get(table)
182 .map(|table| table.index_fields())
183 .unwrap_or_default(),
184 None => Vec::new(),
185 }
186 }
187
188 pub fn needs_reload(&self, table: &str) -> bool {
191 match &**self.tables.load() {
192 Some(tables) => tables
193 .get(table)
194 .map(|table| table.needs_reload())
195 .unwrap_or(true),
196 None => true,
197 }
198 }
199}
200
201impl std::fmt::Debug for TableRegistry {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 fmt_enrichment_table(f, "TableRegistry", &self.tables)
204 }
205}
206
207#[derive(Clone, Default)]
211pub struct TableSearch(Arc<ArcSwap<Option<TableMap>>>);
212
213impl TableSearch {
214 pub fn find_table_row<'a>(
218 &self,
219 table: &str,
220 case: Case,
221 condition: &'a [Condition<'a>],
222 select: Option<&[String]>,
223 wildcard: Option<&Value>,
224 index: Option<IndexHandle>,
225 ) -> Result<ObjectMap, Error> {
226 let tables = self.0.load();
227 if let Some(ref tables) = **tables {
228 match tables.get(table) {
229 None => Err(Error::TableNotLoaded {
230 table: table.to_string(),
231 }),
232 Some(table) => table.find_table_row(case, condition, select, wildcard, index),
233 }
234 } else {
235 Err(Error::Internal {
236 source: InternalError::FinishLoadNotCalled,
237 })
238 }
239 }
240
241 pub fn find_table_rows<'a>(
245 &self,
246 table: &str,
247 case: Case,
248 condition: &'a [Condition<'a>],
249 select: Option<&[String]>,
250 wildcard: Option<&Value>,
251 index: Option<IndexHandle>,
252 ) -> Result<Vec<ObjectMap>, Error> {
253 let tables = self.0.load();
254 if let Some(ref tables) = **tables {
255 match tables.get(table) {
256 None => Err(Error::TableNotLoaded {
257 table: table.to_string(),
258 }),
259 Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
260 }
261 } else {
262 Err(Error::Internal {
263 source: InternalError::FinishLoadNotCalled,
264 })
265 }
266 }
267}
268
269impl std::fmt::Debug for TableSearch {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 fmt_enrichment_table(f, "EnrichmentTableSearch", &self.0)
272 }
273}
274
275fn fmt_enrichment_table(
277 f: &mut std::fmt::Formatter<'_>,
278 name: &'static str,
279 tables: &Arc<ArcSwap<Option<TableMap>>>,
280) -> std::fmt::Result {
281 let tables = tables.load();
282 match **tables {
283 Some(ref tables) => {
284 let mut tables = tables.iter().fold(String::from("("), |mut s, (key, _)| {
285 s.push_str(key);
286 s.push_str(", ");
287 s
288 });
289
290 tables.truncate(std::cmp::max(tables.len(), 0));
291 tables.push(')');
292
293 write!(f, "{name} {tables}")
294 }
295 None => write!(f, "{name} loading"),
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use vrl::value::Value;
302
303 use super::*;
304 use crate::test_util::DummyEnrichmentTable;
305
306 #[test]
307 fn tables_loaded() {
308 let mut tables: TableMap = HashMap::new();
309 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
310 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
311
312 let registry = super::TableRegistry::default();
313 registry.load(tables);
314 let mut result = registry.table_ids();
315 result.sort();
316 assert_eq!(vec!["dummy1", "dummy2"], result);
317 }
318
319 #[test]
320 fn can_add_indexes() {
321 let mut tables: TableMap = HashMap::new();
322 let indexes = Arc::new(Mutex::new(Vec::new()));
323 let dummy = DummyEnrichmentTable::new_with_index(indexes.clone());
324 tables.insert("dummy1".to_string(), Box::new(dummy));
325 let mut registry = super::TableRegistry::default();
326 registry.load(tables);
327 assert_eq!(
328 Ok(IndexHandle(0)),
329 registry.add_index("dummy1", Case::Sensitive, &["erk"])
330 );
331
332 let indexes = indexes.lock().unwrap();
333 assert_eq!(vec!["erk".to_string()], *indexes[0]);
334 }
335
336 #[test]
337 fn can_not_find_table_row_before_finish() {
338 let mut tables: TableMap = HashMap::new();
339 let dummy = DummyEnrichmentTable::new();
340 tables.insert("dummy1".to_string(), Box::new(dummy));
341 let registry = super::TableRegistry::default();
342 registry.load(tables);
343 let tables = registry.as_readonly();
344
345 assert_eq!(
346 Err(Error::Internal {
347 source: InternalError::FinishLoadNotCalled,
348 }),
349 tables.find_table_row(
350 "dummy1",
351 Case::Sensitive,
352 &[Condition::Equals {
353 field: "thing",
354 value: Value::from("thang"),
355 }],
356 None,
357 None,
358 None
359 )
360 );
361 }
362
363 #[test]
364 fn can_not_add_indexes_after_finish() {
365 let mut tables: TableMap = HashMap::new();
366 let dummy = DummyEnrichmentTable::new();
367 tables.insert("dummy1".to_string(), Box::new(dummy));
368 let mut registry = super::TableRegistry::default();
369 registry.load(tables);
370 registry.finish_load();
371 assert_eq!(
372 Err(Error::Internal {
373 source: InternalError::FinishLoadCalled,
374 }),
375 registry.add_index("dummy1", Case::Sensitive, &["erk"])
376 );
377 }
378
379 #[test]
380 fn can_find_table_row_after_finish() {
381 let mut tables: TableMap = HashMap::new();
382 let dummy = DummyEnrichmentTable::new();
383 tables.insert("dummy1".to_string(), Box::new(dummy));
384
385 let registry = super::TableRegistry::default();
386 registry.load(tables);
387 let tables_search = registry.as_readonly();
388
389 registry.finish_load();
390
391 assert_eq!(
392 Ok(ObjectMap::from([("field".into(), Value::from("result"))])),
393 tables_search.find_table_row(
394 "dummy1",
395 Case::Sensitive,
396 &[Condition::Equals {
397 field: "thing",
398 value: Value::from("thang"),
399 }],
400 None,
401 None,
402 None
403 )
404 );
405 }
406
407 #[test]
408 fn can_reload() {
409 let mut tables: TableMap = HashMap::new();
410 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
411
412 let registry = super::TableRegistry::default();
413 registry.load(tables);
414
415 assert_eq!(vec!["dummy1".to_string()], registry.table_ids());
416
417 registry.finish_load();
418
419 assert!(registry.table_ids().is_empty());
421
422 let mut tables: TableMap = HashMap::new();
423 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
424
425 registry.load(tables);
427 let mut table_ids = registry.table_ids();
428 table_ids.sort();
429
430 assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
431 }
432
433 #[test]
434 fn reloads_existing_tables() {
435 let mut tables: TableMap = HashMap::new();
436 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
437 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
438
439 let registry = super::TableRegistry::default();
440 registry.load(tables);
441 registry.finish_load();
442
443 assert!(registry.table_ids().is_empty());
445
446 let mut new_data = ObjectMap::new();
447 new_data.insert("thing".into(), Value::Null);
448
449 let mut tables: TableMap = HashMap::new();
450 tables.insert(
451 "dummy2".to_string(),
452 Box::new(DummyEnrichmentTable::new_with_data(new_data)),
453 );
454
455 registry.load(tables);
457 let tables = registry.loading.lock().unwrap();
458 let tables = tables.clone().unwrap();
459
460 assert_eq!(
462 Value::from("result"),
463 tables
464 .get("dummy1")
465 .unwrap()
466 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
467 .unwrap()
468 .get("field")
469 .cloned()
470 .unwrap()
471 );
472
473 assert_eq!(
475 Value::Null,
476 tables
477 .get("dummy2")
478 .unwrap()
479 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
480 .unwrap()
481 .get("thing")
482 .cloned()
483 .unwrap()
484 );
485 }
486}