vector/components/validation/
mod.rsmod resources;
#[cfg(feature = "component-validation-runner")]
mod runner;
mod sync;
mod test_case;
pub mod util;
mod validators;
use vector_lib::config::LogNamespace;
use crate::config::{BoxedSink, BoxedSource, BoxedTransform};
pub mod prelude {
pub use super::ComponentTestCaseConfig;
pub use super::ExternalResource;
pub use super::HttpResourceConfig;
pub use super::ResourceDirection;
pub use super::ValidatableComponent;
pub use super::ValidationConfiguration;
pub use crate::register_validatable_component;
}
pub use self::resources::*;
#[cfg(feature = "component-validation-runner")]
pub use self::runner::*;
pub use self::test_case::{TestCase, TestCaseExpectation};
pub use self::validators::*;
pub mod component_names {
pub const TEST_SOURCE_NAME: &str = "test_source";
pub const TEST_SINK_NAME: &str = "test_sink";
pub const TEST_TRANSFORM_NAME: &str = "test_transform";
pub const TEST_INPUT_SOURCE_NAME: &str = "input_source";
pub const TEST_OUTPUT_SINK_NAME: &str = "output_sink";
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ComponentType {
Source,
Transform,
Sink,
}
impl ComponentType {
pub const fn as_str(&self) -> &'static str {
match self {
Self::Source => "source",
Self::Transform => "transform",
Self::Sink => "sink",
}
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug)]
pub enum ComponentConfiguration {
Source(BoxedSource),
Transform(BoxedTransform),
Sink(BoxedSink),
}
#[derive(Clone)]
pub struct ComponentTestCaseConfig {
config: ComponentConfiguration,
test_case: Option<String>,
external_resource: Option<ExternalResource>,
}
impl ComponentTestCaseConfig {
pub fn from_source<C: Into<BoxedSource>>(
config: C,
test_case: Option<String>,
external_resource: Option<ExternalResource>,
) -> Self {
Self {
config: ComponentConfiguration::Source(config.into()),
test_case,
external_resource,
}
}
pub fn from_transform<C: Into<BoxedTransform>>(
config: C,
test_case: Option<String>,
external_resource: Option<ExternalResource>,
) -> Self {
Self {
config: ComponentConfiguration::Transform(config.into()),
test_case,
external_resource,
}
}
pub fn from_sink<C: Into<BoxedSink>>(
config: C,
test_case: Option<String>,
external_resource: Option<ExternalResource>,
) -> Self {
Self {
config: ComponentConfiguration::Sink(config.into()),
test_case,
external_resource,
}
}
}
#[derive(Clone)]
pub struct ValidationConfiguration {
component_name: &'static str,
component_type: ComponentType,
component_configurations: Vec<ComponentTestCaseConfig>,
log_namespace: LogNamespace,
}
impl ValidationConfiguration {
pub const fn from_source(
component_name: &'static str,
log_namespace: LogNamespace,
component_configurations: Vec<ComponentTestCaseConfig>,
) -> Self {
Self {
component_name,
component_type: ComponentType::Source,
component_configurations,
log_namespace,
}
}
pub const fn from_transform(
component_name: &'static str,
log_namespace: LogNamespace,
component_configurations: Vec<ComponentTestCaseConfig>,
) -> Self {
Self {
component_name,
component_type: ComponentType::Transform,
component_configurations,
log_namespace,
}
}
pub const fn from_sink(
component_name: &'static str,
log_namespace: LogNamespace,
component_configurations: Vec<ComponentTestCaseConfig>,
) -> Self {
Self {
component_name,
component_type: ComponentType::Sink,
component_configurations,
log_namespace,
}
}
pub const fn component_name(&self) -> &'static str {
self.component_name
}
pub const fn component_type(&self) -> ComponentType {
self.component_type
}
pub fn component_configurations(&self) -> Vec<ComponentTestCaseConfig> {
self.component_configurations.clone()
}
pub const fn log_namespace(&self) -> LogNamespace {
self.log_namespace
}
fn get_comp_test_case(&self, test_case: Option<&String>) -> Option<ComponentTestCaseConfig> {
let empty = String::from("");
let test_case = test_case.unwrap_or(&empty);
self.component_configurations
.clone()
.into_iter()
.find(|c| c.test_case.as_ref().unwrap_or(&String::from("")) == test_case)
}
pub fn component_configuration_for_test_case(
&self,
test_case: Option<&String>,
) -> Option<ComponentConfiguration> {
self.get_comp_test_case(test_case).map(|c| c.config)
}
pub fn external_resource(&self, test_case: Option<&String>) -> Option<ExternalResource> {
self.get_comp_test_case(test_case)
.and_then(|c| c.external_resource)
}
}
pub trait ValidatableComponent: Send + Sync {
fn validation_configuration() -> ValidationConfiguration;
}
pub struct ValidatableComponentDescription {
validation_configuration: fn() -> ValidationConfiguration,
}
impl ValidatableComponentDescription {
pub const fn new<V: ValidatableComponent>() -> Self {
Self {
validation_configuration: <V as ValidatableComponent>::validation_configuration,
}
}
pub fn query(
component_name: &str,
component_type: ComponentType,
) -> Option<ValidationConfiguration> {
inventory::iter::<Self>
.into_iter()
.map(|v| (v.validation_configuration)())
.find(|v| v.component_name() == component_name && v.component_type() == component_type)
}
}
inventory::collect!(ValidatableComponentDescription);
#[macro_export]
macro_rules! register_validatable_component {
($ty:ty) => {
::inventory::submit! {
$crate::components::validation::ValidatableComponentDescription::new::<$ty>()
}
};
}
#[derive(Default, Debug)]
pub struct RunnerMetrics {
pub received_events_total: u64,
pub received_event_bytes_total: u64,
pub received_bytes_total: u64,
pub sent_bytes_total: u64,
pub sent_event_bytes_total: u64,
pub sent_events_total: u64,
pub errors_total: u64,
pub discarded_events_total: u64,
}
#[cfg(feature = "component-validation-runner")]
fn run_validation(configuration: ValidationConfiguration, test_case_data_path: std::path::PathBuf) {
let component_name = configuration.component_name();
info!(
"Running validation for component '{}' (type: {:?})...",
component_name,
configuration.component_type()
);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let mut runner = Runner::from_configuration(
configuration,
test_case_data_path,
crate::extra_context::ExtraContext::default(),
);
runner.add_validator(StandardValidators::ComponentSpec);
match runner.run_validation().await {
Ok(test_case_results) => {
let mut details = Vec::new();
let mut had_failures = false;
for test_case_result in test_case_results.into_iter() {
for validator_result in test_case_result.validator_results() {
match validator_result {
Ok(success) => {
if success.is_empty() {
details.push(format!(
" test case '{}': passed",
test_case_result.test_name()
));
} else {
let formatted = success
.iter()
.map(|s| format!(" - {}\n", s))
.collect::<Vec<_>>();
details.push(format!(
" test case '{}': passed\n{}",
test_case_result.test_name(),
formatted.join("")
));
}
}
Err(failure) => {
had_failures = true;
if failure.is_empty() {
details.push(format!(
" test case '{}': failed",
test_case_result.test_name()
));
} else {
let formatted = failure
.iter()
.map(|s| format!(" - {}\n", s))
.collect::<Vec<_>>();
details.push(format!(
" test case '{}': failed\n{}",
test_case_result.test_name(),
formatted.join("")
));
}
}
}
}
}
if had_failures {
panic!(
"Failed to validate component '{}':\n{}",
component_name,
details.join("")
);
} else {
info!(
"Successfully validated component '{}':\n{}",
component_name,
details.join("")
);
}
}
Err(e) => panic!(
"Failed to complete validation run for component '{}': {}",
component_name, e
),
}
});
}
#[cfg(feature = "component-validation-runner")]
fn get_validation_configuration_from_test_case_path(
test_case_data_path: &std::path::Path,
) -> Result<ValidationConfiguration, String> {
let mut path_segments = test_case_data_path
.components()
.filter_map(|c| match c {
std::path::Component::Normal(path) => Some(std::path::Path::new(path)),
_ => None,
})
.collect::<std::collections::VecDeque<_>>();
if path_segments.len() <= 2 {
return Err(format!(
"Test case data path contained {} normal path segment(s), expected at least 2 or more.",
path_segments.len()
));
}
let component_name = path_segments
.pop_back()
.and_then(|segment| segment.file_stem().map(|s| s.to_string_lossy().to_string()))
.ok_or(format!(
"Test case data path '{}' contained unexpected or invalid filename.",
test_case_data_path.as_os_str().to_string_lossy()
))?;
let component_type = path_segments
.pop_back()
.map(|segment| {
segment
.as_os_str()
.to_string_lossy()
.to_string()
.to_ascii_lowercase()
})
.and_then(|segment| match segment.as_str() {
"sources" => Some(ComponentType::Source),
"transforms" => Some(ComponentType::Transform),
"sinks" => Some(ComponentType::Sink),
_ => None,
})
.ok_or(format!(
"Test case data path '{}' contained unexpected or invalid component type.",
test_case_data_path.as_os_str().to_string_lossy()
))?;
ValidatableComponentDescription::query(&component_name, component_type).ok_or(format!(
"No validation configuration for component '{}' with component type '{}'.",
component_name,
component_type.as_str()
))
}
#[cfg(feature = "component-validation-runner")]
pub fn validate_component(test_case_data_path: std::path::PathBuf) {
if !test_case_data_path.exists() {
panic!("Component validation test invoked with path to test case data that could not be found: {}", test_case_data_path.to_string_lossy());
}
let configuration = get_validation_configuration_from_test_case_path(&test_case_data_path)
.expect("Failed to find validation configuration from given test case data path.");
run_validation(configuration, test_case_data_path);
}
#[cfg(all(test, feature = "component-validation-tests"))]
mod tests {
#[test_generator::test_resources("tests/validation/components/**/*.yaml")]
pub fn validate_component(test_case_data_path: &str) {
crate::test_util::trace_init();
let test_case_data_path = std::path::PathBuf::from(test_case_data_path.to_string());
super::validate_component(test_case_data_path);
}
}