1use std::{
33 collections::HashMap,
34 sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use vrl::value::{ObjectMap, Value};
39
40use super::{Condition, IndexHandle, Table};
41use crate::Case;
42
43type TableMap = HashMap<String, Box<dyn Table + Send + Sync>>;
45
46#[derive(Clone, Default)]
47pub struct TableRegistry {
48 loading: Arc<Mutex<Option<TableMap>>>,
49 tables: Arc<ArcSwap<Option<TableMap>>>,
50}
51
52impl PartialEq for TableRegistry {
54 fn eq(&self, other: &Self) -> bool {
55 Arc::ptr_eq(&self.tables, &other.tables) && Arc::ptr_eq(&self.loading, &other.loading)
56 || self.tables.load().is_none()
57 && other.tables.load().is_none()
58 && self.loading.lock().expect("lock poison").is_none()
59 && other.loading.lock().expect("lock poison").is_none()
60 }
61}
62impl Eq for TableRegistry {}
63
64impl TableRegistry {
65 pub fn load(&self, mut tables: TableMap) {
94 let mut loading = self.loading.lock().unwrap();
95 let existing = self.tables.load();
96 if let Some(existing) = &**existing {
97 let extend = existing
99 .iter()
100 .filter(|(key, _)| !tables.contains_key(*key))
101 .map(|(key, value)| (key.clone(), value.clone()))
102 .collect::<HashMap<_, _>>();
103
104 tables.extend(extend);
105 }
106 match *loading {
107 None => *loading = Some(tables),
108 Some(ref mut loading) => loading.extend(tables),
109 }
110 }
111
112 pub fn finish_load(&self) {
121 let mut tables_lock = self.loading.lock().unwrap();
122 let tables = tables_lock.take();
123 self.tables.swap(Arc::new(tables));
124 }
125
126 pub fn table_ids(&self) -> Vec<String> {
135 let locked = self.loading.lock().unwrap();
136 match *locked {
137 Some(ref tables) => tables.keys().cloned().collect(),
138 None => Vec::new(),
139 }
140 }
141
142 pub fn add_index(
150 &mut self,
151 table: &str,
152 case: Case,
153 fields: &[&str],
154 ) -> Result<IndexHandle, String> {
155 let mut locked = self.loading.lock().unwrap();
156
157 match *locked {
158 None => Err("finish_load has been called".to_string()),
159 Some(ref mut tables) => match tables.get_mut(table) {
160 None => Err(format!("table '{table}' not loaded")),
161 Some(table) => table.add_index(case, fields),
162 },
163 }
164 }
165
166 pub fn as_readonly(&self) -> TableSearch {
169 TableSearch(self.tables.clone())
170 }
171
172 pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
175 match &**self.tables.load() {
176 Some(tables) => tables
177 .get(table)
178 .map(|table| table.index_fields())
179 .unwrap_or_default(),
180 None => Vec::new(),
181 }
182 }
183
184 pub fn needs_reload(&self, table: &str) -> bool {
187 match &**self.tables.load() {
188 Some(tables) => tables
189 .get(table)
190 .map(|table| table.needs_reload())
191 .unwrap_or(true),
192 None => true,
193 }
194 }
195}
196
197impl std::fmt::Debug for TableRegistry {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 fmt_enrichment_table(f, "TableRegistry", &self.tables)
200 }
201}
202
203#[derive(Clone, Default)]
207pub struct TableSearch(Arc<ArcSwap<Option<TableMap>>>);
208
209impl TableSearch {
210 pub fn find_table_row<'a>(
214 &self,
215 table: &str,
216 case: Case,
217 condition: &'a [Condition<'a>],
218 select: Option<&[String]>,
219 wildcard: Option<&Value>,
220 index: Option<IndexHandle>,
221 ) -> Result<ObjectMap, String> {
222 let tables = self.0.load();
223 if let Some(ref tables) = **tables {
224 match tables.get(table) {
225 None => Err(format!("table {table} not loaded")),
226 Some(table) => table.find_table_row(case, condition, select, wildcard, index),
227 }
228 } else {
229 Err("finish_load not called".to_string())
230 }
231 }
232
233 pub fn find_table_rows<'a>(
237 &self,
238 table: &str,
239 case: Case,
240 condition: &'a [Condition<'a>],
241 select: Option<&[String]>,
242 wildcard: Option<&Value>,
243 index: Option<IndexHandle>,
244 ) -> Result<Vec<ObjectMap>, String> {
245 let tables = self.0.load();
246 if let Some(ref tables) = **tables {
247 match tables.get(table) {
248 None => Err(format!("table {table} not loaded")),
249 Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
250 }
251 } else {
252 Err("finish_load not called".to_string())
253 }
254 }
255}
256
257impl std::fmt::Debug for TableSearch {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 fmt_enrichment_table(f, "EnrichmentTableSearch", &self.0)
260 }
261}
262
263fn fmt_enrichment_table(
265 f: &mut std::fmt::Formatter<'_>,
266 name: &'static str,
267 tables: &Arc<ArcSwap<Option<TableMap>>>,
268) -> std::fmt::Result {
269 let tables = tables.load();
270 match **tables {
271 Some(ref tables) => {
272 let mut tables = tables.iter().fold(String::from("("), |mut s, (key, _)| {
273 s.push_str(key);
274 s.push_str(", ");
275 s
276 });
277
278 tables.truncate(std::cmp::max(tables.len(), 0));
279 tables.push(')');
280
281 write!(f, "{name} {tables}")
282 }
283 None => write!(f, "{name} loading"),
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::test_util::DummyEnrichmentTable;
291 use vrl::value::Value;
292
293 #[test]
294 fn tables_loaded() {
295 let mut tables: TableMap = HashMap::new();
296 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
297 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
298
299 let registry = super::TableRegistry::default();
300 registry.load(tables);
301 let mut result = registry.table_ids();
302 result.sort();
303 assert_eq!(vec!["dummy1", "dummy2"], result);
304 }
305
306 #[test]
307 fn can_add_indexes() {
308 let mut tables: TableMap = HashMap::new();
309 let indexes = Arc::new(Mutex::new(Vec::new()));
310 let dummy = DummyEnrichmentTable::new_with_index(indexes.clone());
311 tables.insert("dummy1".to_string(), Box::new(dummy));
312 let mut registry = super::TableRegistry::default();
313 registry.load(tables);
314 assert_eq!(
315 Ok(IndexHandle(0)),
316 registry.add_index("dummy1", Case::Sensitive, &["erk"])
317 );
318
319 let indexes = indexes.lock().unwrap();
320 assert_eq!(vec!["erk".to_string()], *indexes[0]);
321 }
322
323 #[test]
324 fn can_not_find_table_row_before_finish() {
325 let mut tables: TableMap = HashMap::new();
326 let dummy = DummyEnrichmentTable::new();
327 tables.insert("dummy1".to_string(), Box::new(dummy));
328 let registry = super::TableRegistry::default();
329 registry.load(tables);
330 let tables = registry.as_readonly();
331
332 assert_eq!(
333 Err("finish_load not called".to_string()),
334 tables.find_table_row(
335 "dummy1",
336 Case::Sensitive,
337 &[Condition::Equals {
338 field: "thing",
339 value: Value::from("thang"),
340 }],
341 None,
342 None,
343 None
344 )
345 );
346 }
347
348 #[test]
349 fn can_not_add_indexes_after_finish() {
350 let mut tables: TableMap = HashMap::new();
351 let dummy = DummyEnrichmentTable::new();
352 tables.insert("dummy1".to_string(), Box::new(dummy));
353 let mut registry = super::TableRegistry::default();
354 registry.load(tables);
355 registry.finish_load();
356 assert_eq!(
357 Err("finish_load has been called".to_string()),
358 registry.add_index("dummy1", Case::Sensitive, &["erk"])
359 );
360 }
361
362 #[test]
363 fn can_find_table_row_after_finish() {
364 let mut tables: TableMap = HashMap::new();
365 let dummy = DummyEnrichmentTable::new();
366 tables.insert("dummy1".to_string(), Box::new(dummy));
367
368 let registry = super::TableRegistry::default();
369 registry.load(tables);
370 let tables_search = registry.as_readonly();
371
372 registry.finish_load();
373
374 assert_eq!(
375 Ok(ObjectMap::from([("field".into(), Value::from("result"))])),
376 tables_search.find_table_row(
377 "dummy1",
378 Case::Sensitive,
379 &[Condition::Equals {
380 field: "thing",
381 value: Value::from("thang"),
382 }],
383 None,
384 None,
385 None
386 )
387 );
388 }
389
390 #[test]
391 fn can_reload() {
392 let mut tables: TableMap = HashMap::new();
393 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
394
395 let registry = super::TableRegistry::default();
396 registry.load(tables);
397
398 assert_eq!(vec!["dummy1".to_string()], registry.table_ids());
399
400 registry.finish_load();
401
402 assert!(registry.table_ids().is_empty());
404
405 let mut tables: TableMap = HashMap::new();
406 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
407
408 registry.load(tables);
410 let mut table_ids = registry.table_ids();
411 table_ids.sort();
412
413 assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
414 }
415
416 #[test]
417 fn reloads_existing_tables() {
418 let mut tables: TableMap = HashMap::new();
419 tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
420 tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
421
422 let registry = super::TableRegistry::default();
423 registry.load(tables);
424 registry.finish_load();
425
426 assert!(registry.table_ids().is_empty());
428
429 let mut new_data = ObjectMap::new();
430 new_data.insert("thing".into(), Value::Null);
431
432 let mut tables: TableMap = HashMap::new();
433 tables.insert(
434 "dummy2".to_string(),
435 Box::new(DummyEnrichmentTable::new_with_data(new_data)),
436 );
437
438 registry.load(tables);
440 let tables = registry.loading.lock().unwrap();
441 let tables = tables.clone().unwrap();
442
443 assert_eq!(
445 Value::from("result"),
446 tables
447 .get("dummy1")
448 .unwrap()
449 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
450 .unwrap()
451 .get("field")
452 .cloned()
453 .unwrap()
454 );
455
456 assert_eq!(
458 Value::Null,
459 tables
460 .get("dummy2")
461 .unwrap()
462 .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
463 .unwrap()
464 .get("thing")
465 .cloned()
466 .unwrap()
467 );
468 }
469}