vector/enrichment_tables/
file.rs

1//! Handles enrichment tables for `type = file`.
2use std::{collections::HashMap, fs, hash::Hasher, path::PathBuf, time::SystemTime};
3
4use bytes::Bytes;
5use tracing::trace;
6use vector_lib::{
7    TimeZone,
8    configurable::configurable_component,
9    conversion::Conversion,
10    enrichment::{Case, Condition, Error, IndexHandle, Table},
11};
12use vrl::value::{ObjectMap, Value};
13
14use crate::config::EnrichmentTableConfig;
15
16/// File encoding configuration.
17#[configurable_component]
18#[derive(Clone, Debug, Eq, PartialEq)]
19#[serde(tag = "type", rename_all = "snake_case")]
20#[configurable(metadata(docs::enum_tag_description = "File encoding type."))]
21pub enum Encoding {
22    /// Decodes the file as a [CSV][csv] (comma-separated values) file.
23    ///
24    /// [csv]: https://wikipedia.org/wiki/Comma-separated_values
25    Csv {
26        /// Whether or not the file contains column headers.
27        ///
28        /// When set to `true`, the first row of the CSV file will be read as the header row, and
29        /// the values will be used for the names of each column. This is the default behavior.
30        ///
31        /// When set to `false`, columns are referred to by their numerical index.
32        #[serde(default = "crate::serde::default_true")]
33        include_headers: bool,
34
35        /// The delimiter used to separate fields in each row of the CSV file.
36        #[serde(default = "default_delimiter")]
37        delimiter: char,
38    },
39}
40
41impl Default for Encoding {
42    fn default() -> Self {
43        Self::Csv {
44            include_headers: true,
45            delimiter: default_delimiter(),
46        }
47    }
48}
49
50/// File-specific settings.
51#[configurable_component]
52#[derive(Clone, Debug, Default, Eq, PartialEq)]
53pub struct FileSettings {
54    /// The path of the enrichment table file.
55    ///
56    /// Currently, only [CSV][csv] files are supported.
57    ///
58    /// [csv]: https://en.wikipedia.org/wiki/Comma-separated_values
59    pub path: PathBuf,
60
61    /// File encoding configuration.
62    #[configurable(derived)]
63    pub encoding: Encoding,
64}
65
66/// Configuration for the `file` enrichment table.
67#[configurable_component(enrichment_table("file"))]
68#[derive(Clone, Debug, Default, Eq, PartialEq)]
69pub struct FileConfig {
70    /// File-specific settings.
71    #[configurable(derived)]
72    pub file: FileSettings,
73
74    /// Key/value pairs representing mapped log field names and types.
75    ///
76    /// This is used to coerce log fields from strings into their proper types. The available types are listed in the `Types` list below.
77    ///
78    /// Timestamp coercions need to be prefaced with `timestamp|`, for example `"timestamp|%F"`. Timestamp specifiers can use either of the following:
79    ///
80    /// 1. One of the built-in-formats listed in the `Timestamp Formats` table below.
81    /// 2. The [time format specifiers][chrono_fmt] from Rust’s `chrono` library.
82    ///
83    /// Types
84    ///
85    /// - **`bool`**
86    /// - **`string`**
87    /// - **`float`**
88    /// - **`integer`**
89    /// - **`date`**
90    /// - **`timestamp`** (see the table below for formats)
91    ///
92    /// Timestamp Formats
93    ///
94    /// | Format               | Description                                                                      | Example                          |
95    /// |----------------------|----------------------------------------------------------------------------------|----------------------------------|
96    /// | `%F %T`              | `YYYY-MM-DD HH:MM:SS`                                                            | `2020-12-01 02:37:54`            |
97    /// | `%v %T`              | `DD-Mmm-YYYY HH:MM:SS`                                                           | `01-Dec-2020 02:37:54`           |
98    /// | `%FT%T`              | [ISO 8601][iso8601]/[RFC 3339][rfc3339], without time zone                       | `2020-12-01T02:37:54`            |
99    /// | `%FT%TZ`             | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC                                     | `2020-12-01T09:37:54Z`           |
100    /// | `%+`                 | [ISO 8601][iso8601]/[RFC 3339][rfc3339], UTC, with time zone                     | `2020-12-01T02:37:54-07:00`      |
101    /// | `%a, %d %b %Y %T`    | [RFC 822][rfc822]/[RFC 2822][rfc2822], without time zone                         | `Tue, 01 Dec 2020 02:37:54`      |
102    /// | `%a %b %e %T %Y`     | [ctime][ctime] format                                                            | `Tue Dec 1 02:37:54 2020`        |
103    /// | `%s`                 | [UNIX timestamp][unix_ts]                                                        | `1606790274`                     |
104    /// | `%a %d %b %T %Y`     | [date][date] command, without time zone                                          | `Tue 01 Dec 02:37:54 2020`       |
105    /// | `%a %d %b %T %Z %Y`  | [date][date] command, with time zone                                             | `Tue 01 Dec 02:37:54 PST 2020`   |
106    /// | `%a %d %b %T %z %Y`  | [date][date] command, with numeric time zone                                     | `Tue 01 Dec 02:37:54 -0700 2020` |
107    /// | `%a %d %b %T %#z %Y` | [date][date] command, with numeric time zone (minutes can be missing or present) | `Tue 01 Dec 02:37:54 -07 2020`   |
108    ///
109    /// [date]: https://man7.org/linux/man-pages/man1/date.1.html
110    /// [ctime]: https://www.cplusplus.com/reference/ctime
111    /// [unix_ts]: https://en.wikipedia.org/wiki/Unix_time
112    /// [rfc822]: https://tools.ietf.org/html/rfc822#section-5
113    /// [rfc2822]: https://tools.ietf.org/html/rfc2822#section-3.3
114    /// [iso8601]: https://en.wikipedia.org/wiki/ISO_8601
115    /// [rfc3339]: https://tools.ietf.org/html/rfc3339
116    /// [chrono_fmt]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
117    #[serde(default)]
118    #[configurable(metadata(
119        docs::additional_props_description = "Represents mapped log field names and types."
120    ))]
121    pub schema: HashMap<String, String>,
122}
123
124const fn default_delimiter() -> char {
125    ','
126}
127
128impl FileConfig {
129    fn parse_column(
130        &self,
131        timezone: TimeZone,
132        column: &str,
133        row: usize,
134        value: &str,
135    ) -> Result<Value, String> {
136        use chrono::TimeZone;
137
138        Ok(match self.schema.get(column) {
139            Some(format) => {
140                let mut split = format.splitn(2, '|').map(|segment| segment.trim());
141
142                match (split.next(), split.next()) {
143                    (Some("date"), None) => Value::Timestamp(
144                        chrono::FixedOffset::east_opt(0)
145                            .expect("invalid timestamp")
146                            .from_utc_datetime(
147                                &chrono::NaiveDate::parse_from_str(value, "%Y-%m-%d")
148                                    .map_err(|_| {
149                                        format!("unable to parse date {value} found in row {row}")
150                                    })?
151                                    .and_hms_opt(0, 0, 0)
152                                    .expect("invalid timestamp"),
153                            )
154                            .into(),
155                    ),
156                    (Some("date"), Some(format)) => Value::Timestamp(
157                        chrono::FixedOffset::east_opt(0)
158                            .expect("invalid timestamp")
159                            .from_utc_datetime(
160                                &chrono::NaiveDate::parse_from_str(value, format)
161                                    .map_err(|_| {
162                                        format!("unable to parse date {value} found in row {row}")
163                                    })?
164                                    .and_hms_opt(0, 0, 0)
165                                    .expect("invalid timestamp"),
166                            )
167                            .into(),
168                    ),
169                    _ => {
170                        let conversion =
171                            Conversion::parse(format, timezone).map_err(|err| err.to_string())?;
172                        conversion
173                            .convert(Bytes::copy_from_slice(value.as_bytes()))
174                            .map_err(|_| format!("unable to parse {value} found in row {row}"))?
175                    }
176                }
177            }
178            None => value.into(),
179        })
180    }
181
182    /// Load the configured file into memory. Required to create a new file enrichment table.
183    pub fn load_file(&self, timezone: TimeZone) -> crate::Result<FileData> {
184        let Encoding::Csv {
185            include_headers,
186            delimiter,
187        } = self.file.encoding;
188
189        let mut reader = csv::ReaderBuilder::new()
190            .has_headers(include_headers)
191            .delimiter(delimiter as u8)
192            .from_path(&self.file.path)?;
193
194        let first_row = reader.records().next();
195        let headers = if include_headers {
196            reader
197                .headers()?
198                .iter()
199                .map(|col| col.to_string())
200                .collect::<Vec<_>>()
201        } else {
202            // If there are no headers in the datafile we make headers as the numerical index of
203            // the column.
204            match first_row {
205                Some(Ok(ref row)) => (0..row.len()).map(|idx| idx.to_string()).collect(),
206                _ => Vec::new(),
207            }
208        };
209
210        let data = first_row
211            .into_iter()
212            .chain(reader.records())
213            .map(|row| {
214                Ok(row?
215                    .iter()
216                    .enumerate()
217                    .map(|(idx, col)| self.parse_column(timezone, &headers[idx], idx, col))
218                    .collect::<Result<Vec<_>, String>>()?)
219            })
220            .collect::<crate::Result<Vec<_>>>()?;
221
222        trace!(
223            "Loaded enrichment file {} with headers {:?}.",
224            self.file.path.to_str().unwrap_or("path with invalid utf"),
225            headers
226        );
227
228        let file = reader.into_inner();
229
230        Ok(FileData {
231            headers,
232            data,
233            modified: file.metadata()?.modified()?,
234        })
235    }
236}
237
238impl EnrichmentTableConfig for FileConfig {
239    async fn build(
240        &self,
241        globals: &crate::config::GlobalOptions,
242    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
243        Ok(Box::new(File::new(
244            self.clone(),
245            self.load_file(globals.timezone())?,
246        )))
247    }
248}
249
250impl_generate_config_from_default!(FileConfig);
251
252/// The data resulting from loading a configured file.
253pub struct FileData {
254    /// The ordered set of headers of the data columns.
255    pub headers: Vec<String>,
256    /// The data contained in the file.
257    pub data: Vec<Vec<Value>>,
258    /// The last modified time of the file.
259    pub modified: SystemTime,
260}
261
262/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a CSV file.
263#[derive(Clone)]
264pub struct File {
265    config: FileConfig,
266    last_modified: SystemTime,
267    data: Vec<Vec<Value>>,
268    headers: Vec<String>,
269    indexes: Vec<(
270        Case,
271        Vec<usize>,
272        HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>,
273    )>,
274}
275
276impl File {
277    /// Creates a new [File] based on the provided config.
278    pub fn new(config: FileConfig, data: FileData) -> Self {
279        Self {
280            config,
281            last_modified: data.modified,
282            data: data.data,
283            headers: data.headers,
284            indexes: Vec::new(),
285        }
286    }
287
288    fn column_index(&self, col: &str) -> Option<usize> {
289        self.headers.iter().position(|header| header == col)
290    }
291
292    /// Does the given row match all the conditions specified?
293    fn row_equals(
294        &self,
295        case: Case,
296        condition: &[Condition],
297        row: &[Value],
298        wildcard: Option<&Value>,
299    ) -> bool {
300        condition.iter().all(|condition| match condition {
301            Condition::Equals { field, value } => match self.column_index(field) {
302                None => false,
303                Some(idx) => {
304                    let current_row_value = &row[idx];
305
306                    // Helper closure for comparing current_row_value with another value,
307                    // respecting the specified case for Value::Bytes.
308                    let compare_values = |val_to_compare: &Value| -> bool {
309                        match (case, current_row_value, val_to_compare) {
310                            (
311                                Case::Insensitive,
312                                Value::Bytes(bytes_row),
313                                Value::Bytes(bytes_cmp),
314                            ) => {
315                                // Perform case-insensitive comparison for byte strings.
316                                // If both are valid UTF-8, compare their lowercase versions.
317                                // If both are non-UTF-8 bytes, compare them directly.
318                                // If one is UTF-8 and the other is not, they are considered not equal.
319                                match (
320                                    std::str::from_utf8(bytes_row),
321                                    std::str::from_utf8(bytes_cmp),
322                                ) {
323                                    (Ok(s_row), Ok(s_cmp)) => {
324                                        s_row.to_lowercase() == s_cmp.to_lowercase()
325                                    }
326                                    (Err(_), Err(_)) => bytes_row == bytes_cmp,
327                                    _ => false,
328                                }
329                            }
330                            // For Case::Sensitive, or for Case::Insensitive with non-Bytes types,
331                            // perform a direct equality check.
332                            _ => current_row_value == val_to_compare,
333                        }
334                    };
335
336                    // First, check if the row value matches the condition's value.
337                    if compare_values(value) {
338                        true
339                    } else if let Some(wc_val) = wildcard {
340                        // If not, and a wildcard is provided, check if the row value matches the wildcard.
341                        compare_values(wc_val)
342                    } else {
343                        // Otherwise, no match.
344                        false
345                    }
346                }
347            },
348            Condition::BetweenDates { field, from, to } => match self.column_index(field) {
349                None => false,
350                Some(idx) => match row[idx] {
351                    Value::Timestamp(date) => from <= &date && &date <= to,
352                    _ => false,
353                },
354            },
355            Condition::FromDate { field, from } => match self.column_index(field) {
356                None => false,
357                Some(idx) => match row[idx] {
358                    Value::Timestamp(date) => from <= &date,
359                    _ => false,
360                },
361            },
362            Condition::ToDate { field, to } => match self.column_index(field) {
363                None => false,
364                Some(idx) => match row[idx] {
365                    Value::Timestamp(date) => &date <= to,
366                    _ => false,
367                },
368            },
369        })
370    }
371
372    fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> ObjectMap {
373        self.headers
374            .iter()
375            .zip(row)
376            .filter(|(header, _)| {
377                select
378                    .map(|select| select.contains(header))
379                    // If no select is passed, we assume all columns are included
380                    .unwrap_or(true)
381            })
382            .map(|(header, col)| (header.as_str().into(), col.clone()))
383            .collect()
384    }
385
386    /// Order the fields in the index according to the position they are found in the header.
387    fn normalize_index_fields(&self, index: &[&str]) -> Result<Vec<usize>, Error> {
388        // Get the positions of the fields we are indexing
389        let normalized = self
390            .headers
391            .iter()
392            .enumerate()
393            .filter_map(|(idx, col)| {
394                if index.contains(&col.as_ref()) {
395                    Some(idx)
396                } else {
397                    None
398                }
399            })
400            .collect::<Vec<_>>();
401
402        if normalized.len() != index.len() {
403            let fields = index
404                .iter()
405                .filter_map(|col| {
406                    if self.headers.iter().any(|header| header == *col) {
407                        None
408                    } else {
409                        Some(col.to_string())
410                    }
411                })
412                .collect();
413            Err(Error::MissingDatasetFields { fields })
414        } else {
415            Ok(normalized)
416        }
417    }
418
419    /// Creates an index with the given fields.
420    /// Uses seahash to create a hash of the data that is used as the key in a hashmap lookup to
421    /// the index of the row in the data.
422    ///
423    /// Ensure fields that are searched via a comparison are not included in the index!
424    fn index_data(
425        &self,
426        fieldidx: &[usize],
427        case: Case,
428    ) -> Result<HashMap<u64, Vec<usize>, hash_hasher::HashBuildHasher>, Error> {
429        let mut index = HashMap::with_capacity_and_hasher(
430            self.data.len(),
431            hash_hasher::HashBuildHasher::default(),
432        );
433
434        for (idx, row) in self.data.iter().enumerate() {
435            let mut hash = seahash::SeaHasher::default();
436
437            for idx in fieldidx {
438                hash_value(&mut hash, case, &row[*idx])?;
439            }
440
441            let key = hash.finish();
442
443            let entry = index.entry(key).or_insert_with(Vec::new);
444            entry.push(idx);
445        }
446
447        index.shrink_to_fit();
448
449        Ok(index)
450    }
451
452    /// Sequentially searches through the iterator for the given condition.
453    fn sequential<'a, I>(
454        &'a self,
455        data: I,
456        case: Case,
457        condition: &'a [Condition<'a>],
458        select: Option<&'a [String]>,
459        wildcard: Option<&'a Value>,
460    ) -> impl Iterator<Item = ObjectMap> + 'a
461    where
462        I: Iterator<Item = &'a Vec<Value>> + 'a,
463    {
464        data.filter_map(move |row| {
465            if self.row_equals(case, condition, row, wildcard) {
466                Some(self.add_columns(select, row))
467            } else {
468                None
469            }
470        })
471    }
472
473    fn indexed<'a>(
474        &'a self,
475        case: Case,
476        condition: &'a [Condition<'a>],
477        handle: IndexHandle,
478    ) -> Result<Option<&'a Vec<usize>>, Error> {
479        // The index to use has been passed, we can use this to search the data.
480        // We are assuming that the caller has passed an index that represents the fields
481        // being passed in the condition.
482        let mut hash = seahash::SeaHasher::default();
483
484        for header in self.headers.iter() {
485            if let Some(Condition::Equals { value, .. }) = condition.iter().find(
486                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
487            ) {
488                hash_value(&mut hash, case, value)?;
489            }
490        }
491
492        let key = hash.finish();
493
494        let IndexHandle(handle) = handle;
495        Ok(self.indexes[handle].2.get(&key))
496    }
497
498    fn indexed_with_wildcard<'a>(
499        &'a self,
500        case: Case,
501        wildcard: &'a Value,
502        condition: &'a [Condition<'a>],
503        handle: IndexHandle,
504    ) -> Result<Option<&'a Vec<usize>>, Error> {
505        if let Some(result) = self.indexed(case, condition, handle)? {
506            return Ok(Some(result));
507        }
508
509        // If lookup fails and a wildcard is provided, compute hash for the wildcard
510        let mut wildcard_hash = seahash::SeaHasher::default();
511        for header in self.headers.iter() {
512            if condition.iter().any(
513                |condition| matches!(condition, Condition::Equals { field, .. } if field == header),
514            ) {
515                hash_value(&mut wildcard_hash, case, wildcard)?;
516            }
517        }
518
519        let wildcard_key = wildcard_hash.finish();
520        let IndexHandle(handle) = handle;
521        Ok(self.indexes[handle].2.get(&wildcard_key))
522    }
523}
524
525/// Adds the bytes from the given value to the hash.
526/// Each field is terminated by a `0` value to separate the fields
527fn hash_value(hasher: &mut seahash::SeaHasher, case: Case, value: &Value) -> Result<(), Error> {
528    match value {
529        Value::Bytes(bytes) => match case {
530            Case::Sensitive => hasher.write(bytes),
531            Case::Insensitive => hasher.write(
532                std::str::from_utf8(bytes)
533                    .map_err(|source| Error::InvalidUtfInColumn { source })?
534                    .to_lowercase()
535                    .as_bytes(),
536            ),
537        },
538        value => {
539            let bytes: bytes::Bytes = value
540                .encode_as_bytes()
541                .map_err(|details| Error::FailedToEncodeValue { details })?;
542            hasher.write(&bytes);
543        }
544    }
545
546    hasher.write_u8(0);
547
548    Ok(())
549}
550
551/// Returns an error if the iterator doesn't yield exactly one result.
552fn single_or_err<I, T>(mut iter: T) -> Result<I, Error>
553where
554    T: Iterator<Item = I>,
555{
556    let result = iter.next();
557
558    if iter.next().is_some() {
559        // More than one row has been found.
560        Err(Error::MoreThanOneRowFound)
561    } else {
562        result.ok_or(Error::NoRowsFound)
563    }
564}
565
566impl Table for File {
567    fn find_table_row<'a>(
568        &self,
569        case: Case,
570        condition: &'a [Condition<'a>],
571        select: Option<&'a [String]>,
572        wildcard: Option<&Value>,
573        index: Option<IndexHandle>,
574    ) -> Result<ObjectMap, Error> {
575        match index {
576            None => {
577                // No index has been passed so we need to do a Sequential Scan.
578                single_or_err(self.sequential(self.data.iter(), case, condition, select, wildcard))
579            }
580            Some(handle) => {
581                let result = if let Some(wildcard) = wildcard {
582                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
583                } else {
584                    self.indexed(case, condition, handle)?
585                }
586                .ok_or(Error::NoRowsFound)?
587                .iter()
588                .map(|idx| &self.data[*idx]);
589
590                // Perform a sequential scan over the indexed result.
591                single_or_err(self.sequential(result, case, condition, select, wildcard))
592            }
593        }
594    }
595
596    fn find_table_rows<'a>(
597        &self,
598        case: Case,
599        condition: &'a [Condition<'a>],
600        select: Option<&'a [String]>,
601        wildcard: Option<&Value>,
602        index: Option<IndexHandle>,
603    ) -> Result<Vec<ObjectMap>, Error> {
604        match index {
605            None => {
606                // No index has been passed so we need to do a Sequential Scan.
607                Ok(self
608                    .sequential(self.data.iter(), case, condition, select, wildcard)
609                    .collect())
610            }
611            Some(handle) => {
612                // Perform a sequential scan over the indexed result.
613                let indexed_result = if let Some(wildcard) = wildcard {
614                    self.indexed_with_wildcard(case, wildcard, condition, handle)?
615                } else {
616                    self.indexed(case, condition, handle)?
617                };
618
619                Ok(self
620                    .sequential(
621                        indexed_result
622                            .iter()
623                            .flat_map(|results| results.iter().map(|idx| &self.data[*idx])),
624                        case,
625                        condition,
626                        select,
627                        wildcard,
628                    )
629                    .collect())
630            }
631        }
632    }
633
634    fn add_index(&mut self, case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
635        let normalized = self.normalize_index_fields(fields)?;
636        match self
637            .indexes
638            .iter()
639            .position(|index| index.0 == case && index.1 == normalized)
640        {
641            Some(pos) => {
642                // This index already exists
643                Ok(IndexHandle(pos))
644            }
645            None => {
646                let index = self.index_data(&normalized, case)?;
647                self.indexes.push((case, normalized, index));
648                // The returned index handle is the position of the index in our list of indexes.
649                Ok(IndexHandle(self.indexes.len() - 1))
650            }
651        }
652    }
653
654    /// Returns a list of the field names that are in each index
655    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
656        self.indexes
657            .iter()
658            .map(|index| {
659                let (case, fields, _) = index;
660                (
661                    *case,
662                    fields
663                        .iter()
664                        .map(|idx| self.headers[*idx].clone())
665                        .collect::<Vec<_>>(),
666                )
667            })
668            .collect::<Vec<_>>()
669    }
670
671    /// Checks the modified timestamp of the data file to see if data has changed.
672    fn needs_reload(&self) -> bool {
673        matches!(fs::metadata(&self.config.file.path)
674            .and_then(|metadata| metadata.modified()),
675            Ok(modified) if modified > self.last_modified)
676    }
677}
678
679impl std::fmt::Debug for File {
680    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
681        write!(
682            f,
683            "File {} row(s) {} index(es)",
684            self.data.len(),
685            self.indexes.len()
686        )
687    }
688}
689
690#[cfg(test)]
691mod tests {
692    use chrono::{TimeZone, Timelike};
693
694    use super::*;
695
696    #[test]
697    fn parse_file_with_headers() {
698        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
699        let path = dir.path().join("table.csv");
700        fs::write(path.clone(), "foo,bar\na,1\nb,2").expect("Failed to write enrichment table");
701
702        let config = FileConfig {
703            file: FileSettings {
704                path,
705                encoding: Encoding::Csv {
706                    include_headers: true,
707                    delimiter: default_delimiter(),
708                },
709            },
710            schema: HashMap::new(),
711        };
712        let data = config
713            .load_file(Default::default())
714            .expect("Failed to parse csv");
715        assert_eq!(vec!["foo".to_string(), "bar".to_string()], data.headers);
716        assert_eq!(
717            vec![
718                vec![Value::from("a"), Value::from("1")],
719                vec![Value::from("b"), Value::from("2")],
720            ],
721            data.data
722        );
723    }
724
725    #[test]
726    fn parse_file_no_headers() {
727        let dir = tempfile::tempdir().expect("Unable to create tempdir for enrichment table");
728        let path = dir.path().join("table.csv");
729        fs::write(path.clone(), "a,1\nb,2").expect("Failed to write enrichment table");
730
731        let config = FileConfig {
732            file: FileSettings {
733                path,
734                encoding: Encoding::Csv {
735                    include_headers: false,
736                    delimiter: default_delimiter(),
737                },
738            },
739            schema: HashMap::new(),
740        };
741        let data = config
742            .load_file(Default::default())
743            .expect("Failed to parse csv");
744        assert_eq!(vec!["0".to_string(), "1".to_string()], data.headers);
745        assert_eq!(
746            vec![
747                vec![Value::from("a"), Value::from("1")],
748                vec![Value::from("b"), Value::from("2")],
749            ],
750            data.data
751        );
752    }
753
754    #[test]
755    fn parse_column() {
756        let mut schema = HashMap::new();
757        schema.insert("col1".to_string(), " string ".to_string());
758        schema.insert("col2".to_string(), " date ".to_string());
759        schema.insert("col3".to_string(), "date|%m/%d/%Y".to_string());
760        schema.insert("col3-spaces".to_string(), "date | %m %d %Y".to_string());
761        schema.insert("col4".to_string(), "timestamp|%+".to_string());
762        schema.insert("col4-spaces".to_string(), "timestamp | %+".to_string());
763        schema.insert("col5".to_string(), "int".to_string());
764        let config = FileConfig {
765            file: Default::default(),
766            schema,
767        };
768
769        assert_eq!(
770            Ok(Value::from("zork")),
771            config.parse_column(Default::default(), "col1", 1, "zork")
772        );
773
774        assert_eq!(
775            Ok(Value::from(
776                chrono::Utc
777                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
778                    .single()
779                    .expect("invalid timestamp")
780            )),
781            config.parse_column(Default::default(), "col2", 1, "2020-03-05")
782        );
783
784        assert_eq!(
785            Ok(Value::from(
786                chrono::Utc
787                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
788                    .single()
789                    .expect("invalid timestamp")
790            )),
791            config.parse_column(Default::default(), "col3", 1, "03/05/2020")
792        );
793
794        assert_eq!(
795            Ok(Value::from(
796                chrono::Utc
797                    .with_ymd_and_hms(2020, 3, 5, 0, 0, 0)
798                    .single()
799                    .expect("invalid timestamp")
800            )),
801            config.parse_column(Default::default(), "col3-spaces", 1, "03 05 2020")
802        );
803
804        assert_eq!(
805            Ok(Value::from(
806                chrono::Utc
807                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
808                    .single()
809                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
810                    .expect("invalid timestamp")
811            )),
812            config.parse_column(
813                Default::default(),
814                "col4",
815                1,
816                "2001-07-08T00:34:00.026490+09:30"
817            )
818        );
819
820        assert_eq!(
821            Ok(Value::from(
822                chrono::Utc
823                    .with_ymd_and_hms(2001, 7, 7, 15, 4, 0)
824                    .single()
825                    .and_then(|t| t.with_nanosecond(26490 * 1_000))
826                    .expect("invalid timestamp")
827            )),
828            config.parse_column(
829                Default::default(),
830                "col4-spaces",
831                1,
832                "2001-07-08T00:34:00.026490+09:30"
833            )
834        );
835
836        assert_eq!(
837            Ok(Value::from(42)),
838            config.parse_column(Default::default(), "col5", 1, "42")
839        );
840    }
841
842    #[test]
843    fn seahash() {
844        // Ensure we can separate fields to create a distinct hash.
845        let mut one = seahash::SeaHasher::default();
846        one.write(b"norknoog");
847        one.write_u8(0);
848        one.write(b"donk");
849
850        let mut two = seahash::SeaHasher::default();
851        two.write(b"nork");
852        one.write_u8(0);
853        two.write(b"noogdonk");
854
855        assert_ne!(one.finish(), two.finish());
856    }
857
858    #[test]
859    fn finds_row() {
860        let file = File::new(
861            Default::default(),
862            FileData {
863                modified: SystemTime::now(),
864                data: vec![
865                    vec!["zip".into(), "zup".into()],
866                    vec!["zirp".into(), "zurp".into()],
867                ],
868                headers: vec!["field1".to_string(), "field2".to_string()],
869            },
870        );
871
872        let condition = Condition::Equals {
873            field: "field1",
874            value: Value::from("zirp"),
875        };
876
877        assert_eq!(
878            Ok(ObjectMap::from([
879                ("field1".into(), Value::from("zirp")),
880                ("field2".into(), Value::from("zurp")),
881            ])),
882            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
883        );
884    }
885
886    #[test]
887    fn finds_row_with_wildcard() {
888        let file = File::new(
889            Default::default(),
890            FileData {
891                modified: SystemTime::now(),
892                data: vec![
893                    vec!["zip".into(), "zup".into()],
894                    vec!["zirp".into(), "zurp".into()],
895                ],
896                headers: vec!["field1".to_string(), "field2".to_string()],
897            },
898        );
899
900        let wildcard = Value::from("zirp");
901
902        let condition = Condition::Equals {
903            field: "field1",
904            value: Value::from("nonexistent"),
905        };
906
907        assert_eq!(
908            Ok(ObjectMap::from([
909                ("field1".into(), Value::from("zirp")),
910                ("field2".into(), Value::from("zurp")),
911            ])),
912            file.find_table_row(Case::Sensitive, &[condition], None, Some(&wildcard), None)
913        );
914    }
915
916    #[test]
917    fn duplicate_indexes() {
918        let mut file = File::new(
919            Default::default(),
920            FileData {
921                modified: SystemTime::now(),
922                data: Vec::new(),
923                headers: vec![
924                    "field1".to_string(),
925                    "field2".to_string(),
926                    "field3".to_string(),
927                ],
928            },
929        );
930
931        let handle1 = file.add_index(Case::Sensitive, &["field2", "field3"]);
932        let handle2 = file.add_index(Case::Sensitive, &["field3", "field2"]);
933
934        assert_eq!(handle1, handle2);
935        assert_eq!(1, file.indexes.len());
936    }
937
938    #[test]
939    fn errors_on_missing_columns() {
940        let mut file = File::new(
941            Default::default(),
942            FileData {
943                modified: SystemTime::now(),
944                data: Vec::new(),
945                headers: vec![
946                    "field1".to_string(),
947                    "field2".to_string(),
948                    "field3".to_string(),
949                ],
950            },
951        );
952
953        let error = file.add_index(Case::Sensitive, &["apples", "field2", "bananas"]);
954        assert_eq!(
955            Err(Error::MissingDatasetFields {
956                fields: vec!["apples".into(), "bananas".into()],
957            }),
958            error
959        )
960    }
961
962    #[test]
963    fn finds_row_with_index() {
964        let mut file = File::new(
965            Default::default(),
966            FileData {
967                modified: SystemTime::now(),
968                data: vec![
969                    vec!["zip".into(), "zup".into()],
970                    vec!["zirp".into(), "zurp".into()],
971                ],
972                headers: vec!["field1".to_string(), "field2".to_string()],
973            },
974        );
975
976        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
977
978        let condition = Condition::Equals {
979            field: "field1",
980            value: Value::from("zirp"),
981        };
982
983        assert_eq!(
984            Ok(ObjectMap::from([
985                ("field1".into(), Value::from("zirp")),
986                ("field2".into(), Value::from("zurp")),
987            ])),
988            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
989        );
990    }
991
992    #[test]
993    fn finds_row_with_index_case_sensitive_and_wildcard() {
994        let mut file = File::new(
995            Default::default(),
996            FileData {
997                modified: SystemTime::now(),
998                data: vec![
999                    vec!["zip".into(), "zup".into()],
1000                    vec!["zirp".into(), "zurp".into()],
1001                ],
1002                headers: vec!["field1".to_string(), "field2".to_string()],
1003            },
1004        );
1005
1006        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1007        let wildcard = Value::from("zirp");
1008
1009        let condition = Condition::Equals {
1010            field: "field1",
1011            value: Value::from("nonexistent"),
1012        };
1013
1014        assert_eq!(
1015            Ok(ObjectMap::from([
1016                ("field1".into(), Value::from("zirp")),
1017                ("field2".into(), Value::from("zurp")),
1018            ])),
1019            file.find_table_row(
1020                Case::Sensitive,
1021                &[condition],
1022                None,
1023                Some(&wildcard),
1024                Some(handle)
1025            )
1026        );
1027    }
1028
1029    #[test]
1030    fn finds_rows_with_index_case_sensitive() {
1031        let mut file = File::new(
1032            Default::default(),
1033            FileData {
1034                modified: SystemTime::now(),
1035                data: vec![
1036                    vec!["zip".into(), "zup".into()],
1037                    vec!["zirp".into(), "zurp".into()],
1038                    vec!["zip".into(), "zoop".into()],
1039                ],
1040                headers: vec!["field1".to_string(), "field2".to_string()],
1041            },
1042        );
1043
1044        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1045
1046        assert_eq!(
1047            Ok(vec![
1048                ObjectMap::from([
1049                    ("field1".into(), Value::from("zip")),
1050                    ("field2".into(), Value::from("zup")),
1051                ]),
1052                ObjectMap::from([
1053                    ("field1".into(), Value::from("zip")),
1054                    ("field2".into(), Value::from("zoop")),
1055                ]),
1056            ]),
1057            file.find_table_rows(
1058                Case::Sensitive,
1059                &[Condition::Equals {
1060                    field: "field1",
1061                    value: Value::from("zip"),
1062                }],
1063                None,
1064                None,
1065                Some(handle)
1066            )
1067        );
1068
1069        assert_eq!(
1070            Ok(vec![]),
1071            file.find_table_rows(
1072                Case::Sensitive,
1073                &[Condition::Equals {
1074                    field: "field1",
1075                    value: Value::from("ZiP"),
1076                }],
1077                None,
1078                None,
1079                Some(handle)
1080            )
1081        );
1082    }
1083
1084    #[test]
1085    fn selects_columns() {
1086        let mut file = File::new(
1087            Default::default(),
1088            FileData {
1089                modified: SystemTime::now(),
1090                data: vec![
1091                    vec!["zip".into(), "zup".into(), "zoop".into()],
1092                    vec!["zirp".into(), "zurp".into(), "zork".into()],
1093                    vec!["zip".into(), "zoop".into(), "zibble".into()],
1094                ],
1095                headers: vec![
1096                    "field1".to_string(),
1097                    "field2".to_string(),
1098                    "field3".to_string(),
1099                ],
1100            },
1101        );
1102
1103        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1104
1105        let condition = Condition::Equals {
1106            field: "field1",
1107            value: Value::from("zip"),
1108        };
1109
1110        assert_eq!(
1111            Ok(vec![
1112                ObjectMap::from([
1113                    ("field1".into(), Value::from("zip")),
1114                    ("field3".into(), Value::from("zoop")),
1115                ]),
1116                ObjectMap::from([
1117                    ("field1".into(), Value::from("zip")),
1118                    ("field3".into(), Value::from("zibble")),
1119                ]),
1120            ]),
1121            file.find_table_rows(
1122                Case::Sensitive,
1123                &[condition],
1124                Some(&["field1".to_string(), "field3".to_string()]),
1125                None,
1126                Some(handle)
1127            )
1128        );
1129    }
1130
1131    #[test]
1132    fn finds_rows_with_index_case_insensitive() {
1133        let mut file = File::new(
1134            Default::default(),
1135            FileData {
1136                modified: SystemTime::now(),
1137                data: vec![
1138                    vec!["zip".into(), "zup".into()],
1139                    vec!["zirp".into(), "zurp".into()],
1140                    vec!["zip".into(), "zoop".into()],
1141                ],
1142                headers: vec!["field1".to_string(), "field2".to_string()],
1143            },
1144        );
1145
1146        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1147
1148        assert_eq!(
1149            Ok(vec![
1150                ObjectMap::from([
1151                    ("field1".into(), Value::from("zip")),
1152                    ("field2".into(), Value::from("zup")),
1153                ]),
1154                ObjectMap::from([
1155                    ("field1".into(), Value::from("zip")),
1156                    ("field2".into(), Value::from("zoop")),
1157                ]),
1158            ]),
1159            file.find_table_rows(
1160                Case::Insensitive,
1161                &[Condition::Equals {
1162                    field: "field1",
1163                    value: Value::from("zip"),
1164                }],
1165                None,
1166                None,
1167                Some(handle)
1168            )
1169        );
1170
1171        assert_eq!(
1172            Ok(vec![
1173                ObjectMap::from([
1174                    ("field1".into(), Value::from("zip")),
1175                    ("field2".into(), Value::from("zup")),
1176                ]),
1177                ObjectMap::from([
1178                    ("field1".into(), Value::from("zip")),
1179                    ("field2".into(), Value::from("zoop")),
1180                ]),
1181            ]),
1182            file.find_table_rows(
1183                Case::Insensitive,
1184                &[Condition::Equals {
1185                    field: "field1",
1186                    value: Value::from("ZiP"),
1187                }],
1188                None,
1189                None,
1190                Some(handle)
1191            )
1192        );
1193    }
1194
1195    #[test]
1196    fn finds_rows_with_index_case_insensitive_and_wildcard() {
1197        let mut file = File::new(
1198            Default::default(),
1199            FileData {
1200                modified: SystemTime::now(),
1201                data: vec![
1202                    vec!["zip".into(), "zup".into()],
1203                    vec!["zirp".into(), "zurp".into()],
1204                    vec!["zip".into(), "zoop".into()],
1205                ],
1206                headers: vec!["field1".to_string(), "field2".to_string()],
1207            },
1208        );
1209
1210        let handle = file.add_index(Case::Insensitive, &["field1"]).unwrap();
1211
1212        assert_eq!(
1213            Ok(vec![
1214                ObjectMap::from([
1215                    ("field1".into(), Value::from("zip")),
1216                    ("field2".into(), Value::from("zup")),
1217                ]),
1218                ObjectMap::from([
1219                    ("field1".into(), Value::from("zip")),
1220                    ("field2".into(), Value::from("zoop")),
1221                ]),
1222            ]),
1223            file.find_table_rows(
1224                Case::Insensitive,
1225                &[Condition::Equals {
1226                    field: "field1",
1227                    value: Value::from("nonexistent"),
1228                }],
1229                None,
1230                Some(&Value::from("zip")),
1231                Some(handle)
1232            )
1233        );
1234
1235        assert_eq!(
1236            Ok(vec![
1237                ObjectMap::from([
1238                    ("field1".into(), Value::from("zip")),
1239                    ("field2".into(), Value::from("zup")),
1240                ]),
1241                ObjectMap::from([
1242                    ("field1".into(), Value::from("zip")),
1243                    ("field2".into(), Value::from("zoop")),
1244                ]),
1245            ]),
1246            file.find_table_rows(
1247                Case::Insensitive,
1248                &[Condition::Equals {
1249                    field: "field1",
1250                    value: Value::from("ZiP"),
1251                }],
1252                None,
1253                Some(&Value::from("ZiP")),
1254                Some(handle)
1255            )
1256        );
1257    }
1258
1259    #[test]
1260    fn finds_row_between_dates() {
1261        let mut file = File::new(
1262            Default::default(),
1263            FileData {
1264                modified: SystemTime::now(),
1265                data: vec![
1266                    vec![
1267                        "zip".into(),
1268                        Value::Timestamp(
1269                            chrono::Utc
1270                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1271                                .single()
1272                                .expect("invalid timestamp"),
1273                        ),
1274                    ],
1275                    vec![
1276                        "zip".into(),
1277                        Value::Timestamp(
1278                            chrono::Utc
1279                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1280                                .single()
1281                                .expect("invalid timestamp"),
1282                        ),
1283                    ],
1284                ],
1285                headers: vec!["field1".to_string(), "field2".to_string()],
1286            },
1287        );
1288
1289        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1290
1291        let conditions = [
1292            Condition::Equals {
1293                field: "field1",
1294                value: "zip".into(),
1295            },
1296            Condition::BetweenDates {
1297                field: "field2",
1298                from: chrono::Utc
1299                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1300                    .single()
1301                    .expect("invalid timestamp"),
1302                to: chrono::Utc
1303                    .with_ymd_and_hms(2017, 1, 1, 0, 0, 0)
1304                    .single()
1305                    .expect("invalid timestamp"),
1306            },
1307        ];
1308
1309        assert_eq!(
1310            Ok(ObjectMap::from([
1311                ("field1".into(), Value::from("zip")),
1312                (
1313                    "field2".into(),
1314                    Value::Timestamp(
1315                        chrono::Utc
1316                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1317                            .single()
1318                            .expect("invalid timestamp")
1319                    )
1320                )
1321            ])),
1322            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1323        );
1324    }
1325
1326    #[test]
1327    fn finds_row_from_date() {
1328        let mut file = File::new(
1329            Default::default(),
1330            FileData {
1331                modified: SystemTime::now(),
1332                data: vec![
1333                    vec![
1334                        "zip".into(),
1335                        Value::Timestamp(
1336                            chrono::Utc
1337                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1338                                .single()
1339                                .expect("invalid timestamp"),
1340                        ),
1341                    ],
1342                    vec![
1343                        "zip".into(),
1344                        Value::Timestamp(
1345                            chrono::Utc
1346                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1347                                .single()
1348                                .expect("invalid timestamp"),
1349                        ),
1350                    ],
1351                ],
1352                headers: vec!["field1".to_string(), "field2".to_string()],
1353            },
1354        );
1355
1356        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1357
1358        let conditions = [
1359            Condition::Equals {
1360                field: "field1",
1361                value: "zip".into(),
1362            },
1363            Condition::FromDate {
1364                field: "field2",
1365                from: chrono::Utc
1366                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1367                    .single()
1368                    .expect("invalid timestamp"),
1369            },
1370        ];
1371
1372        assert_eq!(
1373            Ok(ObjectMap::from([
1374                ("field1".into(), Value::from("zip")),
1375                (
1376                    "field2".into(),
1377                    Value::Timestamp(
1378                        chrono::Utc
1379                            .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1380                            .single()
1381                            .expect("invalid timestamp")
1382                    )
1383                )
1384            ])),
1385            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1386        );
1387    }
1388
1389    #[test]
1390    fn finds_row_to_date() {
1391        let mut file = File::new(
1392            Default::default(),
1393            FileData {
1394                modified: SystemTime::now(),
1395                data: vec![
1396                    vec![
1397                        "zip".into(),
1398                        Value::Timestamp(
1399                            chrono::Utc
1400                                .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1401                                .single()
1402                                .expect("invalid timestamp"),
1403                        ),
1404                    ],
1405                    vec![
1406                        "zip".into(),
1407                        Value::Timestamp(
1408                            chrono::Utc
1409                                .with_ymd_and_hms(2016, 12, 7, 0, 0, 0)
1410                                .single()
1411                                .expect("invalid timestamp"),
1412                        ),
1413                    ],
1414                ],
1415                headers: vec!["field1".to_string(), "field2".to_string()],
1416            },
1417        );
1418
1419        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1420
1421        let conditions = [
1422            Condition::Equals {
1423                field: "field1",
1424                value: "zip".into(),
1425            },
1426            Condition::ToDate {
1427                field: "field2",
1428                to: chrono::Utc
1429                    .with_ymd_and_hms(2016, 1, 1, 0, 0, 0)
1430                    .single()
1431                    .expect("invalid timestamp"),
1432            },
1433        ];
1434
1435        assert_eq!(
1436            Ok(ObjectMap::from([
1437                ("field1".into(), Value::from("zip")),
1438                (
1439                    "field2".into(),
1440                    Value::Timestamp(
1441                        chrono::Utc
1442                            .with_ymd_and_hms(2015, 12, 7, 0, 0, 0)
1443                            .single()
1444                            .expect("invalid timestamp")
1445                    )
1446                )
1447            ])),
1448            file.find_table_row(Case::Sensitive, &conditions, None, None, Some(handle))
1449        );
1450    }
1451
1452    #[test]
1453    fn doesnt_find_row() {
1454        let file = File::new(
1455            Default::default(),
1456            FileData {
1457                modified: SystemTime::now(),
1458                data: vec![
1459                    vec!["zip".into(), "zup".into()],
1460                    vec!["zirp".into(), "zurp".into()],
1461                ],
1462                headers: vec!["field1".to_string(), "field2".to_string()],
1463            },
1464        );
1465
1466        let condition = Condition::Equals {
1467            field: "field1",
1468            value: Value::from("zorp"),
1469        };
1470
1471        assert_eq!(
1472            Err(Error::NoRowsFound),
1473            file.find_table_row(Case::Sensitive, &[condition], None, None, None)
1474        );
1475    }
1476
1477    #[test]
1478    fn doesnt_find_row_with_index() {
1479        let mut file = File::new(
1480            Default::default(),
1481            FileData {
1482                modified: SystemTime::now(),
1483                data: vec![
1484                    vec!["zip".into(), "zup".into()],
1485                    vec!["zirp".into(), "zurp".into()],
1486                ],
1487                headers: vec!["field1".to_string(), "field2".to_string()],
1488            },
1489        );
1490
1491        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1492
1493        let condition = Condition::Equals {
1494            field: "field1",
1495            value: Value::from("zorp"),
1496        };
1497
1498        assert_eq!(
1499            Err(Error::NoRowsFound),
1500            file.find_table_row(Case::Sensitive, &[condition], None, None, Some(handle))
1501        );
1502    }
1503
1504    #[test]
1505    fn doesnt_find_row_with_index_and_wildcard() {
1506        let mut file = File::new(
1507            Default::default(),
1508            FileData {
1509                modified: SystemTime::now(),
1510                data: vec![
1511                    vec!["zip".into(), "zup".into()],
1512                    vec!["zirp".into(), "zurp".into()],
1513                ],
1514                headers: vec!["field1".to_string(), "field2".to_string()],
1515            },
1516        );
1517
1518        let handle = file.add_index(Case::Sensitive, &["field1"]).unwrap();
1519        let wildcard = Value::from("nonexistent");
1520
1521        let condition = Condition::Equals {
1522            field: "field1",
1523            value: Value::from("zorp"),
1524        };
1525
1526        assert_eq!(
1527            Err(Error::NoRowsFound),
1528            file.find_table_row(
1529                Case::Sensitive,
1530                &[condition],
1531                None,
1532                Some(&wildcard),
1533                Some(handle)
1534            )
1535        );
1536    }
1537}