1#![allow(missing_docs)]
2
3use std::{fmt, fs::remove_dir_all, path::PathBuf};
4
5use clap::Parser;
6use colored::*;
7use exitcode::ExitCode;
8
9use crate::{
10 config::{self, Config, ConfigDiff},
11 topology::{
12 self,
13 builder::{TopologyPieces, TopologyPiecesBuilder},
14 },
15};
16
17const TEMPORARY_DIRECTORY: &str = "validate_tmp";
18
19#[derive(Parser, Debug)]
20#[command(rename_all = "kebab-case")]
21pub struct Opts {
22 #[arg(long)]
24 pub no_environment: bool,
25
26 #[arg(long)]
28 pub skip_healthchecks: bool,
29
30 #[arg(short, long)]
33 pub deny_warnings: bool,
34
35 #[arg(
37 id = "config-toml",
38 long,
39 env = "VECTOR_CONFIG_TOML",
40 value_delimiter(',')
41 )]
42 pub paths_toml: Vec<PathBuf>,
43
44 #[arg(
46 id = "config-json",
47 long,
48 env = "VECTOR_CONFIG_JSON",
49 value_delimiter(',')
50 )]
51 pub paths_json: Vec<PathBuf>,
52
53 #[arg(
55 id = "config-yaml",
56 long,
57 env = "VECTOR_CONFIG_YAML",
58 value_delimiter(',')
59 )]
60 pub paths_yaml: Vec<PathBuf>,
61
62 #[arg(env = "VECTOR_CONFIG", value_delimiter(','))]
67 pub paths: Vec<PathBuf>,
68
69 #[arg(
74 id = "config-dir",
75 short = 'C',
76 long,
77 env = "VECTOR_CONFIG_DIR",
78 value_delimiter(',')
79 )]
80 pub config_dirs: Vec<PathBuf>,
81}
82
83impl Opts {
84 fn paths_with_formats(&self) -> Vec<config::ConfigPath> {
85 config::merge_path_lists(vec![
86 (&self.paths, None),
87 (&self.paths_toml, Some(config::Format::Toml)),
88 (&self.paths_json, Some(config::Format::Json)),
89 (&self.paths_yaml, Some(config::Format::Yaml)),
90 ])
91 .map(|(path, hint)| config::ConfigPath::File(path, hint))
92 .chain(
93 self.config_dirs
94 .iter()
95 .map(|dir| config::ConfigPath::Dir(dir.to_path_buf())),
96 )
97 .collect()
98 }
99}
100
101pub async fn validate(opts: &Opts, color: bool) -> ExitCode {
103 let mut fmt = Formatter::new(color);
104
105 let mut validated = true;
106
107 let mut config = match validate_config(opts, &mut fmt) {
108 Some(config) => config,
109 None => return exitcode::CONFIG,
110 };
111
112 if !opts.no_environment {
113 if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) {
114 validated &= validate_environment(opts, &config, &mut fmt).await;
115 remove_tmp_directory(tmp_directory);
116 } else {
117 validated = false;
118 }
119 }
120
121 if validated {
122 fmt.validated();
123 exitcode::OK
124 } else {
125 exitcode::CONFIG
126 }
127}
128
129pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
130 let paths = opts.paths_with_formats();
132 let paths = if let Some(paths) = config::process_paths(&paths) {
133 paths
134 } else {
135 fmt.error("No config file paths");
136 return None;
137 };
138
139 let paths_list: Vec<_> = paths.iter().map(<&PathBuf>::from).collect();
141
142 let mut report_error = |errors| {
143 fmt.title(format!("Failed to load {:?}", &paths_list));
144 fmt.sub_error(errors);
145 };
146 let builder = config::load_builder_from_paths(&paths)
147 .map_err(&mut report_error)
148 .ok()?;
149 config::init_log_schema(builder.global.log_schema.clone(), true);
150
151 let (config, warnings) = builder
153 .build_with_warnings()
154 .map_err(&mut report_error)
155 .ok()?;
156
157 if !warnings.is_empty() {
159 if opts.deny_warnings {
160 report_error(warnings);
161 return None;
162 }
163
164 fmt.title(format!("Loaded with warnings {:?}", &paths_list));
165 fmt.sub_warning(warnings);
166 } else {
167 fmt.success(format!("Loaded {:?}", &paths_list));
168 }
169
170 Some(config)
171}
172
173async fn validate_environment(opts: &Opts, config: &Config, fmt: &mut Formatter) -> bool {
174 let diff = ConfigDiff::initial(config);
175
176 let mut pieces = match validate_components(config, &diff, fmt).await {
177 Some(pieces) => pieces,
178 _ => {
179 return false;
180 }
181 };
182 opts.skip_healthchecks || validate_healthchecks(opts, config, &diff, &mut pieces, fmt).await
183}
184
185async fn validate_components(
186 config: &Config,
187 diff: &ConfigDiff,
188 fmt: &mut Formatter,
189) -> Option<TopologyPieces> {
190 match TopologyPiecesBuilder::new(config, diff).build().await {
191 Ok(pieces) => {
192 fmt.success("Component configuration");
193 Some(pieces)
194 }
195 Err(errors) => {
196 fmt.title("Component errors");
197 fmt.sub_error(errors);
198 None
199 }
200 }
201}
202
203async fn validate_healthchecks(
204 opts: &Opts,
205 config: &Config,
206 diff: &ConfigDiff,
207 pieces: &mut TopologyPieces,
208 fmt: &mut Formatter,
209) -> bool {
210 if !config.healthchecks.enabled {
211 fmt.warning("Health checks are disabled");
212 return !opts.deny_warnings;
213 }
214
215 let healthchecks = topology::take_healthchecks(diff, pieces);
216 let mut validated = true;
219 for (id, healthcheck) in healthchecks {
220 let mut failed = |error| {
221 validated = false;
222 fmt.error(error);
223 };
224
225 trace!("Healthcheck for {id} starting.");
226 match tokio::spawn(healthcheck).await {
227 Ok(Ok(_)) => {
228 if config
229 .sink(&id)
230 .expect("Sink not present")
231 .healthcheck()
232 .enabled
233 {
234 fmt.success(format!("Health check \"{id}\""));
235 } else {
236 fmt.warning(format!("Health check disabled for \"{id}\""));
237 validated &= !opts.deny_warnings;
238 }
239 }
240 Ok(Err(e)) => failed(format!("Health check for \"{id}\" failed: {e}")),
241 Err(error) if error.is_cancelled() => {
242 failed(format!("Health check for \"{id}\" was cancelled"))
243 }
244 Err(_) => failed(format!("Health check for \"{id}\" panicked")),
245 }
246 trace!("Healthcheck for {id} done.");
247 }
248
249 validated
250}
251
252fn create_tmp_directory(config: &mut Config, fmt: &mut Formatter) -> Option<PathBuf> {
256 match config
257 .global
258 .resolve_and_make_data_subdir(None, TEMPORARY_DIRECTORY)
259 {
260 Ok(path) => {
261 config.global.data_dir = Some(path.clone());
262 Some(path)
263 }
264 Err(error) => {
265 fmt.error(error.to_string());
266 None
267 }
268 }
269}
270
271fn remove_tmp_directory(path: PathBuf) {
272 if let Err(error) = remove_dir_all(&path) {
273 error!(message = "Failed to remove temporary directory.", path = ?path, %error);
274 }
275}
276
277pub struct Formatter {
278 max_line_width: usize,
280 print_space: bool,
282 color: bool,
283 error_intro: String,
285 warning_intro: String,
286 success_intro: String,
287}
288
289impl Formatter {
290 pub fn new(color: bool) -> Self {
291 Self {
292 max_line_width: 0,
293 print_space: false,
294 error_intro: if color {
295 "x".red().to_string()
296 } else {
297 "x".to_owned()
298 },
299 warning_intro: if color {
300 "~".yellow().to_string()
301 } else {
302 "~".to_owned()
303 },
304 success_intro: if color {
305 "√".green().to_string()
306 } else {
307 "√".to_owned()
308 },
309 color,
310 }
311 }
312
313 fn validated(&self) {
315 #[allow(clippy::print_stdout)]
316 {
317 println!("{:-^width$}", "", width = self.max_line_width);
318 }
319 if self.color {
320 #[allow(clippy::print_stdout)]
325 {
326 println!(
327 "{:>width$}",
328 "Validated".green(),
329 width = self.max_line_width
330 );
331 }
332 } else {
333 #[allow(clippy::print_stdout)]
334 {
335 println!("{:>width$}", "Validated", width = self.max_line_width)
336 }
337 }
338 }
339
340 fn success(&mut self, msg: impl AsRef<str>) {
342 self.print(format!("{} {}\n", self.success_intro, msg.as_ref()))
343 }
344
345 fn warning(&mut self, warning: impl AsRef<str>) {
347 self.print(format!("{} {}\n", self.warning_intro, warning.as_ref()))
348 }
349
350 fn error(&mut self, error: impl AsRef<str>) {
352 self.print(format!("{} {}\n", self.error_intro, error.as_ref()))
353 }
354
355 fn title(&mut self, title: impl AsRef<str>) {
357 self.space();
358 self.print(format!(
359 "{}\n{:-<width$}\n",
360 title.as_ref(),
361 "",
362 width = title.as_ref().len()
363 ))
364 }
365
366 fn sub_warning<I: IntoIterator>(&mut self, warnings: I)
368 where
369 I::Item: fmt::Display,
370 {
371 self.sub(self.warning_intro.clone(), warnings)
372 }
373
374 fn sub_error<I: IntoIterator>(&mut self, errors: I)
376 where
377 I::Item: fmt::Display,
378 {
379 self.sub(self.error_intro.clone(), errors)
380 }
381
382 fn sub<I: IntoIterator>(&mut self, intro: impl AsRef<str>, msgs: I)
383 where
384 I::Item: fmt::Display,
385 {
386 for msg in msgs {
387 self.print(format!("{} {}\n", intro.as_ref(), msg));
388 }
389 self.space();
390 }
391
392 fn space(&mut self) {
394 if self.print_space {
395 self.print_space = false;
396 #[allow(clippy::print_stdout)]
397 {
398 println!();
399 }
400 }
401 }
402
403 fn print(&mut self, print: impl AsRef<str>) {
404 let width = print
405 .as_ref()
406 .lines()
407 .map(|line| {
408 String::from_utf8_lossy(&strip_ansi_escapes::strip(line))
409 .chars()
410 .count()
411 })
412 .max()
413 .unwrap_or(0);
414 self.max_line_width = width.max(self.max_line_width);
415 self.print_space = true;
416 #[allow(clippy::print_stdout)]
417 {
418 print!("{}", print.as_ref())
419 }
420 }
421}