vector/config/loading/
loader.rs

1use std::path::{Path, PathBuf};
2
3use serde_toml_merge::merge_into_table;
4use toml::value::{Table, Value};
5
6use super::{Format, component_name, open_file, read_dir};
7use crate::config::format;
8
9/// Provides a hint to the loading system of the type of components that should be found
10/// when traversing an explicitly named directory.
11#[derive(Debug, Copy, Clone)]
12pub enum ComponentHint {
13    Source,
14    Transform,
15    Sink,
16    Test,
17    EnrichmentTable,
18}
19
20impl ComponentHint {
21    /// Returns the component string field that should host a component -- e.g. sources,
22    /// transforms, etc.
23    const fn as_component_field(&self) -> &str {
24        match self {
25            ComponentHint::Source => "sources",
26            ComponentHint::Transform => "transforms",
27            ComponentHint::Sink => "sinks",
28            ComponentHint::Test => "tests",
29            ComponentHint::EnrichmentTable => "enrichment_tables",
30        }
31    }
32
33    /// Joins a component sub-folder to a provided path, for traversal. Since `Self` is a
34    /// `Copy`, this is more efficient to pass by value than ref.
35    pub fn join_path(self, path: &Path) -> PathBuf {
36        path.join(self.as_component_field())
37    }
38}
39
40// The loader traits are split into two parts -- an internal `process` mod, that contains
41// functionality for processing files/folders, and a `Loader<T>` trait, that provides a public
42// interface getting a `T` from a file/folder. The private mod is available to implementors
43// within the loading mod, but does not form part of the public interface. This is useful
44// because there are numerous internal functions for dealing with (non)recursive loading that
45// rely on `&self` but don't need overriding and would be confusingly named in a public API.
46pub(super) mod process {
47    use std::io::Read;
48
49    use super::*;
50
51    /// This trait contains methods that deserialize files/folders. There are a few methods
52    /// in here with subtly different names that can be hidden from public view, hence why
53    /// this is nested in a private mod.
54    pub trait Process {
55        /// Prepares input for serialization. This can be a useful step to interpolate
56        /// environment variables or perform some other pre-processing on the input.
57        fn prepare<R: Read>(&mut self, input: R) -> Result<String, Vec<String>>;
58
59        /// Calls into the `prepare` method, and deserializes a `Read` to a `T`.
60        fn load<R: std::io::Read, T>(&mut self, input: R, format: Format) -> Result<T, Vec<String>>
61        where
62            T: serde::de::DeserializeOwned,
63        {
64            let value = self.prepare(input)?;
65
66            format::deserialize(&value, format)
67        }
68
69        /// Helper method used by other methods to recursively handle file/dir loading, merging
70        /// values against a provided TOML `Table`.
71        fn load_dir_into(
72            &mut self,
73            path: &Path,
74            result: &mut Table,
75            recurse: bool,
76        ) -> Result<(), Vec<String>> {
77            let mut errors = Vec::new();
78            let readdir = read_dir(path)?;
79
80            let mut files = Vec::new();
81            let mut folders = Vec::new();
82
83            for entry in readdir {
84                match entry {
85                    Ok(item) => {
86                        let entry = item.path();
87                        if entry.is_file() {
88                            files.push(entry);
89                        } else if entry.is_dir() {
90                            // do not load directories when the directory starts with a '.'
91                            if !entry
92                                .file_name()
93                                .and_then(|name| name.to_str())
94                                .map(|name| name.starts_with('.'))
95                                .unwrap_or(false)
96                            {
97                                folders.push(entry);
98                            }
99                        }
100                    }
101                    Err(err) => {
102                        errors.push(format!(
103                            "Could not read entry in config dir: {path:?}, {err}."
104                        ));
105                    }
106                };
107            }
108
109            for entry in files {
110                // If the file doesn't contain a known extension, skip it.
111                let format = match Format::from_path(&entry) {
112                    Ok(format) => format,
113                    _ => continue,
114                };
115
116                let loaded = if recurse {
117                    self.load_file_recursive(&entry, format)
118                } else {
119                    self.load_file(&entry, format)
120                };
121
122                match loaded {
123                    Ok(Some((name, inner))) => {
124                        if let Err(errs) = merge_with_value(result, name, Value::Table(inner)) {
125                            errors.extend(errs);
126                        }
127                    }
128                    Ok(None) => {}
129                    Err(errs) => {
130                        errors.extend(errs);
131                    }
132                }
133            }
134
135            // Only descend into folders if `recurse: true`.
136            if recurse {
137                for entry in folders {
138                    if let Ok(name) = component_name(&entry)
139                        && !result.contains_key(&name)
140                    {
141                        match self.load_dir(&entry, true) {
142                            Ok(table) => {
143                                result.insert(name, Value::Table(table));
144                            }
145                            Err(errs) => {
146                                errors.extend(errs);
147                            }
148                        }
149                    }
150                }
151            }
152
153            if errors.is_empty() {
154                Ok(())
155            } else {
156                Err(errors)
157            }
158        }
159
160        /// Loads and deserializes a file into a TOML `Table`.
161        fn load_file(
162            &mut self,
163            path: &Path,
164            format: Format,
165        ) -> Result<Option<(String, Table)>, Vec<String>> {
166            match (component_name(path), open_file(path)) {
167                (Ok(name), Some(file)) => self.load(file, format).map(|value| Some((name, value))),
168                _ => Ok(None),
169            }
170        }
171
172        /// Loads a file, and if the path provided contains a sub-folder by the same name as the
173        /// component, descend into it recursively, returning a TOML `Table`.
174        fn load_file_recursive(
175            &mut self,
176            path: &Path,
177            format: Format,
178        ) -> Result<Option<(String, Table)>, Vec<String>> {
179            if let Some((name, mut table)) = self.load_file(path, format)? {
180                if let Some(subdir) = path.parent().map(|p| p.join(&name))
181                    && subdir.is_dir()
182                    && subdir.exists()
183                {
184                    self.load_dir_into(&subdir, &mut table, true)?;
185                }
186                Ok(Some((name, table)))
187            } else {
188                Ok(None)
189            }
190        }
191
192        /// Loads a directory (optionally, recursively), returning a TOML `Table`. This will
193        /// create an initial `Table` and pass it into `load_dir_into` for recursion handling.
194        fn load_dir(&mut self, path: &Path, recurse: bool) -> Result<Table, Vec<String>> {
195            let mut result = Table::new();
196            self.load_dir_into(path, &mut result, recurse)?;
197            Ok(result)
198        }
199
200        /// Merge a provided TOML `Table` in an implementation-specific way. Contains an
201        /// optional component hint, which may affect how components are merged. Takes a `&mut self`
202        /// with the intention of merging an inner value that can be `take`n by a `Loader`.
203        fn merge(&mut self, table: Table, hint: Option<ComponentHint>) -> Result<(), Vec<String>>;
204    }
205}
206
207/// `Loader` represents the public part of the loading interface. Includes methods for loading
208/// from a file or folder, and accessing the final deserialized `T` value via the `take` method.
209pub trait Loader<T>: process::Process
210where
211    T: serde::de::DeserializeOwned,
212{
213    /// Consumes Self, and returns the final, deserialized `T`.
214    fn take(self) -> T;
215
216    fn load_from_str<R: std::io::Read>(
217        &mut self,
218        input: R,
219        format: Format,
220    ) -> Result<(), Vec<String>> {
221        if let Some(table) = self.load(input, format)? {
222            self.merge(table, None)?;
223        }
224        Ok(())
225    }
226
227    /// Deserializes a file with the provided format, and makes the result available via `take`.
228    /// Returns a vector of non-fatal warnings on success, or a vector of error strings on failure.
229    fn load_from_file(&mut self, path: &Path, format: Format) -> Result<(), Vec<String>> {
230        if let Some((_, table)) = self.load_file(path, format)? {
231            self.merge(table, None)?;
232            Ok(())
233        } else {
234            Ok(())
235        }
236    }
237
238    /// Deserializes a dir with the provided format, and makes the result available via `take`.
239    /// Returns a vector of non-fatal warnings on success, or a vector of error strings on failure.
240    fn load_from_dir(&mut self, path: &Path) -> Result<(), Vec<String>> {
241        // Iterator containing component-specific sub-folders to attempt traversing into.
242        let hints = [
243            ComponentHint::Source,
244            ComponentHint::Transform,
245            ComponentHint::Sink,
246            ComponentHint::Test,
247            ComponentHint::EnrichmentTable,
248        ];
249        let paths = hints
250            .iter()
251            .copied()
252            .map(|hint| (hint.join_path(path), hint));
253
254        // Get files from the root of the folder. These represent top-level config settings,
255        // and need to merged down first to represent a more 'complete' config.
256        let mut root = Table::new();
257        let table = self.load_dir(path, false)?;
258
259        // Discard the named part of the path, since these don't form any component names.
260        for (_, value) in table {
261            // All files should contain key/value pairs.
262            if let Value::Table(table) = value {
263                merge_into_table(&mut root, table).map_err(|e| vec![e.to_string()])?;
264            }
265        }
266
267        // Merge the 'root' config value first.
268        self.merge(root, None)?;
269
270        // Loop over each component path. If it exists, load files and merge.
271        for (path, hint) in paths {
272            // Sanity check for paths, to ensure we're dealing with a folder. This is necessary
273            // because a sub-folder won't generally exist unless the config is namespaced.
274            if path.exists() && path.is_dir() {
275                // Transforms are treated differently from other component types; they can be
276                // arbitrarily nested.
277                let table = self.load_dir(&path, matches!(hint, ComponentHint::Transform))?;
278
279                self.merge(table, Some(hint))?;
280            }
281        }
282
283        Ok(())
284    }
285}
286
287/// Merge two TOML `Value`s, returning a new `Value`.
288fn merge_values(value: toml::Value, other: toml::Value) -> Result<toml::Value, Vec<String>> {
289    serde_toml_merge::merge(value, other).map_err(|e| vec![e.to_string()])
290}
291
292/// Updates a TOML `Table` with the merged values of a named key. Inserts if it doesn't exist.
293fn merge_with_value(res: &mut Table, name: String, value: toml::Value) -> Result<(), Vec<String>> {
294    if let Some(existing) = res.remove(&name) {
295        res.insert(name, merge_values(existing, value)?);
296    } else {
297        res.insert(name, value);
298    }
299    Ok(())
300}
301
302/// Deserialize a TOML `Table` into a `T`.
303pub(super) fn deserialize_table<T: serde::de::DeserializeOwned>(
304    table: Table,
305) -> Result<T, Vec<String>> {
306    Value::Table(table)
307        .try_into()
308        .map_err(|e| vec![e.to_string()])
309}