1#![allow(missing_docs)]
2
3use std::{collections::HashMap, fmt, fs::remove_dir_all, path::PathBuf};
4
5use clap::Parser;
6use colored::*;
7use exitcode::ExitCode;
8
9use crate::{
10 config::{self, Config, ConfigDiff},
11 extra_context::ExtraContext,
12 topology::{self, builder::TopologyPieces},
13};
14
15const TEMPORARY_DIRECTORY: &str = "validate_tmp";
16
17#[derive(Parser, Debug)]
18#[command(rename_all = "kebab-case")]
19pub struct Opts {
20 #[arg(long)]
22 pub no_environment: bool,
23
24 #[arg(long)]
26 pub skip_healthchecks: bool,
27
28 #[arg(short, long)]
31 pub deny_warnings: bool,
32
33 #[arg(
35 id = "config-toml",
36 long,
37 env = "VECTOR_CONFIG_TOML",
38 value_delimiter(',')
39 )]
40 pub paths_toml: Vec<PathBuf>,
41
42 #[arg(
44 id = "config-json",
45 long,
46 env = "VECTOR_CONFIG_JSON",
47 value_delimiter(',')
48 )]
49 pub paths_json: Vec<PathBuf>,
50
51 #[arg(
53 id = "config-yaml",
54 long,
55 env = "VECTOR_CONFIG_YAML",
56 value_delimiter(',')
57 )]
58 pub paths_yaml: Vec<PathBuf>,
59
60 #[arg(env = "VECTOR_CONFIG", value_delimiter(','))]
65 pub paths: Vec<PathBuf>,
66
67 #[arg(
72 id = "config-dir",
73 short = 'C',
74 long,
75 env = "VECTOR_CONFIG_DIR",
76 value_delimiter(',')
77 )]
78 pub config_dirs: Vec<PathBuf>,
79}
80
81impl Opts {
82 fn paths_with_formats(&self) -> Vec<config::ConfigPath> {
83 config::merge_path_lists(vec![
84 (&self.paths, None),
85 (&self.paths_toml, Some(config::Format::Toml)),
86 (&self.paths_json, Some(config::Format::Json)),
87 (&self.paths_yaml, Some(config::Format::Yaml)),
88 ])
89 .map(|(path, hint)| config::ConfigPath::File(path, hint))
90 .chain(
91 self.config_dirs
92 .iter()
93 .map(|dir| config::ConfigPath::Dir(dir.to_path_buf())),
94 )
95 .collect()
96 }
97}
98
99pub async fn validate(opts: &Opts, color: bool) -> ExitCode {
101 let mut fmt = Formatter::new(color);
102
103 let mut validated = true;
104
105 let mut config = match validate_config(opts, &mut fmt) {
106 Some(config) => config,
107 None => return exitcode::CONFIG,
108 };
109
110 if !opts.no_environment {
111 if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) {
112 validated &= validate_environment(opts, &config, &mut fmt).await;
113 remove_tmp_directory(tmp_directory);
114 } else {
115 validated = false;
116 }
117 }
118
119 if validated {
120 fmt.validated();
121 exitcode::OK
122 } else {
123 exitcode::CONFIG
124 }
125}
126
127pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
128 let paths = opts.paths_with_formats();
130 let paths = if let Some(paths) = config::process_paths(&paths) {
131 paths
132 } else {
133 fmt.error("No config file paths");
134 return None;
135 };
136
137 let paths_list: Vec<_> = paths.iter().map(<&PathBuf>::from).collect();
139
140 let mut report_error = |errors| {
141 fmt.title(format!("Failed to load {:?}", &paths_list));
142 fmt.sub_error(errors);
143 };
144 let builder = config::load_builder_from_paths(&paths)
145 .map_err(&mut report_error)
146 .ok()?;
147 config::init_log_schema(builder.global.log_schema.clone(), true);
148
149 let (config, warnings) = builder
151 .build_with_warnings()
152 .map_err(&mut report_error)
153 .ok()?;
154
155 if !warnings.is_empty() {
157 if opts.deny_warnings {
158 report_error(warnings);
159 return None;
160 }
161
162 fmt.title(format!("Loaded with warnings {:?}", &paths_list));
163 fmt.sub_warning(warnings);
164 } else {
165 fmt.success(format!("Loaded {:?}", &paths_list));
166 }
167
168 Some(config)
169}
170
171async fn validate_environment(opts: &Opts, config: &Config, fmt: &mut Formatter) -> bool {
172 let diff = ConfigDiff::initial(config);
173
174 let mut pieces = match validate_components(config, &diff, fmt).await {
175 Some(pieces) => pieces,
176 _ => {
177 return false;
178 }
179 };
180 opts.skip_healthchecks || validate_healthchecks(opts, config, &diff, &mut pieces, fmt).await
181}
182
183async fn validate_components(
184 config: &Config,
185 diff: &ConfigDiff,
186 fmt: &mut Formatter,
187) -> Option<TopologyPieces> {
188 match topology::TopologyPieces::build(config, diff, HashMap::new(), ExtraContext::default())
189 .await
190 {
191 Ok(pieces) => {
192 fmt.success("Component configuration");
193 Some(pieces)
194 }
195 Err(errors) => {
196 fmt.title("Component errors");
197 fmt.sub_error(errors);
198 None
199 }
200 }
201}
202
203async fn validate_healthchecks(
204 opts: &Opts,
205 config: &Config,
206 diff: &ConfigDiff,
207 pieces: &mut TopologyPieces,
208 fmt: &mut Formatter,
209) -> bool {
210 if !config.healthchecks.enabled {
211 fmt.warning("Health checks are disabled");
212 return !opts.deny_warnings;
213 }
214
215 let healthchecks = topology::take_healthchecks(diff, pieces);
216 let mut validated = true;
219 for (id, healthcheck) in healthchecks {
220 let mut failed = |error| {
221 validated = false;
222 fmt.error(error);
223 };
224
225 trace!("Healthcheck for {id} starting.");
226 match tokio::spawn(healthcheck).await {
227 Ok(Ok(_)) => {
228 if config
229 .sink(&id)
230 .expect("Sink not present")
231 .healthcheck()
232 .enabled
233 {
234 fmt.success(format!("Health check \"{id}\""));
235 } else {
236 fmt.warning(format!("Health check disabled for \"{id}\""));
237 validated &= !opts.deny_warnings;
238 }
239 }
240 Ok(Err(e)) => failed(format!("Health check for \"{id}\" failed: {e}")),
241 Err(error) if error.is_cancelled() => {
242 failed(format!("Health check for \"{id}\" was cancelled"))
243 }
244 Err(_) => failed(format!("Health check for \"{id}\" panicked")),
245 }
246 trace!("Healthcheck for {id} done.");
247 }
248
249 validated
250}
251
252fn create_tmp_directory(config: &mut Config, fmt: &mut Formatter) -> Option<PathBuf> {
256 match config
257 .global
258 .resolve_and_make_data_subdir(None, TEMPORARY_DIRECTORY)
259 {
260 Ok(path) => {
261 config.global.data_dir = Some(path.clone());
262 Some(path)
263 }
264 Err(error) => {
265 fmt.error(error.to_string());
266 None
267 }
268 }
269}
270
271fn remove_tmp_directory(path: PathBuf) {
272 if let Err(error) = remove_dir_all(&path) {
273 error!(message = "Failed to remove temporary directory.", path = ?path, %error);
274 }
275}
276
277pub struct Formatter {
278 max_line_width: usize,
280 print_space: bool,
282 color: bool,
283 error_intro: String,
285 warning_intro: String,
286 success_intro: String,
287}
288
289impl Formatter {
290 pub fn new(color: bool) -> Self {
291 Self {
292 max_line_width: 0,
293 print_space: false,
294 error_intro: if color {
295 "x".red().to_string()
296 } else {
297 "x".to_owned()
298 },
299 warning_intro: if color {
300 "~".yellow().to_string()
301 } else {
302 "~".to_owned()
303 },
304 success_intro: if color {
305 "√".green().to_string()
306 } else {
307 "√".to_owned()
308 },
309 color,
310 }
311 }
312
313 fn validated(&self) {
315 #[allow(clippy::print_stdout)]
316 {
317 println!("{:-^width$}", "", width = self.max_line_width);
318 }
319 if self.color {
320 #[allow(clippy::print_stdout)]
325 {
326 println!(
327 "{:>width$}",
328 "Validated".green(),
329 width = self.max_line_width
330 );
331 }
332 } else {
333 #[allow(clippy::print_stdout)]
334 {
335 println!("{:>width$}", "Validated", width = self.max_line_width)
336 }
337 }
338 }
339
340 fn success(&mut self, msg: impl AsRef<str>) {
342 self.print(format!("{} {}\n", self.success_intro, msg.as_ref()))
343 }
344
345 fn warning(&mut self, warning: impl AsRef<str>) {
347 self.print(format!("{} {}\n", self.warning_intro, warning.as_ref()))
348 }
349
350 fn error(&mut self, error: impl AsRef<str>) {
352 self.print(format!("{} {}\n", self.error_intro, error.as_ref()))
353 }
354
355 fn title(&mut self, title: impl AsRef<str>) {
357 self.space();
358 self.print(format!(
359 "{}\n{:-<width$}\n",
360 title.as_ref(),
361 "",
362 width = title.as_ref().len()
363 ))
364 }
365
366 fn sub_warning<I: IntoIterator>(&mut self, warnings: I)
368 where
369 I::Item: fmt::Display,
370 {
371 self.sub(self.warning_intro.clone(), warnings)
372 }
373
374 fn sub_error<I: IntoIterator>(&mut self, errors: I)
376 where
377 I::Item: fmt::Display,
378 {
379 self.sub(self.error_intro.clone(), errors)
380 }
381
382 fn sub<I: IntoIterator>(&mut self, intro: impl AsRef<str>, msgs: I)
383 where
384 I::Item: fmt::Display,
385 {
386 for msg in msgs {
387 self.print(format!("{} {}\n", intro.as_ref(), msg));
388 }
389 self.space();
390 }
391
392 fn space(&mut self) {
394 if self.print_space {
395 self.print_space = false;
396 #[allow(clippy::print_stdout)]
397 {
398 println!();
399 }
400 }
401 }
402
403 fn print(&mut self, print: impl AsRef<str>) {
404 let width = print
405 .as_ref()
406 .lines()
407 .map(|line| {
408 String::from_utf8_lossy(&strip_ansi_escapes::strip(line))
409 .chars()
410 .count()
411 })
412 .max()
413 .unwrap_or(0);
414 self.max_line_width = width.max(self.max_line_width);
415 self.print_space = true;
416 #[allow(clippy::print_stdout)]
417 {
418 print!("{}", print.as_ref())
419 }
420 }
421}