vector/config/loading/
mod.rs

1mod config_builder;
2mod loader;
3mod secret;
4mod source;
5
6use std::{
7    collections::HashMap,
8    fmt::Debug,
9    fs::{File, ReadDir},
10    path::{Path, PathBuf},
11    sync::Mutex,
12};
13
14pub use config_builder::ConfigBuilderLoader;
15use glob::glob;
16use loader::process::Process;
17pub use loader::*;
18pub use secret::*;
19pub use source::*;
20use vector_lib::configurable::NamedComponent;
21
22use super::{
23    Config, ConfigPath, Format, FormatHint, ProviderConfig, builder::ConfigBuilder, format,
24    validation, vars,
25};
26use crate::signal;
27
28pub static CONFIG_PATHS: Mutex<Vec<ConfigPath>> = Mutex::new(Vec::new());
29
30pub(super) fn read_dir<P: AsRef<Path> + Debug>(path: P) -> Result<ReadDir, Vec<String>> {
31    path.as_ref()
32        .read_dir()
33        .map_err(|err| vec![format!("Could not read config dir: {:?}, {}.", path, err)])
34}
35
36pub(super) fn component_name<P: AsRef<Path> + Debug>(path: P) -> Result<String, Vec<String>> {
37    path.as_ref()
38        .file_stem()
39        .and_then(|name| name.to_str())
40        .map(|name| name.to_string())
41        .ok_or_else(|| vec![format!("Couldn't get component name for file: {:?}", path)])
42}
43
44pub(super) fn open_file<P: AsRef<Path> + Debug>(path: P) -> Option<File> {
45    match File::open(&path) {
46        Ok(f) => Some(f),
47        Err(error) => {
48            if let std::io::ErrorKind::NotFound = error.kind() {
49                error!(
50                    message = "Config file not found in path.",
51                    ?path,
52                    internal_log_rate_limit = false
53                );
54                None
55            } else {
56                error!(message = "Error opening config file.", %error, ?path, internal_log_rate_limit = false);
57                None
58            }
59        }
60    }
61}
62
63/// Merge the paths coming from different cli flags with different formats into
64/// a unified list of paths with formats.
65pub fn merge_path_lists(
66    path_lists: Vec<(&[PathBuf], FormatHint)>,
67) -> impl Iterator<Item = (PathBuf, FormatHint)> + '_ {
68    path_lists
69        .into_iter()
70        .flat_map(|(paths, format)| paths.iter().cloned().map(move |path| (path, format)))
71}
72
73/// Expand a list of paths (potentially containing glob patterns) into real
74/// config paths, replacing it with the default paths when empty.
75pub fn process_paths(config_paths: &[ConfigPath]) -> Option<Vec<ConfigPath>> {
76    let starting_paths = if !config_paths.is_empty() {
77        config_paths.to_owned()
78    } else {
79        default_config_paths()
80    };
81
82    let mut paths = Vec::new();
83
84    for config_path in &starting_paths {
85        let config_pattern: &PathBuf = config_path.into();
86
87        let matches: Vec<PathBuf> = match glob(config_pattern.to_str().expect("No ability to glob"))
88        {
89            Ok(glob_paths) => glob_paths.filter_map(Result::ok).collect(),
90            Err(err) => {
91                error!(message = "Failed to read glob pattern.", path = ?config_pattern, error = ?err);
92                return None;
93            }
94        };
95
96        if matches.is_empty() {
97            error!(message = "Config file not found in path.", path = ?config_pattern, internal_log_rate_limit = false);
98            std::process::exit(exitcode::CONFIG);
99        }
100
101        match config_path {
102            ConfigPath::File(_, format) => {
103                for path in matches {
104                    paths.push(ConfigPath::File(path, *format));
105                }
106            }
107            ConfigPath::Dir(_) => {
108                for path in matches {
109                    paths.push(ConfigPath::Dir(path))
110                }
111            }
112        }
113    }
114
115    paths.sort();
116    paths.dedup();
117    // Ignore poison error and let the current main thread continue running to do the cleanup.
118    drop(
119        CONFIG_PATHS
120            .lock()
121            .map(|mut guard| guard.clone_from(&paths)),
122    );
123
124    Some(paths)
125}
126
127pub fn load_from_paths(
128    config_paths: &[ConfigPath],
129    interpolate_env: bool,
130) -> Result<Config, Vec<String>> {
131    let builder = ConfigBuilderLoader::default()
132        .interpolate_env(interpolate_env)
133        .load_from_paths(config_paths)?;
134    let (config, build_warnings) = builder.build_with_warnings()?;
135
136    for warning in build_warnings {
137        warn!("{}", warning);
138    }
139
140    Ok(config)
141}
142
143/// Loads a configuration from paths. Handle secret replacement and if a provider is present
144/// in the builder, the config is used as bootstrapping for a remote source. Otherwise,
145/// provider instantiation is skipped.
146pub async fn load_from_paths_with_provider_and_secrets(
147    config_paths: &[ConfigPath],
148    signal_handler: &mut signal::SignalHandler,
149    allow_empty: bool,
150    interpolate_env: bool,
151) -> Result<Config, Vec<String>> {
152    let secrets_backends_loader = loader_from_paths(
153        SecretBackendLoader::default().interpolate_env(interpolate_env),
154        config_paths,
155    )?;
156    let secrets = secrets_backends_loader
157        .retrieve_secrets(signal_handler)
158        .await
159        .map_err(|e| vec![e])?;
160
161    let mut builder = ConfigBuilderLoader::default()
162        .interpolate_env(interpolate_env)
163        .allow_empty(allow_empty)
164        .secrets(secrets)
165        .load_from_paths(config_paths)?;
166
167    validation::check_provider(&builder)?;
168    signal_handler.clear();
169
170    // If there's a provider, overwrite the existing config builder with the remote variant.
171    if let Some(mut provider) = builder.provider {
172        builder = provider.build(signal_handler).await?;
173        debug!(message = "Provider configured.", provider = ?provider.get_component_name());
174    }
175
176    finalize_config(builder).await
177}
178
179pub async fn load_from_str_with_secrets(
180    input: &str,
181    format: Format,
182    signal_handler: &mut signal::SignalHandler,
183    allow_empty: bool,
184    interpolate_env: bool,
185) -> Result<Config, Vec<String>> {
186    let secrets_backends_loader = loader_from_input(
187        SecretBackendLoader::default().interpolate_env(interpolate_env),
188        input.as_bytes(),
189        format,
190    )?;
191    let secrets = secrets_backends_loader
192        .retrieve_secrets(signal_handler)
193        .await
194        .map_err(|e| vec![e])?;
195
196    let builder = ConfigBuilderLoader::default()
197        .interpolate_env(interpolate_env)
198        .allow_empty(allow_empty)
199        .secrets(secrets)
200        .load_from_input(input.as_bytes(), format)?;
201    signal_handler.clear();
202
203    finalize_config(builder).await
204}
205
206async fn finalize_config(builder: ConfigBuilder) -> Result<Config, Vec<String>> {
207    let (new_config, build_warnings) = builder.build_with_warnings()?;
208
209    validation::check_buffer_preconditions(&new_config).await?;
210
211    for warning in build_warnings {
212        warn!("{}", warning);
213    }
214
215    Ok(new_config)
216}
217
218pub(super) fn loader_from_input<T, L, R>(
219    mut loader: L,
220    input: R,
221    format: Format,
222) -> Result<T, Vec<String>>
223where
224    T: serde::de::DeserializeOwned,
225    L: Loader<T> + Process,
226    R: std::io::Read,
227{
228    loader.load_from_str(input, format).map(|_| loader.take())
229}
230
231/// Iterators over `ConfigPaths`, and processes a file/dir according to a provided `Loader`.
232pub(super) fn loader_from_paths<T, L>(
233    mut loader: L,
234    config_paths: &[ConfigPath],
235) -> Result<T, Vec<String>>
236where
237    T: serde::de::DeserializeOwned,
238    L: Loader<T> + Process,
239{
240    let mut errors = Vec::new();
241
242    for config_path in config_paths {
243        match config_path {
244            ConfigPath::File(path, format_hint) => {
245                match loader.load_from_file(
246                    path,
247                    format_hint
248                        .or_else(move || Format::from_path(&path).ok())
249                        .unwrap_or_default(),
250                ) {
251                    Ok(()) => {}
252                    Err(errs) => errors.extend(errs),
253                };
254            }
255            ConfigPath::Dir(path) => {
256                match loader.load_from_dir(path) {
257                    Ok(()) => {}
258                    Err(errs) => errors.extend(errs),
259                };
260            }
261        }
262    }
263
264    if errors.is_empty() {
265        Ok(loader.take())
266    } else {
267        Err(errors)
268    }
269}
270
271/// Uses `SourceLoader` to process `ConfigPaths`, deserializing to a toml `SourceMap`.
272pub fn load_source_from_paths(
273    config_paths: &[ConfigPath],
274) -> Result<toml::value::Table, Vec<String>> {
275    loader_from_paths(SourceLoader::new(), config_paths)
276}
277
278pub fn load_from_str(input: &str, format: Format) -> Result<Config, Vec<String>> {
279    let builder = load_from_inputs(std::iter::once((input.as_bytes(), format)))?;
280    let (config, build_warnings) = builder.build_with_warnings()?;
281
282    for warning in build_warnings {
283        warn!("{}", warning);
284    }
285
286    Ok(config)
287}
288
289fn load_from_inputs(
290    inputs: impl IntoIterator<Item = (impl std::io::Read, Format)>,
291) -> Result<ConfigBuilder, Vec<String>> {
292    let mut config = Config::builder();
293    let mut errors = Vec::new();
294
295    for (input, format) in inputs {
296        if let Err(errs) = load(input, format).and_then(|n| config.append(n)) {
297            // TODO: add back paths
298            errors.extend(errs.iter().map(|e| e.to_string()));
299        }
300    }
301
302    if errors.is_empty() {
303        Ok(config)
304    } else {
305        Err(errors)
306    }
307}
308
309pub fn prepare_input<R: std::io::Read>(
310    mut input: R,
311    interpolate_env: bool,
312) -> Result<String, Vec<String>> {
313    let mut source_string = String::new();
314    input
315        .read_to_string(&mut source_string)
316        .map_err(|e| vec![e.to_string()])?;
317
318    if interpolate_env {
319        let mut vars: HashMap<String, String> = std::env::vars_os()
320            .filter_map(|(k, v)| match (k.into_string(), v.into_string()) {
321                (Ok(k), Ok(v)) => Some((k, v)),
322                _ => None,
323            })
324            .collect();
325
326        if !vars.contains_key("HOSTNAME")
327            && let Ok(hostname) = crate::get_hostname()
328        {
329            vars.insert("HOSTNAME".into(), hostname);
330        }
331        vars::interpolate(&source_string, &vars)
332    } else {
333        Ok(source_string)
334    }
335}
336
337pub fn load<R: std::io::Read, T>(input: R, format: Format) -> Result<T, Vec<String>>
338where
339    T: serde::de::DeserializeOwned,
340{
341    // Via configurations that load from raw string, skip interpolation of env
342    let with_vars = prepare_input(input, false)?;
343
344    format::deserialize(&with_vars, format)
345}
346
347#[cfg(not(windows))]
348fn default_path() -> PathBuf {
349    "/etc/vector/vector.yaml".into()
350}
351
352#[cfg(windows)]
353fn default_path() -> PathBuf {
354    let program_files =
355        std::env::var("ProgramFiles").expect("%ProgramFiles% environment variable must be defined");
356    format!("{}\\Vector\\config\\vector.yaml", program_files).into()
357}
358
359fn default_config_paths() -> Vec<ConfigPath> {
360    #[cfg(not(windows))]
361    let default_path = default_path();
362    #[cfg(windows)]
363    let default_path = default_path();
364
365    vec![ConfigPath::File(default_path, Some(Format::Yaml))]
366}