vector/components/validation/
mod.rs

1mod resources;
2#[cfg(feature = "component-validation-runner")]
3mod runner;
4mod sync;
5mod test_case;
6pub mod util;
7mod validators;
8
9use vector_lib::config::LogNamespace;
10
11use crate::config::{BoxedSink, BoxedSource, BoxedTransform};
12
13/// For components implementing `ValidatableComponent`
14pub mod prelude {
15    pub use super::ComponentTestCaseConfig;
16    pub use super::ExternalResource;
17    pub use super::HttpResourceConfig;
18    pub use super::ResourceDirection;
19    pub use super::ValidatableComponent;
20    pub use super::ValidationConfiguration;
21    pub use crate::register_validatable_component;
22}
23
24pub use self::resources::*;
25#[cfg(feature = "component-validation-runner")]
26pub use self::runner::*;
27pub use self::test_case::{TestCase, TestCaseExpectation};
28pub use self::validators::*;
29
30pub mod component_names {
31    pub const TEST_SOURCE_NAME: &str = "test_source";
32    pub const TEST_SINK_NAME: &str = "test_sink";
33    pub const TEST_TRANSFORM_NAME: &str = "test_transform";
34    pub const TEST_INPUT_SOURCE_NAME: &str = "input_source";
35    pub const TEST_OUTPUT_SINK_NAME: &str = "output_sink";
36}
37
38/// Component types that can be validated.
39// TODO: We should centralize this in `vector-common` or something, where both this code and the
40// configuration schema stuff (namely the proc macros that use this) can share it.
41#[derive(Clone, Copy, Debug, Eq, PartialEq)]
42pub enum ComponentType {
43    Source,
44    Transform,
45    Sink,
46}
47
48impl ComponentType {
49    /// Gets the name of this component type as a string.
50    pub const fn as_str(&self) -> &'static str {
51        match self {
52            Self::Source => "source",
53            Self::Transform => "transform",
54            Self::Sink => "sink",
55        }
56    }
57}
58
59/// Component type-specific configuration.
60#[allow(clippy::large_enum_variant)]
61#[derive(Clone, Debug)]
62pub enum ComponentConfiguration {
63    /// A source component.
64    Source(BoxedSource),
65
66    /// A transform component.
67    Transform(BoxedTransform),
68
69    /// A sink component.
70    Sink(BoxedSink),
71}
72
73/// Component configuration for a test case.
74#[derive(Clone)]
75pub struct ComponentTestCaseConfig {
76    config: ComponentConfiguration,
77    /// If specified, this name must match the `config_name` field of at least one of the test case events.
78    test_case: Option<String>,
79    external_resource: Option<ExternalResource>,
80}
81
82impl ComponentTestCaseConfig {
83    pub fn from_source<C: Into<BoxedSource>>(
84        config: C,
85        test_case: Option<String>,
86        external_resource: Option<ExternalResource>,
87    ) -> Self {
88        Self {
89            config: ComponentConfiguration::Source(config.into()),
90            test_case,
91            external_resource,
92        }
93    }
94    pub fn from_transform<C: Into<BoxedTransform>>(
95        config: C,
96        test_case: Option<String>,
97        external_resource: Option<ExternalResource>,
98    ) -> Self {
99        Self {
100            config: ComponentConfiguration::Transform(config.into()),
101            test_case,
102            external_resource,
103        }
104    }
105    pub fn from_sink<C: Into<BoxedSink>>(
106        config: C,
107        test_case: Option<String>,
108        external_resource: Option<ExternalResource>,
109    ) -> Self {
110        Self {
111            config: ComponentConfiguration::Sink(config.into()),
112            test_case,
113            external_resource,
114        }
115    }
116}
117
118/// Configuration for validating a component.
119///
120/// This type encompasses all of the required information for configuring and validating a
121/// component, including the strongly-typed configuration for building a topology, as well as the
122/// definition of the external resource required to properly interact with the component.
123#[derive(Clone)]
124pub struct ValidationConfiguration {
125    component_name: &'static str,
126    component_type: ComponentType,
127    /// There may be only one `ComponentTestCaseConfig` necessary to execute all test cases, but some cases
128    /// require more advanced configuration in order to hit the code path desired.
129    component_configurations: Vec<ComponentTestCaseConfig>,
130    log_namespace: LogNamespace,
131}
132
133impl ValidationConfiguration {
134    /// Creates a new `ValidationConfiguration` for a source.
135    pub const fn from_source(
136        component_name: &'static str,
137        log_namespace: LogNamespace,
138        component_configurations: Vec<ComponentTestCaseConfig>,
139    ) -> Self {
140        Self {
141            component_name,
142            component_type: ComponentType::Source,
143            component_configurations,
144            log_namespace,
145        }
146    }
147
148    /// Creates a new `ValidationConfiguration` for a transform.
149    pub const fn from_transform(
150        component_name: &'static str,
151        log_namespace: LogNamespace,
152        component_configurations: Vec<ComponentTestCaseConfig>,
153    ) -> Self {
154        Self {
155            component_name,
156            component_type: ComponentType::Transform,
157            component_configurations,
158            log_namespace,
159        }
160    }
161
162    /// Creates a new `ValidationConfiguration` for a sink.
163    pub const fn from_sink(
164        component_name: &'static str,
165        log_namespace: LogNamespace,
166        component_configurations: Vec<ComponentTestCaseConfig>,
167    ) -> Self {
168        Self {
169            component_name,
170            component_type: ComponentType::Sink,
171            component_configurations,
172            log_namespace,
173        }
174    }
175
176    /// Gets the name of the component.
177    pub const fn component_name(&self) -> &'static str {
178        self.component_name
179    }
180
181    /// Gets the type of the component.
182    pub const fn component_type(&self) -> ComponentType {
183        self.component_type
184    }
185
186    /// Gets the configuration of the component.
187    pub fn component_configurations(&self) -> Vec<ComponentTestCaseConfig> {
188        self.component_configurations.clone()
189    }
190
191    /// Gets the LogNamespace that the component is using.
192    pub const fn log_namespace(&self) -> LogNamespace {
193        self.log_namespace
194    }
195
196    fn get_comp_test_case(&self, test_case: Option<&String>) -> Option<ComponentTestCaseConfig> {
197        let empty = String::from("");
198        let test_case = test_case.unwrap_or(&empty);
199        self.component_configurations
200            .clone()
201            .into_iter()
202            .find(|c| c.test_case.as_ref().unwrap_or(&String::from("")) == test_case)
203    }
204
205    /// Gets the configuration of the component.
206    pub fn component_configuration_for_test_case(
207        &self,
208        test_case: Option<&String>,
209    ) -> Option<ComponentConfiguration> {
210        self.get_comp_test_case(test_case).map(|c| c.config)
211    }
212
213    /// Gets the external resource definition for validating the component, if any.
214    pub fn external_resource(&self, test_case: Option<&String>) -> Option<ExternalResource> {
215        self.get_comp_test_case(test_case)
216            .and_then(|c| c.external_resource)
217    }
218}
219
220pub trait ValidatableComponent: Send + Sync {
221    /// Gets the validation configuration for this component.
222    ///
223    /// The validation configuration compromises the two main requirements for validating a
224    /// component: how to configure the component in a topology, and what external resources, if
225    /// any, it depends on.
226    fn validation_configuration() -> ValidationConfiguration;
227}
228
229/// Description of a validatable component.
230pub struct ValidatableComponentDescription {
231    validation_configuration: fn() -> ValidationConfiguration,
232}
233
234impl ValidatableComponentDescription {
235    /// Creates a new `ValidatableComponentDescription`.
236    ///
237    /// This creates a validatable component description for a component identified by the given
238    /// component type `V`.
239    pub const fn new<V: ValidatableComponent>() -> Self {
240        Self {
241            validation_configuration: <V as ValidatableComponent>::validation_configuration,
242        }
243    }
244
245    /// Queries the list of validatable components for a component with the given name and component type.
246    pub fn query(
247        component_name: &str,
248        component_type: ComponentType,
249    ) -> Option<ValidationConfiguration> {
250        inventory::iter::<Self>
251            .into_iter()
252            .map(|v| (v.validation_configuration)())
253            .find(|v| v.component_name() == component_name && v.component_type() == component_type)
254    }
255}
256
257inventory::collect!(ValidatableComponentDescription);
258
259#[macro_export]
260macro_rules! register_validatable_component {
261    ($ty:ty) => {
262        ::inventory::submit! {
263            $crate::components::validation::ValidatableComponentDescription::new::<$ty>()
264        }
265    };
266}
267
268/// Input and Output runners populate this structure as they send and receive events.
269/// The structure is passed into the validator to use as the expected values for the
270/// metrics that the components under test actually output.
271#[derive(Default, Debug)]
272pub struct RunnerMetrics {
273    pub received_events_total: u64,
274    pub received_event_bytes_total: u64,
275    pub received_bytes_total: u64,
276    pub sent_bytes_total: u64,
277    pub sent_event_bytes_total: u64,
278    pub sent_events_total: u64,
279    pub errors_total: u64,
280    pub discarded_events_total: u64,
281}
282
283#[cfg(feature = "component-validation-runner")]
284fn run_validation(configuration: ValidationConfiguration, test_case_data_path: std::path::PathBuf) {
285    let component_name = configuration.component_name();
286    info!(
287        "Running validation for component '{}' (type: {:?})...",
288        component_name,
289        configuration.component_type()
290    );
291
292    let rt = tokio::runtime::Builder::new_current_thread()
293        .enable_all()
294        .build()
295        .unwrap();
296
297    rt.block_on(async {
298        let mut runner = Runner::from_configuration(
299            configuration,
300            test_case_data_path,
301            crate::extra_context::ExtraContext::default(),
302        );
303        runner.add_validator(StandardValidators::ComponentSpec);
304
305        match runner.run_validation().await {
306            Ok(test_case_results) => {
307                let mut details = Vec::new();
308                let mut had_failures = false;
309
310                for test_case_result in test_case_results.into_iter() {
311                    for validator_result in test_case_result.validator_results() {
312                        match validator_result {
313                            Ok(success) => {
314                                if success.is_empty() {
315                                    details.push(format!(
316                                        "  test case '{}': passed",
317                                        test_case_result.test_name()
318                                    ));
319                                } else {
320                                    let formatted = success
321                                        .iter()
322                                        .map(|s| format!("    - {s}\n"))
323                                        .collect::<Vec<_>>();
324
325                                    details.push(format!(
326                                        "  test case '{}': passed\n{}",
327                                        test_case_result.test_name(),
328                                        formatted.join("")
329                                    ));
330                                }
331                            }
332                            Err(failure) => {
333                                had_failures = true;
334
335                                if failure.is_empty() {
336                                    details.push(format!(
337                                        "  test case '{}': failed",
338                                        test_case_result.test_name()
339                                    ));
340                                } else {
341                                    let formatted = failure
342                                        .iter()
343                                        .map(|s| format!("    - {s}\n"))
344                                        .collect::<Vec<_>>();
345
346                                    details.push(format!(
347                                        "  test case '{}': failed\n{}",
348                                        test_case_result.test_name(),
349                                        formatted.join("")
350                                    ));
351                                }
352                            }
353                        }
354                    }
355                }
356
357                if had_failures {
358                    panic!(
359                        "Failed to validate component '{}':\n{}",
360                        component_name,
361                        details.join("")
362                    );
363                } else {
364                    info!(
365                        "Successfully validated component '{}':\n{}",
366                        component_name,
367                        details.join("")
368                    );
369                }
370            }
371            Err(e) => {
372                panic!("Failed to complete validation run for component '{component_name}': {e}")
373            }
374        }
375    });
376}
377
378#[cfg(feature = "component-validation-runner")]
379fn get_validation_configuration_from_test_case_path(
380    test_case_data_path: &std::path::Path,
381) -> Result<ValidationConfiguration, String> {
382    // The test case data path should follow a fixed structure where the 2nd to last segment is
383    // the component type, and the last segment -- when the extension is removed -- is the
384    // component name.
385    let mut path_segments = test_case_data_path
386        .components()
387        .filter_map(|c| match c {
388            std::path::Component::Normal(path) => Some(std::path::Path::new(path)),
389            _ => None,
390        })
391        .collect::<std::collections::VecDeque<_>>();
392    if path_segments.len() <= 2 {
393        return Err(format!(
394            "Test case data path contained {} normal path segment(s), expected at least 2 or more.",
395            path_segments.len()
396        ));
397    }
398
399    let component_name = path_segments
400        .pop_back()
401        .and_then(|segment| segment.file_stem().map(|s| s.to_string_lossy().to_string()))
402        .ok_or(format!(
403            "Test case data path '{}' contained unexpected or invalid filename.",
404            test_case_data_path.as_os_str().to_string_lossy()
405        ))?;
406
407    let component_type = path_segments
408        .pop_back()
409        .map(|segment| {
410            segment
411                .as_os_str()
412                .to_string_lossy()
413                .to_string()
414                .to_ascii_lowercase()
415        })
416        .and_then(|segment| match segment.as_str() {
417            "sources" => Some(ComponentType::Source),
418            "transforms" => Some(ComponentType::Transform),
419            "sinks" => Some(ComponentType::Sink),
420            _ => None,
421        })
422        .ok_or(format!(
423            "Test case data path '{}' contained unexpected or invalid component type.",
424            test_case_data_path.as_os_str().to_string_lossy()
425        ))?;
426
427    // Now that we've theoretically got the component type and component name, try to query the
428    // validatable component descriptions to find it.
429    ValidatableComponentDescription::query(&component_name, component_type).ok_or(format!(
430        "No validation configuration for component '{}' with component type '{}'.",
431        component_name,
432        component_type.as_str()
433    ))
434}
435
436#[cfg(feature = "component-validation-runner")]
437pub fn validate_component(test_case_data_path: std::path::PathBuf) {
438    if !test_case_data_path.exists() {
439        panic!("Component validation test invoked with path to test case data that could not be found: {}", test_case_data_path.to_string_lossy());
440    }
441
442    let configuration = get_validation_configuration_from_test_case_path(&test_case_data_path)
443        .expect("Failed to find validation configuration from given test case data path.");
444
445    run_validation(configuration, test_case_data_path);
446}
447
448#[cfg(all(test, feature = "component-validation-tests"))]
449mod tests {
450    #[test_generator::test_resources("tests/validation/components/**/*.yaml")]
451    pub fn validate_component(test_case_data_path: &str) {
452        crate::test_util::trace_init();
453
454        let test_case_data_path = std::path::PathBuf::from(test_case_data_path.to_string());
455
456        super::validate_component(test_case_data_path);
457    }
458}