vector/config/loading/
mod.rs

1mod config_builder;
2mod loader;
3mod secret;
4mod source;
5
6use std::{
7    collections::HashMap,
8    fmt::Debug,
9    fs::{File, ReadDir},
10    path::{Path, PathBuf},
11    sync::Mutex,
12};
13
14use config_builder::ConfigBuilderLoader;
15use glob::glob;
16use loader::process::Process;
17pub use loader::*;
18pub use secret::*;
19pub use source::*;
20use vector_lib::configurable::NamedComponent;
21
22use super::{
23    builder::ConfigBuilder, format, validation, vars, Config, ConfigPath, Format, FormatHint,
24};
25use crate::{config::ProviderConfig, signal};
26
27pub static CONFIG_PATHS: Mutex<Vec<ConfigPath>> = Mutex::new(Vec::new());
28
29pub(super) fn read_dir<P: AsRef<Path> + Debug>(path: P) -> Result<ReadDir, Vec<String>> {
30    path.as_ref()
31        .read_dir()
32        .map_err(|err| vec![format!("Could not read config dir: {:?}, {}.", path, err)])
33}
34
35pub(super) fn component_name<P: AsRef<Path> + Debug>(path: P) -> Result<String, Vec<String>> {
36    path.as_ref()
37        .file_stem()
38        .and_then(|name| name.to_str())
39        .map(|name| name.to_string())
40        .ok_or_else(|| vec![format!("Couldn't get component name for file: {:?}", path)])
41}
42
43pub(super) fn open_file<P: AsRef<Path> + Debug>(path: P) -> Option<File> {
44    match File::open(&path) {
45        Ok(f) => Some(f),
46        Err(error) => {
47            if let std::io::ErrorKind::NotFound = error.kind() {
48                error!(message = "Config file not found in path.", ?path);
49                None
50            } else {
51                error!(message = "Error opening config file.", %error, ?path);
52                None
53            }
54        }
55    }
56}
57
58/// Merge the paths coming from different cli flags with different formats into
59/// a unified list of paths with formats.
60pub fn merge_path_lists(
61    path_lists: Vec<(&[PathBuf], FormatHint)>,
62) -> impl Iterator<Item = (PathBuf, FormatHint)> + '_ {
63    path_lists
64        .into_iter()
65        .flat_map(|(paths, format)| paths.iter().cloned().map(move |path| (path, format)))
66}
67
68/// Expand a list of paths (potentially containing glob patterns) into real
69/// config paths, replacing it with the default paths when empty.
70pub fn process_paths(config_paths: &[ConfigPath]) -> Option<Vec<ConfigPath>> {
71    let starting_paths = if !config_paths.is_empty() {
72        config_paths.to_owned()
73    } else {
74        default_config_paths()
75    };
76
77    let mut paths = Vec::new();
78
79    for config_path in &starting_paths {
80        let config_pattern: &PathBuf = config_path.into();
81
82        let matches: Vec<PathBuf> = match glob(config_pattern.to_str().expect("No ability to glob"))
83        {
84            Ok(glob_paths) => glob_paths.filter_map(Result::ok).collect(),
85            Err(err) => {
86                error!(message = "Failed to read glob pattern.", path = ?config_pattern, error = ?err);
87                return None;
88            }
89        };
90
91        if matches.is_empty() {
92            error!(message = "Config file not found in path.", path = ?config_pattern);
93            std::process::exit(exitcode::CONFIG);
94        }
95
96        match config_path {
97            ConfigPath::File(_, format) => {
98                for path in matches {
99                    paths.push(ConfigPath::File(path, *format));
100                }
101            }
102            ConfigPath::Dir(_) => {
103                for path in matches {
104                    paths.push(ConfigPath::Dir(path))
105                }
106            }
107        }
108    }
109
110    paths.sort();
111    paths.dedup();
112    // Ignore poison error and let the current main thread continue running to do the cleanup.
113    drop(
114        CONFIG_PATHS
115            .lock()
116            .map(|mut guard| guard.clone_from(&paths)),
117    );
118
119    Some(paths)
120}
121
122pub fn load_from_paths(config_paths: &[ConfigPath]) -> Result<Config, Vec<String>> {
123    let builder = load_builder_from_paths(config_paths)?;
124    let (config, build_warnings) = builder.build_with_warnings()?;
125
126    for warning in build_warnings {
127        warn!("{}", warning);
128    }
129
130    Ok(config)
131}
132
133/// Loads a configuration from paths. Handle secret replacement and if a provider is present
134/// in the builder, the config is used as bootstrapping for a remote source. Otherwise,
135/// provider instantiation is skipped.
136pub async fn load_from_paths_with_provider_and_secrets(
137    config_paths: &[ConfigPath],
138    signal_handler: &mut signal::SignalHandler,
139    allow_empty: bool,
140) -> Result<Config, Vec<String>> {
141    // Load secret backends first
142    let mut secrets_backends_loader = load_secret_backends_from_paths(config_paths)?;
143    // And then, if needed, retrieve secrets from configured backends
144    let mut builder = if secrets_backends_loader.has_secrets_to_retrieve() {
145        debug!(message = "Secret placeholders found, retrieving secrets from configured backends.");
146        let resolved_secrets = secrets_backends_loader
147            .retrieve(&mut signal_handler.subscribe())
148            .await
149            .map_err(|e| vec![e])?;
150        load_builder_from_paths_with_secrets(config_paths, resolved_secrets)?
151    } else {
152        debug!(message = "No secret placeholder found, skipping secret resolution.");
153        load_builder_from_paths(config_paths)?
154    };
155
156    builder.allow_empty = allow_empty;
157
158    validation::check_provider(&builder)?;
159    signal_handler.clear();
160
161    // If there's a provider, overwrite the existing config builder with the remote variant.
162    if let Some(mut provider) = builder.provider {
163        builder = provider.build(signal_handler).await?;
164        debug!(message = "Provider configured.", provider = ?provider.get_component_name());
165    }
166
167    let (new_config, build_warnings) = builder.build_with_warnings()?;
168
169    validation::check_buffer_preconditions(&new_config).await?;
170
171    for warning in build_warnings {
172        warn!("{}", warning);
173    }
174
175    Ok(new_config)
176}
177
178/// Iterators over `ConfigPaths`, and processes a file/dir according to a provided `Loader`.
179fn loader_from_paths<T, L>(mut loader: L, config_paths: &[ConfigPath]) -> Result<T, Vec<String>>
180where
181    T: serde::de::DeserializeOwned,
182    L: Loader<T> + Process,
183{
184    let mut errors = Vec::new();
185
186    for config_path in config_paths {
187        match config_path {
188            ConfigPath::File(path, format_hint) => {
189                match loader.load_from_file(
190                    path,
191                    format_hint
192                        .or_else(move || Format::from_path(&path).ok())
193                        .unwrap_or_default(),
194                ) {
195                    Ok(()) => {}
196                    Err(errs) => errors.extend(errs),
197                };
198            }
199            ConfigPath::Dir(path) => {
200                match loader.load_from_dir(path) {
201                    Ok(()) => {}
202                    Err(errs) => errors.extend(errs),
203                };
204            }
205        }
206    }
207
208    if errors.is_empty() {
209        Ok(loader.take())
210    } else {
211        Err(errors)
212    }
213}
214
215/// Uses `ConfigBuilderLoader` to process `ConfigPaths`, deserializing to a `ConfigBuilder`.
216pub fn load_builder_from_paths(config_paths: &[ConfigPath]) -> Result<ConfigBuilder, Vec<String>> {
217    loader_from_paths(ConfigBuilderLoader::new(), config_paths)
218}
219
220/// Uses `ConfigBuilderLoader` to process `ConfigPaths`, performing secret replacement and deserializing to a `ConfigBuilder`
221pub fn load_builder_from_paths_with_secrets(
222    config_paths: &[ConfigPath],
223    secrets: HashMap<String, String>,
224) -> Result<ConfigBuilder, Vec<String>> {
225    loader_from_paths(ConfigBuilderLoader::with_secrets(secrets), config_paths)
226}
227
228/// Uses `SourceLoader` to process `ConfigPaths`, deserializing to a toml `SourceMap`.
229pub fn load_source_from_paths(
230    config_paths: &[ConfigPath],
231) -> Result<toml::value::Table, Vec<String>> {
232    loader_from_paths(SourceLoader::new(), config_paths)
233}
234
235/// Uses `SecretBackendLoader` to process `ConfigPaths`, deserializing to a `SecretBackends`.
236pub fn load_secret_backends_from_paths(
237    config_paths: &[ConfigPath],
238) -> Result<SecretBackendLoader, Vec<String>> {
239    loader_from_paths(SecretBackendLoader::new(), config_paths)
240}
241
242pub fn load_from_str(input: &str, format: Format) -> Result<Config, Vec<String>> {
243    let builder = load_from_inputs(std::iter::once((input.as_bytes(), format)))?;
244    let (config, build_warnings) = builder.build_with_warnings()?;
245
246    for warning in build_warnings {
247        warn!("{}", warning);
248    }
249
250    Ok(config)
251}
252
253fn load_from_inputs(
254    inputs: impl IntoIterator<Item = (impl std::io::Read, Format)>,
255) -> Result<ConfigBuilder, Vec<String>> {
256    let mut config = Config::builder();
257    let mut errors = Vec::new();
258
259    for (input, format) in inputs {
260        if let Err(errs) = load(input, format).and_then(|n| config.append(n)) {
261            // TODO: add back paths
262            errors.extend(errs.iter().map(|e| e.to_string()));
263        }
264    }
265
266    if errors.is_empty() {
267        Ok(config)
268    } else {
269        Err(errors)
270    }
271}
272
273pub fn prepare_input<R: std::io::Read>(mut input: R) -> Result<String, Vec<String>> {
274    let mut source_string = String::new();
275    input
276        .read_to_string(&mut source_string)
277        .map_err(|e| vec![e.to_string()])?;
278
279    let mut vars: HashMap<String, String> = std::env::vars_os()
280        .filter_map(|(k, v)| match (k.into_string(), v.into_string()) {
281            (Ok(k), Ok(v)) => Some((k, v)),
282            _ => None,
283        })
284        .collect();
285
286    if !vars.contains_key("HOSTNAME") {
287        if let Ok(hostname) = crate::get_hostname() {
288            vars.insert("HOSTNAME".into(), hostname);
289        }
290    }
291    vars::interpolate(&source_string, &vars)
292}
293
294pub fn load<R: std::io::Read, T>(input: R, format: Format) -> Result<T, Vec<String>>
295where
296    T: serde::de::DeserializeOwned,
297{
298    let with_vars = prepare_input(input)?;
299
300    format::deserialize(&with_vars, format)
301}
302
303#[cfg(not(windows))]
304fn default_path() -> PathBuf {
305    "/etc/vector/vector.yaml".into()
306}
307
308#[cfg(windows)]
309fn default_path() -> PathBuf {
310    let program_files =
311        std::env::var("ProgramFiles").expect("%ProgramFiles% environment variable must be defined");
312    format!("{}\\Vector\\config\\vector.yaml", program_files).into()
313}
314
315fn default_config_paths() -> Vec<ConfigPath> {
316    #[cfg(not(windows))]
317    let default_path = default_path();
318    #[cfg(windows)]
319    let default_path = default_path();
320
321    vec![ConfigPath::File(default_path, Some(Format::Yaml))]
322}
323
324#[cfg(all(
325    test,
326    feature = "sinks-elasticsearch",
327    feature = "transforms-sample",
328    feature = "sources-demo_logs",
329    feature = "sinks-console"
330))]
331mod tests {
332    use std::path::PathBuf;
333
334    use super::load_builder_from_paths;
335    use crate::config::{ComponentKey, ConfigPath};
336
337    #[test]
338    fn load_namespacing_folder() {
339        let path = PathBuf::from(".")
340            .join("tests")
341            .join("namespacing")
342            .join("success");
343        let configs = vec![ConfigPath::Dir(path)];
344        let builder = load_builder_from_paths(&configs).unwrap();
345        assert!(builder
346            .transforms
347            .contains_key(&ComponentKey::from("apache_parser")));
348        assert!(builder
349            .sources
350            .contains_key(&ComponentKey::from("apache_logs")));
351        assert!(builder
352            .sinks
353            .contains_key(&ComponentKey::from("es_cluster")));
354        assert_eq!(builder.tests.len(), 2);
355    }
356
357    #[test]
358    fn load_namespacing_ignore_invalid() {
359        let path = PathBuf::from(".")
360            .join("tests")
361            .join("namespacing")
362            .join("ignore-invalid");
363        let configs = vec![ConfigPath::Dir(path)];
364        load_builder_from_paths(&configs).unwrap();
365    }
366
367    #[test]
368    fn load_directory_ignores_unknown_file_formats() {
369        let path = PathBuf::from(".")
370            .join("tests")
371            .join("config-dir")
372            .join("ignore-unknown");
373        let configs = vec![ConfigPath::Dir(path)];
374        load_builder_from_paths(&configs).unwrap();
375    }
376
377    #[test]
378    fn load_directory_globals() {
379        let path = PathBuf::from(".")
380            .join("tests")
381            .join("config-dir")
382            .join("globals");
383        let configs = vec![ConfigPath::Dir(path)];
384        load_builder_from_paths(&configs).unwrap();
385    }
386
387    #[test]
388    fn load_directory_globals_duplicates() {
389        let path = PathBuf::from(".")
390            .join("tests")
391            .join("config-dir")
392            .join("globals-duplicate");
393        let configs = vec![ConfigPath::Dir(path)];
394        load_builder_from_paths(&configs).unwrap();
395    }
396}