1#[cfg(all(
3 test,
4 feature = "sources-demo_logs",
5 feature = "transforms-remap",
6 feature = "transforms-route",
7 feature = "transforms-filter",
8 feature = "transforms-reduce",
9 feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15 collections::{BTreeMap, HashMap, HashSet},
16 sync::Arc,
17};
18
19use futures_util::{StreamExt, stream::FuturesUnordered};
20use indexmap::IndexMap;
21use tokio::sync::{
22 Mutex,
23 oneshot::{self, Receiver},
24};
25use uuid::Uuid;
26use vrl::{
27 compiler::{Context, TargetValue, TimeZone, state::RuntimeState},
28 diagnostic::Formatter,
29 value,
30};
31
32pub use self::unit_test_components::{
33 UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34 UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{OutputId, compiler::expand_globs, graph::Graph, transform::get_transform_output_ids};
37use crate::{
38 conditions::Condition,
39 config::{
40 self, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41 TestDefinition, TestInput, TestOutput, loading, loading::ConfigBuilderLoader,
42 },
43 event::{Event, EventMetadata, LogEvent},
44 signal,
45 topology::{
46 RunningTopology,
47 builder::{TopologyPieces, TopologyPiecesBuilder},
48 },
49};
50
51pub struct UnitTest {
52 pub name: String,
53 config: Config,
54 pieces: TopologyPieces,
55 test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
56}
57
58pub struct UnitTestResult {
59 pub errors: Vec<String>,
60}
61
62impl UnitTest {
63 pub async fn run(self) -> UnitTestResult {
64 let diff = config::ConfigDiff::initial(&self.config);
65 let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
66 .await
67 .unwrap();
68 topology.sources_finished().await;
69 let _stop_complete = topology.stop();
70
71 let mut in_flight = self
72 .test_result_rxs
73 .into_iter()
74 .collect::<FuturesUnordered<_>>();
75
76 let mut errors = Vec::new();
77 while let Some(partial_result) = in_flight.next().await {
78 let partial_result = partial_result.expect(
79 "An unexpected error occurred while executing unit tests. Please try again.",
80 );
81 errors.extend(partial_result.test_errors);
82 }
83
84 UnitTestResult { errors }
85 }
86}
87
88fn init_log_schema_from_paths(
93 config_paths: &[ConfigPath],
94 deny_if_set: bool,
95) -> Result<(), Vec<String>> {
96 let builder = ConfigBuilderLoader::default()
97 .interpolate_env(true)
98 .load_from_paths(config_paths)?;
99 vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
100 Ok(())
101}
102
103pub async fn build_unit_tests_main(
104 paths: &[ConfigPath],
105 signal_handler: &mut signal::SignalHandler,
106) -> Result<Vec<UnitTest>, Vec<String>> {
107 init_log_schema_from_paths(paths, false)?;
108 let secrets_backends_loader = loading::loader_from_paths(
109 loading::SecretBackendLoader::default().interpolate_env(true),
110 paths,
111 )?;
112 let secrets = secrets_backends_loader
113 .retrieve_secrets(signal_handler)
114 .await
115 .map_err(|e| vec![e])?;
116
117 let config_builder = ConfigBuilderLoader::default()
118 .interpolate_env(true)
119 .secrets(secrets)
120 .load_from_paths(paths)?;
121
122 build_unit_tests(config_builder).await
123}
124
125pub async fn build_unit_tests(
126 mut config_builder: ConfigBuilder,
127) -> Result<Vec<UnitTest>, Vec<String>> {
128 config_builder.sources = Default::default();
130 config_builder.sinks = Default::default();
131
132 let test_definitions = std::mem::take(&mut config_builder.tests);
133 let mut tests = Vec::new();
134 let mut build_errors = Vec::new();
135 let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
136
137 for mut test_definition in test_definitions {
138 let test_name = test_definition.name.clone();
139 let legacy_input = std::mem::take(&mut test_definition.input);
141 if let Some(input) = legacy_input {
142 test_definition.inputs.push(input);
143 }
144 match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
145 Ok(test) => tests.push(test),
146 Err(errors) => {
147 let mut test_error = errors.join("\n");
148 test_error = test_error.replace('\n', "\n ");
150 test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n "));
151 build_errors.push(test_error);
152 }
153 }
154 }
155
156 if build_errors.is_empty() {
157 Ok(tests)
158 } else {
159 Err(build_errors)
160 }
161}
162
163pub struct UnitTestBuildMetadata {
164 available_insert_targets: HashSet<ComponentKey>,
166 source_ids: HashMap<ComponentKey, String>,
168 template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
171 sink_ids: HashMap<OutputId, String>,
173}
174
175impl UnitTestBuildMetadata {
176 pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
177 let random_id = Uuid::new_v4().to_string();
179
180 let available_insert_targets = config_builder
181 .transforms
182 .keys()
183 .cloned()
184 .collect::<HashSet<_>>();
185
186 let source_ids = available_insert_targets
187 .iter()
188 .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
189 .collect::<HashMap<_, _>>();
190
191 let mut template_sources = IndexMap::new();
193 for (key, transform) in config_builder.transforms.iter_mut() {
194 let test_source_id = source_ids
195 .get(key)
196 .expect("Missing test source for a transform")
197 .clone();
198 transform.inputs.extend(Some(test_source_id));
199
200 template_sources.insert(key.clone(), UnitTestSourceConfig::default());
201 }
202
203 let builder = config_builder.clone();
204 let available_extract_targets = builder
205 .transforms
206 .iter()
207 .flat_map(|(key, transform)| {
208 get_transform_output_ids(
209 transform.inner.as_ref(),
210 key.clone(),
211 builder.schema.log_namespace(),
212 )
213 })
214 .collect::<HashSet<_>>();
215
216 let sink_ids = available_extract_targets
217 .iter()
218 .map(|key| {
219 (
220 key.clone(),
221 format!(
222 "{}-{}-{}",
223 key.to_string().replace('.', "-"),
224 "sink",
225 random_id
226 ),
227 )
228 })
229 .collect::<HashMap<_, _>>();
230
231 Ok(Self {
232 available_insert_targets,
233 source_ids,
234 template_sources,
235 sink_ids,
236 })
237 }
238
239 pub fn hydrate_into_sources(
241 &self,
242 inputs: &[TestInput],
243 ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
244 let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
245 let mut template_sources = self.template_sources.clone();
246 Ok(inputs
247 .into_iter()
248 .map(|(insert_at, events)| {
249 let mut source_config =
250 template_sources
251 .shift_remove(&insert_at)
252 .unwrap_or_else(|| {
253 panic!(
257 "Invalid input: cannot insert at {:?}",
258 insert_at.to_string()
259 )
260 });
261 source_config.events.extend(events);
262 let id: &str = self
263 .source_ids
264 .get(&insert_at)
265 .expect("Corresponding source must exist")
266 .as_ref();
267 (ComponentKey::from(id), SourceOuter::new(source_config))
268 })
269 .collect::<IndexMap<_, _>>())
270 }
271
272 pub fn hydrate_into_sinks(
274 &self,
275 test_name: &str,
276 outputs: &[TestOutput],
277 no_outputs_from: &[OutputId],
278 ) -> Result<
279 (
280 Vec<Receiver<UnitTestSinkResult>>,
281 IndexMap<ComponentKey, SinkOuter<String>>,
282 ),
283 Vec<String>,
284 > {
285 if outputs.is_empty() && no_outputs_from.is_empty() {
286 return Err(vec![
287 "unit test must contain at least one of `outputs` or `no_outputs_from`."
288 .to_string(),
289 ]);
290 }
291 let outputs = build_outputs(outputs)?;
292
293 let mut template_sinks = IndexMap::new();
294 let mut test_result_rxs = Vec::new();
295 for (ids, checks) in outputs {
297 let (tx, rx) = oneshot::channel();
298 let sink_ids = ids.clone();
299 let sink_config = UnitTestSinkConfig {
300 test_name: test_name.to_string(),
301 transform_ids: ids.iter().map(|id| id.to_string()).collect(),
302 result_tx: Arc::new(Mutex::new(Some(tx))),
303 check: UnitTestSinkCheck::Checks(checks),
304 };
305
306 test_result_rxs.push(rx);
307 template_sinks.insert(sink_ids, sink_config);
308 }
309
310 for id in no_outputs_from {
312 let (tx, rx) = oneshot::channel();
313 let sink_config = UnitTestSinkConfig {
314 test_name: test_name.to_string(),
315 transform_ids: vec![id.to_string()],
316 result_tx: Arc::new(Mutex::new(Some(tx))),
317 check: UnitTestSinkCheck::NoOutputs,
318 };
319
320 test_result_rxs.push(rx);
321 template_sinks.insert(vec![id.clone()], sink_config);
322 }
323
324 let sinks = template_sinks
325 .into_iter()
326 .map(|(transform_ids, sink_config)| {
327 let transform_ids_str = transform_ids
328 .iter()
329 .map(|s| s.to_string())
330 .collect::<Vec<_>>();
331 let sink_ids = transform_ids
332 .iter()
333 .map(|transform_id| {
334 self.sink_ids
335 .get(transform_id)
336 .expect("Sink does not exist")
337 .as_str()
338 })
339 .collect::<Vec<_>>();
340 let sink_id = sink_ids.join(",");
341 (
342 ComponentKey::from(sink_id),
343 SinkOuter::new(transform_ids_str, sink_config),
344 )
345 })
346 .collect::<IndexMap<_, _>>();
347
348 Ok((test_result_rxs, sinks))
349 }
350}
351
352fn get_relevant_test_components(
354 sources: &[&ComponentKey],
355 graph: &Graph,
356) -> Result<HashSet<String>, Vec<String>> {
357 graph.check_for_cycles().map_err(|error| vec![error])?;
358 let mut errors = Vec::new();
359 let mut components = HashSet::new();
360 for source in sources {
361 let paths = graph.paths_to_sink_from(source);
362 if paths.is_empty() {
363 errors.push(format!(
364 "Unable to complete topology between input target '{}' and output target(s)",
365 source
366 .to_string()
367 .rsplit_once("-source-")
368 .unwrap_or(("", ""))
369 .0
370 ));
371 } else {
372 for path in paths {
373 components.extend(path.into_iter().map(|key| key.to_string()));
374 }
375 }
376 }
377
378 if errors.is_empty() {
379 Ok(components)
380 } else {
381 Err(errors)
382 }
383}
384
385async fn build_unit_test(
386 metadata: &UnitTestBuildMetadata,
387 test: TestDefinition<String>,
388 mut config_builder: ConfigBuilder,
389) -> Result<UnitTest, Vec<String>> {
390 let transform_only_config = config_builder.clone();
391 let transform_only_graph = Graph::new_unchecked(
392 &transform_only_config.sources,
393 &transform_only_config.transforms,
394 &transform_only_config.sinks,
395 transform_only_config.schema,
396 transform_only_config
397 .global
398 .wildcard_matching
399 .unwrap_or_default(),
400 );
401 let test = test.resolve_outputs(&transform_only_graph)?;
402
403 let sources = metadata.hydrate_into_sources(&test.inputs)?;
404 let (test_result_rxs, sinks) =
405 metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
406
407 config_builder.sources = sources;
408 config_builder.sinks = sinks;
409 expand_globs(&mut config_builder);
410
411 let graph = Graph::new_unchecked(
412 &config_builder.sources,
413 &config_builder.transforms,
414 &config_builder.sinks,
415 config_builder.schema,
416 config_builder.global.wildcard_matching.unwrap_or_default(),
417 );
418
419 let mut valid_components = get_relevant_test_components(
420 config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
421 &graph,
422 )?;
423
424 let unexpanded_transforms = valid_components
426 .iter()
427 .filter_map(|component| {
428 component
429 .split_once('.')
430 .map(|(original_name, _)| original_name.to_string())
431 })
432 .collect::<Vec<_>>();
433 valid_components.extend(unexpanded_transforms);
434
435 config_builder
439 .enrichment_tables
440 .iter()
441 .filter_map(|(key, c)| c.as_sink(key).map(|(_, sink)| sink.inputs))
442 .for_each(|i| valid_components.extend(i.into_iter()));
443
444 config_builder.transforms = config_builder
446 .transforms
447 .into_iter()
448 .filter(|(key, _)| valid_components.contains(&key.to_string()))
449 .collect();
450
451 let graph = Graph::new_unchecked(
453 &config_builder.sources,
454 &config_builder.transforms,
455 &config_builder.sinks,
456 config_builder.schema,
457 config_builder.global.wildcard_matching.unwrap_or_default(),
458 );
459 let valid_inputs = graph.input_map()?;
460 for (_, transform) in config_builder.transforms.iter_mut() {
461 let inputs = std::mem::take(&mut transform.inputs);
462 transform.inputs = inputs
463 .into_iter()
464 .filter(|input| valid_inputs.contains_key(input))
465 .collect();
466 }
467
468 if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
469 config_builder
470 .sinks
471 .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
472 }
473 let config = config_builder.build()?;
474 let diff = config::ConfigDiff::initial(&config);
475 let pieces = TopologyPiecesBuilder::new(&config, &diff).build().await?;
476
477 Ok(UnitTest {
478 name: test.name,
479 config,
480 pieces,
481 test_result_rxs,
482 })
483}
484
485fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
492 let config = config.clone();
493 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
494 get_transform_output_ids(
495 transform.inner.as_ref(),
496 key.clone(),
497 config.schema.log_namespace(),
498 )
499 .map(|output| output.to_string())
500 .collect::<Vec<_>>()
501 });
502
503 let mut loose_end_outputs = Vec::new();
504 for id in transform_ids {
505 if !config
506 .transforms
507 .iter()
508 .any(|(_, transform)| transform.inputs.contains(&id))
509 && !config
510 .sinks
511 .iter()
512 .any(|(_, sink)| sink.inputs.contains(&id))
513 {
514 loose_end_outputs.push(id);
515 }
516 }
517
518 if loose_end_outputs.is_empty() {
519 None
520 } else {
521 let noop_sink = UnitTestSinkConfig {
522 test_name: "".to_string(),
523 transform_ids: vec![],
524 result_tx: Arc::new(Mutex::new(None)),
525 check: UnitTestSinkCheck::NoOp,
526 };
527 Some(SinkOuter::new(loose_end_outputs, noop_sink))
528 }
529}
530
531fn build_and_validate_inputs(
532 test_inputs: &[TestInput],
533 available_insert_targets: &HashSet<ComponentKey>,
534) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
535 let mut inputs = HashMap::new();
536 let mut errors = Vec::new();
537 if test_inputs.is_empty() {
538 errors.push("must specify at least one input.".to_string());
539 return Err(errors);
540 }
541
542 for (index, input) in test_inputs.iter().enumerate() {
543 if available_insert_targets.contains(&input.insert_at) {
544 match build_input_event(input) {
545 Ok(input_event) => {
546 inputs
547 .entry(input.insert_at.clone())
548 .and_modify(|events: &mut Vec<Event>| {
549 events.push(input_event.clone());
550 })
551 .or_insert_with(|| vec![input_event]);
552 }
553 Err(error) => errors.push(error),
554 }
555 } else {
556 errors.push(format!(
557 "inputs[{}]: unable to locate target transform '{}'",
558 index, input.insert_at
559 ))
560 }
561 }
562
563 if errors.is_empty() {
564 Ok(inputs)
565 } else {
566 Err(errors)
567 }
568}
569
570fn build_outputs(
571 test_outputs: &[TestOutput],
572) -> Result<IndexMap<Vec<OutputId>, Vec<Vec<Condition>>>, Vec<String>> {
573 let mut outputs: IndexMap<Vec<OutputId>, Vec<Vec<Condition>>> = IndexMap::new();
574 let mut errors = Vec::new();
575
576 for output in test_outputs {
577 let mut conditions = Vec::new();
578 for (index, condition) in output
579 .conditions
580 .clone()
581 .unwrap_or_default()
582 .iter()
583 .enumerate()
584 {
585 match condition.build(&Default::default()) {
586 Ok(condition) => conditions.push(condition),
587 Err(error) => errors.push(format!(
588 "failed to create test condition '{index}': {error}"
589 )),
590 }
591 }
592
593 outputs
594 .entry(output.extract_from.clone().to_vec())
595 .and_modify(|existing_conditions| existing_conditions.push(conditions.clone()))
596 .or_insert(vec![conditions.clone()]);
597 }
598
599 if errors.is_empty() {
600 Ok(outputs)
601 } else {
602 Err(errors)
603 }
604}
605
606fn build_input_event(input: &TestInput) -> Result<Event, String> {
607 match input.type_str.as_ref() {
608 "raw" => match input.value.as_ref() {
609 Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
610 None => Err("input type 'raw' requires the field 'value'".to_string()),
611 },
612 "vrl" => {
613 if let Some(source) = &input.source {
614 let fns = vrl::stdlib::all();
615 let result = vrl::compiler::compile(source, &fns)
616 .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
617
618 let mut target = TargetValue {
619 value: value!({}),
620 metadata: value::Value::Object(BTreeMap::new()),
621 secrets: value::Secrets::default(),
622 };
623
624 let mut state = RuntimeState::default();
625 let timezone = TimeZone::default();
626 let mut ctx = Context::new(&mut target, &mut state, &timezone);
627
628 result
629 .program
630 .resolve(&mut ctx)
631 .map(|_| {
632 Event::Log(LogEvent::from_parts(
633 target.value.clone(),
634 EventMetadata::default_with_value(target.metadata.clone()),
635 ))
636 })
637 .map_err(|e| e.to_string())
638 } else {
639 Err("input type 'vrl' requires the field 'source'".to_string())
640 }
641 }
642 "log" => {
643 if let Some(log_fields) = &input.log_fields {
644 let mut event = LogEvent::from_str_legacy("");
645 for (path, value) in log_fields {
646 event
647 .parse_path_and_insert(path, value.clone())
648 .map_err(|e| e.to_string())?;
649 }
650 Ok(event.into())
651 } else {
652 Err("input type 'log' requires the field 'log_fields'".to_string())
653 }
654 }
655 "metric" => {
656 if let Some(metric) = &input.metric {
657 Ok(Event::Metric(metric.clone()))
658 } else {
659 Err("input type 'metric' requires the field 'metric'".to_string())
660 }
661 }
662 _ => Err(format!(
663 "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
664 input.type_str
665 )),
666 }
667}