1#[cfg(all(
3 test,
4 feature = "sources-demo_logs",
5 feature = "transforms-remap",
6 feature = "transforms-route",
7 feature = "transforms-filter",
8 feature = "transforms-reduce",
9 feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15 collections::{BTreeMap, HashMap, HashSet},
16 sync::Arc,
17};
18
19use futures_util::{StreamExt, stream::FuturesUnordered};
20use indexmap::IndexMap;
21use tokio::sync::{
22 Mutex,
23 oneshot::{self, Receiver},
24};
25use uuid::Uuid;
26use vrl::{
27 compiler::{Context, TargetValue, TimeZone, state::RuntimeState},
28 diagnostic::Formatter,
29 value,
30};
31
32pub use self::unit_test_components::{
33 UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34 UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{OutputId, compiler::expand_globs, graph::Graph, transform::get_transform_output_ids};
37use crate::{
38 conditions::Condition,
39 config::{
40 self, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41 TestDefinition, TestInput, TestOutput, loading,
42 },
43 event::{Event, EventMetadata, LogEvent},
44 signal,
45 topology::{
46 RunningTopology,
47 builder::{TopologyPieces, TopologyPiecesBuilder},
48 },
49};
50
51pub struct UnitTest {
52 pub name: String,
53 config: Config,
54 pieces: TopologyPieces,
55 test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
56}
57
58pub struct UnitTestResult {
59 pub errors: Vec<String>,
60}
61
62impl UnitTest {
63 pub async fn run(self) -> UnitTestResult {
64 let diff = config::ConfigDiff::initial(&self.config);
65 let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
66 .await
67 .unwrap();
68 topology.sources_finished().await;
69 let _stop_complete = topology.stop();
70
71 let mut in_flight = self
72 .test_result_rxs
73 .into_iter()
74 .collect::<FuturesUnordered<_>>();
75
76 let mut errors = Vec::new();
77 while let Some(partial_result) = in_flight.next().await {
78 let partial_result = partial_result.expect(
79 "An unexpected error occurred while executing unit tests. Please try again.",
80 );
81 errors.extend(partial_result.test_errors);
82 }
83
84 UnitTestResult { errors }
85 }
86}
87
88fn init_log_schema_from_paths(
93 config_paths: &[ConfigPath],
94 deny_if_set: bool,
95) -> Result<(), Vec<String>> {
96 let builder = config::loading::load_builder_from_paths(config_paths)?;
97 vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
98 Ok(())
99}
100
101pub async fn build_unit_tests_main(
102 paths: &[ConfigPath],
103 signal_handler: &mut signal::SignalHandler,
104) -> Result<Vec<UnitTest>, Vec<String>> {
105 init_log_schema_from_paths(paths, false)?;
106 let mut secrets_backends_loader = loading::load_secret_backends_from_paths(paths)?;
107 let config_builder = if secrets_backends_loader.has_secrets_to_retrieve() {
108 let resolved_secrets = secrets_backends_loader
109 .retrieve(&mut signal_handler.subscribe())
110 .await
111 .map_err(|e| vec![e])?;
112 loading::load_builder_from_paths_with_secrets(paths, resolved_secrets)?
113 } else {
114 loading::load_builder_from_paths(paths)?
115 };
116
117 build_unit_tests(config_builder).await
118}
119
120pub async fn build_unit_tests(
121 mut config_builder: ConfigBuilder,
122) -> Result<Vec<UnitTest>, Vec<String>> {
123 config_builder.sources = Default::default();
125 config_builder.sinks = Default::default();
126
127 let test_definitions = std::mem::take(&mut config_builder.tests);
128 let mut tests = Vec::new();
129 let mut build_errors = Vec::new();
130 let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
131
132 for mut test_definition in test_definitions {
133 let test_name = test_definition.name.clone();
134 let legacy_input = std::mem::take(&mut test_definition.input);
136 if let Some(input) = legacy_input {
137 test_definition.inputs.push(input);
138 }
139 match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
140 Ok(test) => tests.push(test),
141 Err(errors) => {
142 let mut test_error = errors.join("\n");
143 test_error = test_error.replace('\n', "\n ");
145 test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n "));
146 build_errors.push(test_error);
147 }
148 }
149 }
150
151 if build_errors.is_empty() {
152 Ok(tests)
153 } else {
154 Err(build_errors)
155 }
156}
157
158pub struct UnitTestBuildMetadata {
159 available_insert_targets: HashSet<ComponentKey>,
161 source_ids: HashMap<ComponentKey, String>,
163 template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
166 sink_ids: HashMap<OutputId, String>,
168}
169
170impl UnitTestBuildMetadata {
171 pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
172 let random_id = Uuid::new_v4().to_string();
174
175 let available_insert_targets = config_builder
176 .transforms
177 .keys()
178 .cloned()
179 .collect::<HashSet<_>>();
180
181 let source_ids = available_insert_targets
182 .iter()
183 .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
184 .collect::<HashMap<_, _>>();
185
186 let mut template_sources = IndexMap::new();
188 for (key, transform) in config_builder.transforms.iter_mut() {
189 let test_source_id = source_ids
190 .get(key)
191 .expect("Missing test source for a transform")
192 .clone();
193 transform.inputs.extend(Some(test_source_id));
194
195 template_sources.insert(key.clone(), UnitTestSourceConfig::default());
196 }
197
198 let builder = config_builder.clone();
199 let available_extract_targets = builder
200 .transforms
201 .iter()
202 .flat_map(|(key, transform)| {
203 get_transform_output_ids(
204 transform.inner.as_ref(),
205 key.clone(),
206 builder.schema.log_namespace(),
207 )
208 })
209 .collect::<HashSet<_>>();
210
211 let sink_ids = available_extract_targets
212 .iter()
213 .map(|key| {
214 (
215 key.clone(),
216 format!(
217 "{}-{}-{}",
218 key.to_string().replace('.', "-"),
219 "sink",
220 random_id
221 ),
222 )
223 })
224 .collect::<HashMap<_, _>>();
225
226 Ok(Self {
227 available_insert_targets,
228 source_ids,
229 template_sources,
230 sink_ids,
231 })
232 }
233
234 pub fn hydrate_into_sources(
236 &self,
237 inputs: &[TestInput],
238 ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
239 let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
240 let mut template_sources = self.template_sources.clone();
241 Ok(inputs
242 .into_iter()
243 .map(|(insert_at, events)| {
244 let mut source_config =
245 template_sources
246 .shift_remove(&insert_at)
247 .unwrap_or_else(|| {
248 panic!(
252 "Invalid input: cannot insert at {:?}",
253 insert_at.to_string()
254 )
255 });
256 source_config.events.extend(events);
257 let id: &str = self
258 .source_ids
259 .get(&insert_at)
260 .expect("Corresponding source must exist")
261 .as_ref();
262 (ComponentKey::from(id), SourceOuter::new(source_config))
263 })
264 .collect::<IndexMap<_, _>>())
265 }
266
267 pub fn hydrate_into_sinks(
269 &self,
270 test_name: &str,
271 outputs: &[TestOutput],
272 no_outputs_from: &[OutputId],
273 ) -> Result<
274 (
275 Vec<Receiver<UnitTestSinkResult>>,
276 IndexMap<ComponentKey, SinkOuter<String>>,
277 ),
278 Vec<String>,
279 > {
280 if outputs.is_empty() && no_outputs_from.is_empty() {
281 return Err(vec![
282 "unit test must contain at least one of `outputs` or `no_outputs_from`."
283 .to_string(),
284 ]);
285 }
286 let outputs = build_outputs(outputs)?;
287
288 let mut template_sinks = IndexMap::new();
289 let mut test_result_rxs = Vec::new();
290 for (ids, checks) in outputs {
292 let (tx, rx) = oneshot::channel();
293 let sink_ids = ids.clone();
294 let sink_config = UnitTestSinkConfig {
295 test_name: test_name.to_string(),
296 transform_ids: ids.iter().map(|id| id.to_string()).collect(),
297 result_tx: Arc::new(Mutex::new(Some(tx))),
298 check: UnitTestSinkCheck::Checks(checks),
299 };
300
301 test_result_rxs.push(rx);
302 template_sinks.insert(sink_ids, sink_config);
303 }
304
305 for id in no_outputs_from {
307 let (tx, rx) = oneshot::channel();
308 let sink_config = UnitTestSinkConfig {
309 test_name: test_name.to_string(),
310 transform_ids: vec![id.to_string()],
311 result_tx: Arc::new(Mutex::new(Some(tx))),
312 check: UnitTestSinkCheck::NoOutputs,
313 };
314
315 test_result_rxs.push(rx);
316 template_sinks.insert(vec![id.clone()], sink_config);
317 }
318
319 let sinks = template_sinks
320 .into_iter()
321 .map(|(transform_ids, sink_config)| {
322 let transform_ids_str = transform_ids
323 .iter()
324 .map(|s| s.to_string())
325 .collect::<Vec<_>>();
326 let sink_ids = transform_ids
327 .iter()
328 .map(|transform_id| {
329 self.sink_ids
330 .get(transform_id)
331 .expect("Sink does not exist")
332 .as_str()
333 })
334 .collect::<Vec<_>>();
335 let sink_id = sink_ids.join(",");
336 (
337 ComponentKey::from(sink_id),
338 SinkOuter::new(transform_ids_str, sink_config),
339 )
340 })
341 .collect::<IndexMap<_, _>>();
342
343 Ok((test_result_rxs, sinks))
344 }
345}
346
347fn get_relevant_test_components(
349 sources: &[&ComponentKey],
350 graph: &Graph,
351) -> Result<HashSet<String>, Vec<String>> {
352 graph.check_for_cycles().map_err(|error| vec![error])?;
353 let mut errors = Vec::new();
354 let mut components = HashSet::new();
355 for source in sources {
356 let paths = graph.paths_to_sink_from(source);
357 if paths.is_empty() {
358 errors.push(format!(
359 "Unable to complete topology between input target '{}' and output target(s)",
360 source
361 .to_string()
362 .rsplit_once("-source-")
363 .unwrap_or(("", ""))
364 .0
365 ));
366 } else {
367 for path in paths {
368 components.extend(path.into_iter().map(|key| key.to_string()));
369 }
370 }
371 }
372
373 if errors.is_empty() {
374 Ok(components)
375 } else {
376 Err(errors)
377 }
378}
379
380async fn build_unit_test(
381 metadata: &UnitTestBuildMetadata,
382 test: TestDefinition<String>,
383 mut config_builder: ConfigBuilder,
384) -> Result<UnitTest, Vec<String>> {
385 let transform_only_config = config_builder.clone();
386 let transform_only_graph = Graph::new_unchecked(
387 &transform_only_config.sources,
388 &transform_only_config.transforms,
389 &transform_only_config.sinks,
390 transform_only_config.schema,
391 transform_only_config
392 .global
393 .wildcard_matching
394 .unwrap_or_default(),
395 );
396 let test = test.resolve_outputs(&transform_only_graph)?;
397
398 let sources = metadata.hydrate_into_sources(&test.inputs)?;
399 let (test_result_rxs, sinks) =
400 metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
401
402 config_builder.sources = sources;
403 config_builder.sinks = sinks;
404 expand_globs(&mut config_builder);
405
406 let graph = Graph::new_unchecked(
407 &config_builder.sources,
408 &config_builder.transforms,
409 &config_builder.sinks,
410 config_builder.schema,
411 config_builder.global.wildcard_matching.unwrap_or_default(),
412 );
413
414 let mut valid_components = get_relevant_test_components(
415 config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
416 &graph,
417 )?;
418
419 let unexpanded_transforms = valid_components
421 .iter()
422 .filter_map(|component| {
423 component
424 .split_once('.')
425 .map(|(original_name, _)| original_name.to_string())
426 })
427 .collect::<Vec<_>>();
428 valid_components.extend(unexpanded_transforms);
429
430 config_builder.transforms = config_builder
432 .transforms
433 .into_iter()
434 .filter(|(key, _)| valid_components.contains(&key.to_string()))
435 .collect();
436
437 let graph = Graph::new_unchecked(
439 &config_builder.sources,
440 &config_builder.transforms,
441 &config_builder.sinks,
442 config_builder.schema,
443 config_builder.global.wildcard_matching.unwrap_or_default(),
444 );
445 let valid_inputs = graph.input_map()?;
446 for (_, transform) in config_builder.transforms.iter_mut() {
447 let inputs = std::mem::take(&mut transform.inputs);
448 transform.inputs = inputs
449 .into_iter()
450 .filter(|input| valid_inputs.contains_key(input))
451 .collect();
452 }
453
454 if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
455 config_builder
456 .sinks
457 .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
458 }
459 let config = config_builder.build()?;
460 let diff = config::ConfigDiff::initial(&config);
461 let pieces = TopologyPiecesBuilder::new(&config, &diff).build().await?;
462
463 Ok(UnitTest {
464 name: test.name,
465 config,
466 pieces,
467 test_result_rxs,
468 })
469}
470
471fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
478 let config = config.clone();
479 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
480 get_transform_output_ids(
481 transform.inner.as_ref(),
482 key.clone(),
483 config.schema.log_namespace(),
484 )
485 .map(|output| output.to_string())
486 .collect::<Vec<_>>()
487 });
488
489 let mut loose_end_outputs = Vec::new();
490 for id in transform_ids {
491 if !config
492 .transforms
493 .iter()
494 .any(|(_, transform)| transform.inputs.contains(&id))
495 && !config
496 .sinks
497 .iter()
498 .any(|(_, sink)| sink.inputs.contains(&id))
499 {
500 loose_end_outputs.push(id);
501 }
502 }
503
504 if loose_end_outputs.is_empty() {
505 None
506 } else {
507 let noop_sink = UnitTestSinkConfig {
508 test_name: "".to_string(),
509 transform_ids: vec![],
510 result_tx: Arc::new(Mutex::new(None)),
511 check: UnitTestSinkCheck::NoOp,
512 };
513 Some(SinkOuter::new(loose_end_outputs, noop_sink))
514 }
515}
516
517fn build_and_validate_inputs(
518 test_inputs: &[TestInput],
519 available_insert_targets: &HashSet<ComponentKey>,
520) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
521 let mut inputs = HashMap::new();
522 let mut errors = Vec::new();
523 if test_inputs.is_empty() {
524 errors.push("must specify at least one input.".to_string());
525 return Err(errors);
526 }
527
528 for (index, input) in test_inputs.iter().enumerate() {
529 if available_insert_targets.contains(&input.insert_at) {
530 match build_input_event(input) {
531 Ok(input_event) => {
532 inputs
533 .entry(input.insert_at.clone())
534 .and_modify(|events: &mut Vec<Event>| {
535 events.push(input_event.clone());
536 })
537 .or_insert_with(|| vec![input_event]);
538 }
539 Err(error) => errors.push(error),
540 }
541 } else {
542 errors.push(format!(
543 "inputs[{}]: unable to locate target transform '{}'",
544 index, input.insert_at
545 ))
546 }
547 }
548
549 if errors.is_empty() {
550 Ok(inputs)
551 } else {
552 Err(errors)
553 }
554}
555
556fn build_outputs(
557 test_outputs: &[TestOutput],
558) -> Result<IndexMap<Vec<OutputId>, Vec<Vec<Condition>>>, Vec<String>> {
559 let mut outputs: IndexMap<Vec<OutputId>, Vec<Vec<Condition>>> = IndexMap::new();
560 let mut errors = Vec::new();
561
562 for output in test_outputs {
563 let mut conditions = Vec::new();
564 for (index, condition) in output
565 .conditions
566 .clone()
567 .unwrap_or_default()
568 .iter()
569 .enumerate()
570 {
571 match condition.build(&Default::default()) {
572 Ok(condition) => conditions.push(condition),
573 Err(error) => errors.push(format!(
574 "failed to create test condition '{index}': {error}"
575 )),
576 }
577 }
578
579 outputs
580 .entry(output.extract_from.clone().to_vec())
581 .and_modify(|existing_conditions| existing_conditions.push(conditions.clone()))
582 .or_insert(vec![conditions.clone()]);
583 }
584
585 if errors.is_empty() {
586 Ok(outputs)
587 } else {
588 Err(errors)
589 }
590}
591
592fn build_input_event(input: &TestInput) -> Result<Event, String> {
593 match input.type_str.as_ref() {
594 "raw" => match input.value.as_ref() {
595 Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
596 None => Err("input type 'raw' requires the field 'value'".to_string()),
597 },
598 "vrl" => {
599 if let Some(source) = &input.source {
600 let fns = vrl::stdlib::all();
601 let result = vrl::compiler::compile(source, &fns)
602 .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
603
604 let mut target = TargetValue {
605 value: value!({}),
606 metadata: value::Value::Object(BTreeMap::new()),
607 secrets: value::Secrets::default(),
608 };
609
610 let mut state = RuntimeState::default();
611 let timezone = TimeZone::default();
612 let mut ctx = Context::new(&mut target, &mut state, &timezone);
613
614 result
615 .program
616 .resolve(&mut ctx)
617 .map(|_| {
618 Event::Log(LogEvent::from_parts(
619 target.value.clone(),
620 EventMetadata::default_with_value(target.metadata.clone()),
621 ))
622 })
623 .map_err(|e| e.to_string())
624 } else {
625 Err("input type 'vrl' requires the field 'source'".to_string())
626 }
627 }
628 "log" => {
629 if let Some(log_fields) = &input.log_fields {
630 let mut event = LogEvent::from_str_legacy("");
631 for (path, value) in log_fields {
632 event
633 .parse_path_and_insert(path, value.clone())
634 .map_err(|e| e.to_string())?;
635 }
636 Ok(event.into())
637 } else {
638 Err("input type 'log' requires the field 'log_fields'".to_string())
639 }
640 }
641 "metric" => {
642 if let Some(metric) = &input.metric {
643 Ok(Event::Metric(metric.clone()))
644 } else {
645 Err("input type 'metric' requires the field 'metric'".to_string())
646 }
647 }
648 _ => Err(format!(
649 "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
650 input.type_str
651 )),
652 }
653}