vector/config/loading/
mod.rs

1mod config_builder;
2mod loader;
3mod secret;
4mod source;
5
6use std::{
7    collections::HashMap,
8    fmt::Debug,
9    fs::{File, ReadDir},
10    path::{Path, PathBuf},
11    sync::Mutex,
12};
13
14use config_builder::ConfigBuilderLoader;
15use glob::glob;
16use loader::process::Process;
17pub use loader::*;
18pub use secret::*;
19pub use source::*;
20use vector_lib::configurable::NamedComponent;
21
22use super::{
23    Config, ConfigPath, Format, FormatHint, builder::ConfigBuilder, format, validation, vars,
24};
25use crate::{config::ProviderConfig, signal};
26
27pub static CONFIG_PATHS: Mutex<Vec<ConfigPath>> = Mutex::new(Vec::new());
28
29pub(super) fn read_dir<P: AsRef<Path> + Debug>(path: P) -> Result<ReadDir, Vec<String>> {
30    path.as_ref()
31        .read_dir()
32        .map_err(|err| vec![format!("Could not read config dir: {:?}, {}.", path, err)])
33}
34
35pub(super) fn component_name<P: AsRef<Path> + Debug>(path: P) -> Result<String, Vec<String>> {
36    path.as_ref()
37        .file_stem()
38        .and_then(|name| name.to_str())
39        .map(|name| name.to_string())
40        .ok_or_else(|| vec![format!("Couldn't get component name for file: {:?}", path)])
41}
42
43pub(super) fn open_file<P: AsRef<Path> + Debug>(path: P) -> Option<File> {
44    match File::open(&path) {
45        Ok(f) => Some(f),
46        Err(error) => {
47            if let std::io::ErrorKind::NotFound = error.kind() {
48                error!(
49                    message = "Config file not found in path.",
50                    ?path,
51                    internal_log_rate_limit = false
52                );
53                None
54            } else {
55                error!(message = "Error opening config file.", %error, ?path, internal_log_rate_limit = false);
56                None
57            }
58        }
59    }
60}
61
62/// Merge the paths coming from different cli flags with different formats into
63/// a unified list of paths with formats.
64pub fn merge_path_lists(
65    path_lists: Vec<(&[PathBuf], FormatHint)>,
66) -> impl Iterator<Item = (PathBuf, FormatHint)> + '_ {
67    path_lists
68        .into_iter()
69        .flat_map(|(paths, format)| paths.iter().cloned().map(move |path| (path, format)))
70}
71
72/// Expand a list of paths (potentially containing glob patterns) into real
73/// config paths, replacing it with the default paths when empty.
74pub fn process_paths(config_paths: &[ConfigPath]) -> Option<Vec<ConfigPath>> {
75    let starting_paths = if !config_paths.is_empty() {
76        config_paths.to_owned()
77    } else {
78        default_config_paths()
79    };
80
81    let mut paths = Vec::new();
82
83    for config_path in &starting_paths {
84        let config_pattern: &PathBuf = config_path.into();
85
86        let matches: Vec<PathBuf> = match glob(config_pattern.to_str().expect("No ability to glob"))
87        {
88            Ok(glob_paths) => glob_paths.filter_map(Result::ok).collect(),
89            Err(err) => {
90                error!(message = "Failed to read glob pattern.", path = ?config_pattern, error = ?err);
91                return None;
92            }
93        };
94
95        if matches.is_empty() {
96            error!(message = "Config file not found in path.", path = ?config_pattern, internal_log_rate_limit = false);
97            std::process::exit(exitcode::CONFIG);
98        }
99
100        match config_path {
101            ConfigPath::File(_, format) => {
102                for path in matches {
103                    paths.push(ConfigPath::File(path, *format));
104                }
105            }
106            ConfigPath::Dir(_) => {
107                for path in matches {
108                    paths.push(ConfigPath::Dir(path))
109                }
110            }
111        }
112    }
113
114    paths.sort();
115    paths.dedup();
116    // Ignore poison error and let the current main thread continue running to do the cleanup.
117    drop(
118        CONFIG_PATHS
119            .lock()
120            .map(|mut guard| guard.clone_from(&paths)),
121    );
122
123    Some(paths)
124}
125
126pub fn load_from_paths(config_paths: &[ConfigPath]) -> Result<Config, Vec<String>> {
127    let builder = load_builder_from_paths(config_paths)?;
128    let (config, build_warnings) = builder.build_with_warnings()?;
129
130    for warning in build_warnings {
131        warn!("{}", warning);
132    }
133
134    Ok(config)
135}
136
137/// Loads a configuration from paths. Handle secret replacement and if a provider is present
138/// in the builder, the config is used as bootstrapping for a remote source. Otherwise,
139/// provider instantiation is skipped.
140pub async fn load_from_paths_with_provider_and_secrets(
141    config_paths: &[ConfigPath],
142    signal_handler: &mut signal::SignalHandler,
143    allow_empty: bool,
144) -> Result<Config, Vec<String>> {
145    // Load secret backends first
146    let mut secrets_backends_loader = load_secret_backends_from_paths(config_paths)?;
147    // And then, if needed, retrieve secrets from configured backends
148    let mut builder = if secrets_backends_loader.has_secrets_to_retrieve() {
149        debug!(message = "Secret placeholders found, retrieving secrets from configured backends.");
150        let resolved_secrets = secrets_backends_loader
151            .retrieve(&mut signal_handler.subscribe())
152            .await
153            .map_err(|e| vec![e])?;
154        load_builder_from_paths_with_secrets(config_paths, resolved_secrets)?
155    } else {
156        debug!(message = "No secret placeholder found, skipping secret resolution.");
157        load_builder_from_paths(config_paths)?
158    };
159
160    builder.allow_empty = allow_empty;
161
162    validation::check_provider(&builder)?;
163    signal_handler.clear();
164
165    // If there's a provider, overwrite the existing config builder with the remote variant.
166    if let Some(mut provider) = builder.provider {
167        builder = provider.build(signal_handler).await?;
168        debug!(message = "Provider configured.", provider = ?provider.get_component_name());
169    }
170
171    let (new_config, build_warnings) = builder.build_with_warnings()?;
172
173    validation::check_buffer_preconditions(&new_config).await?;
174
175    for warning in build_warnings {
176        warn!("{}", warning);
177    }
178
179    Ok(new_config)
180}
181
182pub async fn load_from_str_with_secrets(
183    input: &str,
184    format: Format,
185    signal_handler: &mut signal::SignalHandler,
186    allow_empty: bool,
187) -> Result<Config, Vec<String>> {
188    // Load secret backends first
189    let mut secrets_backends_loader = load_secret_backends_from_input(input.as_bytes(), format)?;
190    // And then, if needed, retrieve secrets from configured backends
191    let mut builder = if secrets_backends_loader.has_secrets_to_retrieve() {
192        debug!(message = "Secret placeholders found, retrieving secrets from configured backends.");
193        let resolved_secrets = secrets_backends_loader
194            .retrieve(&mut signal_handler.subscribe())
195            .await
196            .map_err(|e| vec![e])?;
197        load_builder_from_input_with_secrets(input.as_bytes(), format, resolved_secrets)?
198    } else {
199        debug!(message = "No secret placeholder found, skipping secret resolution.");
200        load_builder_from_input(input.as_bytes(), format)?
201    };
202
203    builder.allow_empty = allow_empty;
204    signal_handler.clear();
205    let (new_config, build_warnings) = builder.build_with_warnings()?;
206
207    validation::check_buffer_preconditions(&new_config).await?;
208
209    for warning in build_warnings {
210        warn!("{}", warning);
211    }
212
213    Ok(new_config)
214}
215
216fn loader_from_input<T, L, R>(mut loader: L, input: R, format: Format) -> Result<T, Vec<String>>
217where
218    T: serde::de::DeserializeOwned,
219    L: Loader<T> + Process,
220    R: std::io::Read,
221{
222    loader.load_from_str(input, format).map(|_| loader.take())
223}
224
225/// Iterators over `ConfigPaths`, and processes a file/dir according to a provided `Loader`.
226fn loader_from_paths<T, L>(mut loader: L, config_paths: &[ConfigPath]) -> Result<T, Vec<String>>
227where
228    T: serde::de::DeserializeOwned,
229    L: Loader<T> + Process,
230{
231    let mut errors = Vec::new();
232
233    for config_path in config_paths {
234        match config_path {
235            ConfigPath::File(path, format_hint) => {
236                match loader.load_from_file(
237                    path,
238                    format_hint
239                        .or_else(move || Format::from_path(&path).ok())
240                        .unwrap_or_default(),
241                ) {
242                    Ok(()) => {}
243                    Err(errs) => errors.extend(errs),
244                };
245            }
246            ConfigPath::Dir(path) => {
247                match loader.load_from_dir(path) {
248                    Ok(()) => {}
249                    Err(errs) => errors.extend(errs),
250                };
251            }
252        }
253    }
254
255    if errors.is_empty() {
256        Ok(loader.take())
257    } else {
258        Err(errors)
259    }
260}
261
262/// Uses `ConfigBuilderLoader` to process `ConfigPaths`, deserializing to a `ConfigBuilder`.
263pub fn load_builder_from_paths(config_paths: &[ConfigPath]) -> Result<ConfigBuilder, Vec<String>> {
264    loader_from_paths(ConfigBuilderLoader::new(), config_paths)
265}
266
267fn load_builder_from_input<R: std::io::Read>(
268    input: R,
269    format: Format,
270) -> Result<ConfigBuilder, Vec<String>> {
271    loader_from_input(ConfigBuilderLoader::new(), input, format)
272}
273
274/// Uses `ConfigBuilderLoader` to process `ConfigPaths`, performing secret replacement and deserializing to a `ConfigBuilder`
275pub fn load_builder_from_paths_with_secrets(
276    config_paths: &[ConfigPath],
277    secrets: HashMap<String, String>,
278) -> Result<ConfigBuilder, Vec<String>> {
279    loader_from_paths(ConfigBuilderLoader::with_secrets(secrets), config_paths)
280}
281
282fn load_builder_from_input_with_secrets<R: std::io::Read>(
283    input: R,
284    format: Format,
285    secrets: HashMap<String, String>,
286) -> Result<ConfigBuilder, Vec<String>> {
287    loader_from_input(ConfigBuilderLoader::with_secrets(secrets), input, format)
288}
289
290/// Uses `SourceLoader` to process `ConfigPaths`, deserializing to a toml `SourceMap`.
291pub fn load_source_from_paths(
292    config_paths: &[ConfigPath],
293) -> Result<toml::value::Table, Vec<String>> {
294    loader_from_paths(SourceLoader::new(), config_paths)
295}
296
297/// Uses `SecretBackendLoader` to process `ConfigPaths`, deserializing to a `SecretBackends`.
298pub fn load_secret_backends_from_paths(
299    config_paths: &[ConfigPath],
300) -> Result<SecretBackendLoader, Vec<String>> {
301    loader_from_paths(SecretBackendLoader::new(), config_paths)
302}
303
304fn load_secret_backends_from_input<R: std::io::Read>(
305    input: R,
306    format: Format,
307) -> Result<SecretBackendLoader, Vec<String>> {
308    loader_from_input(SecretBackendLoader::new(), input, format)
309}
310
311pub fn load_from_str(input: &str, format: Format) -> Result<Config, Vec<String>> {
312    let builder = load_from_inputs(std::iter::once((input.as_bytes(), format)))?;
313    let (config, build_warnings) = builder.build_with_warnings()?;
314
315    for warning in build_warnings {
316        warn!("{}", warning);
317    }
318
319    Ok(config)
320}
321
322fn load_from_inputs(
323    inputs: impl IntoIterator<Item = (impl std::io::Read, Format)>,
324) -> Result<ConfigBuilder, Vec<String>> {
325    let mut config = Config::builder();
326    let mut errors = Vec::new();
327
328    for (input, format) in inputs {
329        if let Err(errs) = load(input, format).and_then(|n| config.append(n)) {
330            // TODO: add back paths
331            errors.extend(errs.iter().map(|e| e.to_string()));
332        }
333    }
334
335    if errors.is_empty() {
336        Ok(config)
337    } else {
338        Err(errors)
339    }
340}
341
342pub fn prepare_input<R: std::io::Read>(mut input: R) -> Result<String, Vec<String>> {
343    let mut source_string = String::new();
344    input
345        .read_to_string(&mut source_string)
346        .map_err(|e| vec![e.to_string()])?;
347
348    let mut vars: HashMap<String, String> = std::env::vars_os()
349        .filter_map(|(k, v)| match (k.into_string(), v.into_string()) {
350            (Ok(k), Ok(v)) => Some((k, v)),
351            _ => None,
352        })
353        .collect();
354
355    if !vars.contains_key("HOSTNAME")
356        && let Ok(hostname) = crate::get_hostname()
357    {
358        vars.insert("HOSTNAME".into(), hostname);
359    }
360    vars::interpolate(&source_string, &vars)
361}
362
363pub fn load<R: std::io::Read, T>(input: R, format: Format) -> Result<T, Vec<String>>
364where
365    T: serde::de::DeserializeOwned,
366{
367    let with_vars = prepare_input(input)?;
368
369    format::deserialize(&with_vars, format)
370}
371
372#[cfg(not(windows))]
373fn default_path() -> PathBuf {
374    "/etc/vector/vector.yaml".into()
375}
376
377#[cfg(windows)]
378fn default_path() -> PathBuf {
379    let program_files =
380        std::env::var("ProgramFiles").expect("%ProgramFiles% environment variable must be defined");
381    format!("{}\\Vector\\config\\vector.yaml", program_files).into()
382}
383
384fn default_config_paths() -> Vec<ConfigPath> {
385    #[cfg(not(windows))]
386    let default_path = default_path();
387    #[cfg(windows)]
388    let default_path = default_path();
389
390    vec![ConfigPath::File(default_path, Some(Format::Yaml))]
391}
392
393#[cfg(all(
394    test,
395    feature = "sinks-elasticsearch",
396    feature = "transforms-sample",
397    feature = "sources-demo_logs",
398    feature = "sinks-console"
399))]
400mod tests {
401    use std::path::PathBuf;
402
403    use super::load_builder_from_paths;
404    use crate::config::{ComponentKey, ConfigPath};
405
406    #[test]
407    fn load_namespacing_folder() {
408        let path = PathBuf::from(".")
409            .join("tests")
410            .join("namespacing")
411            .join("success");
412        let configs = vec![ConfigPath::Dir(path)];
413        let builder = load_builder_from_paths(&configs).unwrap();
414        assert!(
415            builder
416                .transforms
417                .contains_key(&ComponentKey::from("apache_parser"))
418        );
419        assert!(
420            builder
421                .sources
422                .contains_key(&ComponentKey::from("apache_logs"))
423        );
424        assert!(
425            builder
426                .sinks
427                .contains_key(&ComponentKey::from("es_cluster"))
428        );
429        assert_eq!(builder.tests.len(), 2);
430    }
431
432    #[test]
433    fn load_namespacing_ignore_invalid() {
434        let path = PathBuf::from(".")
435            .join("tests")
436            .join("namespacing")
437            .join("ignore-invalid");
438        let configs = vec![ConfigPath::Dir(path)];
439        load_builder_from_paths(&configs).unwrap();
440    }
441
442    #[test]
443    fn load_directory_ignores_unknown_file_formats() {
444        let path = PathBuf::from(".")
445            .join("tests")
446            .join("config-dir")
447            .join("ignore-unknown");
448        let configs = vec![ConfigPath::Dir(path)];
449        load_builder_from_paths(&configs).unwrap();
450    }
451
452    #[test]
453    fn load_directory_globals() {
454        let path = PathBuf::from(".")
455            .join("tests")
456            .join("config-dir")
457            .join("globals");
458        let configs = vec![ConfigPath::Dir(path)];
459        load_builder_from_paths(&configs).unwrap();
460    }
461
462    #[test]
463    fn load_directory_globals_duplicates() {
464        let path = PathBuf::from(".")
465            .join("tests")
466            .join("config-dir")
467            .join("globals-duplicate");
468        let configs = vec![ConfigPath::Dir(path)];
469        load_builder_from_paths(&configs).unwrap();
470    }
471}