enrichment/
tables.rs

1//! The Enrichment `TableRegistry` manages the collection of `Table`s loaded
2//! into Vector. Enrichment Tables go through two stages.
3//!
4//! ## 1. Writing
5//!
6//! The tables are loaded. There are two elements that need loading. The first
7//! is the actual data. This is loaded at config load time, the actual loading
8//! is performed by the implementation of the `EnrichmentTable` trait. Next, the
9//! tables are passed through Vectors `Transform` components, particularly the
10//! `Remap` transform. These Transforms are able to determine which fields we
11//! will want to lookup whilst Vector is running. They can notify the tables of
12//! these fields so that the data can be indexed.
13//!
14//! During this phase, the data is loaded within a single thread, so can be
15//! loaded directly into a `HashMap`.
16//!
17//! ## 2. Reading
18//!
19//! Once all the data has been loaded we can move to the next stage. This is
20//! signified by calling the `finish_load` method. At this point all the data is
21//! swapped into the `ArcSwap` of the `tables` field. `ArcSwap` provides
22//! lock-free read-only access to the data. From this point on we have fast,
23//! efficient read-only access and can no longer add indexes or otherwise mutate
24//! the data.
25//!
26//! This data within the `ArcSwap` is accessed through the `TableSearch`
27//! struct. Any transform that needs access to this can call
28//! `TableRegistry::as_readonly`. This returns a cheaply clonable struct that
29//! implements `vrl:EnrichmentTableSearch` through with the enrichment tables
30//! can be searched.
31
32use std::{
33    collections::HashMap,
34    sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use vrl::value::{ObjectMap, Value};
39
40use super::{Condition, Error, IndexHandle, InternalError, Table};
41use crate::Case;
42
43/// A hashmap of name => implementation of an enrichment table.
44type TableMap = HashMap<String, Box<dyn Table + Send + Sync>>;
45
46#[derive(Clone, Default)]
47pub struct TableRegistry {
48    loading: Arc<Mutex<Option<TableMap>>>,
49    tables: Arc<ArcSwap<Option<TableMap>>>,
50}
51
52/// Pessimistic Eq implementation for caching purposes
53impl PartialEq for TableRegistry {
54    fn eq(&self, other: &Self) -> bool {
55        Arc::ptr_eq(&self.tables, &other.tables) && Arc::ptr_eq(&self.loading, &other.loading)
56            || self.tables.load().is_none()
57                && other.tables.load().is_none()
58                && self.loading.lock().expect("lock poison").is_none()
59                && other.loading.lock().expect("lock poison").is_none()
60    }
61}
62impl Eq for TableRegistry {}
63
64impl TableRegistry {
65    /// Load the given Enrichment Tables into the registry. This can be new tables
66    /// loaded from the config, or tables that need to be reloaded because the
67    /// underlying data has changed.
68    ///
69    /// If there are no tables currently loaded into the registry, this is a
70    /// simple operation, we simply load the tables into the `loading` field.
71    ///
72    /// If there are tables that have already been loaded things get a bit more
73    /// complicated. This can occur when the config is reloaded. Vector will be
74    /// currently running and transforming events, thus the tables loaded into
75    /// the `tables` field could be in active use. Since there is no lock
76    /// against these tables, we cannot mutate this list. We do need to have a
77    /// full list of tables in the `loading` field since there may be some
78    /// transforms that will need to add indexes to these tables during the
79    /// reload.
80    ///
81    /// Our only option is to clone the data that is in `tables` and move it
82    /// into the `loading` field so it can be mutated. This could be a
83    /// potentially expensive operation. For the period whilst the config is
84    /// reloading we could potentially have double the enrichment data loaded
85    /// into memory.
86    ///
87    /// Once loading is complete, the data is swapped out of `loading` and we
88    /// return to a single copy of the tables.
89    ///
90    /// # Panics
91    ///
92    /// Panics if the Mutex is poisoned.
93    pub fn load(&self, mut tables: TableMap) {
94        let mut loading = self.loading.lock().unwrap();
95        let existing = self.tables.load();
96        if let Some(existing) = &**existing {
97            // We already have some tables
98            let extend = existing
99                .iter()
100                .filter(|(key, _)| !tables.contains_key(*key))
101                .map(|(key, value)| (key.clone(), value.clone()))
102                .collect::<HashMap<_, _>>();
103
104            tables.extend(extend);
105        }
106        match *loading {
107            None => *loading = Some(tables),
108            Some(ref mut loading) => loading.extend(tables),
109        }
110    }
111
112    /// Swap the data out of the `HashTable` into the `ArcSwap`.
113    ///
114    /// From this point we can no longer add indexes to the tables, but are now
115    /// allowed to read the data.
116    ///
117    /// # Panics
118    ///
119    /// Panics if the Mutex is poisoned.
120    pub fn finish_load(&self) {
121        let mut tables_lock = self.loading.lock().unwrap();
122        let tables = tables_lock.take();
123        self.tables.swap(Arc::new(tables));
124    }
125
126    /// Return a list of the available tables that we can write to.
127    ///
128    /// This only works in the writing stage and will acquire a lock to retrieve
129    /// the tables.
130    ///
131    /// # Panics
132    ///
133    /// Panics if the Mutex is poisoned.
134    pub fn table_ids(&self) -> Vec<String> {
135        let locked = self.loading.lock().unwrap();
136        match *locked {
137            Some(ref tables) => tables.keys().cloned().collect(),
138            None => Vec::new(),
139        }
140    }
141
142    /// Adds an index to the given Enrichment Table.
143    ///
144    /// If we are in the reading stage, this function will error.
145    ///
146    /// # Panics
147    ///
148    /// Panics if the Mutex is poisoned.
149    pub fn add_index(
150        &mut self,
151        table: &str,
152        case: Case,
153        fields: &[&str],
154    ) -> Result<IndexHandle, Error> {
155        let mut locked = self.loading.lock().unwrap();
156
157        match *locked {
158            None => Err(Error::Internal {
159                source: InternalError::FinishLoadCalled,
160            }),
161            Some(ref mut tables) => match tables.get_mut(table) {
162                None => Err(Error::TableNotLoaded {
163                    table: table.to_string(),
164                }),
165                Some(table) => table.add_index(case, fields),
166            },
167        }
168    }
169
170    /// Returns a cheaply clonable struct through that provides lock free read
171    /// access to the enrichment tables.
172    pub fn as_readonly(&self) -> TableSearch {
173        TableSearch(self.tables.clone())
174    }
175
176    /// Returns the indexes that have been applied to the given table.
177    /// If the table is reloaded we need these to reapply them to the new reloaded tables.
178    pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
179        match &**self.tables.load() {
180            Some(tables) => tables
181                .get(table)
182                .map(|table| table.index_fields())
183                .unwrap_or_default(),
184            None => Vec::new(),
185        }
186    }
187
188    /// Checks if the table needs reloading.
189    /// If in doubt (the table isn't in our list) we return true.
190    pub fn needs_reload(&self, table: &str) -> bool {
191        match &**self.tables.load() {
192            Some(tables) => tables
193                .get(table)
194                .map(|table| table.needs_reload())
195                .unwrap_or(true),
196            None => true,
197        }
198    }
199}
200
201impl std::fmt::Debug for TableRegistry {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        fmt_enrichment_table(f, "TableRegistry", &self.tables)
204    }
205}
206
207/// Provides read only access to the enrichment tables via the
208/// `vrl::EnrichmentTableSearch` trait. Cloning this object is designed to be
209/// cheap. The underlying data will be shared by all clones.
210#[derive(Clone, Default)]
211pub struct TableSearch(Arc<ArcSwap<Option<TableMap>>>);
212
213impl TableSearch {
214    /// Search the given table to find the data.
215    ///
216    /// If we are in the writing stage, this function will return an error.
217    pub fn find_table_row<'a>(
218        &self,
219        table: &str,
220        case: Case,
221        condition: &'a [Condition<'a>],
222        select: Option<&[String]>,
223        wildcard: Option<&Value>,
224        index: Option<IndexHandle>,
225    ) -> Result<ObjectMap, Error> {
226        let tables = self.0.load();
227        if let Some(ref tables) = **tables {
228            match tables.get(table) {
229                None => Err(Error::TableNotLoaded {
230                    table: table.to_string(),
231                }),
232                Some(table) => table.find_table_row(case, condition, select, wildcard, index),
233            }
234        } else {
235            Err(Error::Internal {
236                source: InternalError::FinishLoadNotCalled,
237            })
238        }
239    }
240
241    /// Search the enrichment table data with the given condition.
242    /// All conditions must match (AND).
243    /// Can return multiple matched records
244    pub fn find_table_rows<'a>(
245        &self,
246        table: &str,
247        case: Case,
248        condition: &'a [Condition<'a>],
249        select: Option<&[String]>,
250        wildcard: Option<&Value>,
251        index: Option<IndexHandle>,
252    ) -> Result<Vec<ObjectMap>, Error> {
253        let tables = self.0.load();
254        if let Some(ref tables) = **tables {
255            match tables.get(table) {
256                None => Err(Error::TableNotLoaded {
257                    table: table.to_string(),
258                }),
259                Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
260            }
261        } else {
262            Err(Error::Internal {
263                source: InternalError::FinishLoadNotCalled,
264            })
265        }
266    }
267}
268
269impl std::fmt::Debug for TableSearch {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        fmt_enrichment_table(f, "EnrichmentTableSearch", &self.0)
272    }
273}
274
275/// Provide some fairly rudimentary debug output for enrichment tables.
276fn fmt_enrichment_table(
277    f: &mut std::fmt::Formatter<'_>,
278    name: &'static str,
279    tables: &Arc<ArcSwap<Option<TableMap>>>,
280) -> std::fmt::Result {
281    let tables = tables.load();
282    match **tables {
283        Some(ref tables) => {
284            let mut tables = tables.iter().fold(String::from("("), |mut s, (key, _)| {
285                s.push_str(key);
286                s.push_str(", ");
287                s
288            });
289
290            tables.truncate(std::cmp::max(tables.len(), 0));
291            tables.push(')');
292
293            write!(f, "{name} {tables}")
294        }
295        None => write!(f, "{name} loading"),
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use vrl::value::Value;
302
303    use super::*;
304    use crate::test_util::DummyEnrichmentTable;
305
306    #[test]
307    fn tables_loaded() {
308        let mut tables: TableMap = HashMap::new();
309        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
310        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
311
312        let registry = super::TableRegistry::default();
313        registry.load(tables);
314        let mut result = registry.table_ids();
315        result.sort();
316        assert_eq!(vec!["dummy1", "dummy2"], result);
317    }
318
319    #[test]
320    fn can_add_indexes() {
321        let mut tables: TableMap = HashMap::new();
322        let indexes = Arc::new(Mutex::new(Vec::new()));
323        let dummy = DummyEnrichmentTable::new_with_index(indexes.clone());
324        tables.insert("dummy1".to_string(), Box::new(dummy));
325        let mut registry = super::TableRegistry::default();
326        registry.load(tables);
327        assert_eq!(
328            Ok(IndexHandle(0)),
329            registry.add_index("dummy1", Case::Sensitive, &["erk"])
330        );
331
332        let indexes = indexes.lock().unwrap();
333        assert_eq!(vec!["erk".to_string()], *indexes[0]);
334    }
335
336    #[test]
337    fn can_not_find_table_row_before_finish() {
338        let mut tables: TableMap = HashMap::new();
339        let dummy = DummyEnrichmentTable::new();
340        tables.insert("dummy1".to_string(), Box::new(dummy));
341        let registry = super::TableRegistry::default();
342        registry.load(tables);
343        let tables = registry.as_readonly();
344
345        assert_eq!(
346            Err(Error::Internal {
347                source: InternalError::FinishLoadNotCalled,
348            }),
349            tables.find_table_row(
350                "dummy1",
351                Case::Sensitive,
352                &[Condition::Equals {
353                    field: "thing",
354                    value: Value::from("thang"),
355                }],
356                None,
357                None,
358                None
359            )
360        );
361    }
362
363    #[test]
364    fn can_not_add_indexes_after_finish() {
365        let mut tables: TableMap = HashMap::new();
366        let dummy = DummyEnrichmentTable::new();
367        tables.insert("dummy1".to_string(), Box::new(dummy));
368        let mut registry = super::TableRegistry::default();
369        registry.load(tables);
370        registry.finish_load();
371        assert_eq!(
372            Err(Error::Internal {
373                source: InternalError::FinishLoadCalled,
374            }),
375            registry.add_index("dummy1", Case::Sensitive, &["erk"])
376        );
377    }
378
379    #[test]
380    fn can_find_table_row_after_finish() {
381        let mut tables: TableMap = HashMap::new();
382        let dummy = DummyEnrichmentTable::new();
383        tables.insert("dummy1".to_string(), Box::new(dummy));
384
385        let registry = super::TableRegistry::default();
386        registry.load(tables);
387        let tables_search = registry.as_readonly();
388
389        registry.finish_load();
390
391        assert_eq!(
392            Ok(ObjectMap::from([("field".into(), Value::from("result"))])),
393            tables_search.find_table_row(
394                "dummy1",
395                Case::Sensitive,
396                &[Condition::Equals {
397                    field: "thing",
398                    value: Value::from("thang"),
399                }],
400                None,
401                None,
402                None
403            )
404        );
405    }
406
407    #[test]
408    fn can_reload() {
409        let mut tables: TableMap = HashMap::new();
410        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
411
412        let registry = super::TableRegistry::default();
413        registry.load(tables);
414
415        assert_eq!(vec!["dummy1".to_string()], registry.table_ids());
416
417        registry.finish_load();
418
419        // After we finish load there are no tables in the list
420        assert!(registry.table_ids().is_empty());
421
422        let mut tables: TableMap = HashMap::new();
423        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
424
425        // A load should put both tables back into the list.
426        registry.load(tables);
427        let mut table_ids = registry.table_ids();
428        table_ids.sort();
429
430        assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
431    }
432
433    #[test]
434    fn reloads_existing_tables() {
435        let mut tables: TableMap = HashMap::new();
436        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
437        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
438
439        let registry = super::TableRegistry::default();
440        registry.load(tables);
441        registry.finish_load();
442
443        // After we finish load there are no tables in the list
444        assert!(registry.table_ids().is_empty());
445
446        let mut new_data = ObjectMap::new();
447        new_data.insert("thing".into(), Value::Null);
448
449        let mut tables: TableMap = HashMap::new();
450        tables.insert(
451            "dummy2".to_string(),
452            Box::new(DummyEnrichmentTable::new_with_data(new_data)),
453        );
454
455        // A load should put both tables back into the list.
456        registry.load(tables);
457        let tables = registry.loading.lock().unwrap();
458        let tables = tables.clone().unwrap();
459
460        // dummy1 should still have old data.
461        assert_eq!(
462            Value::from("result"),
463            tables
464                .get("dummy1")
465                .unwrap()
466                .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
467                .unwrap()
468                .get("field")
469                .cloned()
470                .unwrap()
471        );
472
473        // dummy2 should have new data.
474        assert_eq!(
475            Value::Null,
476            tables
477                .get("dummy2")
478                .unwrap()
479                .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
480                .unwrap()
481                .get("thing")
482                .cloned()
483                .unwrap()
484        );
485    }
486}