1mod config_builder;
2mod loader;
3mod secret;
4mod source;
5
6use std::{
7 collections::HashMap,
8 fmt::Debug,
9 fs::{File, ReadDir},
10 path::{Path, PathBuf},
11 sync::Mutex,
12};
13
14use config_builder::ConfigBuilderLoader;
15use glob::glob;
16use loader::process::Process;
17pub use loader::*;
18pub use secret::*;
19pub use source::*;
20use vector_lib::configurable::NamedComponent;
21
22use super::{
23 builder::ConfigBuilder, format, validation, vars, Config, ConfigPath, Format, FormatHint,
24};
25use crate::{config::ProviderConfig, signal};
26
27pub static CONFIG_PATHS: Mutex<Vec<ConfigPath>> = Mutex::new(Vec::new());
28
29pub(super) fn read_dir<P: AsRef<Path> + Debug>(path: P) -> Result<ReadDir, Vec<String>> {
30 path.as_ref()
31 .read_dir()
32 .map_err(|err| vec![format!("Could not read config dir: {:?}, {}.", path, err)])
33}
34
35pub(super) fn component_name<P: AsRef<Path> + Debug>(path: P) -> Result<String, Vec<String>> {
36 path.as_ref()
37 .file_stem()
38 .and_then(|name| name.to_str())
39 .map(|name| name.to_string())
40 .ok_or_else(|| vec![format!("Couldn't get component name for file: {:?}", path)])
41}
42
43pub(super) fn open_file<P: AsRef<Path> + Debug>(path: P) -> Option<File> {
44 match File::open(&path) {
45 Ok(f) => Some(f),
46 Err(error) => {
47 if let std::io::ErrorKind::NotFound = error.kind() {
48 error!(message = "Config file not found in path.", ?path);
49 None
50 } else {
51 error!(message = "Error opening config file.", %error, ?path);
52 None
53 }
54 }
55 }
56}
57
58pub fn merge_path_lists(
61 path_lists: Vec<(&[PathBuf], FormatHint)>,
62) -> impl Iterator<Item = (PathBuf, FormatHint)> + '_ {
63 path_lists
64 .into_iter()
65 .flat_map(|(paths, format)| paths.iter().cloned().map(move |path| (path, format)))
66}
67
68pub fn process_paths(config_paths: &[ConfigPath]) -> Option<Vec<ConfigPath>> {
71 let starting_paths = if !config_paths.is_empty() {
72 config_paths.to_owned()
73 } else {
74 default_config_paths()
75 };
76
77 let mut paths = Vec::new();
78
79 for config_path in &starting_paths {
80 let config_pattern: &PathBuf = config_path.into();
81
82 let matches: Vec<PathBuf> = match glob(config_pattern.to_str().expect("No ability to glob"))
83 {
84 Ok(glob_paths) => glob_paths.filter_map(Result::ok).collect(),
85 Err(err) => {
86 error!(message = "Failed to read glob pattern.", path = ?config_pattern, error = ?err);
87 return None;
88 }
89 };
90
91 if matches.is_empty() {
92 error!(message = "Config file not found in path.", path = ?config_pattern);
93 std::process::exit(exitcode::CONFIG);
94 }
95
96 match config_path {
97 ConfigPath::File(_, format) => {
98 for path in matches {
99 paths.push(ConfigPath::File(path, *format));
100 }
101 }
102 ConfigPath::Dir(_) => {
103 for path in matches {
104 paths.push(ConfigPath::Dir(path))
105 }
106 }
107 }
108 }
109
110 paths.sort();
111 paths.dedup();
112 drop(
114 CONFIG_PATHS
115 .lock()
116 .map(|mut guard| guard.clone_from(&paths)),
117 );
118
119 Some(paths)
120}
121
122pub fn load_from_paths(config_paths: &[ConfigPath]) -> Result<Config, Vec<String>> {
123 let builder = load_builder_from_paths(config_paths)?;
124 let (config, build_warnings) = builder.build_with_warnings()?;
125
126 for warning in build_warnings {
127 warn!("{}", warning);
128 }
129
130 Ok(config)
131}
132
133pub async fn load_from_paths_with_provider_and_secrets(
137 config_paths: &[ConfigPath],
138 signal_handler: &mut signal::SignalHandler,
139 allow_empty: bool,
140) -> Result<Config, Vec<String>> {
141 let mut secrets_backends_loader = load_secret_backends_from_paths(config_paths)?;
143 let mut builder = if secrets_backends_loader.has_secrets_to_retrieve() {
145 debug!(message = "Secret placeholders found, retrieving secrets from configured backends.");
146 let resolved_secrets = secrets_backends_loader
147 .retrieve(&mut signal_handler.subscribe())
148 .await
149 .map_err(|e| vec![e])?;
150 load_builder_from_paths_with_secrets(config_paths, resolved_secrets)?
151 } else {
152 debug!(message = "No secret placeholder found, skipping secret resolution.");
153 load_builder_from_paths(config_paths)?
154 };
155
156 builder.allow_empty = allow_empty;
157
158 validation::check_provider(&builder)?;
159 signal_handler.clear();
160
161 if let Some(mut provider) = builder.provider {
163 builder = provider.build(signal_handler).await?;
164 debug!(message = "Provider configured.", provider = ?provider.get_component_name());
165 }
166
167 let (new_config, build_warnings) = builder.build_with_warnings()?;
168
169 validation::check_buffer_preconditions(&new_config).await?;
170
171 for warning in build_warnings {
172 warn!("{}", warning);
173 }
174
175 Ok(new_config)
176}
177
178fn loader_from_paths<T, L>(mut loader: L, config_paths: &[ConfigPath]) -> Result<T, Vec<String>>
180where
181 T: serde::de::DeserializeOwned,
182 L: Loader<T> + Process,
183{
184 let mut errors = Vec::new();
185
186 for config_path in config_paths {
187 match config_path {
188 ConfigPath::File(path, format_hint) => {
189 match loader.load_from_file(
190 path,
191 format_hint
192 .or_else(move || Format::from_path(&path).ok())
193 .unwrap_or_default(),
194 ) {
195 Ok(()) => {}
196 Err(errs) => errors.extend(errs),
197 };
198 }
199 ConfigPath::Dir(path) => {
200 match loader.load_from_dir(path) {
201 Ok(()) => {}
202 Err(errs) => errors.extend(errs),
203 };
204 }
205 }
206 }
207
208 if errors.is_empty() {
209 Ok(loader.take())
210 } else {
211 Err(errors)
212 }
213}
214
215pub fn load_builder_from_paths(config_paths: &[ConfigPath]) -> Result<ConfigBuilder, Vec<String>> {
217 loader_from_paths(ConfigBuilderLoader::new(), config_paths)
218}
219
220pub fn load_builder_from_paths_with_secrets(
222 config_paths: &[ConfigPath],
223 secrets: HashMap<String, String>,
224) -> Result<ConfigBuilder, Vec<String>> {
225 loader_from_paths(ConfigBuilderLoader::with_secrets(secrets), config_paths)
226}
227
228pub fn load_source_from_paths(
230 config_paths: &[ConfigPath],
231) -> Result<toml::value::Table, Vec<String>> {
232 loader_from_paths(SourceLoader::new(), config_paths)
233}
234
235pub fn load_secret_backends_from_paths(
237 config_paths: &[ConfigPath],
238) -> Result<SecretBackendLoader, Vec<String>> {
239 loader_from_paths(SecretBackendLoader::new(), config_paths)
240}
241
242pub fn load_from_str(input: &str, format: Format) -> Result<Config, Vec<String>> {
243 let builder = load_from_inputs(std::iter::once((input.as_bytes(), format)))?;
244 let (config, build_warnings) = builder.build_with_warnings()?;
245
246 for warning in build_warnings {
247 warn!("{}", warning);
248 }
249
250 Ok(config)
251}
252
253fn load_from_inputs(
254 inputs: impl IntoIterator<Item = (impl std::io::Read, Format)>,
255) -> Result<ConfigBuilder, Vec<String>> {
256 let mut config = Config::builder();
257 let mut errors = Vec::new();
258
259 for (input, format) in inputs {
260 if let Err(errs) = load(input, format).and_then(|n| config.append(n)) {
261 errors.extend(errs.iter().map(|e| e.to_string()));
263 }
264 }
265
266 if errors.is_empty() {
267 Ok(config)
268 } else {
269 Err(errors)
270 }
271}
272
273pub fn prepare_input<R: std::io::Read>(mut input: R) -> Result<String, Vec<String>> {
274 let mut source_string = String::new();
275 input
276 .read_to_string(&mut source_string)
277 .map_err(|e| vec![e.to_string()])?;
278
279 let mut vars: HashMap<String, String> = std::env::vars_os()
280 .filter_map(|(k, v)| match (k.into_string(), v.into_string()) {
281 (Ok(k), Ok(v)) => Some((k, v)),
282 _ => None,
283 })
284 .collect();
285
286 if !vars.contains_key("HOSTNAME") {
287 if let Ok(hostname) = crate::get_hostname() {
288 vars.insert("HOSTNAME".into(), hostname);
289 }
290 }
291 vars::interpolate(&source_string, &vars)
292}
293
294pub fn load<R: std::io::Read, T>(input: R, format: Format) -> Result<T, Vec<String>>
295where
296 T: serde::de::DeserializeOwned,
297{
298 let with_vars = prepare_input(input)?;
299
300 format::deserialize(&with_vars, format)
301}
302
303#[cfg(not(windows))]
304fn default_path() -> PathBuf {
305 "/etc/vector/vector.yaml".into()
306}
307
308#[cfg(windows)]
309fn default_path() -> PathBuf {
310 let program_files =
311 std::env::var("ProgramFiles").expect("%ProgramFiles% environment variable must be defined");
312 format!("{}\\Vector\\config\\vector.yaml", program_files).into()
313}
314
315fn default_config_paths() -> Vec<ConfigPath> {
316 #[cfg(not(windows))]
317 let default_path = default_path();
318 #[cfg(windows)]
319 let default_path = default_path();
320
321 vec![ConfigPath::File(default_path, Some(Format::Yaml))]
322}
323
324#[cfg(all(
325 test,
326 feature = "sinks-elasticsearch",
327 feature = "transforms-sample",
328 feature = "sources-demo_logs",
329 feature = "sinks-console"
330))]
331mod tests {
332 use std::path::PathBuf;
333
334 use super::load_builder_from_paths;
335 use crate::config::{ComponentKey, ConfigPath};
336
337 #[test]
338 fn load_namespacing_folder() {
339 let path = PathBuf::from(".")
340 .join("tests")
341 .join("namespacing")
342 .join("success");
343 let configs = vec![ConfigPath::Dir(path)];
344 let builder = load_builder_from_paths(&configs).unwrap();
345 assert!(builder
346 .transforms
347 .contains_key(&ComponentKey::from("apache_parser")));
348 assert!(builder
349 .sources
350 .contains_key(&ComponentKey::from("apache_logs")));
351 assert!(builder
352 .sinks
353 .contains_key(&ComponentKey::from("es_cluster")));
354 assert_eq!(builder.tests.len(), 2);
355 }
356
357 #[test]
358 fn load_namespacing_ignore_invalid() {
359 let path = PathBuf::from(".")
360 .join("tests")
361 .join("namespacing")
362 .join("ignore-invalid");
363 let configs = vec![ConfigPath::Dir(path)];
364 load_builder_from_paths(&configs).unwrap();
365 }
366
367 #[test]
368 fn load_directory_ignores_unknown_file_formats() {
369 let path = PathBuf::from(".")
370 .join("tests")
371 .join("config-dir")
372 .join("ignore-unknown");
373 let configs = vec![ConfigPath::Dir(path)];
374 load_builder_from_paths(&configs).unwrap();
375 }
376
377 #[test]
378 fn load_directory_globals() {
379 let path = PathBuf::from(".")
380 .join("tests")
381 .join("config-dir")
382 .join("globals");
383 let configs = vec![ConfigPath::Dir(path)];
384 load_builder_from_paths(&configs).unwrap();
385 }
386
387 #[test]
388 fn load_directory_globals_duplicates() {
389 let path = PathBuf::from(".")
390 .join("tests")
391 .join("config-dir")
392 .join("globals-duplicate");
393 let configs = vec![ConfigPath::Dir(path)];
394 load_builder_from_paths(&configs).unwrap();
395 }
396}