vector/config/loading/loader.rs
1use std::path::{Path, PathBuf};
2
3use serde_toml_merge::merge_into_table;
4use toml::value::{Table, Value};
5
6use super::{Format, component_name, open_file, read_dir};
7use crate::config::format;
8
9/// Provides a hint to the loading system of the type of components that should be found
10/// when traversing an explicitly named directory.
11#[derive(Debug, Copy, Clone)]
12pub enum ComponentHint {
13 Source,
14 Transform,
15 Sink,
16 Test,
17 EnrichmentTable,
18}
19
20impl ComponentHint {
21 /// Returns the component string field that should host a component -- e.g. sources,
22 /// transforms, etc.
23 const fn as_component_field(&self) -> &str {
24 match self {
25 ComponentHint::Source => "sources",
26 ComponentHint::Transform => "transforms",
27 ComponentHint::Sink => "sinks",
28 ComponentHint::Test => "tests",
29 ComponentHint::EnrichmentTable => "enrichment_tables",
30 }
31 }
32
33 /// Joins a component sub-folder to a provided path, for traversal. Since `Self` is a
34 /// `Copy`, this is more efficient to pass by value than ref.
35 pub fn join_path(self, path: &Path) -> PathBuf {
36 path.join(self.as_component_field())
37 }
38}
39
40// The loader traits are split into two parts -- an internal `process` mod, that contains
41// functionality for processing files/folders, and a `Loader<T>` trait, that provides a public
42// interface getting a `T` from a file/folder. The private mod is available to implementors
43// within the loading mod, but does not form part of the public interface. This is useful
44// because there are numerous internal functions for dealing with (non)recursive loading that
45// rely on `&self` but don't need overriding and would be confusingly named in a public API.
46pub(super) mod process {
47 use std::io::Read;
48
49 use super::*;
50
51 /// This trait contains methods that deserialize files/folders. There are a few methods
52 /// in here with subtly different names that can be hidden from public view, hence why
53 /// this is nested in a private mod.
54 pub trait Process {
55 /// Prepares input for serialization. This can be a useful step to interpolate
56 /// environment variables or perform some other pre-processing on the input.
57 fn prepare<R: Read>(&mut self, input: R) -> Result<String, Vec<String>>;
58
59 /// Calls into the `prepare` method, and deserializes a `Read` to a `T`.
60 fn load<R: std::io::Read, T>(&mut self, input: R, format: Format) -> Result<T, Vec<String>>
61 where
62 T: serde::de::DeserializeOwned,
63 {
64 let value = self.prepare(input)?;
65
66 format::deserialize(&value, format)
67 }
68
69 /// Helper method used by other methods to recursively handle file/dir loading, merging
70 /// values against a provided TOML `Table`.
71 fn load_dir_into(
72 &mut self,
73 path: &Path,
74 result: &mut Table,
75 recurse: bool,
76 ) -> Result<(), Vec<String>> {
77 let mut errors = Vec::new();
78 let readdir = read_dir(path)?;
79
80 let mut files = Vec::new();
81 let mut folders = Vec::new();
82
83 for entry in readdir {
84 match entry {
85 Ok(item) => {
86 let entry = item.path();
87 if entry.is_file() {
88 files.push(entry);
89 } else if entry.is_dir() {
90 // do not load directories when the directory starts with a '.'
91 if !entry
92 .file_name()
93 .and_then(|name| name.to_str())
94 .map(|name| name.starts_with('.'))
95 .unwrap_or(false)
96 {
97 folders.push(entry);
98 }
99 }
100 }
101 Err(err) => {
102 errors.push(format!(
103 "Could not read entry in config dir: {path:?}, {err}."
104 ));
105 }
106 };
107 }
108
109 for entry in files {
110 // If the file doesn't contain a known extension, skip it.
111 let format = match Format::from_path(&entry) {
112 Ok(format) => format,
113 _ => continue,
114 };
115
116 let loaded = if recurse {
117 self.load_file_recursive(&entry, format)
118 } else {
119 self.load_file(&entry, format)
120 };
121
122 match loaded {
123 Ok(Some((name, inner))) => {
124 if let Err(errs) = merge_with_value(result, name, Value::Table(inner)) {
125 errors.extend(errs);
126 }
127 }
128 Ok(None) => {}
129 Err(errs) => {
130 errors.extend(errs);
131 }
132 }
133 }
134
135 // Only descend into folders if `recurse: true`.
136 if recurse {
137 for entry in folders {
138 if let Ok(name) = component_name(&entry)
139 && !result.contains_key(&name)
140 {
141 match self.load_dir(&entry, true) {
142 Ok(table) => {
143 result.insert(name, Value::Table(table));
144 }
145 Err(errs) => {
146 errors.extend(errs);
147 }
148 }
149 }
150 }
151 }
152
153 if errors.is_empty() {
154 Ok(())
155 } else {
156 Err(errors)
157 }
158 }
159
160 /// Loads and deserializes a file into a TOML `Table`.
161 fn load_file(
162 &mut self,
163 path: &Path,
164 format: Format,
165 ) -> Result<Option<(String, Table)>, Vec<String>> {
166 match (component_name(path), open_file(path)) {
167 (Ok(name), Some(file)) => self.load(file, format).map(|value| Some((name, value))),
168 _ => Ok(None),
169 }
170 }
171
172 /// Loads a file, and if the path provided contains a sub-folder by the same name as the
173 /// component, descend into it recursively, returning a TOML `Table`.
174 fn load_file_recursive(
175 &mut self,
176 path: &Path,
177 format: Format,
178 ) -> Result<Option<(String, Table)>, Vec<String>> {
179 if let Some((name, mut table)) = self.load_file(path, format)? {
180 if let Some(subdir) = path.parent().map(|p| p.join(&name))
181 && subdir.is_dir()
182 && subdir.exists()
183 {
184 self.load_dir_into(&subdir, &mut table, true)?;
185 }
186 Ok(Some((name, table)))
187 } else {
188 Ok(None)
189 }
190 }
191
192 /// Loads a directory (optionally, recursively), returning a TOML `Table`. This will
193 /// create an initial `Table` and pass it into `load_dir_into` for recursion handling.
194 fn load_dir(&mut self, path: &Path, recurse: bool) -> Result<Table, Vec<String>> {
195 let mut result = Table::new();
196 self.load_dir_into(path, &mut result, recurse)?;
197 Ok(result)
198 }
199
200 /// Merge a provided TOML `Table` in an implementation-specific way. Contains an
201 /// optional component hint, which may affect how components are merged. Takes a `&mut self`
202 /// with the intention of merging an inner value that can be `take`n by a `Loader`.
203 fn merge(&mut self, table: Table, hint: Option<ComponentHint>) -> Result<(), Vec<String>>;
204 }
205}
206
207/// `Loader` represents the public part of the loading interface. Includes methods for loading
208/// from a file or folder, and accessing the final deserialized `T` value via the `take` method.
209pub trait Loader<T>: process::Process
210where
211 T: serde::de::DeserializeOwned,
212{
213 /// Consumes Self, and returns the final, deserialized `T`.
214 fn take(self) -> T;
215
216 fn load_from_str<R: std::io::Read>(
217 &mut self,
218 input: R,
219 format: Format,
220 ) -> Result<(), Vec<String>> {
221 if let Some(table) = self.load(input, format)? {
222 self.merge(table, None)?;
223 }
224 Ok(())
225 }
226
227 /// Deserializes a file with the provided format, and makes the result available via `take`.
228 /// Returns a vector of non-fatal warnings on success, or a vector of error strings on failure.
229 fn load_from_file(&mut self, path: &Path, format: Format) -> Result<(), Vec<String>> {
230 if let Some((_, table)) = self.load_file(path, format)? {
231 self.merge(table, None)?;
232 Ok(())
233 } else {
234 Ok(())
235 }
236 }
237
238 /// Deserializes a dir with the provided format, and makes the result available via `take`.
239 /// Returns a vector of non-fatal warnings on success, or a vector of error strings on failure.
240 fn load_from_dir(&mut self, path: &Path) -> Result<(), Vec<String>> {
241 // Iterator containing component-specific sub-folders to attempt traversing into.
242 let hints = [
243 ComponentHint::Source,
244 ComponentHint::Transform,
245 ComponentHint::Sink,
246 ComponentHint::Test,
247 ComponentHint::EnrichmentTable,
248 ];
249 let paths = hints
250 .iter()
251 .copied()
252 .map(|hint| (hint.join_path(path), hint));
253
254 // Get files from the root of the folder. These represent top-level config settings,
255 // and need to merged down first to represent a more 'complete' config.
256 let mut root = Table::new();
257 let table = self.load_dir(path, false)?;
258
259 // Discard the named part of the path, since these don't form any component names.
260 for (_, value) in table {
261 // All files should contain key/value pairs.
262 if let Value::Table(table) = value {
263 merge_into_table(&mut root, table).map_err(|e| vec![e.to_string()])?;
264 }
265 }
266
267 // Merge the 'root' config value first.
268 self.merge(root, None)?;
269
270 // Loop over each component path. If it exists, load files and merge.
271 for (path, hint) in paths {
272 // Sanity check for paths, to ensure we're dealing with a folder. This is necessary
273 // because a sub-folder won't generally exist unless the config is namespaced.
274 if path.exists() && path.is_dir() {
275 // Transforms are treated differently from other component types; they can be
276 // arbitrarily nested.
277 let table = self.load_dir(&path, matches!(hint, ComponentHint::Transform))?;
278
279 self.merge(table, Some(hint))?;
280 }
281 }
282
283 Ok(())
284 }
285}
286
287/// Merge two TOML `Value`s, returning a new `Value`.
288fn merge_values(value: toml::Value, other: toml::Value) -> Result<toml::Value, Vec<String>> {
289 serde_toml_merge::merge(value, other).map_err(|e| vec![e.to_string()])
290}
291
292/// Updates a TOML `Table` with the merged values of a named key. Inserts if it doesn't exist.
293fn merge_with_value(res: &mut Table, name: String, value: toml::Value) -> Result<(), Vec<String>> {
294 if let Some(existing) = res.remove(&name) {
295 res.insert(name, merge_values(existing, value)?);
296 } else {
297 res.insert(name, value);
298 }
299 Ok(())
300}
301
302/// Deserialize a TOML `Table` into a `T`.
303pub(super) fn deserialize_table<T: serde::de::DeserializeOwned>(
304 table: Table,
305) -> Result<T, Vec<String>> {
306 Value::Table(table)
307 .try_into()
308 .map_err(|e| vec![e.to_string()])
309}