vector/enrichment_tables/
file.rs

1//! Handles enrichment tables for `type = file`.
2use std::{collections::HashMap, fs, hash::Hasher, path::PathBuf, time::SystemTime};
3
4use bytes::Bytes;
5use tracing::trace;
6use vector_lib::configurable::configurable_component;
7use vector_lib::enrichment::{Case, Condition, IndexHandle, Table};
8use vector_lib::{conversion::Conversion, TimeZone};
9use vrl::value::{ObjectMap, Value};
10
11use crate::config::EnrichmentTableConfig;
12
13/// File encoding configuration.
14#[configurable_component]
15#[derive(Clone, Debug, Eq, PartialEq)]
16#[serde(tag = "type", rename_all = "snake_case")]
17#[configurable(metadata(docs::enum_tag_description = "File encoding type."))]
18pub enum Encoding {
19    /// Decodes the file as a [CSV][csv] (comma-separated values) file.
20    ///
21    /// [csv]: https://wikipedia.org/wiki/Comma-separated_values
22    Csv {
23        /// Whether or not the file contains column headers.
24        ///
25        /// When set to `true`, the first row of the CSV file will be read as the header row, and
26        /// the values will be used for the names of each column. This is the default behavior.
27        ///
28        /// When set to `false`, columns are referred to by their numerical index.
29        #[serde(default = "crate::serde::default_true")]
30        include_headers: bool,
31
32        /// The delimiter used to separate fields in each row of the CSV file.
33        #[serde(default = "default_delimiter")]
34        delimiter: char,
35    },
36}
37
38impl Default for Encoding {
39    fn default() -> Self {
40        Self::Csv {
41            include_headers: true,
42            delimiter: default_delimiter(),
43        }
44    }
45}
46
47/// File-specific settings.
48#[configurable_component]
49#[derive(Clone, Debug, Default, Eq, PartialEq)]
50pub struct FileSettings {
51    /// The path of the enrichment table file.
52    ///
53    /// Currently, only [CSV][csv] files are supported.
54    ///
55    /// [csv]: https://en.wikipedia.org/wiki/Comma-separated_values
56    pub path: PathBuf,
57
58    /// File encoding configuration.
59    #[configurable(derived)]
60    pub encoding: Encoding,
61}
62
63/// Configuration for the `file` enrichment table.
64#[configurable_component(enrichment_table("file"))]
65#[derive(Clone, Debug, Default, Eq, PartialEq)]
66pub struct FileConfig {
67    /// File-specific settings.
68    #[configurable(derived)]
69    pub file: FileSettings,
70
71    /// Key/value pairs representing mapped log field names and types.
72    ///
73    /// This is used to coerce log fields from strings into their proper types. The available types are listed in the `Types` list below.
74    ///
75    /// Timestamp coercions need to be prefaced with `timestamp|`, for example `"timestamp|%F"`. Timestamp specifiers can use either of the following:
76    ///
77    /// 1. One of the built-in-formats listed in the `Timestamp Formats` table below.
78    /// 2. The [time format specifiers][chrono_fmt] from Rust’s `chrono` library.
79    ///
80    /// Types
81    ///
82    /// - **`bool`**
83    /// - **`string`**
84    /// - **`float`**
85    /// - **`integer`**
86    /// - **`date`**
87    /// - **`timestamp`** (see the table below for formats)
88    ///
89    /// Timestamp Formats
90    ///
91    /// | Format               | Description                                                                      | Example                          |
92    /// |----------------------|----------------------------------------------------------------------------------|----------------------------------|
93    /// | `%F %T`              | `YYYY-MM-DD HH:MM:SS`                                                            | `2020-12-01 02:37:54`            |
94    /// | `%v %T`              | `DD-Mmm-YYYY HH:MM:SS`                                                           | `01-Dec-2020 02:37:54`           |
95    /// | `%FT%T`              | [ISO 8601][iso8601]/[RFC 3339][rfc3339], without time zone                       | `2020-12-01T02:37:54`            |
96    /// | `%FT%TZ`             | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC                                     | `2020-12-01T09:37:54Z`           |
97    /// | `%+`                 | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC, with time zone                     | `2020-12-01T02:37:54-07:00`      |
98    /// | `%a, %d %b %Y %T`    | [RFC 822][rfc822]/[RFC 2822][rfc2822], without time zone                         | `Tue, 01 Dec 2020 02:37:54`      |
99    /// | `%a %b %e %T %Y`     | [ctime][ctime] format                                                            | `Tue Dec 1 02:37:54 2020`        |
100    /// | `%s`                 | [UNIX timestamp][unix_ts]                                                        | `1606790274`                     |
101    /// | `%a %d %b %T %Y`     | [date][date] command, without time zone                                          | `Tue 01 Dec 02:37:54 2020`       |
102    /// | `%a %d %b %T %Z %Y`  | [date][date] command, with time zone                                             | `Tue 01 Dec 02:37:54 PST 2020`   |
103    /// | `%a %d %b %T %z %Y`  | [date][date] command, with numeric time zone                                     | `Tue 01 Dec 02:37:54 -0700 2020` |
104    /// | `%a %d %b %T %#z %Y` | [date][date] command, with numeric time zone (minutes can be missing or present) | `Tue 01 Dec 02:37:54 -07 2020`   |
105    ///
106    /// [date]: https://man7.org/linux/man-pages/man1/date.1.html
107    /// [ctime]: https://www.cplusplus.com/reference/ctime
108    /// [unix_ts]: https://en.wikipedia.org/wiki/Unix_time
109    /// [rfc822]: https://tools.ietf.org/html/rfc822#section-5
110    /// [rfc2822]: https://tools.ietf.org/html/rfc2822#section-3.3
111    /// [iso8601]: https://en.wikipedia.org/wiki/ISO_8601
112    /// [rfc3339]: https://tools.ietf.org/html/rfc3339
113    /// [chrono_fmt]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
114    #[serde(default)]
115    #[configurable(metadata(
116        docs::additional_props_description = "Represents mapped log field names and types."
117    ))]
118    pub schema: HashMap<String, String>,
119}
120
121const fn default_delimiter() -> char {
122    ','
123}
124
125impl FileConfig {
126    fn parse_column(
127        &self,
128        timezone: TimeZone,
129        column: &str,
130        row: usize,
131        value: &str,
132    ) -> Result<Value, String> {
133        use chrono::TimeZone;
134
135        Ok(match self.schema.get(column) {
136            Some(format) => {
137                let mut split = format.splitn(2, '|').map(|segment| segment.trim());
138
139                match (split.next(), split.next()) {
140                    (Some("date"), None) => Value::Timestamp(
141                        chrono::FixedOffset::east_opt(0)
142                            .expect("invalid timestamp")
143                            .from_utc_datetime(
144                                &chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d")
145                                    .map_err(|_| {
146                                        format!("unable to parse date {value} found in row {row}")
147                                    })?
148                                    .and_hms_opt(0, 0, 0)
149                                    .expect("invalid timestamp"),
150                            )
151                            .into(),
152                    ),
153                    (Some("date"), Some(format)) => Value::Timestamp(
154                        chrono::FixedOffset::east_opt(0)
155                            .expect("invalid timestamp")
156                            .from_utc_datetime(
157                                &chrono::NaiveDate::parse_from_str(value, format)
158                                    .map_err(|_| {
159                                        format!("unable to parse date {value} found in row {row}")
160                                    })?
161                                    .and_hms_opt(0, 0, 0)
162                                    .expect("invalid timestamp"),
163                            )
164                            .into(),
165                    ),
166                    _ => {
167                        let conversion =
168                            Conversion::parse(format, timezone).map_err(|err| err.to_string())?;
169                        conversion
170                            .convert(Bytes::copy_from_slice(value.as_bytes()))
171                            .map_err(|_| format!("unable to parse {value} found in row {row}"))?
172                    }
173                }
174            }
175            None => value.into(),
176        })
177    }
178
179    /// Load the configured file into memory. Required to create a new file enrichment table.
180    pub fn load_file(&self, timezone: TimeZone) -> crate::Result<FileData> {
181        let Encoding::Csv {
182            include_headers,
183            delimiter,
184        } = self.file.encoding;
185
186        let mut reader = csv::ReaderBuilder::new()
187            .has_headers(include_headers)
188            .delimiter(delimiter as u8)
189            .from_path(&self.file.path)?;
190
191        let first_row = reader.records().next();
192        let headers = if include_headers {
193            reader
194                .headers()?
195                .iter()
196                .map(|col| col.to_string())
197                .collect::<Vec<_>>()
198        } else {
199            // If there are no headers in the datafile we make headers as the numerical index of
200            // the column.
201            match first_row {
202                Some(Ok(ref row)) => (0..row.len()).map(|idx| idx.to_string()).collect(),
203                _ => Vec::new(),
204            }
205        };
206
207        let data = first_row
208            .into_iter()
209            .chain(reader.records())
210            .map(|row| {
211                Ok(row?
212                    .iter()
213                    .enumerate()
214                    .map(|(idx, col)| self.parse_column(timezone, &headers[idx], idx, col))
215                    .collect::<Result<Vec<_>, String>>()?)
216            })
217            .collect::<crate::Result<Vec<_>>>()?;
218
219        trace!(
220            "Loaded enrichment file {} with headers {:?}.",
221            self.file.path.to_str().unwrap_or("path with invalid utf"),
222            headers
223        );
224
225        let file = reader.into_inner();
226
227        Ok(FileData {
228            headers,
229            data,
230            modified: file.metadata()?.modified()?,
231        })
232    }
233}
234
235impl EnrichmentTableConfig for FileConfig {
236    async fn build(
237        &self,
238        globals: &crate::config::GlobalOptions,
239    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
240        Ok(Box::new(File::new(
241            self.clone(),
242            self.load_file(globals.timezone())?,
243        )))
244    }
245}
246
247impl_generate_config_from_default!(FileConfig);
248
249/// The data resulting from loading a configured file.
250pub struct FileData {
251    /// The ordered set of headers of the data columns.
252    pub headers: Vec<String>,
253    /// The data contained in the file.
254    pub data: Vec<Vec<Value>>,
255    /// The last modified time of the file.
256    pub modified: SystemTime,
257}
258
259/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a CSV file.
260#[derive(Clone)]
261pub struct File {
262    config: FileConfig,
263    last_modified: SystemTime,
264    data: Vec<Vec<Value>>,
265    headers: Vec<String>,
266    indexes: Vec<(
267        Case,
268        Vec<usize>,
269        HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>,
270    )>,
271}
272
273impl File {
274    /// Creates a new [File] based on the provided config.
275    pub fn new(config: FileConfig, data: FileData) -> Self {
276        Self {
277            config,
278            last_modified: data.modified,
279            data: data.data,
280            headers: data.headers,
281            indexes: Vec::new(),
282        }
283    }
284
285    fn column_index(&self, col: &str) -> Option<usize> {
286        self.headers.iter().position(|header| header == col)
287    }
288
289    /// Does the given row match all the conditions specified?
290    fn row_equals(
291        &self,
292        case: Case,
293        condition: &[Condition],
294        row: &[Value],
295        wildcard: Option<&Value>,
296    ) -> bool {
297        condition.iter().all(|condition| match condition {
298            Condition::Equals { field, value } => match self.column_index(field) {
299                None => false,
300                Some(idx) => {
301                    let current_row_value = &row[idx];
302
303                    // Helper closure for comparing current_row_value with another value,
304                    // respecting the specified case for Value::Bytes.
305                    let compare_values = |val_to_compare: &Value| -> bool {
306                        match (case, current_row_value, val_to_compare) {
307                            (
308                                Case::Insensitive,
309                                Value::Bytes(bytes_row),
310                                Value::Bytes(bytes_cmp),
311                            ) => {
312                                // Perform case-insensitive comparison for byte strings.
313                                // If both are valid UTF-8, compare their lowercase versions.
314                                // If both are non-UTF-8 bytes, compare them directly.
315                                // If one is UTF-8 and the other is not, they are considered not equal.
316                                match (
317                                    std::str::from_utf8(bytes_row),
318                                    std::str::from_utf8(bytes_cmp),
319                                ) {
320                                    (Ok(s_row), Ok(s_cmp)) => {
321                                        s_row.to_lowercase() == s_cmp.to_lowercase()
322                                    }
323                                    (Err(_), Err(_)) => bytes_row == bytes_cmp,
324                                    _ => false,
325                                }
326                            }
327                            // For Case::Sensitive, or for Case::Insensitive with non-Bytes types,
328                            // perform a direct equality check.
329                            _ => current_row_value == val_to_compare,
330                        }
331                    };
332
333                    // First, check if the row value matches the condition's value.
334                    if compare_values(value) {
335                        true
336                    } else if let Some(wc_val) = wildcard {
337                        // If not, and a wildcard is provided, check if the row value matches the wildcard.
338                        compare_values(wc_val)
339                    } else {
340                        // Otherwise, no match.
341                        false
342                    }
343                }
344            },
345            Condition::BetweenDates { field, from, to } => match self.column_index(field) {
346                None => false,
347                Some(idx) => match row[idx] {
348                    Value::Timestamp(date) => from <= &date && &date <= to,
349                    _ => false,
350                },
351            },
352            Condition::FromDate { field, from } => match self.column_index(field) {
353                None => false,
354                Some(idx) => match row[idx] {
355                    Value::Timestamp(date) => from <= &date,
356                    _ => false,
357                },
358            },
359            Condition::ToDate { field, to } => match self.column_index(field) {
360                None => false,
361                Some(idx) => match row[idx] {
362                    Value::Timestamp(date) => &date <= to,
363                    _ => false,
364                },
365            },
366        })
367    }
368
369    fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> ObjectMap {
370        self.headers
371            .iter()
372            .zip(row)
373            .filter(|(header, _)| {
374                select
375                    .map(|select| select.contains(header))
376                    // If no select is passed, we assume all columns are included
377                    .unwrap_or(true)
378            })
379            .map(|(header, col)| (header.as_str().into(), col.clone()))
380            .collect()
381    }
382
383    /// Order the fields in the index according to the position they are found in the header.
384    fn normalize_index_fields(&self, index: &[&str]) -> Result<Vec<usize>, String> {
385        // Get the positions of the fields we are indexing
386        let normalized = self
387            .headers
388            .iter()
389            .enumerate()
390            .filter_map(|(idx, col)| {
391                if index.contains(&col.as_ref()) {
392                    Some(idx)
393                } else {
394                    None
395                }
396            })
397            .collect::<Vec<_>>();
398
399        if normalized.len() != index.len() {
400            let missing = index
401                .iter()
402                .filter_map(|col| {
403                    if self.headers.iter().any(|header| header == *col) {
404                        None
405                    } else {
406                        Some(col.to_string())
407                    }
408                })
409                .collect::<Vec<_>>()
410                .join(", ");
411            Err(format!("field(s) '{missing}' missing from dataset"))
412        } else {
413            Ok(normalized)
414        }
415    }
416
417    /// Creates an index with the given fields.
418    /// Uses seahash to create a hash of the data that is used as the key in a hashmap lookup to
419    /// the index of the row in the data.
420    ///
421    /// Ensure fields that are searched via a comparison are not included in the index!
422    fn index_data(
423        &self,
424        fieldidx: &[usize],
425        case: Case,
426    ) -> Result<HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>, String> {
427        let mut index = HashMap::with_capacity_and_hasher(
428            self.data.len(),
429            hash_hasher::HashBuildHasher::default(),
430        );
431
432        for (idx, row) in self.data.iter().enumerate() {
433            let mut hash = seahash::SeaHasher::default();
434
435            for idx in fieldidx {
436                hash_value(&mut hash, case, &row[*idx])?;
437            }
438
439            let key = hash.finish();
440
441            let entry = index.entry(key).or_insert_with(Vec::new);
442            entry.push(idx);
443        }
444
445        index.shrink_to_fit();
446
447        Ok(index)
448    }
449
450    /// Sequentially searches through the iterator for the given condition.
451    fn sequential<'a, I>(
452        &'a self,
453        data: I,
454        case: Case,
455        condition: &'a [Condition<'a>],
456        select: Option<&'a [String]>,
457        wildcard: Option<&'a Value>,
458    ) -> impl Iterator<Item = ObjectMap> + 'a
459    where
460        I: Iterator<Item = &'a Vec<Value>> + 'a,
461    {
462        data.filter_map(move |row| {
463            if self.row_equals(case, condition, row, wildcard) {
464                Some(self.add_columns(select, row))
465            } else {
466                None
467            }
468        })
469    }
470
471    fn indexed<'a>(
472        &'a self,
473        case: Case,
474        condition: &'a [Condition<'a>],
475        handle: IndexHandle,
476    ) -> Result<Option<&'a Vec<usize>>, String> {
477        // The index to use has been passed, we can use this to search the data.
478        // We are assuming that the caller has passed an index that represents the fields
479        // being passed in the condition.
480        let mut hash = seahash::SeaHasher::default();
481
482        for header in self.headers.iter() {
483            if let Some(Condition::Equals { value, .. }) = condition.iter().find(
484                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
485            ) {
486                hash_value(&mut hash, case, value)?;
487            }
488        }
489
490        let key = hash.finish();
491
492        let IndexHandle(handle) = handle;
493        Ok(self.indexes[handle].2.get(&key))
494    }
495
496    fn indexed_with_wildcard<'a>(
497        &'a self,
498        case: Case,
499        wildcard: &'a Value,
500        condition: &'a [Condition<'a>],
501        handle: IndexHandle,
502    ) -> Result<Option<&'a Vec<usize>>, String> {
503        if let Some(result) = self.indexed(case, condition, handle)? {
504            return Ok(Some(result));
505        }
506
507        // If lookup fails and a wildcard is provided, compute hash for the wildcard
508        let mut wildcard_hash = seahash::SeaHasher::default();
509        for header in self.headers.iter() {
510            if condition.iter().any(
511                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
512            ) {
513                hash_value(&mut wildcard_hash, case, wildcard)?;
514            }
515        }
516
517        let wildcard_key = wildcard_hash.finish();
518        let IndexHandle(handle) = handle;
519        Ok(self.indexes[handle].2.get(&wildcard_key))
520    }
521}
522
523/// Adds the bytes from the given value to the hash.
524/// Each field is terminated by a `0` value to separate the fields
525fn hash_value(hasher: &mut seahash::SeaHasher, case: Case, value: &Value) -> Result<(), String> {
526    match value {
527        Value::Bytes(bytes) => match case {
528            Case::Sensitive => hasher.write(bytes),
529            Case::Insensitive => hasher.write(
530                std::str::from_utf8(bytes)
531                    .map_err(|_| "column contains invalid utf".to_string())?
532                    .to_lowercase()
533                    .as_bytes(),
534            ),
535        },
536        value => {
537            let bytes: bytes::Bytes = value.encode_as_bytes()?;
538            hasher.write(&bytes);
539        }
540    }
541
542    hasher.write_u8(0);
543
544    Ok(())
545}
546
547/// Returns an error if the iterator doesn't yield exactly one result.
548fn single_or_err<I, T>(mut iter: T) -> Result<I, String>
549where
550    T: Iterator<Item = I>,
551{
552    let result = iter.next();
553
554    if iter.next().is_some() {
555        // More than one row has been found.
556        Err("more than one row found".to_string())
557    } else {
558        result.ok_or_else(|| "no rows found".to_string())
559    }
560}
561
562impl Table for File {
563    fn find_table_row<'a>(
564        &self,
565        case: Case,
566        condition: &'a [Condition<'a>],
567        select: Option<&'a [String]>,
568        wildcard: Option<&Value>,
569        index: Option<IndexHandle>,
570    ) -> Result<ObjectMap, String> {
571        match index {
572            None => {
573                // No index has been passed so we need to do a Sequential Scan.
574                single_or_err(self.sequential(self.data.iter(), case, condition, select, wildcard))
575            }
576            Some(handle) => {
577                let result = if let Some(wildcard) = wildcard {
578                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
579                } else {
580                    self.indexed(case, condition, handle)?
581                }
582                .ok_or_else(|| "no rows found in index".to_string())?
583                .iter()
584                .map(|idx| &self.data[*idx]);
585
586                // Perform a sequential scan over the indexed result.
587                single_or_err(self.sequential(result, case, condition, select, wildcard))
588            }
589        }
590    }
591
592    fn find_table_rows<'a>(
593        &self,
594        case: Case,
595        condition: &'a [Condition<'a>],
596        select: Option<&'a [String]>,
597        wildcard: Option<&Value>,
598        index: Option<IndexHandle>,
599    ) -> Result<Vec<ObjectMap>, String> {
600        match index {
601            None => {
602                // No index has been passed so we need to do a Sequential Scan.
603                Ok(self
604                    .sequential(self.data.iter(), case, condition, select, wildcard)
605                    .collect())
606            }
607            Some(handle) => {
608                // Perform a sequential scan over the indexed result.
609                let indexed_result = if let Some(wildcard) = wildcard {
610                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
611                } else {
612                    self.indexed(case, condition, handle)?
613                };
614
615                Ok(self
616                    .sequential(
617                        indexed_result
618                            .iter()
619                            .flat_map(|results| results.iter().map(|idx| &self.data[*idx])),
620                        case,
621                        condition,
622                        select,
623                        wildcard,
624                    )
625                    .collect())
626            }
627        }
628    }
629
630    fn add_index(&mut self, case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
631        let normalized = self.normalize_index_fields(fields)?;
632        match self
633            .indexes
634            .iter()
635            .position(|index| index.0 == case && index.1 == normalized)
636        {
637            Some(pos) => {
638                // This index already exists
639                Ok(IndexHandle(pos))
640            }
641            None => {
642                let index = self.index_data(&normalized, case)?;
643                self.indexes.push((case, normalized, index));
644                // The returned index handle is the position of the index in our list of indexes.
645                Ok(IndexHandle(self.indexes.len() - 1))
646            }
647        }
648    }
649
650    /// Returns a list of the field names that are in each index
651    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
652        self.indexes
653            .iter()
654            .map(|index| {
655                let (case, fields, _) = index;
656                (
657                    *case,
658                    fields
659                        .iter()
660                        .map(|idx| self.headers[*idx].clone())
661                        .collect::<Vec<_>>(),
662                )
663            })
664            .collect::<Vec<_>>()
665    }
666
667    /// Checks the modified timestamp of the data file to see if data has changed.
668    fn needs_reload(&self) -> bool {
669        matches!(fs::metadata(&self.config.file.path)
670            .and_then(|metadata| metadata.modified()),
671            Ok(modified) if modified > self.last_modified)
672    }
673}
674
675impl std::fmt::Debug for File {
676    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677        write!(
678            f,
679            "File {} row(s) {} index(es)",
680            self.data.len(),
681            self.indexes.len()
682        )
683    }
684}
685
686#[cfg(test)]
687mod tests {
688    use chrono::{TimeZone, Timelike};
689
690    use super::*;
691
692    #[test]
693    fn parse_file_with_headers() {
694        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
695        let path = dir.path().join("table.csv");
696        fs::write(path.clone(), "foo,bar\na,1\nb,2").expect("Failed to write enrichment table");
697
698        let config = FileConfig {
699            file: FileSettings {
700                path,
701                encoding: Encoding::Csv {
702                    include_headers: true,
703                    delimiter: default_delimiter(),
704                },
705            },
706            schema: HashMap::new(),
707        };
708        let data = config
709            .load_file(Default::default())
710            .expect("Failed to parse csv");
711        assert_eq!(vec!["foo".to_string(), "bar".to_string()], data.headers);
712        assert_eq!(
713            vec![
714                vec![Value::from("a"), Value::from("1")],
715                vec![Value::from("b"), Value::from("2")],
716            ],
717            data.data
718        );
719    }
720
721    #[test]
722    fn parse_file_no_headers() {
723        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
724        let path = dir.path().join("table.csv");
725        fs::write(path.clone(), "a,1\nb,2").expect("Failed to write enrichment table");
726
727        let config = FileConfig {
728            file: FileSettings {
729                path,
730                encoding: Encoding::Csv {
731                    include_headers: false,
732                    delimiter: default_delimiter(),
733                },
734            },
735            schema: HashMap::new(),
736        };
737        let data = config
738            .load_file(Default::default())
739            .expect("Failed to parse csv");
740        assert_eq!(vec!["0".to_string(), "1".to_string()], data.headers);
741        assert_eq!(
742            vec![
743                vec![Value::from("a"), Value::from("1")],
744                vec![Value::from("b"), Value::from("2")],
745            ],
746            data.data
747        );
748    }
749
750    #[test]
751    fn parse_column() {
752        let mut schema = HashMap::new();
753        schema.insert("col1".to_string(), " string ".to_string());
754        schema.insert("col2".to_string(), " date ".to_string());
755        schema.insert("col3".to_string(), "date|%m/%d/%Y".to_string());
756        schema.insert("col3-spaces".to_string(), "date | %m %d %Y".to_string());
757        schema.insert("col4".to_string(), "timestamp|%+".to_string());
758        schema.insert("col4-spaces".to_string(), "timestamp | %+".to_string());
759        schema.insert("col5".to_string(), "int".to_string());
760        let config = FileConfig {
761            file: Default::default(),
762            schema,
763        };
764
765        assert_eq!(
766            Ok(Value::from("zork")),
767            config.parse_column(Default::default(), "col1", 1, "zork")
768        );
769
770        assert_eq!(
771            Ok(Value::from(
772                chrono::Utc
773                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
774                    .single()
775                    .expect("invalid timestamp")
776            )),
777            config.parse_column(Default::default(), "col2", 1, "2020-03-05")
778        );
779
780        assert_eq!(
781            Ok(Value::from(
782                chrono::Utc
783                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
784                    .single()
785                    .expect("invalid timestamp")
786            )),
787            config.parse_column(Default::default(), "col3", 1, "03/05/2020")
788        );
789
790        assert_eq!(
791            Ok(Value::from(
792                chrono::Utc
793                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
794                    .single()
795                    .expect("invalid timestamp")
796            )),
797            config.parse_column(Default::default(), "col3-spaces", 1, "03 05 2020")
798        );
799
800        assert_eq!(
801            Ok(Value::from(
802                chrono::Utc
803                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
804                    .single()
805                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
806                    .expect("invalid timestamp")
807            )),
808            config.parse_column(
809                Default::default(),
810                "col4",
811                1,
812                "2001-07-08T00:34:00.026490+09:30"
813            )
814        );
815
816        assert_eq!(
817            Ok(Value::from(
818                chrono::Utc
819                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
820                    .single()
821                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
822                    .expect("invalid timestamp")
823            )),
824            config.parse_column(
825                Default::default(),
826                "col4-spaces",
827                1,
828                "2001-07-08T00:34:00.026490+09:30"
829            )
830        );
831
832        assert_eq!(
833            Ok(Value::from(42)),
834            config.parse_column(Default::default(), "col5", 1, "42")
835        );
836    }
837
838    #[test]
839    fn seahash() {
840        // Ensure we can separate fields to create a distinct hash.
841        let mut one = seahash::SeaHasher::default();
842        one.write(b"norknoog");
843        one.write_u8(0);
844        one.write(b"donk");
845
846        let mut two = seahash::SeaHasher::default();
847        two.write(b"nork");
848        one.write_u8(0);
849        two.write(b"noogdonk");
850
851        assert_ne!(one.finish(), two.finish());
852    }
853
854    #[test]
855    fn finds_row() {
856        let file = File::new(
857            Default::default(),
858            FileData {
859                modified: SystemTime::now(),
860                data: vec![
861                    vec!["zip".into(), "zup".into()],
862                    vec!["zirp".into(), "zurp".into()],
863                ],
864                headers: vec!["field1".to_string(), "field2".to_string()],
865            },
866        );
867
868        let condition = Condition::Equals {
869            field: "field1",
870            value: Value::from("zirp"),
871        };
872
873        assert_eq!(
874            Ok(ObjectMap::from([
875                ("field1".into(), Value::from("zirp")),
876                ("field2".into(), Value::from("zurp")),
877            ])),
878            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
879        );
880    }
881
882    #[test]
883    fn finds_row_with_wildcard() {
884        let file = File::new(
885            Default::default(),
886            FileData {
887                modified: SystemTime::now(),
888                data: vec![
889                    vec!["zip".into(), "zup".into()],
890                    vec!["zirp".into(), "zurp".into()],
891                ],
892                headers: vec!["field1".to_string(), "field2".to_string()],
893            },
894        );
895
896        let wildcard = Value::from("zirp");
897
898        let condition = Condition::Equals {
899            field: "field1",
900            value: Value::from("nonexistent"),
901        };
902
903        assert_eq!(
904            Ok(ObjectMap::from([
905                ("field1".into(), Value::from("zirp")),
906                ("field2".into(), Value::from("zurp")),
907            ])),
908            file.find_table_row(Case::Sensitive, &[condition], None, Some(&wildcard), None)
909        );
910    }
911
912    #[test]
913    fn duplicate_indexes() {
914        let mut file = File::new(
915            Default::default(),
916            FileData {
917                modified: SystemTime::now(),
918                data: Vec::new(),
919                headers: vec![
920                    "field1".to_string(),
921                    "field2".to_string(),
922                    "field3".to_string(),
923                ],
924            },
925        );
926
927        let handle1 = file.add_index(Case::Sensitive, &["field2", "field3"]);
928        let handle2 = file.add_index(Case::Sensitive, &["field3", "field2"]);
929
930        assert_eq!(handle1, handle2);
931        assert_eq!(1, file.indexes.len());
932    }
933
934    #[test]
935    fn errors_on_missing_columns() {
936        let mut file = File::new(
937            Default::default(),
938            FileData {
939                modified: SystemTime::now(),
940                data: Vec::new(),
941                headers: vec![
942                    "field1".to_string(),
943                    "field2".to_string(),
944                    "field3".to_string(),
945                ],
946            },
947        );
948
949        let error = file.add_index(Case::Sensitive, &["apples", "field2", "bananas"]);
950        assert_eq!(
951            Err("field(s) 'apples, bananas' missing from dataset".to_string()),
952            error
953        )
954    }
955
956    #[test]
957    fn finds_row_with_index() {
958        let mut file = File::new(
959            Default::default(),
960            FileData {
961                modified: SystemTime::now(),
962                data: vec![
963                    vec!["zip".into(), "zup".into()],
964                    vec!["zirp".into(), "zurp".into()],
965                ],
966                headers: vec!["field1".to_string(), "field2".to_string()],
967            },
968        );
969
970        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
971
972        let condition = Condition::Equals {
973            field: "field1",
974            value: Value::from("zirp"),
975        };
976
977        assert_eq!(
978            Ok(ObjectMap::from([
979                ("field1".into(), Value::from("zirp")),
980                ("field2".into(), Value::from("zurp")),
981            ])),
982            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
983        );
984    }
985
986    #[test]
987    fn finds_row_with_index_case_sensitive_and_wildcard() {
988        let mut file = File::new(
989            Default::default(),
990            FileData {
991                modified: SystemTime::now(),
992                data: vec![
993                    vec!["zip".into(), "zup".into()],
994                    vec!["zirp".into(), "zurp".into()],
995                ],
996                headers: vec!["field1".to_string(), "field2".to_string()],
997            },
998        );
999
1000        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1001        let wildcard = Value::from("zirp");
1002
1003        let condition = Condition::Equals {
1004            field: "field1",
1005            value: Value::from("nonexistent"),
1006        };
1007
1008        assert_eq!(
1009            Ok(ObjectMap::from([
1010                ("field1".into(), Value::from("zirp")),
1011                ("field2".into(), Value::from("zurp")),
1012            ])),
1013            file.find_table_row(
1014                Case::Sensitive,
1015                &[condition],
1016                None,
1017                Some(&wildcard),
1018                Some(handle)
1019            )
1020        );
1021    }
1022
1023    #[test]
1024    fn finds_rows_with_index_case_sensitive() {
1025        let mut file = File::new(
1026            Default::default(),
1027            FileData {
1028                modified: SystemTime::now(),
1029                data: vec![
1030                    vec!["zip".into(), "zup".into()],
1031                    vec!["zirp".into(), "zurp".into()],
1032                    vec!["zip".into(), "zoop".into()],
1033                ],
1034                headers: vec!["field1".to_string(), "field2".to_string()],
1035            },
1036        );
1037
1038        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1039
1040        assert_eq!(
1041            Ok(vec![
1042                ObjectMap::from([
1043                    ("field1".into(), Value::from("zip")),
1044                    ("field2".into(), Value::from("zup")),
1045                ]),
1046                ObjectMap::from([
1047                    ("field1".into(), Value::from("zip")),
1048                    ("field2".into(), Value::from("zoop")),
1049                ]),
1050            ]),
1051            file.find_table_rows(
1052                Case::Sensitive,
1053                &[Condition::Equals {
1054                    field: "field1",
1055                    value: Value::from("zip"),
1056                }],
1057                None,
1058                None,
1059                Some(handle)
1060            )
1061        );
1062
1063        assert_eq!(
1064            Ok(vec![]),
1065            file.find_table_rows(
1066                Case::Sensitive,
1067                &[Condition::Equals {
1068                    field: "field1",
1069                    value: Value::from("ZiP"),
1070                }],
1071                None,
1072                None,
1073                Some(handle)
1074            )
1075        );
1076    }
1077
1078    #[test]
1079    fn selects_columns() {
1080        let mut file = File::new(
1081            Default::default(),
1082            FileData {
1083                modified: SystemTime::now(),
1084                data: vec![
1085                    vec!["zip".into(), "zup".into(), "zoop".into()],
1086                    vec!["zirp".into(), "zurp".into(), "zork".into()],
1087                    vec!["zip".into(), "zoop".into(), "zibble".into()],
1088                ],
1089                headers: vec![
1090                    "field1".to_string(),
1091                    "field2".to_string(),
1092                    "field3".to_string(),
1093                ],
1094            },
1095        );
1096
1097        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1098
1099        let condition = Condition::Equals {
1100            field: "field1",
1101            value: Value::from("zip"),
1102        };
1103
1104        assert_eq!(
1105            Ok(vec![
1106                ObjectMap::from([
1107                    ("field1".into(), Value::from("zip")),
1108                    ("field3".into(), Value::from("zoop")),
1109                ]),
1110                ObjectMap::from([
1111                    ("field1".into(), Value::from("zip")),
1112                    ("field3".into(), Value::from("zibble")),
1113                ]),
1114            ]),
1115            file.find_table_rows(
1116                Case::Sensitive,
1117                &[condition],
1118                Some(&["field1".to_string(), "field3".to_string()]),
1119                None,
1120                Some(handle)
1121            )
1122        );
1123    }
1124
1125    #[test]
1126    fn finds_rows_with_index_case_insensitive() {
1127        let mut file = File::new(
1128            Default::default(),
1129            FileData {
1130                modified: SystemTime::now(),
1131                data: vec![
1132                    vec!["zip".into(), "zup".into()],
1133                    vec!["zirp".into(), "zurp".into()],
1134                    vec!["zip".into(), "zoop".into()],
1135                ],
1136                headers: vec!["field1".to_string(), "field2".to_string()],
1137            },
1138        );
1139
1140        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1141
1142        assert_eq!(
1143            Ok(vec![
1144                ObjectMap::from([
1145                    ("field1".into(), Value::from("zip")),
1146                    ("field2".into(), Value::from("zup")),
1147                ]),
1148                ObjectMap::from([
1149                    ("field1".into(), Value::from("zip")),
1150                    ("field2".into(), Value::from("zoop")),
1151                ]),
1152            ]),
1153            file.find_table_rows(
1154                Case::Insensitive,
1155                &[Condition::Equals {
1156                    field: "field1",
1157                    value: Value::from("zip"),
1158                }],
1159                None,
1160                None,
1161                Some(handle)
1162            )
1163        );
1164
1165        assert_eq!(
1166            Ok(vec![
1167                ObjectMap::from([
1168                    ("field1".into(), Value::from("zip")),
1169                    ("field2".into(), Value::from("zup")),
1170                ]),
1171                ObjectMap::from([
1172                    ("field1".into(), Value::from("zip")),
1173                    ("field2".into(), Value::from("zoop")),
1174                ]),
1175            ]),
1176            file.find_table_rows(
1177                Case::Insensitive,
1178                &[Condition::Equals {
1179                    field: "field1",
1180                    value: Value::from("ZiP"),
1181                }],
1182                None,
1183                None,
1184                Some(handle)
1185            )
1186        );
1187    }
1188
1189    #[test]
1190    fn finds_rows_with_index_case_insensitive_and_wildcard() {
1191        let mut file = File::new(
1192            Default::default(),
1193            FileData {
1194                modified: SystemTime::now(),
1195                data: vec![
1196                    vec!["zip".into(), "zup".into()],
1197                    vec!["zirp".into(), "zurp".into()],
1198                    vec!["zip".into(), "zoop".into()],
1199                ],
1200                headers: vec!["field1".to_string(), "field2".to_string()],
1201            },
1202        );
1203
1204        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1205
1206        assert_eq!(
1207            Ok(vec![
1208                ObjectMap::from([
1209                    ("field1".into(), Value::from("zip")),
1210                    ("field2".into(), Value::from("zup")),
1211                ]),
1212                ObjectMap::from([
1213                    ("field1".into(), Value::from("zip")),
1214                    ("field2".into(), Value::from("zoop")),
1215                ]),
1216            ]),
1217            file.find_table_rows(
1218                Case::Insensitive,
1219                &[Condition::Equals {
1220                    field: "field1",
1221                    value: Value::from("nonexistent"),
1222                }],
1223                None,
1224                Some(&Value::from("zip")),
1225                Some(handle)
1226            )
1227        );
1228
1229        assert_eq!(
1230            Ok(vec![
1231                ObjectMap::from([
1232                    ("field1".into(), Value::from("zip")),
1233                    ("field2".into(), Value::from("zup")),
1234                ]),
1235                ObjectMap::from([
1236                    ("field1".into(), Value::from("zip")),
1237                    ("field2".into(), Value::from("zoop")),
1238                ]),
1239            ]),
1240            file.find_table_rows(
1241                Case::Insensitive,
1242                &[Condition::Equals {
1243                    field: "field1",
1244                    value: Value::from("ZiP"),
1245                }],
1246                None,
1247                Some(&Value::from("ZiP")),
1248                Some(handle)
1249            )
1250        );
1251    }
1252
1253    #[test]
1254    fn finds_row_between_dates() {
1255        let mut file = File::new(
1256            Default::default(),
1257            FileData {
1258                modified: SystemTime::now(),
1259                data: vec![
1260                    vec![
1261                        "zip".into(),
1262                        Value::Timestamp(
1263                            chrono::Utc
1264                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1265                                .single()
1266                                .expect("invalid timestamp"),
1267                        ),
1268                    ],
1269                    vec![
1270                        "zip".into(),
1271                        Value::Timestamp(
1272                            chrono::Utc
1273                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1274                                .single()
1275                                .expect("invalid timestamp"),
1276                        ),
1277                    ],
1278                ],
1279                headers: vec!["field1".to_string(), "field2".to_string()],
1280            },
1281        );
1282
1283        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1284
1285        let conditions = [
1286            Condition::Equals {
1287                field: "field1",
1288                value: "zip".into(),
1289            },
1290            Condition::BetweenDates {
1291                field: "field2",
1292                from: chrono::Utc
1293                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1294                    .single()
1295                    .expect("invalid timestamp"),
1296                to: chrono::Utc
1297                    .with_ymd_and_hms(2017, 1, 1, 0, 0, 0)
1298                    .single()
1299                    .expect("invalid timestamp"),
1300            },
1301        ];
1302
1303        assert_eq!(
1304            Ok(ObjectMap::from([
1305                ("field1".into(), Value::from("zip")),
1306                (
1307                    "field2".into(),
1308                    Value::Timestamp(
1309                        chrono::Utc
1310                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1311                            .single()
1312                            .expect("invalid timestamp")
1313                    )
1314                )
1315            ])),
1316            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1317        );
1318    }
1319
1320    #[test]
1321    fn finds_row_from_date() {
1322        let mut file = File::new(
1323            Default::default(),
1324            FileData {
1325                modified: SystemTime::now(),
1326                data: vec![
1327                    vec![
1328                        "zip".into(),
1329                        Value::Timestamp(
1330                            chrono::Utc
1331                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1332                                .single()
1333                                .expect("invalid timestamp"),
1334                        ),
1335                    ],
1336                    vec![
1337                        "zip".into(),
1338                        Value::Timestamp(
1339                            chrono::Utc
1340                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1341                                .single()
1342                                .expect("invalid timestamp"),
1343                        ),
1344                    ],
1345                ],
1346                headers: vec!["field1".to_string(), "field2".to_string()],
1347            },
1348        );
1349
1350        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1351
1352        let conditions = [
1353            Condition::Equals {
1354                field: "field1",
1355                value: "zip".into(),
1356            },
1357            Condition::FromDate {
1358                field: "field2",
1359                from: chrono::Utc
1360                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1361                    .single()
1362                    .expect("invalid timestamp"),
1363            },
1364        ];
1365
1366        assert_eq!(
1367            Ok(ObjectMap::from([
1368                ("field1".into(), Value::from("zip")),
1369                (
1370                    "field2".into(),
1371                    Value::Timestamp(
1372                        chrono::Utc
1373                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1374                            .single()
1375                            .expect("invalid timestamp")
1376                    )
1377                )
1378            ])),
1379            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1380        );
1381    }
1382
1383    #[test]
1384    fn finds_row_to_date() {
1385        let mut file = File::new(
1386            Default::default(),
1387            FileData {
1388                modified: SystemTime::now(),
1389                data: vec![
1390                    vec![
1391                        "zip".into(),
1392                        Value::Timestamp(
1393                            chrono::Utc
1394                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1395                                .single()
1396                                .expect("invalid timestamp"),
1397                        ),
1398                    ],
1399                    vec![
1400                        "zip".into(),
1401                        Value::Timestamp(
1402                            chrono::Utc
1403                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1404                                .single()
1405                                .expect("invalid timestamp"),
1406                        ),
1407                    ],
1408                ],
1409                headers: vec!["field1".to_string(), "field2".to_string()],
1410            },
1411        );
1412
1413        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1414
1415        let conditions = [
1416            Condition::Equals {
1417                field: "field1",
1418                value: "zip".into(),
1419            },
1420            Condition::ToDate {
1421                field: "field2",
1422                to: chrono::Utc
1423                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1424                    .single()
1425                    .expect("invalid timestamp"),
1426            },
1427        ];
1428
1429        assert_eq!(
1430            Ok(ObjectMap::from([
1431                ("field1".into(), Value::from("zip")),
1432                (
1433                    "field2".into(),
1434                    Value::Timestamp(
1435                        chrono::Utc
1436                            .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1437                            .single()
1438                            .expect("invalid timestamp")
1439                    )
1440                )
1441            ])),
1442            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1443        );
1444    }
1445
1446    #[test]
1447    fn doesnt_find_row() {
1448        let file = File::new(
1449            Default::default(),
1450            FileData {
1451                modified: SystemTime::now(),
1452                data: vec![
1453                    vec!["zip".into(), "zup".into()],
1454                    vec!["zirp".into(), "zurp".into()],
1455                ],
1456                headers: vec!["field1".to_string(), "field2".to_string()],
1457            },
1458        );
1459
1460        let condition = Condition::Equals {
1461            field: "field1",
1462            value: Value::from("zorp"),
1463        };
1464
1465        assert_eq!(
1466            Err("no rows found".to_string()),
1467            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
1468        );
1469    }
1470
1471    #[test]
1472    fn doesnt_find_row_with_index() {
1473        let mut file = File::new(
1474            Default::default(),
1475            FileData {
1476                modified: SystemTime::now(),
1477                data: vec![
1478                    vec!["zip".into(), "zup".into()],
1479                    vec!["zirp".into(), "zurp".into()],
1480                ],
1481                headers: vec!["field1".to_string(), "field2".to_string()],
1482            },
1483        );
1484
1485        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1486
1487        let condition = Condition::Equals {
1488            field: "field1",
1489            value: Value::from("zorp"),
1490        };
1491
1492        assert_eq!(
1493            Err("no rows found in index".to_string()),
1494            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
1495        );
1496    }
1497
1498    #[test]
1499    fn doesnt_find_row_with_index_and_wildcard() {
1500        let mut file = File::new(
1501            Default::default(),
1502            FileData {
1503                modified: SystemTime::now(),
1504                data: vec![
1505                    vec!["zip".into(), "zup".into()],
1506                    vec!["zirp".into(), "zurp".into()],
1507                ],
1508                headers: vec!["field1".to_string(), "field2".to_string()],
1509            },
1510        );
1511
1512        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1513        let wildcard = Value::from("nonexistent");
1514
1515        let condition = Condition::Equals {
1516            field: "field1",
1517            value: Value::from("zorp"),
1518        };
1519
1520        assert_eq!(
1521            Err("no rows found in index".to_string()),
1522            file.find_table_row(
1523                Case::Sensitive,
1524                &[condition],
1525                None,
1526                Some(&wildcard),
1527                Some(handle)
1528            )
1529        );
1530    }
1531}