1mod config_builder;
2mod loader;
3mod secret;
4mod source;
5
6use std::{
7 collections::HashMap,
8 fmt::Debug,
9 fs::{File, ReadDir},
10 path::{Path, PathBuf},
11 sync::Mutex,
12};
13
14use config_builder::ConfigBuilderLoader;
15use glob::glob;
16use loader::process::Process;
17pub use loader::*;
18pub use secret::*;
19pub use source::*;
20use vector_lib::configurable::NamedComponent;
21
22use super::{
23 Config, ConfigPath, Format, FormatHint, builder::ConfigBuilder, format, validation, vars,
24};
25use crate::{config::ProviderConfig, signal};
26
27pub static CONFIG_PATHS: Mutex<Vec<ConfigPath>> = Mutex::new(Vec::new());
28
29pub(super) fn read_dir<P: AsRef<Path> + Debug>(path: P) -> Result<ReadDir, Vec<String>> {
30 path.as_ref()
31 .read_dir()
32 .map_err(|err| vec![format!("Could not read config dir: {:?}, {}.", path, err)])
33}
34
35pub(super) fn component_name<P: AsRef<Path> + Debug>(path: P) -> Result<String, Vec<String>> {
36 path.as_ref()
37 .file_stem()
38 .and_then(|name| name.to_str())
39 .map(|name| name.to_string())
40 .ok_or_else(|| vec![format!("Couldn't get component name for file: {:?}", path)])
41}
42
43pub(super) fn open_file<P: AsRef<Path> + Debug>(path: P) -> Option<File> {
44 match File::open(&path) {
45 Ok(f) => Some(f),
46 Err(error) => {
47 if let std::io::ErrorKind::NotFound = error.kind() {
48 error!(
49 message = "Config file not found in path.",
50 ?path,
51 internal_log_rate_limit = false
52 );
53 None
54 } else {
55 error!(message = "Error opening config file.", %error, ?path, internal_log_rate_limit = false);
56 None
57 }
58 }
59 }
60}
61
62pub fn merge_path_lists(
65 path_lists: Vec<(&[PathBuf], FormatHint)>,
66) -> impl Iterator<Item = (PathBuf, FormatHint)> + '_ {
67 path_lists
68 .into_iter()
69 .flat_map(|(paths, format)| paths.iter().cloned().map(move |path| (path, format)))
70}
71
72pub fn process_paths(config_paths: &[ConfigPath]) -> Option<Vec<ConfigPath>> {
75 let starting_paths = if !config_paths.is_empty() {
76 config_paths.to_owned()
77 } else {
78 default_config_paths()
79 };
80
81 let mut paths = Vec::new();
82
83 for config_path in &starting_paths {
84 let config_pattern: &PathBuf = config_path.into();
85
86 let matches: Vec<PathBuf> = match glob(config_pattern.to_str().expect("No ability to glob"))
87 {
88 Ok(glob_paths) => glob_paths.filter_map(Result::ok).collect(),
89 Err(err) => {
90 error!(message = "Failed to read glob pattern.", path = ?config_pattern, error = ?err);
91 return None;
92 }
93 };
94
95 if matches.is_empty() {
96 error!(message = "Config file not found in path.", path = ?config_pattern, internal_log_rate_limit = false);
97 std::process::exit(exitcode::CONFIG);
98 }
99
100 match config_path {
101 ConfigPath::File(_, format) => {
102 for path in matches {
103 paths.push(ConfigPath::File(path, *format));
104 }
105 }
106 ConfigPath::Dir(_) => {
107 for path in matches {
108 paths.push(ConfigPath::Dir(path))
109 }
110 }
111 }
112 }
113
114 paths.sort();
115 paths.dedup();
116 drop(
118 CONFIG_PATHS
119 .lock()
120 .map(|mut guard| guard.clone_from(&paths)),
121 );
122
123 Some(paths)
124}
125
126pub fn load_from_paths(config_paths: &[ConfigPath]) -> Result<Config, Vec<String>> {
127 let builder = load_builder_from_paths(config_paths)?;
128 let (config, build_warnings) = builder.build_with_warnings()?;
129
130 for warning in build_warnings {
131 warn!("{}", warning);
132 }
133
134 Ok(config)
135}
136
137pub async fn load_from_paths_with_provider_and_secrets(
141 config_paths: &[ConfigPath],
142 signal_handler: &mut signal::SignalHandler,
143 allow_empty: bool,
144) -> Result<Config, Vec<String>> {
145 let mut secrets_backends_loader = load_secret_backends_from_paths(config_paths)?;
147 let mut builder = if secrets_backends_loader.has_secrets_to_retrieve() {
149 debug!(message = "Secret placeholders found, retrieving secrets from configured backends.");
150 let resolved_secrets = secrets_backends_loader
151 .retrieve(&mut signal_handler.subscribe())
152 .await
153 .map_err(|e| vec![e])?;
154 load_builder_from_paths_with_secrets(config_paths, resolved_secrets)?
155 } else {
156 debug!(message = "No secret placeholder found, skipping secret resolution.");
157 load_builder_from_paths(config_paths)?
158 };
159
160 builder.allow_empty = allow_empty;
161
162 validation::check_provider(&builder)?;
163 signal_handler.clear();
164
165 if let Some(mut provider) = builder.provider {
167 builder = provider.build(signal_handler).await?;
168 debug!(message = "Provider configured.", provider = ?provider.get_component_name());
169 }
170
171 let (new_config, build_warnings) = builder.build_with_warnings()?;
172
173 validation::check_buffer_preconditions(&new_config).await?;
174
175 for warning in build_warnings {
176 warn!("{}", warning);
177 }
178
179 Ok(new_config)
180}
181
182pub async fn load_from_str_with_secrets(
183 input: &str,
184 format: Format,
185 signal_handler: &mut signal::SignalHandler,
186 allow_empty: bool,
187) -> Result<Config, Vec<String>> {
188 let mut secrets_backends_loader = load_secret_backends_from_input(input.as_bytes(), format)?;
190 let mut builder = if secrets_backends_loader.has_secrets_to_retrieve() {
192 debug!(message = "Secret placeholders found, retrieving secrets from configured backends.");
193 let resolved_secrets = secrets_backends_loader
194 .retrieve(&mut signal_handler.subscribe())
195 .await
196 .map_err(|e| vec![e])?;
197 load_builder_from_input_with_secrets(input.as_bytes(), format, resolved_secrets)?
198 } else {
199 debug!(message = "No secret placeholder found, skipping secret resolution.");
200 load_builder_from_input(input.as_bytes(), format)?
201 };
202
203 builder.allow_empty = allow_empty;
204 signal_handler.clear();
205 let (new_config, build_warnings) = builder.build_with_warnings()?;
206
207 validation::check_buffer_preconditions(&new_config).await?;
208
209 for warning in build_warnings {
210 warn!("{}", warning);
211 }
212
213 Ok(new_config)
214}
215
216fn loader_from_input<T, L, R>(mut loader: L, input: R, format: Format) -> Result<T, Vec<String>>
217where
218 T: serde::de::DeserializeOwned,
219 L: Loader<T> + Process,
220 R: std::io::Read,
221{
222 loader.load_from_str(input, format).map(|_| loader.take())
223}
224
225fn loader_from_paths<T, L>(mut loader: L, config_paths: &[ConfigPath]) -> Result<T, Vec<String>>
227where
228 T: serde::de::DeserializeOwned,
229 L: Loader<T> + Process,
230{
231 let mut errors = Vec::new();
232
233 for config_path in config_paths {
234 match config_path {
235 ConfigPath::File(path, format_hint) => {
236 match loader.load_from_file(
237 path,
238 format_hint
239 .or_else(move || Format::from_path(&path).ok())
240 .unwrap_or_default(),
241 ) {
242 Ok(()) => {}
243 Err(errs) => errors.extend(errs),
244 };
245 }
246 ConfigPath::Dir(path) => {
247 match loader.load_from_dir(path) {
248 Ok(()) => {}
249 Err(errs) => errors.extend(errs),
250 };
251 }
252 }
253 }
254
255 if errors.is_empty() {
256 Ok(loader.take())
257 } else {
258 Err(errors)
259 }
260}
261
262pub fn load_builder_from_paths(config_paths: &[ConfigPath]) -> Result<ConfigBuilder, Vec<String>> {
264 loader_from_paths(ConfigBuilderLoader::new(), config_paths)
265}
266
267fn load_builder_from_input<R: std::io::Read>(
268 input: R,
269 format: Format,
270) -> Result<ConfigBuilder, Vec<String>> {
271 loader_from_input(ConfigBuilderLoader::new(), input, format)
272}
273
274pub fn load_builder_from_paths_with_secrets(
276 config_paths: &[ConfigPath],
277 secrets: HashMap<String, String>,
278) -> Result<ConfigBuilder, Vec<String>> {
279 loader_from_paths(ConfigBuilderLoader::with_secrets(secrets), config_paths)
280}
281
282fn load_builder_from_input_with_secrets<R: std::io::Read>(
283 input: R,
284 format: Format,
285 secrets: HashMap<String, String>,
286) -> Result<ConfigBuilder, Vec<String>> {
287 loader_from_input(ConfigBuilderLoader::with_secrets(secrets), input, format)
288}
289
290pub fn load_source_from_paths(
292 config_paths: &[ConfigPath],
293) -> Result<toml::value::Table, Vec<String>> {
294 loader_from_paths(SourceLoader::new(), config_paths)
295}
296
297pub fn load_secret_backends_from_paths(
299 config_paths: &[ConfigPath],
300) -> Result<SecretBackendLoader, Vec<String>> {
301 loader_from_paths(SecretBackendLoader::new(), config_paths)
302}
303
304fn load_secret_backends_from_input<R: std::io::Read>(
305 input: R,
306 format: Format,
307) -> Result<SecretBackendLoader, Vec<String>> {
308 loader_from_input(SecretBackendLoader::new(), input, format)
309}
310
311pub fn load_from_str(input: &str, format: Format) -> Result<Config, Vec<String>> {
312 let builder = load_from_inputs(std::iter::once((input.as_bytes(), format)))?;
313 let (config, build_warnings) = builder.build_with_warnings()?;
314
315 for warning in build_warnings {
316 warn!("{}", warning);
317 }
318
319 Ok(config)
320}
321
322fn load_from_inputs(
323 inputs: impl IntoIterator<Item = (impl std::io::Read, Format)>,
324) -> Result<ConfigBuilder, Vec<String>> {
325 let mut config = Config::builder();
326 let mut errors = Vec::new();
327
328 for (input, format) in inputs {
329 if let Err(errs) = load(input, format).and_then(|n| config.append(n)) {
330 errors.extend(errs.iter().map(|e| e.to_string()));
332 }
333 }
334
335 if errors.is_empty() {
336 Ok(config)
337 } else {
338 Err(errors)
339 }
340}
341
342pub fn prepare_input<R: std::io::Read>(mut input: R) -> Result<String, Vec<String>> {
343 let mut source_string = String::new();
344 input
345 .read_to_string(&mut source_string)
346 .map_err(|e| vec![e.to_string()])?;
347
348 let mut vars: HashMap<String, String> = std::env::vars_os()
349 .filter_map(|(k, v)| match (k.into_string(), v.into_string()) {
350 (Ok(k), Ok(v)) => Some((k, v)),
351 _ => None,
352 })
353 .collect();
354
355 if !vars.contains_key("HOSTNAME")
356 && let Ok(hostname) = crate::get_hostname()
357 {
358 vars.insert("HOSTNAME".into(), hostname);
359 }
360 vars::interpolate(&source_string, &vars)
361}
362
363pub fn load<R: std::io::Read, T>(input: R, format: Format) -> Result<T, Vec<String>>
364where
365 T: serde::de::DeserializeOwned,
366{
367 let with_vars = prepare_input(input)?;
368
369 format::deserialize(&with_vars, format)
370}
371
372#[cfg(not(windows))]
373fn default_path() -> PathBuf {
374 "/etc/vector/vector.yaml".into()
375}
376
377#[cfg(windows)]
378fn default_path() -> PathBuf {
379 let program_files =
380 std::env::var("ProgramFiles").expect("%ProgramFiles% environment variable must be defined");
381 format!("{}\\Vector\\config\\vector.yaml", program_files).into()
382}
383
384fn default_config_paths() -> Vec<ConfigPath> {
385 #[cfg(not(windows))]
386 let default_path = default_path();
387 #[cfg(windows)]
388 let default_path = default_path();
389
390 vec![ConfigPath::File(default_path, Some(Format::Yaml))]
391}
392
393#[cfg(all(
394 test,
395 feature = "sinks-elasticsearch",
396 feature = "transforms-sample",
397 feature = "sources-demo_logs",
398 feature = "sinks-console"
399))]
400mod tests {
401 use std::path::PathBuf;
402
403 use super::load_builder_from_paths;
404 use crate::config::{ComponentKey, ConfigPath};
405
406 #[test]
407 fn load_namespacing_folder() {
408 let path = PathBuf::from(".")
409 .join("tests")
410 .join("namespacing")
411 .join("success");
412 let configs = vec![ConfigPath::Dir(path)];
413 let builder = load_builder_from_paths(&configs).unwrap();
414 assert!(
415 builder
416 .transforms
417 .contains_key(&ComponentKey::from("apache_parser"))
418 );
419 assert!(
420 builder
421 .sources
422 .contains_key(&ComponentKey::from("apache_logs"))
423 );
424 assert!(
425 builder
426 .sinks
427 .contains_key(&ComponentKey::from("es_cluster"))
428 );
429 assert_eq!(builder.tests.len(), 2);
430 }
431
432 #[test]
433 fn load_namespacing_ignore_invalid() {
434 let path = PathBuf::from(".")
435 .join("tests")
436 .join("namespacing")
437 .join("ignore-invalid");
438 let configs = vec![ConfigPath::Dir(path)];
439 load_builder_from_paths(&configs).unwrap();
440 }
441
442 #[test]
443 fn load_directory_ignores_unknown_file_formats() {
444 let path = PathBuf::from(".")
445 .join("tests")
446 .join("config-dir")
447 .join("ignore-unknown");
448 let configs = vec![ConfigPath::Dir(path)];
449 load_builder_from_paths(&configs).unwrap();
450 }
451
452 #[test]
453 fn load_directory_globals() {
454 let path = PathBuf::from(".")
455 .join("tests")
456 .join("config-dir")
457 .join("globals");
458 let configs = vec![ConfigPath::Dir(path)];
459 load_builder_from_paths(&configs).unwrap();
460 }
461
462 #[test]
463 fn load_directory_globals_duplicates() {
464 let path = PathBuf::from(".")
465 .join("tests")
466 .join("config-dir")
467 .join("globals-duplicate");
468 let configs = vec![ConfigPath::Dir(path)];
469 load_builder_from_paths(&configs).unwrap();
470 }
471}