1#![allow(missing_docs)]
2
3use std::{fmt, fs::remove_dir_all, path::PathBuf};
4
5use clap::Parser;
6use colored::*;
7use exitcode::ExitCode;
8
9use crate::{
10 config::{self, Config, ConfigDiff, loading::ConfigBuilderLoader},
11 topology::{
12 self,
13 builder::{TopologyPieces, TopologyPiecesBuilder},
14 },
15};
16
17const TEMPORARY_DIRECTORY: &str = "validate_tmp";
18
19#[derive(Parser, Debug)]
20#[command(rename_all = "kebab-case")]
21pub struct Opts {
22 #[arg(long)]
24 pub no_environment: bool,
25
26 #[arg(long)]
28 pub skip_healthchecks: bool,
29
30 #[arg(short, long)]
33 pub deny_warnings: bool,
34
35 #[arg(
37 id = "config-toml",
38 long,
39 env = "VECTOR_CONFIG_TOML",
40 value_delimiter(',')
41 )]
42 pub paths_toml: Vec<PathBuf>,
43
44 #[arg(
46 id = "config-json",
47 long,
48 env = "VECTOR_CONFIG_JSON",
49 value_delimiter(',')
50 )]
51 pub paths_json: Vec<PathBuf>,
52
53 #[arg(
55 id = "config-yaml",
56 long,
57 env = "VECTOR_CONFIG_YAML",
58 value_delimiter(',')
59 )]
60 pub paths_yaml: Vec<PathBuf>,
61
62 #[arg(env = "VECTOR_CONFIG", value_delimiter(','))]
67 pub paths: Vec<PathBuf>,
68
69 #[arg(
74 id = "config-dir",
75 short = 'C',
76 long,
77 env = "VECTOR_CONFIG_DIR",
78 value_delimiter(',')
79 )]
80 pub config_dirs: Vec<PathBuf>,
81
82 #[arg(
84 long,
85 env = "VECTOR_DISABLE_ENV_VAR_INTERPOLATION",
86 default_value = "false"
87 )]
88 pub disable_env_var_interpolation: bool,
89}
90
91impl Opts {
92 fn paths_with_formats(&self) -> Vec<config::ConfigPath> {
93 config::merge_path_lists(vec![
94 (&self.paths, None),
95 (&self.paths_toml, Some(config::Format::Toml)),
96 (&self.paths_json, Some(config::Format::Json)),
97 (&self.paths_yaml, Some(config::Format::Yaml)),
98 ])
99 .map(|(path, hint)| config::ConfigPath::File(path, hint))
100 .chain(
101 self.config_dirs
102 .iter()
103 .map(|dir| config::ConfigPath::Dir(dir.to_path_buf())),
104 )
105 .collect()
106 }
107}
108
109pub async fn validate(opts: &Opts, color: bool) -> ExitCode {
111 let mut fmt = Formatter::new(color);
112
113 let mut validated = true;
114
115 let mut config = match validate_config(opts, &mut fmt) {
116 Some(config) => config,
117 None => return exitcode::CONFIG,
118 };
119
120 if !opts.no_environment {
121 if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) {
122 validated &= validate_environment(opts, &config, &mut fmt).await;
123 remove_tmp_directory(tmp_directory);
124 } else {
125 validated = false;
126 }
127 }
128
129 if validated {
130 fmt.validated();
131 exitcode::OK
132 } else {
133 exitcode::CONFIG
134 }
135}
136
137pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
138 let paths = opts.paths_with_formats();
140 let paths = if let Some(paths) = config::process_paths(&paths) {
141 paths
142 } else {
143 fmt.error("No config file paths");
144 return None;
145 };
146
147 let paths_list: Vec<_> = paths.iter().map(<&PathBuf>::from).collect();
149
150 let mut report_error = |errors| {
151 fmt.title(format!("Failed to load {:?}", &paths_list));
152 fmt.sub_error(errors);
153 };
154 let builder = ConfigBuilderLoader::default()
155 .interpolate_env(!opts.disable_env_var_interpolation)
156 .load_from_paths(&paths)
157 .map_err(&mut report_error)
158 .ok()?;
159 config::init_log_schema(builder.global.log_schema.clone(), true);
160
161 let (config, warnings) = builder
163 .build_with_warnings()
164 .map_err(&mut report_error)
165 .ok()?;
166
167 if !warnings.is_empty() {
169 if opts.deny_warnings {
170 report_error(warnings);
171 return None;
172 }
173
174 fmt.title(format!("Loaded with warnings {:?}", &paths_list));
175 fmt.sub_warning(warnings);
176 } else {
177 fmt.success(format!("Loaded {:?}", &paths_list));
178 }
179
180 Some(config)
181}
182
183async fn validate_environment(opts: &Opts, config: &Config, fmt: &mut Formatter) -> bool {
184 let diff = ConfigDiff::initial(config);
185
186 let mut pieces = match validate_components(config, &diff, fmt).await {
187 Some(pieces) => pieces,
188 _ => {
189 return false;
190 }
191 };
192 opts.skip_healthchecks || validate_healthchecks(opts, config, &diff, &mut pieces, fmt).await
193}
194
195async fn validate_components(
196 config: &Config,
197 diff: &ConfigDiff,
198 fmt: &mut Formatter,
199) -> Option<TopologyPieces> {
200 match TopologyPiecesBuilder::new(config, diff).build().await {
201 Ok(pieces) => {
202 fmt.success("Component configuration");
203 Some(pieces)
204 }
205 Err(errors) => {
206 fmt.title("Component errors");
207 fmt.sub_error(errors);
208 None
209 }
210 }
211}
212
213async fn validate_healthchecks(
214 opts: &Opts,
215 config: &Config,
216 diff: &ConfigDiff,
217 pieces: &mut TopologyPieces,
218 fmt: &mut Formatter,
219) -> bool {
220 if !config.healthchecks.enabled {
221 fmt.warning("Health checks are disabled");
222 return !opts.deny_warnings;
223 }
224
225 let healthchecks = topology::take_healthchecks(diff, pieces);
226 let mut validated = true;
229 for (id, healthcheck) in healthchecks {
230 let mut failed = |error| {
231 validated = false;
232 fmt.error(error);
233 };
234
235 trace!("Healthcheck for {id} starting.");
236 match tokio::spawn(healthcheck).await {
237 Ok(Ok(_)) => {
238 if config
239 .sink(&id)
240 .expect("Sink not present")
241 .healthcheck()
242 .enabled
243 {
244 fmt.success(format!("Health check \"{id}\""));
245 } else {
246 fmt.warning(format!("Health check disabled for \"{id}\""));
247 validated &= !opts.deny_warnings;
248 }
249 }
250 Ok(Err(e)) => failed(format!("Health check for \"{id}\" failed: {e}")),
251 Err(error) if error.is_cancelled() => {
252 failed(format!("Health check for \"{id}\" was cancelled"))
253 }
254 Err(_) => failed(format!("Health check for \"{id}\" panicked")),
255 }
256 trace!("Healthcheck for {id} done.");
257 }
258
259 validated
260}
261
262fn create_tmp_directory(config: &mut Config, fmt: &mut Formatter) -> Option<PathBuf> {
266 match config
267 .global
268 .resolve_and_make_data_subdir(None, TEMPORARY_DIRECTORY)
269 {
270 Ok(path) => {
271 config.global.data_dir = Some(path.clone());
272 Some(path)
273 }
274 Err(error) => {
275 fmt.error(error.to_string());
276 None
277 }
278 }
279}
280
281fn remove_tmp_directory(path: PathBuf) {
282 if let Err(error) = remove_dir_all(&path) {
283 error!(message = "Failed to remove temporary directory.", path = ?path, %error);
284 }
285}
286
287pub struct Formatter {
288 max_line_width: usize,
290 print_space: bool,
292 color: bool,
293 error_intro: String,
295 warning_intro: String,
296 success_intro: String,
297}
298
299impl Formatter {
300 pub fn new(color: bool) -> Self {
301 Self {
302 max_line_width: 0,
303 print_space: false,
304 error_intro: if color {
305 "x".red().to_string()
306 } else {
307 "x".to_owned()
308 },
309 warning_intro: if color {
310 "~".yellow().to_string()
311 } else {
312 "~".to_owned()
313 },
314 success_intro: if color {
315 "√".green().to_string()
316 } else {
317 "√".to_owned()
318 },
319 color,
320 }
321 }
322
323 fn validated(&self) {
325 #[allow(clippy::print_stdout)]
326 {
327 println!("{:-^width$}", "", width = self.max_line_width);
328 }
329 if self.color {
330 #[allow(clippy::print_stdout)]
335 {
336 println!(
337 "{:>width$}",
338 "Validated".green(),
339 width = self.max_line_width
340 );
341 }
342 } else {
343 #[allow(clippy::print_stdout)]
344 {
345 println!("{:>width$}", "Validated", width = self.max_line_width)
346 }
347 }
348 }
349
350 fn success(&mut self, msg: impl AsRef<str>) {
352 self.print(format!("{} {}\n", self.success_intro, msg.as_ref()))
353 }
354
355 fn warning(&mut self, warning: impl AsRef<str>) {
357 self.print(format!("{} {}\n", self.warning_intro, warning.as_ref()))
358 }
359
360 fn error(&mut self, error: impl AsRef<str>) {
362 self.print(format!("{} {}\n", self.error_intro, error.as_ref()))
363 }
364
365 fn title(&mut self, title: impl AsRef<str>) {
367 self.space();
368 self.print(format!(
369 "{}\n{:-<width$}\n",
370 title.as_ref(),
371 "",
372 width = title.as_ref().len()
373 ))
374 }
375
376 fn sub_warning<I: IntoIterator>(&mut self, warnings: I)
378 where
379 I::Item: fmt::Display,
380 {
381 self.sub(self.warning_intro.clone(), warnings)
382 }
383
384 fn sub_error<I: IntoIterator>(&mut self, errors: I)
386 where
387 I::Item: fmt::Display,
388 {
389 self.sub(self.error_intro.clone(), errors)
390 }
391
392 fn sub<I: IntoIterator>(&mut self, intro: impl AsRef<str>, msgs: I)
393 where
394 I::Item: fmt::Display,
395 {
396 for msg in msgs {
397 self.print(format!("{} {}\n", intro.as_ref(), msg));
398 }
399 self.space();
400 }
401
402 fn space(&mut self) {
404 if self.print_space {
405 self.print_space = false;
406 #[allow(clippy::print_stdout)]
407 {
408 println!();
409 }
410 }
411 }
412
413 fn print(&mut self, print: impl AsRef<str>) {
414 let width = print
415 .as_ref()
416 .lines()
417 .map(|line| {
418 String::from_utf8_lossy(&strip_ansi_escapes::strip(line))
419 .chars()
420 .count()
421 })
422 .max()
423 .unwrap_or(0);
424 self.max_line_width = width.max(self.max_line_width);
425 self.print_space = true;
426 #[allow(clippy::print_stdout)]
427 {
428 print!("{}", print.as_ref())
429 }
430 }
431}