enrichment/
tables.rs

1//! The Enrichment `TableRegistry` manages the collection of `Table`s loaded
2//! into Vector. Enrichment Tables go through two stages.
3//!
4//! ## 1. Writing
5//!
6//! The tables are loaded. There are two elements that need loading. The first
7//! is the actual data. This is loaded at config load time, the actual loading
8//! is performed by the implementation of the `EnrichmentTable` trait. Next, the
9//! tables are passed through Vectors `Transform` components, particularly the
10//! `Remap` transform. These Transforms are able to determine which fields we
11//! will want to lookup whilst Vector is running. They can notify the tables of
12//! these fields so that the data can be indexed.
13//!
14//! During this phase, the data is loaded within a single thread, so can be
15//! loaded directly into a `HashMap`.
16//!
17//! ## 2. Reading
18//!
19//! Once all the data has been loaded we can move to the next stage. This is
20//! signified by calling the `finish_load` method. At this point all the data is
21//! swapped into the `ArcSwap` of the `tables` field. `ArcSwap` provides
22//! lock-free read-only access to the data. From this point on we have fast,
23//! efficient read-only access and can no longer add indexes or otherwise mutate
24//! the data.
25//!
26//! This data within the `ArcSwap` is accessed through the `TableSearch`
27//! struct. Any transform that needs access to this can call
28//! `TableRegistry::as_readonly`. This returns a cheaply clonable struct that
29//! implements `vrl:EnrichmentTableSearch` through with the enrichment tables
30//! can be searched.
31
32use std::{
33    collections::HashMap,
34    sync::{Arc, Mutex},
35};
36
37use arc_swap::ArcSwap;
38use vrl::value::{ObjectMap, Value};
39
40use super::{Condition, IndexHandle, Table};
41use crate::Case;
42
43/// A hashmap of name => implementation of an enrichment table.
44type TableMap = HashMap<String, Box<dyn Table + Send + Sync>>;
45
46#[derive(Clone, Default)]
47pub struct TableRegistry {
48    loading: Arc<Mutex<Option<TableMap>>>,
49    tables: Arc<ArcSwap<Option<TableMap>>>,
50}
51
52/// Pessimistic Eq implementation for caching purposes
53impl PartialEq for TableRegistry {
54    fn eq(&self, other: &Self) -> bool {
55        Arc::ptr_eq(&self.tables, &other.tables) && Arc::ptr_eq(&self.loading, &other.loading)
56            || self.tables.load().is_none()
57                && other.tables.load().is_none()
58                && self.loading.lock().expect("lock poison").is_none()
59                && other.loading.lock().expect("lock poison").is_none()
60    }
61}
62impl Eq for TableRegistry {}
63
64impl TableRegistry {
65    /// Load the given Enrichment Tables into the registry. This can be new tables
66    /// loaded from the config, or tables that need to be reloaded because the
67    /// underlying data has changed.
68    ///
69    /// If there are no tables currently loaded into the registry, this is a
70    /// simple operation, we simply load the tables into the `loading` field.
71    ///
72    /// If there are tables that have already been loaded things get a bit more
73    /// complicated. This can occur when the config is reloaded. Vector will be
74    /// currently running and transforming events, thus the tables loaded into
75    /// the `tables` field could be in active use. Since there is no lock
76    /// against these tables, we cannot mutate this list. We do need to have a
77    /// full list of tables in the `loading` field since there may be some
78    /// transforms that will need to add indexes to these tables during the
79    /// reload.
80    ///
81    /// Our only option is to clone the data that is in `tables` and move it
82    /// into the `loading` field so it can be mutated. This could be a
83    /// potentially expensive operation. For the period whilst the config is
84    /// reloading we could potentially have double the enrichment data loaded
85    /// into memory.
86    ///
87    /// Once loading is complete, the data is swapped out of `loading` and we
88    /// return to a single copy of the tables.
89    ///
90    /// # Panics
91    ///
92    /// Panics if the Mutex is poisoned.
93    pub fn load(&self, mut tables: TableMap) {
94        let mut loading = self.loading.lock().unwrap();
95        let existing = self.tables.load();
96        if let Some(existing) = &**existing {
97            // We already have some tables
98            let extend = existing
99                .iter()
100                .filter(|(key, _)| !tables.contains_key(*key))
101                .map(|(key, value)| (key.clone(), value.clone()))
102                .collect::<HashMap<_, _>>();
103
104            tables.extend(extend);
105        }
106        match *loading {
107            None => *loading = Some(tables),
108            Some(ref mut loading) => loading.extend(tables),
109        }
110    }
111
112    /// Swap the data out of the `HashTable` into the `ArcSwap`.
113    ///
114    /// From this point we can no longer add indexes to the tables, but are now
115    /// allowed to read the data.
116    ///
117    /// # Panics
118    ///
119    /// Panics if the Mutex is poisoned.
120    pub fn finish_load(&self) {
121        let mut tables_lock = self.loading.lock().unwrap();
122        let tables = tables_lock.take();
123        self.tables.swap(Arc::new(tables));
124    }
125
126    /// Return a list of the available tables that we can write to.
127    ///
128    /// This only works in the writing stage and will acquire a lock to retrieve
129    /// the tables.
130    ///
131    /// # Panics
132    ///
133    /// Panics if the Mutex is poisoned.
134    pub fn table_ids(&self) -> Vec<String> {
135        let locked = self.loading.lock().unwrap();
136        match *locked {
137            Some(ref tables) => tables.keys().cloned().collect(),
138            None => Vec::new(),
139        }
140    }
141
142    /// Adds an index to the given Enrichment Table.
143    ///
144    /// If we are in the reading stage, this function will error.
145    ///
146    /// # Panics
147    ///
148    /// Panics if the Mutex is poisoned.
149    pub fn add_index(
150        &mut self,
151        table: &str,
152        case: Case,
153        fields: &[&str],
154    ) -> Result<IndexHandle, String> {
155        let mut locked = self.loading.lock().unwrap();
156
157        match *locked {
158            None => Err("finish_load has been called".to_string()),
159            Some(ref mut tables) => match tables.get_mut(table) {
160                None => Err(format!("table '{table}' not loaded")),
161                Some(table) => table.add_index(case, fields),
162            },
163        }
164    }
165
166    /// Returns a cheaply clonable struct through that provides lock free read
167    /// access to the enrichment tables.
168    pub fn as_readonly(&self) -> TableSearch {
169        TableSearch(self.tables.clone())
170    }
171
172    /// Returns the indexes that have been applied to the given table.
173    /// If the table is reloaded we need these to reapply them to the new reloaded tables.
174    pub fn index_fields(&self, table: &str) -> Vec<(Case, Vec<String>)> {
175        match &**self.tables.load() {
176            Some(tables) => tables
177                .get(table)
178                .map(|table| table.index_fields())
179                .unwrap_or_default(),
180            None => Vec::new(),
181        }
182    }
183
184    /// Checks if the table needs reloading.
185    /// If in doubt (the table isn't in our list) we return true.
186    pub fn needs_reload(&self, table: &str) -> bool {
187        match &**self.tables.load() {
188            Some(tables) => tables
189                .get(table)
190                .map(|table| table.needs_reload())
191                .unwrap_or(true),
192            None => true,
193        }
194    }
195}
196
197impl std::fmt::Debug for TableRegistry {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        fmt_enrichment_table(f, "TableRegistry", &self.tables)
200    }
201}
202
203/// Provides read only access to the enrichment tables via the
204/// `vrl::EnrichmentTableSearch` trait. Cloning this object is designed to be
205/// cheap. The underlying data will be shared by all clones.
206#[derive(Clone, Default)]
207pub struct TableSearch(Arc<ArcSwap<Option<TableMap>>>);
208
209impl TableSearch {
210    /// Search the given table to find the data.
211    ///
212    /// If we are in the writing stage, this function will return an error.
213    pub fn find_table_row<'a>(
214        &self,
215        table: &str,
216        case: Case,
217        condition: &'a [Condition<'a>],
218        select: Option<&[String]>,
219        wildcard: Option<&Value>,
220        index: Option<IndexHandle>,
221    ) -> Result<ObjectMap, String> {
222        let tables = self.0.load();
223        if let Some(ref tables) = **tables {
224            match tables.get(table) {
225                None => Err(format!("table {table} not loaded")),
226                Some(table) => table.find_table_row(case, condition, select, wildcard, index),
227            }
228        } else {
229            Err("finish_load not called".to_string())
230        }
231    }
232
233    /// Search the enrichment table data with the given condition.
234    /// All conditions must match (AND).
235    /// Can return multiple matched records
236    pub fn find_table_rows<'a>(
237        &self,
238        table: &str,
239        case: Case,
240        condition: &'a [Condition<'a>],
241        select: Option<&[String]>,
242        wildcard: Option<&Value>,
243        index: Option<IndexHandle>,
244    ) -> Result<Vec<ObjectMap>, String> {
245        let tables = self.0.load();
246        if let Some(ref tables) = **tables {
247            match tables.get(table) {
248                None => Err(format!("table {table} not loaded")),
249                Some(table) => table.find_table_rows(case, condition, select, wildcard, index),
250            }
251        } else {
252            Err("finish_load not called".to_string())
253        }
254    }
255}
256
257impl std::fmt::Debug for TableSearch {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        fmt_enrichment_table(f, "EnrichmentTableSearch", &self.0)
260    }
261}
262
263/// Provide some fairly rudimentary debug output for enrichment tables.
264fn fmt_enrichment_table(
265    f: &mut std::fmt::Formatter<'_>,
266    name: &'static str,
267    tables: &Arc<ArcSwap<Option<TableMap>>>,
268) -> std::fmt::Result {
269    let tables = tables.load();
270    match **tables {
271        Some(ref tables) => {
272            let mut tables = tables.iter().fold(String::from("("), |mut s, (key, _)| {
273                s.push_str(key);
274                s.push_str(", ");
275                s
276            });
277
278            tables.truncate(std::cmp::max(tables.len(), 0));
279            tables.push(')');
280
281            write!(f, "{name} {tables}")
282        }
283        None => write!(f, "{name} loading"),
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use crate::test_util::DummyEnrichmentTable;
291    use vrl::value::Value;
292
293    #[test]
294    fn tables_loaded() {
295        let mut tables: TableMap = HashMap::new();
296        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
297        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
298
299        let registry = super::TableRegistry::default();
300        registry.load(tables);
301        let mut result = registry.table_ids();
302        result.sort();
303        assert_eq!(vec!["dummy1", "dummy2"], result);
304    }
305
306    #[test]
307    fn can_add_indexes() {
308        let mut tables: TableMap = HashMap::new();
309        let indexes = Arc::new(Mutex::new(Vec::new()));
310        let dummy = DummyEnrichmentTable::new_with_index(indexes.clone());
311        tables.insert("dummy1".to_string(), Box::new(dummy));
312        let mut registry = super::TableRegistry::default();
313        registry.load(tables);
314        assert_eq!(
315            Ok(IndexHandle(0)),
316            registry.add_index("dummy1", Case::Sensitive, &["erk"])
317        );
318
319        let indexes = indexes.lock().unwrap();
320        assert_eq!(vec!["erk".to_string()], *indexes[0]);
321    }
322
323    #[test]
324    fn can_not_find_table_row_before_finish() {
325        let mut tables: TableMap = HashMap::new();
326        let dummy = DummyEnrichmentTable::new();
327        tables.insert("dummy1".to_string(), Box::new(dummy));
328        let registry = super::TableRegistry::default();
329        registry.load(tables);
330        let tables = registry.as_readonly();
331
332        assert_eq!(
333            Err("finish_load not called".to_string()),
334            tables.find_table_row(
335                "dummy1",
336                Case::Sensitive,
337                &[Condition::Equals {
338                    field: "thing",
339                    value: Value::from("thang"),
340                }],
341                None,
342                None,
343                None
344            )
345        );
346    }
347
348    #[test]
349    fn can_not_add_indexes_after_finish() {
350        let mut tables: TableMap = HashMap::new();
351        let dummy = DummyEnrichmentTable::new();
352        tables.insert("dummy1".to_string(), Box::new(dummy));
353        let mut registry = super::TableRegistry::default();
354        registry.load(tables);
355        registry.finish_load();
356        assert_eq!(
357            Err("finish_load has been called".to_string()),
358            registry.add_index("dummy1", Case::Sensitive, &["erk"])
359        );
360    }
361
362    #[test]
363    fn can_find_table_row_after_finish() {
364        let mut tables: TableMap = HashMap::new();
365        let dummy = DummyEnrichmentTable::new();
366        tables.insert("dummy1".to_string(), Box::new(dummy));
367
368        let registry = super::TableRegistry::default();
369        registry.load(tables);
370        let tables_search = registry.as_readonly();
371
372        registry.finish_load();
373
374        assert_eq!(
375            Ok(ObjectMap::from([("field".into(), Value::from("result"))])),
376            tables_search.find_table_row(
377                "dummy1",
378                Case::Sensitive,
379                &[Condition::Equals {
380                    field: "thing",
381                    value: Value::from("thang"),
382                }],
383                None,
384                None,
385                None
386            )
387        );
388    }
389
390    #[test]
391    fn can_reload() {
392        let mut tables: TableMap = HashMap::new();
393        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
394
395        let registry = super::TableRegistry::default();
396        registry.load(tables);
397
398        assert_eq!(vec!["dummy1".to_string()], registry.table_ids());
399
400        registry.finish_load();
401
402        // After we finish load there are no tables in the list
403        assert!(registry.table_ids().is_empty());
404
405        let mut tables: TableMap = HashMap::new();
406        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
407
408        // A load should put both tables back into the list.
409        registry.load(tables);
410        let mut table_ids = registry.table_ids();
411        table_ids.sort();
412
413        assert_eq!(vec!["dummy1".to_string(), "dummy2".to_string()], table_ids,);
414    }
415
416    #[test]
417    fn reloads_existing_tables() {
418        let mut tables: TableMap = HashMap::new();
419        tables.insert("dummy1".to_string(), Box::new(DummyEnrichmentTable::new()));
420        tables.insert("dummy2".to_string(), Box::new(DummyEnrichmentTable::new()));
421
422        let registry = super::TableRegistry::default();
423        registry.load(tables);
424        registry.finish_load();
425
426        // After we finish load there are no tables in the list
427        assert!(registry.table_ids().is_empty());
428
429        let mut new_data = ObjectMap::new();
430        new_data.insert("thing".into(), Value::Null);
431
432        let mut tables: TableMap = HashMap::new();
433        tables.insert(
434            "dummy2".to_string(),
435            Box::new(DummyEnrichmentTable::new_with_data(new_data)),
436        );
437
438        // A load should put both tables back into the list.
439        registry.load(tables);
440        let tables = registry.loading.lock().unwrap();
441        let tables = tables.clone().unwrap();
442
443        // dummy1 should still have old data.
444        assert_eq!(
445            Value::from("result"),
446            tables
447                .get("dummy1")
448                .unwrap()
449                .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
450                .unwrap()
451                .get("field")
452                .cloned()
453                .unwrap()
454        );
455
456        // dummy2 should have new data.
457        assert_eq!(
458            Value::Null,
459            tables
460                .get("dummy2")
461                .unwrap()
462                .find_table_row(Case::Sensitive, &Vec::new(), None, None, None)
463                .unwrap()
464                .get("thing")
465                .cloned()
466                .unwrap()
467        );
468    }
469}