vector/
validate.rs

1#![allow(missing_docs)]
2
3use std::{fmt, fs::remove_dir_all, path::PathBuf};
4
5use clap::Parser;
6use colored::*;
7use exitcode::ExitCode;
8
9use crate::{
10    config::{self, Config, ConfigDiff, loading::ConfigBuilderLoader},
11    topology::{
12        self,
13        builder::{TopologyPieces, TopologyPiecesBuilder},
14    },
15};
16
17const TEMPORARY_DIRECTORY: &str = "validate_tmp";
18
19#[derive(Parser, Debug)]
20#[command(rename_all = "kebab-case")]
21pub struct Opts {
22    /// Disables environment checks. That includes component checks and health checks.
23    #[arg(long)]
24    pub no_environment: bool,
25
26    /// Disables health checks during validation.
27    #[arg(long)]
28    pub skip_healthchecks: bool,
29
30    /// Fail validation on warnings that are probably a mistake in the configuration
31    /// or are recommended to be fixed.
32    #[arg(short, long)]
33    pub deny_warnings: bool,
34
35    /// Vector config files in TOML format to validate.
36    #[arg(
37        id = "config-toml",
38        long,
39        env = "VECTOR_CONFIG_TOML",
40        value_delimiter(',')
41    )]
42    pub paths_toml: Vec<PathBuf>,
43
44    /// Vector config files in JSON format to validate.
45    #[arg(
46        id = "config-json",
47        long,
48        env = "VECTOR_CONFIG_JSON",
49        value_delimiter(',')
50    )]
51    pub paths_json: Vec<PathBuf>,
52
53    /// Vector config files in YAML format to validate.
54    #[arg(
55        id = "config-yaml",
56        long,
57        env = "VECTOR_CONFIG_YAML",
58        value_delimiter(',')
59    )]
60    pub paths_yaml: Vec<PathBuf>,
61
62    /// Any number of Vector config files to validate.
63    /// Format is detected from the file name.
64    /// If none are specified, the default config path `/etc/vector/vector.yaml`
65    /// is targeted.
66    #[arg(env = "VECTOR_CONFIG", value_delimiter(','))]
67    pub paths: Vec<PathBuf>,
68
69    /// Read configuration from files in one or more directories.
70    /// File format is detected from the file name.
71    ///
72    /// Files not ending in .toml, .json, .yaml, or .yml will be ignored.
73    #[arg(
74        id = "config-dir",
75        short = 'C',
76        long,
77        env = "VECTOR_CONFIG_DIR",
78        value_delimiter(',')
79    )]
80    pub config_dirs: Vec<PathBuf>,
81
82    /// Disable interpolation of environment variables in configuration files.
83    #[arg(
84        long,
85        env = "VECTOR_DISABLE_ENV_VAR_INTERPOLATION",
86        default_value = "false"
87    )]
88    pub disable_env_var_interpolation: bool,
89}
90
91impl Opts {
92    fn paths_with_formats(&self) -> Vec<config::ConfigPath> {
93        config::merge_path_lists(vec![
94            (&self.paths, None),
95            (&self.paths_toml, Some(config::Format::Toml)),
96            (&self.paths_json, Some(config::Format::Json)),
97            (&self.paths_yaml, Some(config::Format::Yaml)),
98        ])
99        .map(|(path, hint)| config::ConfigPath::File(path, hint))
100        .chain(
101            self.config_dirs
102                .iter()
103                .map(|dir| config::ConfigPath::Dir(dir.to_path_buf())),
104        )
105        .collect()
106    }
107}
108
109/// Performs topology, component, and health checks.
110pub async fn validate(opts: &Opts, color: bool) -> ExitCode {
111    let mut fmt = Formatter::new(color);
112
113    let mut validated = true;
114
115    let mut config = match validate_config(opts, &mut fmt) {
116        Some(config) => config,
117        None => return exitcode::CONFIG,
118    };
119
120    if !opts.no_environment {
121        if let Some(tmp_directory) = create_tmp_directory(&mut config, &mut fmt) {
122            validated &= validate_environment(opts, &config, &mut fmt).await;
123            remove_tmp_directory(tmp_directory);
124        } else {
125            validated = false;
126        }
127    }
128
129    if validated {
130        fmt.validated();
131        exitcode::OK
132    } else {
133        exitcode::CONFIG
134    }
135}
136
137pub fn validate_config(opts: &Opts, fmt: &mut Formatter) -> Option<Config> {
138    // Prepare paths
139    let paths = opts.paths_with_formats();
140    let paths = if let Some(paths) = config::process_paths(&paths) {
141        paths
142    } else {
143        fmt.error("No config file paths");
144        return None;
145    };
146
147    // Load
148    let paths_list: Vec<_> = paths.iter().map(<&PathBuf>::from).collect();
149
150    let mut report_error = |errors| {
151        fmt.title(format!("Failed to load {:?}", &paths_list));
152        fmt.sub_error(errors);
153    };
154    let builder = ConfigBuilderLoader::default()
155        .interpolate_env(!opts.disable_env_var_interpolation)
156        .load_from_paths(&paths)
157        .map_err(&mut report_error)
158        .ok()?;
159    config::init_log_schema(builder.global.log_schema.clone(), true);
160
161    // Build
162    let (config, warnings) = builder
163        .build_with_warnings()
164        .map_err(&mut report_error)
165        .ok()?;
166
167    // Warnings
168    if !warnings.is_empty() {
169        if opts.deny_warnings {
170            report_error(warnings);
171            return None;
172        }
173
174        fmt.title(format!("Loaded with warnings {:?}", &paths_list));
175        fmt.sub_warning(warnings);
176    } else {
177        fmt.success(format!("Loaded {:?}", &paths_list));
178    }
179
180    Some(config)
181}
182
183async fn validate_environment(opts: &Opts, config: &Config, fmt: &mut Formatter) -> bool {
184    let diff = ConfigDiff::initial(config);
185
186    let mut pieces = match validate_components(config, &diff, fmt).await {
187        Some(pieces) => pieces,
188        _ => {
189            return false;
190        }
191    };
192    opts.skip_healthchecks || validate_healthchecks(opts, config, &diff, &mut pieces, fmt).await
193}
194
195async fn validate_components(
196    config: &Config,
197    diff: &ConfigDiff,
198    fmt: &mut Formatter,
199) -> Option<TopologyPieces> {
200    match TopologyPiecesBuilder::new(config, diff).build().await {
201        Ok(pieces) => {
202            fmt.success("Component configuration");
203            Some(pieces)
204        }
205        Err(errors) => {
206            fmt.title("Component errors");
207            fmt.sub_error(errors);
208            None
209        }
210    }
211}
212
213async fn validate_healthchecks(
214    opts: &Opts,
215    config: &Config,
216    diff: &ConfigDiff,
217    pieces: &mut TopologyPieces,
218    fmt: &mut Formatter,
219) -> bool {
220    if !config.healthchecks.enabled {
221        fmt.warning("Health checks are disabled");
222        return !opts.deny_warnings;
223    }
224
225    let healthchecks = topology::take_healthchecks(diff, pieces);
226    // We are running health checks in serial so it's easier for the users
227    // to parse which errors/warnings/etc. belong to which healthcheck.
228    let mut validated = true;
229    for (id, healthcheck) in healthchecks {
230        let mut failed = |error| {
231            validated = false;
232            fmt.error(error);
233        };
234
235        trace!("Healthcheck for {id} starting.");
236        match tokio::spawn(healthcheck).await {
237            Ok(Ok(_)) => {
238                if config
239                    .sink(&id)
240                    .expect("Sink not present")
241                    .healthcheck()
242                    .enabled
243                {
244                    fmt.success(format!("Health check \"{id}\""));
245                } else {
246                    fmt.warning(format!("Health check disabled for \"{id}\""));
247                    validated &= !opts.deny_warnings;
248                }
249            }
250            Ok(Err(e)) => failed(format!("Health check for \"{id}\" failed: {e}")),
251            Err(error) if error.is_cancelled() => {
252                failed(format!("Health check for \"{id}\" was cancelled"))
253            }
254            Err(_) => failed(format!("Health check for \"{id}\" panicked")),
255        }
256        trace!("Healthcheck for {id} done.");
257    }
258
259    validated
260}
261
262/// For data directory that we write to:
263/// 1. Create a tmp directory in it.
264/// 2. Change config to point to that tmp directory.
265fn create_tmp_directory(config: &mut Config, fmt: &mut Formatter) -> Option<PathBuf> {
266    match config
267        .global
268        .resolve_and_make_data_subdir(None, TEMPORARY_DIRECTORY)
269    {
270        Ok(path) => {
271            config.global.data_dir = Some(path.clone());
272            Some(path)
273        }
274        Err(error) => {
275            fmt.error(error.to_string());
276            None
277        }
278    }
279}
280
281fn remove_tmp_directory(path: PathBuf) {
282    if let Err(error) = remove_dir_all(&path) {
283        error!(message = "Failed to remove temporary directory.", path = ?path, %error);
284    }
285}
286
287pub struct Formatter {
288    /// Width of largest printed line
289    max_line_width: usize,
290    /// Can empty line be printed
291    print_space: bool,
292    color: bool,
293    // Intros
294    error_intro: String,
295    warning_intro: String,
296    success_intro: String,
297}
298
299impl Formatter {
300    pub fn new(color: bool) -> Self {
301        Self {
302            max_line_width: 0,
303            print_space: false,
304            error_intro: if color {
305                "x".red().to_string()
306            } else {
307                "x".to_owned()
308            },
309            warning_intro: if color {
310                "~".yellow().to_string()
311            } else {
312                "~".to_owned()
313            },
314            success_intro: if color {
315                "√".green().to_string()
316            } else {
317                "√".to_owned()
318            },
319            color,
320        }
321    }
322
323    /// Final confirmation that validation process was successful.
324    fn validated(&self) {
325        #[allow(clippy::print_stdout)]
326        {
327            println!("{:-^width$}", "", width = self.max_line_width);
328        }
329        if self.color {
330            // Coloring needs to be used directly so that print
331            // infrastructure correctly determines length of the
332            // "Validated". Otherwise, ansi escape coloring is
333            // calculated into the length.
334            #[allow(clippy::print_stdout)]
335            {
336                println!(
337                    "{:>width$}",
338                    "Validated".green(),
339                    width = self.max_line_width
340                );
341            }
342        } else {
343            #[allow(clippy::print_stdout)]
344            {
345                println!("{:>width$}", "Validated", width = self.max_line_width)
346            }
347        }
348    }
349
350    /// Standalone line
351    fn success(&mut self, msg: impl AsRef<str>) {
352        self.print(format!("{} {}\n", self.success_intro, msg.as_ref()))
353    }
354
355    /// Standalone line
356    fn warning(&mut self, warning: impl AsRef<str>) {
357        self.print(format!("{} {}\n", self.warning_intro, warning.as_ref()))
358    }
359
360    /// Standalone line
361    fn error(&mut self, error: impl AsRef<str>) {
362        self.print(format!("{} {}\n", self.error_intro, error.as_ref()))
363    }
364
365    /// Marks sub
366    fn title(&mut self, title: impl AsRef<str>) {
367        self.space();
368        self.print(format!(
369            "{}\n{:-<width$}\n",
370            title.as_ref(),
371            "",
372            width = title.as_ref().len()
373        ))
374    }
375
376    /// A list of warnings that go with a title.
377    fn sub_warning<I: IntoIterator>(&mut self, warnings: I)
378    where
379        I::Item: fmt::Display,
380    {
381        self.sub(self.warning_intro.clone(), warnings)
382    }
383
384    /// A list of errors that go with a title.
385    fn sub_error<I: IntoIterator>(&mut self, errors: I)
386    where
387        I::Item: fmt::Display,
388    {
389        self.sub(self.error_intro.clone(), errors)
390    }
391
392    fn sub<I: IntoIterator>(&mut self, intro: impl AsRef<str>, msgs: I)
393    where
394        I::Item: fmt::Display,
395    {
396        for msg in msgs {
397            self.print(format!("{} {}\n", intro.as_ref(), msg));
398        }
399        self.space();
400    }
401
402    /// Prints empty space if necessary.
403    fn space(&mut self) {
404        if self.print_space {
405            self.print_space = false;
406            #[allow(clippy::print_stdout)]
407            {
408                println!();
409            }
410        }
411    }
412
413    fn print(&mut self, print: impl AsRef<str>) {
414        let width = print
415            .as_ref()
416            .lines()
417            .map(|line| {
418                String::from_utf8_lossy(&strip_ansi_escapes::strip(line))
419                    .chars()
420                    .count()
421            })
422            .max()
423            .unwrap_or(0);
424        self.max_line_width = width.max(self.max_line_width);
425        self.print_space = true;
426        #[allow(clippy::print_stdout)]
427        {
428            print!("{}", print.as_ref())
429        }
430    }
431}