vector/enrichment_tables/
file.rs

1//! Handles enrichment tables for `type = file`.
2use std::{collections::HashMap, fs, hash::Hasher, path::PathBuf, time::SystemTime};
3
4use bytes::Bytes;
5use tracing::trace;
6use vector_lib::{
7    TimeZone,
8    configurable::configurable_component,
9    conversion::Conversion,
10    enrichment::{Case, Condition, IndexHandle, Table},
11};
12use vrl::value::{ObjectMap, Value};
13
14use crate::config::EnrichmentTableConfig;
15
16/// File encoding configuration.
17#[configurable_component]
18#[derive(Clone, Debug, Eq, PartialEq)]
19#[serde(tag = "type", rename_all = "snake_case")]
20#[configurable(metadata(docs::enum_tag_description = "File encoding type."))]
21pub enum Encoding {
22    /// Decodes the file as a [CSV][csv] (comma-separated values) file.
23    ///
24    /// [csv]: https://wikipedia.org/wiki/Comma-separated_values
25    Csv {
26        /// Whether or not the file contains column headers.
27        ///
28        /// When set to `true`, the first row of the CSV file will be read as the header row, and
29        /// the values will be used for the names of each column. This is the default behavior.
30        ///
31        /// When set to `false`, columns are referred to by their numerical index.
32        #[serde(default = "crate::serde::default_true")]
33        include_headers: bool,
34
35        /// The delimiter used to separate fields in each row of the CSV file.
36        #[serde(default = "default_delimiter")]
37        delimiter: char,
38    },
39}
40
41impl Default for Encoding {
42    fn default() -> Self {
43        Self::Csv {
44            include_headers: true,
45            delimiter: default_delimiter(),
46        }
47    }
48}
49
50/// File-specific settings.
51#[configurable_component]
52#[derive(Clone, Debug, Default, Eq, PartialEq)]
53pub struct FileSettings {
54    /// The path of the enrichment table file.
55    ///
56    /// Currently, only [CSV][csv] files are supported.
57    ///
58    /// [csv]: https://en.wikipedia.org/wiki/Comma-separated_values
59    pub path: PathBuf,
60
61    /// File encoding configuration.
62    #[configurable(derived)]
63    pub encoding: Encoding,
64}
65
66/// Configuration for the `file` enrichment table.
67#[configurable_component(enrichment_table("file"))]
68#[derive(Clone, Debug, Default, Eq, PartialEq)]
69pub struct FileConfig {
70    /// File-specific settings.
71    #[configurable(derived)]
72    pub file: FileSettings,
73
74    /// Key/value pairs representing mapped log field names and types.
75    ///
76    /// This is used to coerce log fields from strings into their proper types. The available types are listed in the `Types` list below.
77    ///
78    /// Timestamp coercions need to be prefaced with `timestamp|`, for example `"timestamp|%F"`. Timestamp specifiers can use either of the following:
79    ///
80    /// 1. One of the built-in-formats listed in the `Timestamp Formats` table below.
81    /// 2. The [time format specifiers][chrono_fmt] from Rust’s `chrono` library.
82    ///
83    /// Types
84    ///
85    /// - **`bool`**
86    /// - **`string`**
87    /// - **`float`**
88    /// - **`integer`**
89    /// - **`date`**
90    /// - **`timestamp`** (see the table below for formats)
91    ///
92    /// Timestamp Formats
93    ///
94    /// | Format               | Description                                                                      | Example                          |
95    /// |----------------------|----------------------------------------------------------------------------------|----------------------------------|
96    /// | `%F %T`              | `YYYY-MM-DD HH:MM:SS`                                                            | `2020-12-01 02:37:54`            |
97    /// | `%v %T`              | `DD-Mmm-YYYY HH:MM:SS`                                                           | `01-Dec-2020 02:37:54`           |
98    /// | `%FT%T`              | [ISO 8601][iso8601]/[RFC 3339][rfc3339], without time zone                       | `2020-12-01T02:37:54`            |
99    /// | `%FT%TZ`             | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC                                     | `2020-12-01T09:37:54Z`           |
100    /// | `%+`                 | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC, with time zone                     | `2020-12-01T02:37:54-07:00`      |
101    /// | `%a, %d %b %Y %T`    | [RFC 822][rfc822]/[RFC 2822][rfc2822], without time zone                         | `Tue, 01 Dec 2020 02:37:54`      |
102    /// | `%a %b %e %T %Y`     | [ctime][ctime] format                                                            | `Tue Dec 1 02:37:54 2020`        |
103    /// | `%s`                 | [UNIX timestamp][unix_ts]                                                        | `1606790274`                     |
104    /// | `%a %d %b %T %Y`     | [date][date] command, without time zone                                          | `Tue 01 Dec 02:37:54 2020`       |
105    /// | `%a %d %b %T %Z %Y`  | [date][date] command, with time zone                                             | `Tue 01 Dec 02:37:54 PST 2020`   |
106    /// | `%a %d %b %T %z %Y`  | [date][date] command, with numeric time zone                                     | `Tue 01 Dec 02:37:54 -0700 2020` |
107    /// | `%a %d %b %T %#z %Y` | [date][date] command, with numeric time zone (minutes can be missing or present) | `Tue 01 Dec 02:37:54 -07 2020`   |
108    ///
109    /// [date]: https://man7.org/linux/man-pages/man1/date.1.html
110    /// [ctime]: https://www.cplusplus.com/reference/ctime
111    /// [unix_ts]: https://en.wikipedia.org/wiki/Unix_time
112    /// [rfc822]: https://tools.ietf.org/html/rfc822#section-5
113    /// [rfc2822]: https://tools.ietf.org/html/rfc2822#section-3.3
114    /// [iso8601]: https://en.wikipedia.org/wiki/ISO_8601
115    /// [rfc3339]: https://tools.ietf.org/html/rfc3339
116    /// [chrono_fmt]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
117    #[serde(default)]
118    #[configurable(metadata(
119        docs::additional_props_description = "Represents mapped log field names and types."
120    ))]
121    pub schema: HashMap<String, String>,
122}
123
124const fn default_delimiter() -> char {
125    ','
126}
127
128impl FileConfig {
129    fn parse_column(
130        &self,
131        timezone: TimeZone,
132        column: &str,
133        row: usize,
134        value: &str,
135    ) -> Result<Value, String> {
136        use chrono::TimeZone;
137
138        Ok(match self.schema.get(column) {
139            Some(format) => {
140                let mut split = format.splitn(2, '|').map(|segment| segment.trim());
141
142                match (split.next(), split.next()) {
143                    (Some("date"), None) => Value::Timestamp(
144                        chrono::FixedOffset::east_opt(0)
145                            .expect("invalid timestamp")
146                            .from_utc_datetime(
147                                &chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d")
148                                    .map_err(|_| {
149                                        format!("unable to parse date {value} found in row {row}")
150                                    })?
151                                    .and_hms_opt(0, 0, 0)
152                                    .expect("invalid timestamp"),
153                            )
154                            .into(),
155                    ),
156                    (Some("date"), Some(format)) => Value::Timestamp(
157                        chrono::FixedOffset::east_opt(0)
158                            .expect("invalid timestamp")
159                            .from_utc_datetime(
160                                &chrono::NaiveDate::parse_from_str(value, format)
161                                    .map_err(|_| {
162                                        format!("unable to parse date {value} found in row {row}")
163                                    })?
164                                    .and_hms_opt(0, 0, 0)
165                                    .expect("invalid timestamp"),
166                            )
167                            .into(),
168                    ),
169                    _ => {
170                        let conversion =
171                            Conversion::parse(format, timezone).map_err(|err| err.to_string())?;
172                        conversion
173                            .convert(Bytes::copy_from_slice(value.as_bytes()))
174                            .map_err(|_| format!("unable to parse {value} found in row {row}"))?
175                    }
176                }
177            }
178            None => value.into(),
179        })
180    }
181
182    /// Load the configured file into memory. Required to create a new file enrichment table.
183    pub fn load_file(&self, timezone: TimeZone) -> crate::Result<FileData> {
184        let Encoding::Csv {
185            include_headers,
186            delimiter,
187        } = self.file.encoding;
188
189        let mut reader = csv::ReaderBuilder::new()
190            .has_headers(include_headers)
191            .delimiter(delimiter as u8)
192            .from_path(&self.file.path)?;
193
194        let first_row = reader.records().next();
195        let headers = if include_headers {
196            reader
197                .headers()?
198                .iter()
199                .map(|col| col.to_string())
200                .collect::<Vec<_>>()
201        } else {
202            // If there are no headers in the datafile we make headers as the numerical index of
203            // the column.
204            match first_row {
205                Some(Ok(ref row)) => (0..row.len()).map(|idx| idx.to_string()).collect(),
206                _ => Vec::new(),
207            }
208        };
209
210        let data = first_row
211            .into_iter()
212            .chain(reader.records())
213            .map(|row| {
214                Ok(row?
215                    .iter()
216                    .enumerate()
217                    .map(|(idx, col)| self.parse_column(timezone, &headers[idx], idx, col))
218                    .collect::<Result<Vec<_>, String>>()?)
219            })
220            .collect::<crate::Result<Vec<_>>>()?;
221
222        trace!(
223            "Loaded enrichment file {} with headers {:?}.",
224            self.file.path.to_str().unwrap_or("path with invalid utf"),
225            headers
226        );
227
228        let file = reader.into_inner();
229
230        Ok(FileData {
231            headers,
232            data,
233            modified: file.metadata()?.modified()?,
234        })
235    }
236}
237
238impl EnrichmentTableConfig for FileConfig {
239    async fn build(
240        &self,
241        globals: &crate::config::GlobalOptions,
242    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
243        Ok(Box::new(File::new(
244            self.clone(),
245            self.load_file(globals.timezone())?,
246        )))
247    }
248}
249
250impl_generate_config_from_default!(FileConfig);
251
252/// The data resulting from loading a configured file.
253pub struct FileData {
254    /// The ordered set of headers of the data columns.
255    pub headers: Vec<String>,
256    /// The data contained in the file.
257    pub data: Vec<Vec<Value>>,
258    /// The last modified time of the file.
259    pub modified: SystemTime,
260}
261
262/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a CSV file.
263#[derive(Clone)]
264pub struct File {
265    config: FileConfig,
266    last_modified: SystemTime,
267    data: Vec<Vec<Value>>,
268    headers: Vec<String>,
269    indexes: Vec<(
270        Case,
271        Vec<usize>,
272        HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>,
273    )>,
274}
275
276impl File {
277    /// Creates a new [File] based on the provided config.
278    pub fn new(config: FileConfig, data: FileData) -> Self {
279        Self {
280            config,
281            last_modified: data.modified,
282            data: data.data,
283            headers: data.headers,
284            indexes: Vec::new(),
285        }
286    }
287
288    fn column_index(&self, col: &str) -> Option<usize> {
289        self.headers.iter().position(|header| header == col)
290    }
291
292    /// Does the given row match all the conditions specified?
293    fn row_equals(
294        &self,
295        case: Case,
296        condition: &[Condition],
297        row: &[Value],
298        wildcard: Option<&Value>,
299    ) -> bool {
300        condition.iter().all(|condition| match condition {
301            Condition::Equals { field, value } => match self.column_index(field) {
302                None => false,
303                Some(idx) => {
304                    let current_row_value = &row[idx];
305
306                    // Helper closure for comparing current_row_value with another value,
307                    // respecting the specified case for Value::Bytes.
308                    let compare_values = |val_to_compare: &Value| -> bool {
309                        match (case, current_row_value, val_to_compare) {
310                            (
311                                Case::Insensitive,
312                                Value::Bytes(bytes_row),
313                                Value::Bytes(bytes_cmp),
314                            ) => {
315                                // Perform case-insensitive comparison for byte strings.
316                                // If both are valid UTF-8, compare their lowercase versions.
317                                // If both are non-UTF-8 bytes, compare them directly.
318                                // If one is UTF-8 and the other is not, they are considered not equal.
319                                match (
320                                    std::str::from_utf8(bytes_row),
321                                    std::str::from_utf8(bytes_cmp),
322                                ) {
323                                    (Ok(s_row), Ok(s_cmp)) => {
324                                        s_row.to_lowercase() == s_cmp.to_lowercase()
325                                    }
326                                    (Err(_), Err(_)) => bytes_row == bytes_cmp,
327                                    _ => false,
328                                }
329                            }
330                            // For Case::Sensitive, or for Case::Insensitive with non-Bytes types,
331                            // perform a direct equality check.
332                            _ => current_row_value == val_to_compare,
333                        }
334                    };
335
336                    // First, check if the row value matches the condition's value.
337                    if compare_values(value) {
338                        true
339                    } else if let Some(wc_val) = wildcard {
340                        // If not, and a wildcard is provided, check if the row value matches the wildcard.
341                        compare_values(wc_val)
342                    } else {
343                        // Otherwise, no match.
344                        false
345                    }
346                }
347            },
348            Condition::BetweenDates { field, from, to } => match self.column_index(field) {
349                None => false,
350                Some(idx) => match row[idx] {
351                    Value::Timestamp(date) => from <= &date && &date <= to,
352                    _ => false,
353                },
354            },
355            Condition::FromDate { field, from } => match self.column_index(field) {
356                None => false,
357                Some(idx) => match row[idx] {
358                    Value::Timestamp(date) => from <= &date,
359                    _ => false,
360                },
361            },
362            Condition::ToDate { field, to } => match self.column_index(field) {
363                None => false,
364                Some(idx) => match row[idx] {
365                    Value::Timestamp(date) => &date <= to,
366                    _ => false,
367                },
368            },
369        })
370    }
371
372    fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> ObjectMap {
373        self.headers
374            .iter()
375            .zip(row)
376            .filter(|(header, _)| {
377                select
378                    .map(|select| select.contains(header))
379                    // If no select is passed, we assume all columns are included
380                    .unwrap_or(true)
381            })
382            .map(|(header, col)| (header.as_str().into(), col.clone()))
383            .collect()
384    }
385
386    /// Order the fields in the index according to the position they are found in the header.
387    fn normalize_index_fields(&self, index: &[&str]) -> Result<Vec<usize>, String> {
388        // Get the positions of the fields we are indexing
389        let normalized = self
390            .headers
391            .iter()
392            .enumerate()
393            .filter_map(|(idx, col)| {
394                if index.contains(&col.as_ref()) {
395                    Some(idx)
396                } else {
397                    None
398                }
399            })
400            .collect::<Vec<_>>();
401
402        if normalized.len() != index.len() {
403            let missing = index
404                .iter()
405                .filter_map(|col| {
406                    if self.headers.iter().any(|header| header == *col) {
407                        None
408                    } else {
409                        Some(col.to_string())
410                    }
411                })
412                .collect::<Vec<_>>()
413                .join(", ");
414            Err(format!("field(s) '{missing}' missing from dataset"))
415        } else {
416            Ok(normalized)
417        }
418    }
419
420    /// Creates an index with the given fields.
421    /// Uses seahash to create a hash of the data that is used as the key in a hashmap lookup to
422    /// the index of the row in the data.
423    ///
424    /// Ensure fields that are searched via a comparison are not included in the index!
425    fn index_data(
426        &self,
427        fieldidx: &[usize],
428        case: Case,
429    ) -> Result<HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>, String> {
430        let mut index = HashMap::with_capacity_and_hasher(
431            self.data.len(),
432            hash_hasher::HashBuildHasher::default(),
433        );
434
435        for (idx, row) in self.data.iter().enumerate() {
436            let mut hash = seahash::SeaHasher::default();
437
438            for idx in fieldidx {
439                hash_value(&mut hash, case, &row[*idx])?;
440            }
441
442            let key = hash.finish();
443
444            let entry = index.entry(key).or_insert_with(Vec::new);
445            entry.push(idx);
446        }
447
448        index.shrink_to_fit();
449
450        Ok(index)
451    }
452
453    /// Sequentially searches through the iterator for the given condition.
454    fn sequential<'a, I>(
455        &'a self,
456        data: I,
457        case: Case,
458        condition: &'a [Condition<'a>],
459        select: Option<&'a [String]>,
460        wildcard: Option<&'a Value>,
461    ) -> impl Iterator<Item = ObjectMap> + 'a
462    where
463        I: Iterator<Item = &'a Vec<Value>> + 'a,
464    {
465        data.filter_map(move |row| {
466            if self.row_equals(case, condition, row, wildcard) {
467                Some(self.add_columns(select, row))
468            } else {
469                None
470            }
471        })
472    }
473
474    fn indexed<'a>(
475        &'a self,
476        case: Case,
477        condition: &'a [Condition<'a>],
478        handle: IndexHandle,
479    ) -> Result<Option<&'a Vec<usize>>, String> {
480        // The index to use has been passed, we can use this to search the data.
481        // We are assuming that the caller has passed an index that represents the fields
482        // being passed in the condition.
483        let mut hash = seahash::SeaHasher::default();
484
485        for header in self.headers.iter() {
486            if let Some(Condition::Equals { value, .. }) = condition.iter().find(
487                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
488            ) {
489                hash_value(&mut hash, case, value)?;
490            }
491        }
492
493        let key = hash.finish();
494
495        let IndexHandle(handle) = handle;
496        Ok(self.indexes[handle].2.get(&key))
497    }
498
499    fn indexed_with_wildcard<'a>(
500        &'a self,
501        case: Case,
502        wildcard: &'a Value,
503        condition: &'a [Condition<'a>],
504        handle: IndexHandle,
505    ) -> Result<Option<&'a Vec<usize>>, String> {
506        if let Some(result) = self.indexed(case, condition, handle)? {
507            return Ok(Some(result));
508        }
509
510        // If lookup fails and a wildcard is provided, compute hash for the wildcard
511        let mut wildcard_hash = seahash::SeaHasher::default();
512        for header in self.headers.iter() {
513            if condition.iter().any(
514                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
515            ) {
516                hash_value(&mut wildcard_hash, case, wildcard)?;
517            }
518        }
519
520        let wildcard_key = wildcard_hash.finish();
521        let IndexHandle(handle) = handle;
522        Ok(self.indexes[handle].2.get(&wildcard_key))
523    }
524}
525
526/// Adds the bytes from the given value to the hash.
527/// Each field is terminated by a `0` value to separate the fields
528fn hash_value(hasher: &mut seahash::SeaHasher, case: Case, value: &Value) -> Result<(), String> {
529    match value {
530        Value::Bytes(bytes) => match case {
531            Case::Sensitive => hasher.write(bytes),
532            Case::Insensitive => hasher.write(
533                std::str::from_utf8(bytes)
534                    .map_err(|_| "column contains invalid utf".to_string())?
535                    .to_lowercase()
536                    .as_bytes(),
537            ),
538        },
539        value => {
540            let bytes: bytes::Bytes = value.encode_as_bytes()?;
541            hasher.write(&bytes);
542        }
543    }
544
545    hasher.write_u8(0);
546
547    Ok(())
548}
549
550/// Returns an error if the iterator doesn't yield exactly one result.
551fn single_or_err<I, T>(mut iter: T) -> Result<I, String>
552where
553    T: Iterator<Item = I>,
554{
555    let result = iter.next();
556
557    if iter.next().is_some() {
558        // More than one row has been found.
559        Err("more than one row found".to_string())
560    } else {
561        result.ok_or_else(|| "no rows found".to_string())
562    }
563}
564
565impl Table for File {
566    fn find_table_row<'a>(
567        &self,
568        case: Case,
569        condition: &'a [Condition<'a>],
570        select: Option<&'a [String]>,
571        wildcard: Option<&Value>,
572        index: Option<IndexHandle>,
573    ) -> Result<ObjectMap, String> {
574        match index {
575            None => {
576                // No index has been passed so we need to do a Sequential Scan.
577                single_or_err(self.sequential(self.data.iter(), case, condition, select, wildcard))
578            }
579            Some(handle) => {
580                let result = if let Some(wildcard) = wildcard {
581                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
582                } else {
583                    self.indexed(case, condition, handle)?
584                }
585                .ok_or_else(|| "no rows found in index".to_string())?
586                .iter()
587                .map(|idx| &self.data[*idx]);
588
589                // Perform a sequential scan over the indexed result.
590                single_or_err(self.sequential(result, case, condition, select, wildcard))
591            }
592        }
593    }
594
595    fn find_table_rows<'a>(
596        &self,
597        case: Case,
598        condition: &'a [Condition<'a>],
599        select: Option<&'a [String]>,
600        wildcard: Option<&Value>,
601        index: Option<IndexHandle>,
602    ) -> Result<Vec<ObjectMap>, String> {
603        match index {
604            None => {
605                // No index has been passed so we need to do a Sequential Scan.
606                Ok(self
607                    .sequential(self.data.iter(), case, condition, select, wildcard)
608                    .collect())
609            }
610            Some(handle) => {
611                // Perform a sequential scan over the indexed result.
612                let indexed_result = if let Some(wildcard) = wildcard {
613                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
614                } else {
615                    self.indexed(case, condition, handle)?
616                };
617
618                Ok(self
619                    .sequential(
620                        indexed_result
621                            .iter()
622                            .flat_map(|results| results.iter().map(|idx| &self.data[*idx])),
623                        case,
624                        condition,
625                        select,
626                        wildcard,
627                    )
628                    .collect())
629            }
630        }
631    }
632
633    fn add_index(&mut self, case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
634        let normalized = self.normalize_index_fields(fields)?;
635        match self
636            .indexes
637            .iter()
638            .position(|index| index.0 == case && index.1 == normalized)
639        {
640            Some(pos) => {
641                // This index already exists
642                Ok(IndexHandle(pos))
643            }
644            None => {
645                let index = self.index_data(&normalized, case)?;
646                self.indexes.push((case, normalized, index));
647                // The returned index handle is the position of the index in our list of indexes.
648                Ok(IndexHandle(self.indexes.len() - 1))
649            }
650        }
651    }
652
653    /// Returns a list of the field names that are in each index
654    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
655        self.indexes
656            .iter()
657            .map(|index| {
658                let (case, fields, _) = index;
659                (
660                    *case,
661                    fields
662                        .iter()
663                        .map(|idx| self.headers[*idx].clone())
664                        .collect::<Vec<_>>(),
665                )
666            })
667            .collect::<Vec<_>>()
668    }
669
670    /// Checks the modified timestamp of the data file to see if data has changed.
671    fn needs_reload(&self) -> bool {
672        matches!(fs::metadata(&self.config.file.path)
673            .and_then(|metadata| metadata.modified()),
674            Ok(modified) if modified > self.last_modified)
675    }
676}
677
678impl std::fmt::Debug for File {
679    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
680        write!(
681            f,
682            "File {} row(s) {} index(es)",
683            self.data.len(),
684            self.indexes.len()
685        )
686    }
687}
688
689#[cfg(test)]
690mod tests {
691    use chrono::{TimeZone, Timelike};
692
693    use super::*;
694
695    #[test]
696    fn parse_file_with_headers() {
697        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
698        let path = dir.path().join("table.csv");
699        fs::write(path.clone(), "foo,bar\na,1\nb,2").expect("Failed to write enrichment table");
700
701        let config = FileConfig {
702            file: FileSettings {
703                path,
704                encoding: Encoding::Csv {
705                    include_headers: true,
706                    delimiter: default_delimiter(),
707                },
708            },
709            schema: HashMap::new(),
710        };
711        let data = config
712            .load_file(Default::default())
713            .expect("Failed to parse csv");
714        assert_eq!(vec!["foo".to_string(), "bar".to_string()], data.headers);
715        assert_eq!(
716            vec![
717                vec![Value::from("a"), Value::from("1")],
718                vec![Value::from("b"), Value::from("2")],
719            ],
720            data.data
721        );
722    }
723
724    #[test]
725    fn parse_file_no_headers() {
726        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
727        let path = dir.path().join("table.csv");
728        fs::write(path.clone(), "a,1\nb,2").expect("Failed to write enrichment table");
729
730        let config = FileConfig {
731            file: FileSettings {
732                path,
733                encoding: Encoding::Csv {
734                    include_headers: false,
735                    delimiter: default_delimiter(),
736                },
737            },
738            schema: HashMap::new(),
739        };
740        let data = config
741            .load_file(Default::default())
742            .expect("Failed to parse csv");
743        assert_eq!(vec!["0".to_string(), "1".to_string()], data.headers);
744        assert_eq!(
745            vec![
746                vec![Value::from("a"), Value::from("1")],
747                vec![Value::from("b"), Value::from("2")],
748            ],
749            data.data
750        );
751    }
752
753    #[test]
754    fn parse_column() {
755        let mut schema = HashMap::new();
756        schema.insert("col1".to_string(), " string ".to_string());
757        schema.insert("col2".to_string(), " date ".to_string());
758        schema.insert("col3".to_string(), "date|%m/%d/%Y".to_string());
759        schema.insert("col3-spaces".to_string(), "date | %m %d %Y".to_string());
760        schema.insert("col4".to_string(), "timestamp|%+".to_string());
761        schema.insert("col4-spaces".to_string(), "timestamp | %+".to_string());
762        schema.insert("col5".to_string(), "int".to_string());
763        let config = FileConfig {
764            file: Default::default(),
765            schema,
766        };
767
768        assert_eq!(
769            Ok(Value::from("zork")),
770            config.parse_column(Default::default(), "col1", 1, "zork")
771        );
772
773        assert_eq!(
774            Ok(Value::from(
775                chrono::Utc
776                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
777                    .single()
778                    .expect("invalid timestamp")
779            )),
780            config.parse_column(Default::default(), "col2", 1, "2020-03-05")
781        );
782
783        assert_eq!(
784            Ok(Value::from(
785                chrono::Utc
786                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
787                    .single()
788                    .expect("invalid timestamp")
789            )),
790            config.parse_column(Default::default(), "col3", 1, "03/05/2020")
791        );
792
793        assert_eq!(
794            Ok(Value::from(
795                chrono::Utc
796                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
797                    .single()
798                    .expect("invalid timestamp")
799            )),
800            config.parse_column(Default::default(), "col3-spaces", 1, "03 05 2020")
801        );
802
803        assert_eq!(
804            Ok(Value::from(
805                chrono::Utc
806                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
807                    .single()
808                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
809                    .expect("invalid timestamp")
810            )),
811            config.parse_column(
812                Default::default(),
813                "col4",
814                1,
815                "2001-07-08T00:34:00.026490+09:30"
816            )
817        );
818
819        assert_eq!(
820            Ok(Value::from(
821                chrono::Utc
822                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
823                    .single()
824                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
825                    .expect("invalid timestamp")
826            )),
827            config.parse_column(
828                Default::default(),
829                "col4-spaces",
830                1,
831                "2001-07-08T00:34:00.026490+09:30"
832            )
833        );
834
835        assert_eq!(
836            Ok(Value::from(42)),
837            config.parse_column(Default::default(), "col5", 1, "42")
838        );
839    }
840
841    #[test]
842    fn seahash() {
843        // Ensure we can separate fields to create a distinct hash.
844        let mut one = seahash::SeaHasher::default();
845        one.write(b"norknoog");
846        one.write_u8(0);
847        one.write(b"donk");
848
849        let mut two = seahash::SeaHasher::default();
850        two.write(b"nork");
851        one.write_u8(0);
852        two.write(b"noogdonk");
853
854        assert_ne!(one.finish(), two.finish());
855    }
856
857    #[test]
858    fn finds_row() {
859        let file = File::new(
860            Default::default(),
861            FileData {
862                modified: SystemTime::now(),
863                data: vec![
864                    vec!["zip".into(), "zup".into()],
865                    vec!["zirp".into(), "zurp".into()],
866                ],
867                headers: vec!["field1".to_string(), "field2".to_string()],
868            },
869        );
870
871        let condition = Condition::Equals {
872            field: "field1",
873            value: Value::from("zirp"),
874        };
875
876        assert_eq!(
877            Ok(ObjectMap::from([
878                ("field1".into(), Value::from("zirp")),
879                ("field2".into(), Value::from("zurp")),
880            ])),
881            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
882        );
883    }
884
885    #[test]
886    fn finds_row_with_wildcard() {
887        let file = File::new(
888            Default::default(),
889            FileData {
890                modified: SystemTime::now(),
891                data: vec![
892                    vec!["zip".into(), "zup".into()],
893                    vec!["zirp".into(), "zurp".into()],
894                ],
895                headers: vec!["field1".to_string(), "field2".to_string()],
896            },
897        );
898
899        let wildcard = Value::from("zirp");
900
901        let condition = Condition::Equals {
902            field: "field1",
903            value: Value::from("nonexistent"),
904        };
905
906        assert_eq!(
907            Ok(ObjectMap::from([
908                ("field1".into(), Value::from("zirp")),
909                ("field2".into(), Value::from("zurp")),
910            ])),
911            file.find_table_row(Case::Sensitive, &[condition], None, Some(&wildcard), None)
912        );
913    }
914
915    #[test]
916    fn duplicate_indexes() {
917        let mut file = File::new(
918            Default::default(),
919            FileData {
920                modified: SystemTime::now(),
921                data: Vec::new(),
922                headers: vec![
923                    "field1".to_string(),
924                    "field2".to_string(),
925                    "field3".to_string(),
926                ],
927            },
928        );
929
930        let handle1 = file.add_index(Case::Sensitive, &["field2", "field3"]);
931        let handle2 = file.add_index(Case::Sensitive, &["field3", "field2"]);
932
933        assert_eq!(handle1, handle2);
934        assert_eq!(1, file.indexes.len());
935    }
936
937    #[test]
938    fn errors_on_missing_columns() {
939        let mut file = File::new(
940            Default::default(),
941            FileData {
942                modified: SystemTime::now(),
943                data: Vec::new(),
944                headers: vec![
945                    "field1".to_string(),
946                    "field2".to_string(),
947                    "field3".to_string(),
948                ],
949            },
950        );
951
952        let error = file.add_index(Case::Sensitive, &["apples", "field2", "bananas"]);
953        assert_eq!(
954            Err("field(s) 'apples, bananas' missing from dataset".to_string()),
955            error
956        )
957    }
958
959    #[test]
960    fn finds_row_with_index() {
961        let mut file = File::new(
962            Default::default(),
963            FileData {
964                modified: SystemTime::now(),
965                data: vec![
966                    vec!["zip".into(), "zup".into()],
967                    vec!["zirp".into(), "zurp".into()],
968                ],
969                headers: vec!["field1".to_string(), "field2".to_string()],
970            },
971        );
972
973        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
974
975        let condition = Condition::Equals {
976            field: "field1",
977            value: Value::from("zirp"),
978        };
979
980        assert_eq!(
981            Ok(ObjectMap::from([
982                ("field1".into(), Value::from("zirp")),
983                ("field2".into(), Value::from("zurp")),
984            ])),
985            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
986        );
987    }
988
989    #[test]
990    fn finds_row_with_index_case_sensitive_and_wildcard() {
991        let mut file = File::new(
992            Default::default(),
993            FileData {
994                modified: SystemTime::now(),
995                data: vec![
996                    vec!["zip".into(), "zup".into()],
997                    vec!["zirp".into(), "zurp".into()],
998                ],
999                headers: vec!["field1".to_string(), "field2".to_string()],
1000            },
1001        );
1002
1003        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1004        let wildcard = Value::from("zirp");
1005
1006        let condition = Condition::Equals {
1007            field: "field1",
1008            value: Value::from("nonexistent"),
1009        };
1010
1011        assert_eq!(
1012            Ok(ObjectMap::from([
1013                ("field1".into(), Value::from("zirp")),
1014                ("field2".into(), Value::from("zurp")),
1015            ])),
1016            file.find_table_row(
1017                Case::Sensitive,
1018                &[condition],
1019                None,
1020                Some(&wildcard),
1021                Some(handle)
1022            )
1023        );
1024    }
1025
1026    #[test]
1027    fn finds_rows_with_index_case_sensitive() {
1028        let mut file = File::new(
1029            Default::default(),
1030            FileData {
1031                modified: SystemTime::now(),
1032                data: vec![
1033                    vec!["zip".into(), "zup".into()],
1034                    vec!["zirp".into(), "zurp".into()],
1035                    vec!["zip".into(), "zoop".into()],
1036                ],
1037                headers: vec!["field1".to_string(), "field2".to_string()],
1038            },
1039        );
1040
1041        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1042
1043        assert_eq!(
1044            Ok(vec![
1045                ObjectMap::from([
1046                    ("field1".into(), Value::from("zip")),
1047                    ("field2".into(), Value::from("zup")),
1048                ]),
1049                ObjectMap::from([
1050                    ("field1".into(), Value::from("zip")),
1051                    ("field2".into(), Value::from("zoop")),
1052                ]),
1053            ]),
1054            file.find_table_rows(
1055                Case::Sensitive,
1056                &[Condition::Equals {
1057                    field: "field1",
1058                    value: Value::from("zip"),
1059                }],
1060                None,
1061                None,
1062                Some(handle)
1063            )
1064        );
1065
1066        assert_eq!(
1067            Ok(vec![]),
1068            file.find_table_rows(
1069                Case::Sensitive,
1070                &[Condition::Equals {
1071                    field: "field1",
1072                    value: Value::from("ZiP"),
1073                }],
1074                None,
1075                None,
1076                Some(handle)
1077            )
1078        );
1079    }
1080
1081    #[test]
1082    fn selects_columns() {
1083        let mut file = File::new(
1084            Default::default(),
1085            FileData {
1086                modified: SystemTime::now(),
1087                data: vec![
1088                    vec!["zip".into(), "zup".into(), "zoop".into()],
1089                    vec!["zirp".into(), "zurp".into(), "zork".into()],
1090                    vec!["zip".into(), "zoop".into(), "zibble".into()],
1091                ],
1092                headers: vec![
1093                    "field1".to_string(),
1094                    "field2".to_string(),
1095                    "field3".to_string(),
1096                ],
1097            },
1098        );
1099
1100        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1101
1102        let condition = Condition::Equals {
1103            field: "field1",
1104            value: Value::from("zip"),
1105        };
1106
1107        assert_eq!(
1108            Ok(vec![
1109                ObjectMap::from([
1110                    ("field1".into(), Value::from("zip")),
1111                    ("field3".into(), Value::from("zoop")),
1112                ]),
1113                ObjectMap::from([
1114                    ("field1".into(), Value::from("zip")),
1115                    ("field3".into(), Value::from("zibble")),
1116                ]),
1117            ]),
1118            file.find_table_rows(
1119                Case::Sensitive,
1120                &[condition],
1121                Some(&["field1".to_string(), "field3".to_string()]),
1122                None,
1123                Some(handle)
1124            )
1125        );
1126    }
1127
1128    #[test]
1129    fn finds_rows_with_index_case_insensitive() {
1130        let mut file = File::new(
1131            Default::default(),
1132            FileData {
1133                modified: SystemTime::now(),
1134                data: vec![
1135                    vec!["zip".into(), "zup".into()],
1136                    vec!["zirp".into(), "zurp".into()],
1137                    vec!["zip".into(), "zoop".into()],
1138                ],
1139                headers: vec!["field1".to_string(), "field2".to_string()],
1140            },
1141        );
1142
1143        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1144
1145        assert_eq!(
1146            Ok(vec![
1147                ObjectMap::from([
1148                    ("field1".into(), Value::from("zip")),
1149                    ("field2".into(), Value::from("zup")),
1150                ]),
1151                ObjectMap::from([
1152                    ("field1".into(), Value::from("zip")),
1153                    ("field2".into(), Value::from("zoop")),
1154                ]),
1155            ]),
1156            file.find_table_rows(
1157                Case::Insensitive,
1158                &[Condition::Equals {
1159                    field: "field1",
1160                    value: Value::from("zip"),
1161                }],
1162                None,
1163                None,
1164                Some(handle)
1165            )
1166        );
1167
1168        assert_eq!(
1169            Ok(vec![
1170                ObjectMap::from([
1171                    ("field1".into(), Value::from("zip")),
1172                    ("field2".into(), Value::from("zup")),
1173                ]),
1174                ObjectMap::from([
1175                    ("field1".into(), Value::from("zip")),
1176                    ("field2".into(), Value::from("zoop")),
1177                ]),
1178            ]),
1179            file.find_table_rows(
1180                Case::Insensitive,
1181                &[Condition::Equals {
1182                    field: "field1",
1183                    value: Value::from("ZiP"),
1184                }],
1185                None,
1186                None,
1187                Some(handle)
1188            )
1189        );
1190    }
1191
1192    #[test]
1193    fn finds_rows_with_index_case_insensitive_and_wildcard() {
1194        let mut file = File::new(
1195            Default::default(),
1196            FileData {
1197                modified: SystemTime::now(),
1198                data: vec![
1199                    vec!["zip".into(), "zup".into()],
1200                    vec!["zirp".into(), "zurp".into()],
1201                    vec!["zip".into(), "zoop".into()],
1202                ],
1203                headers: vec!["field1".to_string(), "field2".to_string()],
1204            },
1205        );
1206
1207        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1208
1209        assert_eq!(
1210            Ok(vec![
1211                ObjectMap::from([
1212                    ("field1".into(), Value::from("zip")),
1213                    ("field2".into(), Value::from("zup")),
1214                ]),
1215                ObjectMap::from([
1216                    ("field1".into(), Value::from("zip")),
1217                    ("field2".into(), Value::from("zoop")),
1218                ]),
1219            ]),
1220            file.find_table_rows(
1221                Case::Insensitive,
1222                &[Condition::Equals {
1223                    field: "field1",
1224                    value: Value::from("nonexistent"),
1225                }],
1226                None,
1227                Some(&Value::from("zip")),
1228                Some(handle)
1229            )
1230        );
1231
1232        assert_eq!(
1233            Ok(vec![
1234                ObjectMap::from([
1235                    ("field1".into(), Value::from("zip")),
1236                    ("field2".into(), Value::from("zup")),
1237                ]),
1238                ObjectMap::from([
1239                    ("field1".into(), Value::from("zip")),
1240                    ("field2".into(), Value::from("zoop")),
1241                ]),
1242            ]),
1243            file.find_table_rows(
1244                Case::Insensitive,
1245                &[Condition::Equals {
1246                    field: "field1",
1247                    value: Value::from("ZiP"),
1248                }],
1249                None,
1250                Some(&Value::from("ZiP")),
1251                Some(handle)
1252            )
1253        );
1254    }
1255
1256    #[test]
1257    fn finds_row_between_dates() {
1258        let mut file = File::new(
1259            Default::default(),
1260            FileData {
1261                modified: SystemTime::now(),
1262                data: vec![
1263                    vec![
1264                        "zip".into(),
1265                        Value::Timestamp(
1266                            chrono::Utc
1267                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1268                                .single()
1269                                .expect("invalid timestamp"),
1270                        ),
1271                    ],
1272                    vec![
1273                        "zip".into(),
1274                        Value::Timestamp(
1275                            chrono::Utc
1276                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1277                                .single()
1278                                .expect("invalid timestamp"),
1279                        ),
1280                    ],
1281                ],
1282                headers: vec!["field1".to_string(), "field2".to_string()],
1283            },
1284        );
1285
1286        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1287
1288        let conditions = [
1289            Condition::Equals {
1290                field: "field1",
1291                value: "zip".into(),
1292            },
1293            Condition::BetweenDates {
1294                field: "field2",
1295                from: chrono::Utc
1296                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1297                    .single()
1298                    .expect("invalid timestamp"),
1299                to: chrono::Utc
1300                    .with_ymd_and_hms(2017, 1, 1, 0, 0, 0)
1301                    .single()
1302                    .expect("invalid timestamp"),
1303            },
1304        ];
1305
1306        assert_eq!(
1307            Ok(ObjectMap::from([
1308                ("field1".into(), Value::from("zip")),
1309                (
1310                    "field2".into(),
1311                    Value::Timestamp(
1312                        chrono::Utc
1313                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1314                            .single()
1315                            .expect("invalid timestamp")
1316                    )
1317                )
1318            ])),
1319            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1320        );
1321    }
1322
1323    #[test]
1324    fn finds_row_from_date() {
1325        let mut file = File::new(
1326            Default::default(),
1327            FileData {
1328                modified: SystemTime::now(),
1329                data: vec![
1330                    vec![
1331                        "zip".into(),
1332                        Value::Timestamp(
1333                            chrono::Utc
1334                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1335                                .single()
1336                                .expect("invalid timestamp"),
1337                        ),
1338                    ],
1339                    vec![
1340                        "zip".into(),
1341                        Value::Timestamp(
1342                            chrono::Utc
1343                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1344                                .single()
1345                                .expect("invalid timestamp"),
1346                        ),
1347                    ],
1348                ],
1349                headers: vec!["field1".to_string(), "field2".to_string()],
1350            },
1351        );
1352
1353        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1354
1355        let conditions = [
1356            Condition::Equals {
1357                field: "field1",
1358                value: "zip".into(),
1359            },
1360            Condition::FromDate {
1361                field: "field2",
1362                from: chrono::Utc
1363                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1364                    .single()
1365                    .expect("invalid timestamp"),
1366            },
1367        ];
1368
1369        assert_eq!(
1370            Ok(ObjectMap::from([
1371                ("field1".into(), Value::from("zip")),
1372                (
1373                    "field2".into(),
1374                    Value::Timestamp(
1375                        chrono::Utc
1376                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1377                            .single()
1378                            .expect("invalid timestamp")
1379                    )
1380                )
1381            ])),
1382            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1383        );
1384    }
1385
1386    #[test]
1387    fn finds_row_to_date() {
1388        let mut file = File::new(
1389            Default::default(),
1390            FileData {
1391                modified: SystemTime::now(),
1392                data: vec![
1393                    vec![
1394                        "zip".into(),
1395                        Value::Timestamp(
1396                            chrono::Utc
1397                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1398                                .single()
1399                                .expect("invalid timestamp"),
1400                        ),
1401                    ],
1402                    vec![
1403                        "zip".into(),
1404                        Value::Timestamp(
1405                            chrono::Utc
1406                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1407                                .single()
1408                                .expect("invalid timestamp"),
1409                        ),
1410                    ],
1411                ],
1412                headers: vec!["field1".to_string(), "field2".to_string()],
1413            },
1414        );
1415
1416        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1417
1418        let conditions = [
1419            Condition::Equals {
1420                field: "field1",
1421                value: "zip".into(),
1422            },
1423            Condition::ToDate {
1424                field: "field2",
1425                to: chrono::Utc
1426                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1427                    .single()
1428                    .expect("invalid timestamp"),
1429            },
1430        ];
1431
1432        assert_eq!(
1433            Ok(ObjectMap::from([
1434                ("field1".into(), Value::from("zip")),
1435                (
1436                    "field2".into(),
1437                    Value::Timestamp(
1438                        chrono::Utc
1439                            .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1440                            .single()
1441                            .expect("invalid timestamp")
1442                    )
1443                )
1444            ])),
1445            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1446        );
1447    }
1448
1449    #[test]
1450    fn doesnt_find_row() {
1451        let file = File::new(
1452            Default::default(),
1453            FileData {
1454                modified: SystemTime::now(),
1455                data: vec![
1456                    vec!["zip".into(), "zup".into()],
1457                    vec!["zirp".into(), "zurp".into()],
1458                ],
1459                headers: vec!["field1".to_string(), "field2".to_string()],
1460            },
1461        );
1462
1463        let condition = Condition::Equals {
1464            field: "field1",
1465            value: Value::from("zorp"),
1466        };
1467
1468        assert_eq!(
1469            Err("no rows found".to_string()),
1470            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
1471        );
1472    }
1473
1474    #[test]
1475    fn doesnt_find_row_with_index() {
1476        let mut file = File::new(
1477            Default::default(),
1478            FileData {
1479                modified: SystemTime::now(),
1480                data: vec![
1481                    vec!["zip".into(), "zup".into()],
1482                    vec!["zirp".into(), "zurp".into()],
1483                ],
1484                headers: vec!["field1".to_string(), "field2".to_string()],
1485            },
1486        );
1487
1488        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1489
1490        let condition = Condition::Equals {
1491            field: "field1",
1492            value: Value::from("zorp"),
1493        };
1494
1495        assert_eq!(
1496            Err("no rows found in index".to_string()),
1497            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
1498        );
1499    }
1500
1501    #[test]
1502    fn doesnt_find_row_with_index_and_wildcard() {
1503        let mut file = File::new(
1504            Default::default(),
1505            FileData {
1506                modified: SystemTime::now(),
1507                data: vec![
1508                    vec!["zip".into(), "zup".into()],
1509                    vec!["zirp".into(), "zurp".into()],
1510                ],
1511                headers: vec!["field1".to_string(), "field2".to_string()],
1512            },
1513        );
1514
1515        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1516        let wildcard = Value::from("nonexistent");
1517
1518        let condition = Condition::Equals {
1519            field: "field1",
1520            value: Value::from("zorp"),
1521        };
1522
1523        assert_eq!(
1524            Err("no rows found in index".to_string()),
1525            file.find_table_row(
1526                Case::Sensitive,
1527                &[condition],
1528                None,
1529                Some(&wildcard),
1530                Some(handle)
1531            )
1532        );
1533    }
1534}