vector/components/validation/
mod.rs1mod resources;
2#[cfg(feature = "component-validation-runner")]
3mod runner;
4mod sync;
5mod test_case;
6pub mod util;
7mod validators;
8
9use vector_lib::config::LogNamespace;
10
11use crate::config::{BoxedSink, BoxedSource, BoxedTransform};
12
13pub mod prelude {
15 pub use super::ComponentTestCaseConfig;
16 pub use super::ExternalResource;
17 pub use super::HttpResourceConfig;
18 pub use super::ResourceDirection;
19 pub use super::ValidatableComponent;
20 pub use super::ValidationConfiguration;
21 pub use crate::register_validatable_component;
22}
23
24pub use self::resources::*;
25#[cfg(feature = "component-validation-runner")]
26pub use self::runner::*;
27pub use self::test_case::{TestCase, TestCaseExpectation};
28pub use self::validators::*;
29
30pub mod component_names {
31 pub const TEST_SOURCE_NAME: &str = "test_source";
32 pub const TEST_SINK_NAME: &str = "test_sink";
33 pub const TEST_TRANSFORM_NAME: &str = "test_transform";
34 pub const TEST_INPUT_SOURCE_NAME: &str = "input_source";
35 pub const TEST_OUTPUT_SINK_NAME: &str = "output_sink";
36}
37
38#[derive(Clone, Copy, Debug, Eq, PartialEq)]
42pub enum ComponentType {
43 Source,
44 Transform,
45 Sink,
46}
47
48impl ComponentType {
49 pub const fn as_str(&self) -> &'static str {
51 match self {
52 Self::Source => "source",
53 Self::Transform => "transform",
54 Self::Sink => "sink",
55 }
56 }
57}
58
59#[allow(clippy::large_enum_variant)]
61#[derive(Clone, Debug)]
62pub enum ComponentConfiguration {
63 Source(BoxedSource),
65
66 Transform(BoxedTransform),
68
69 Sink(BoxedSink),
71}
72
73#[derive(Clone)]
75pub struct ComponentTestCaseConfig {
76 config: ComponentConfiguration,
77 test_case: Option<String>,
79 external_resource: Option<ExternalResource>,
80}
81
82impl ComponentTestCaseConfig {
83 pub fn from_source<C: Into<BoxedSource>>(
84 config: C,
85 test_case: Option<String>,
86 external_resource: Option<ExternalResource>,
87 ) -> Self {
88 Self {
89 config: ComponentConfiguration::Source(config.into()),
90 test_case,
91 external_resource,
92 }
93 }
94 pub fn from_transform<C: Into<BoxedTransform>>(
95 config: C,
96 test_case: Option<String>,
97 external_resource: Option<ExternalResource>,
98 ) -> Self {
99 Self {
100 config: ComponentConfiguration::Transform(config.into()),
101 test_case,
102 external_resource,
103 }
104 }
105 pub fn from_sink<C: Into<BoxedSink>>(
106 config: C,
107 test_case: Option<String>,
108 external_resource: Option<ExternalResource>,
109 ) -> Self {
110 Self {
111 config: ComponentConfiguration::Sink(config.into()),
112 test_case,
113 external_resource,
114 }
115 }
116}
117
118#[derive(Clone)]
124pub struct ValidationConfiguration {
125 component_name: &'static str,
126 component_type: ComponentType,
127 component_configurations: Vec<ComponentTestCaseConfig>,
130 log_namespace: LogNamespace,
131}
132
133impl ValidationConfiguration {
134 pub const fn from_source(
136 component_name: &'static str,
137 log_namespace: LogNamespace,
138 component_configurations: Vec<ComponentTestCaseConfig>,
139 ) -> Self {
140 Self {
141 component_name,
142 component_type: ComponentType::Source,
143 component_configurations,
144 log_namespace,
145 }
146 }
147
148 pub const fn from_transform(
150 component_name: &'static str,
151 log_namespace: LogNamespace,
152 component_configurations: Vec<ComponentTestCaseConfig>,
153 ) -> Self {
154 Self {
155 component_name,
156 component_type: ComponentType::Transform,
157 component_configurations,
158 log_namespace,
159 }
160 }
161
162 pub const fn from_sink(
164 component_name: &'static str,
165 log_namespace: LogNamespace,
166 component_configurations: Vec<ComponentTestCaseConfig>,
167 ) -> Self {
168 Self {
169 component_name,
170 component_type: ComponentType::Sink,
171 component_configurations,
172 log_namespace,
173 }
174 }
175
176 pub const fn component_name(&self) -> &'static str {
178 self.component_name
179 }
180
181 pub const fn component_type(&self) -> ComponentType {
183 self.component_type
184 }
185
186 pub fn component_configurations(&self) -> Vec<ComponentTestCaseConfig> {
188 self.component_configurations.clone()
189 }
190
191 pub const fn log_namespace(&self) -> LogNamespace {
193 self.log_namespace
194 }
195
196 fn get_comp_test_case(&self, test_case: Option<&String>) -> Option<ComponentTestCaseConfig> {
197 let empty = String::from("");
198 let test_case = test_case.unwrap_or(&empty);
199 self.component_configurations
200 .clone()
201 .into_iter()
202 .find(|c| c.test_case.as_ref().unwrap_or(&String::from("")) == test_case)
203 }
204
205 pub fn component_configuration_for_test_case(
207 &self,
208 test_case: Option<&String>,
209 ) -> Option<ComponentConfiguration> {
210 self.get_comp_test_case(test_case).map(|c| c.config)
211 }
212
213 pub fn external_resource(&self, test_case: Option<&String>) -> Option<ExternalResource> {
215 self.get_comp_test_case(test_case)
216 .and_then(|c| c.external_resource)
217 }
218}
219
220pub trait ValidatableComponent: Send + Sync {
221 fn validation_configuration() -> ValidationConfiguration;
227}
228
229pub struct ValidatableComponentDescription {
231 validation_configuration: fn() -> ValidationConfiguration,
232}
233
234impl ValidatableComponentDescription {
235 pub const fn new<V: ValidatableComponent>() -> Self {
240 Self {
241 validation_configuration: <V as ValidatableComponent>::validation_configuration,
242 }
243 }
244
245 pub fn query(
247 component_name: &str,
248 component_type: ComponentType,
249 ) -> Option<ValidationConfiguration> {
250 inventory::iter::<Self>
251 .into_iter()
252 .map(|v| (v.validation_configuration)())
253 .find(|v| v.component_name() == component_name && v.component_type() == component_type)
254 }
255}
256
257inventory::collect!(ValidatableComponentDescription);
258
259#[macro_export]
260macro_rules! register_validatable_component {
261 ($ty:ty) => {
262 ::inventory::submit! {
263 $crate::components::validation::ValidatableComponentDescription::new::<$ty>()
264 }
265 };
266}
267
268#[derive(Default, Debug)]
272pub struct RunnerMetrics {
273 pub received_events_total: u64,
274 pub received_event_bytes_total: u64,
275 pub received_bytes_total: u64,
276 pub sent_bytes_total: u64,
277 pub sent_event_bytes_total: u64,
278 pub sent_events_total: u64,
279 pub errors_total: u64,
280 pub discarded_events_total: u64,
281}
282
283#[cfg(feature = "component-validation-runner")]
284fn run_validation(configuration: ValidationConfiguration, test_case_data_path: std::path::PathBuf) {
285 let component_name = configuration.component_name();
286 info!(
287 "Running validation for component '{}' (type: {:?})...",
288 component_name,
289 configuration.component_type()
290 );
291
292 let rt = tokio::runtime::Builder::new_current_thread()
293 .enable_all()
294 .build()
295 .unwrap();
296
297 rt.block_on(async {
298 let mut runner = Runner::from_configuration(
299 configuration,
300 test_case_data_path,
301 crate::extra_context::ExtraContext::default(),
302 );
303 runner.add_validator(StandardValidators::ComponentSpec);
304
305 match runner.run_validation().await {
306 Ok(test_case_results) => {
307 let mut details = Vec::new();
308 let mut had_failures = false;
309
310 for test_case_result in test_case_results.into_iter() {
311 for validator_result in test_case_result.validator_results() {
312 match validator_result {
313 Ok(success) => {
314 if success.is_empty() {
315 details.push(format!(
316 " test case '{}': passed",
317 test_case_result.test_name()
318 ));
319 } else {
320 let formatted = success
321 .iter()
322 .map(|s| format!(" - {s}\n"))
323 .collect::<Vec<_>>();
324
325 details.push(format!(
326 " test case '{}': passed\n{}",
327 test_case_result.test_name(),
328 formatted.join("")
329 ));
330 }
331 }
332 Err(failure) => {
333 had_failures = true;
334
335 if failure.is_empty() {
336 details.push(format!(
337 " test case '{}': failed",
338 test_case_result.test_name()
339 ));
340 } else {
341 let formatted = failure
342 .iter()
343 .map(|s| format!(" - {s}\n"))
344 .collect::<Vec<_>>();
345
346 details.push(format!(
347 " test case '{}': failed\n{}",
348 test_case_result.test_name(),
349 formatted.join("")
350 ));
351 }
352 }
353 }
354 }
355 }
356
357 if had_failures {
358 panic!(
359 "Failed to validate component '{}':\n{}",
360 component_name,
361 details.join("")
362 );
363 } else {
364 info!(
365 "Successfully validated component '{}':\n{}",
366 component_name,
367 details.join("")
368 );
369 }
370 }
371 Err(e) => {
372 panic!("Failed to complete validation run for component '{component_name}': {e}")
373 }
374 }
375 });
376}
377
378#[cfg(feature = "component-validation-runner")]
379fn get_validation_configuration_from_test_case_path(
380 test_case_data_path: &std::path::Path,
381) -> Result<ValidationConfiguration, String> {
382 let mut path_segments = test_case_data_path
386 .components()
387 .filter_map(|c| match c {
388 std::path::Component::Normal(path) => Some(std::path::Path::new(path)),
389 _ => None,
390 })
391 .collect::<std::collections::VecDeque<_>>();
392 if path_segments.len() <= 2 {
393 return Err(format!(
394 "Test case data path contained {} normal path segment(s), expected at least 2 or more.",
395 path_segments.len()
396 ));
397 }
398
399 let component_name = path_segments
400 .pop_back()
401 .and_then(|segment| segment.file_stem().map(|s| s.to_string_lossy().to_string()))
402 .ok_or(format!(
403 "Test case data path '{}' contained unexpected or invalid filename.",
404 test_case_data_path.as_os_str().to_string_lossy()
405 ))?;
406
407 let component_type = path_segments
408 .pop_back()
409 .map(|segment| {
410 segment
411 .as_os_str()
412 .to_string_lossy()
413 .to_string()
414 .to_ascii_lowercase()
415 })
416 .and_then(|segment| match segment.as_str() {
417 "sources" => Some(ComponentType::Source),
418 "transforms" => Some(ComponentType::Transform),
419 "sinks" => Some(ComponentType::Sink),
420 _ => None,
421 })
422 .ok_or(format!(
423 "Test case data path '{}' contained unexpected or invalid component type.",
424 test_case_data_path.as_os_str().to_string_lossy()
425 ))?;
426
427 ValidatableComponentDescription::query(&component_name, component_type).ok_or(format!(
430 "No validation configuration for component '{}' with component type '{}'.",
431 component_name,
432 component_type.as_str()
433 ))
434}
435
436#[cfg(feature = "component-validation-runner")]
437pub fn validate_component(test_case_data_path: std::path::PathBuf) {
438 if !test_case_data_path.exists() {
439 panic!("Component validation test invoked with path to test case data that could not be found: {}", test_case_data_path.to_string_lossy());
440 }
441
442 let configuration = get_validation_configuration_from_test_case_path(&test_case_data_path)
443 .expect("Failed to find validation configuration from given test case data path.");
444
445 run_validation(configuration, test_case_data_path);
446}
447
448#[cfg(all(test, feature = "component-validation-tests"))]
449mod tests {
450 #[test_generator::test_resources("tests/validation/components/**/*.yaml")]
451 pub fn validate_component(test_case_data_path: &str) {
452 crate::test_util::trace_init();
453
454 let test_case_data_path = std::path::PathBuf::from(test_case_data_path.to_string());
455
456 super::validate_component(test_case_data_path);
457 }
458}