1#[cfg(all(
3 test,
4 feature = "sources-demo_logs",
5 feature = "transforms-remap",
6 feature = "transforms-route",
7 feature = "transforms-filter",
8 feature = "transforms-reduce",
9 feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15 collections::{BTreeMap, HashMap, HashSet},
16 sync::Arc,
17};
18
19use futures_util::{stream::FuturesUnordered, StreamExt};
20use indexmap::IndexMap;
21use tokio::sync::{
22 oneshot::{self, Receiver},
23 Mutex,
24};
25use uuid::Uuid;
26use vrl::{
27 compiler::{state::RuntimeState, Context, TargetValue, TimeZone},
28 diagnostic::Formatter,
29 value,
30};
31
32pub use self::unit_test_components::{
33 UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34 UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{compiler::expand_globs, graph::Graph, transform::get_transform_output_ids, OutputId};
37use crate::{
38 conditions::Condition,
39 config::{
40 self, loading, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41 TestDefinition, TestInput, TestOutput,
42 },
43 event::{Event, EventMetadata, LogEvent},
44 signal,
45 topology::{builder::TopologyPieces, RunningTopology},
46};
47
48pub struct UnitTest {
49 pub name: String,
50 config: Config,
51 pieces: TopologyPieces,
52 test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
53}
54
55pub struct UnitTestResult {
56 pub errors: Vec<String>,
57}
58
59impl UnitTest {
60 pub async fn run(self) -> UnitTestResult {
61 let diff = config::ConfigDiff::initial(&self.config);
62 let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
63 .await
64 .unwrap();
65 topology.sources_finished().await;
66 let _stop_complete = topology.stop();
67
68 let mut in_flight = self
69 .test_result_rxs
70 .into_iter()
71 .collect::<FuturesUnordered<_>>();
72
73 let mut errors = Vec::new();
74 while let Some(partial_result) = in_flight.next().await {
75 let partial_result = partial_result.expect(
76 "An unexpected error occurred while executing unit tests. Please try again.",
77 );
78 errors.extend(partial_result.test_errors);
79 }
80
81 UnitTestResult { errors }
82 }
83}
84
85fn init_log_schema_from_paths(
90 config_paths: &[ConfigPath],
91 deny_if_set: bool,
92) -> Result<(), Vec<String>> {
93 let builder = config::loading::load_builder_from_paths(config_paths)?;
94 vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
95 Ok(())
96}
97
98pub async fn build_unit_tests_main(
99 paths: &[ConfigPath],
100 signal_handler: &mut signal::SignalHandler,
101) -> Result<Vec<UnitTest>, Vec<String>> {
102 init_log_schema_from_paths(paths, false)?;
103 let mut secrets_backends_loader = loading::load_secret_backends_from_paths(paths)?;
104 let config_builder = if secrets_backends_loader.has_secrets_to_retrieve() {
105 let resolved_secrets = secrets_backends_loader
106 .retrieve(&mut signal_handler.subscribe())
107 .await
108 .map_err(|e| vec![e])?;
109 loading::load_builder_from_paths_with_secrets(paths, resolved_secrets)?
110 } else {
111 loading::load_builder_from_paths(paths)?
112 };
113
114 build_unit_tests(config_builder).await
115}
116
117pub async fn build_unit_tests(
118 mut config_builder: ConfigBuilder,
119) -> Result<Vec<UnitTest>, Vec<String>> {
120 config_builder.sources = Default::default();
122 config_builder.sinks = Default::default();
123
124 let test_definitions = std::mem::take(&mut config_builder.tests);
125 let mut tests = Vec::new();
126 let mut build_errors = Vec::new();
127 let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
128
129 for mut test_definition in test_definitions {
130 let test_name = test_definition.name.clone();
131 let legacy_input = std::mem::take(&mut test_definition.input);
133 if let Some(input) = legacy_input {
134 test_definition.inputs.push(input);
135 }
136 match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
137 Ok(test) => tests.push(test),
138 Err(errors) => {
139 let mut test_error = errors.join("\n");
140 test_error = test_error.replace('\n', "\n ");
142 test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n "));
143 build_errors.push(test_error);
144 }
145 }
146 }
147
148 if build_errors.is_empty() {
149 Ok(tests)
150 } else {
151 Err(build_errors)
152 }
153}
154
155pub struct UnitTestBuildMetadata {
156 available_insert_targets: HashSet<ComponentKey>,
158 source_ids: HashMap<ComponentKey, String>,
160 template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
163 sink_ids: HashMap<OutputId, String>,
165}
166
167impl UnitTestBuildMetadata {
168 pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
169 let random_id = Uuid::new_v4().to_string();
171
172 let available_insert_targets = config_builder
173 .transforms
174 .keys()
175 .cloned()
176 .collect::<HashSet<_>>();
177
178 let source_ids = available_insert_targets
179 .iter()
180 .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
181 .collect::<HashMap<_, _>>();
182
183 let mut template_sources = IndexMap::new();
185 for (key, transform) in config_builder.transforms.iter_mut() {
186 let test_source_id = source_ids
187 .get(key)
188 .expect("Missing test source for a transform")
189 .clone();
190 transform.inputs.extend(Some(test_source_id));
191
192 template_sources.insert(key.clone(), UnitTestSourceConfig::default());
193 }
194
195 let builder = config_builder.clone();
196 let available_extract_targets = builder
197 .transforms
198 .iter()
199 .flat_map(|(key, transform)| {
200 get_transform_output_ids(
201 transform.inner.as_ref(),
202 key.clone(),
203 builder.schema.log_namespace(),
204 )
205 })
206 .collect::<HashSet<_>>();
207
208 let sink_ids = available_extract_targets
209 .iter()
210 .map(|key| {
211 (
212 key.clone(),
213 format!(
214 "{}-{}-{}",
215 key.to_string().replace('.', "-"),
216 "sink",
217 random_id
218 ),
219 )
220 })
221 .collect::<HashMap<_, _>>();
222
223 Ok(Self {
224 available_insert_targets,
225 source_ids,
226 template_sources,
227 sink_ids,
228 })
229 }
230
231 pub fn hydrate_into_sources(
233 &self,
234 inputs: &[TestInput],
235 ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
236 let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
237 let mut template_sources = self.template_sources.clone();
238 Ok(inputs
239 .into_iter()
240 .map(|(insert_at, events)| {
241 let mut source_config =
242 template_sources
243 .shift_remove(&insert_at)
244 .unwrap_or_else(|| {
245 panic!(
249 "Invalid input: cannot insert at {:?}",
250 insert_at.to_string()
251 )
252 });
253 source_config.events.extend(events);
254 let id: &str = self
255 .source_ids
256 .get(&insert_at)
257 .expect("Corresponding source must exist")
258 .as_ref();
259 (ComponentKey::from(id), SourceOuter::new(source_config))
260 })
261 .collect::<IndexMap<_, _>>())
262 }
263
264 pub fn hydrate_into_sinks(
266 &self,
267 test_name: &str,
268 outputs: &[TestOutput],
269 no_outputs_from: &[OutputId],
270 ) -> Result<
271 (
272 Vec<Receiver<UnitTestSinkResult>>,
273 IndexMap<ComponentKey, SinkOuter<String>>,
274 ),
275 Vec<String>,
276 > {
277 if outputs.is_empty() && no_outputs_from.is_empty() {
278 return Err(vec![
279 "unit test must contain at least one of `outputs` or `no_outputs_from`."
280 .to_string(),
281 ]);
282 }
283 let outputs = build_outputs(outputs)?;
284
285 let mut template_sinks = IndexMap::new();
286 let mut test_result_rxs = Vec::new();
287 for (ids, checks) in outputs {
289 let (tx, rx) = oneshot::channel();
290 let sink_ids = ids.clone();
291 let sink_config = UnitTestSinkConfig {
292 test_name: test_name.to_string(),
293 transform_ids: ids.iter().map(|id| id.to_string()).collect(),
294 result_tx: Arc::new(Mutex::new(Some(tx))),
295 check: UnitTestSinkCheck::Checks(checks),
296 };
297
298 test_result_rxs.push(rx);
299 template_sinks.insert(sink_ids, sink_config);
300 }
301
302 for id in no_outputs_from {
304 let (tx, rx) = oneshot::channel();
305 let sink_config = UnitTestSinkConfig {
306 test_name: test_name.to_string(),
307 transform_ids: vec![id.to_string()],
308 result_tx: Arc::new(Mutex::new(Some(tx))),
309 check: UnitTestSinkCheck::NoOutputs,
310 };
311
312 test_result_rxs.push(rx);
313 template_sinks.insert(vec![id.clone()], sink_config);
314 }
315
316 let sinks = template_sinks
317 .into_iter()
318 .map(|(transform_ids, sink_config)| {
319 let transform_ids_str = transform_ids
320 .iter()
321 .map(|s| s.to_string())
322 .collect::<Vec<_>>();
323 let sink_ids = transform_ids
324 .iter()
325 .map(|transform_id| {
326 self.sink_ids
327 .get(transform_id)
328 .expect("Sink does not exist")
329 .as_str()
330 })
331 .collect::<Vec<_>>();
332 let sink_id = sink_ids.join(",");
333 (
334 ComponentKey::from(sink_id),
335 SinkOuter::new(transform_ids_str, sink_config),
336 )
337 })
338 .collect::<IndexMap<_, _>>();
339
340 Ok((test_result_rxs, sinks))
341 }
342}
343
344fn get_relevant_test_components(
346 sources: &[&ComponentKey],
347 graph: &Graph,
348) -> Result<HashSet<String>, Vec<String>> {
349 graph.check_for_cycles().map_err(|error| vec![error])?;
350 let mut errors = Vec::new();
351 let mut components = HashSet::new();
352 for source in sources {
353 let paths = graph.paths_to_sink_from(source);
354 if paths.is_empty() {
355 errors.push(format!(
356 "Unable to complete topology between input target '{}' and output target(s)",
357 source
358 .to_string()
359 .rsplit_once("-source-")
360 .unwrap_or(("", ""))
361 .0
362 ));
363 } else {
364 for path in paths {
365 components.extend(path.into_iter().map(|key| key.to_string()));
366 }
367 }
368 }
369
370 if errors.is_empty() {
371 Ok(components)
372 } else {
373 Err(errors)
374 }
375}
376
377async fn build_unit_test(
378 metadata: &UnitTestBuildMetadata,
379 test: TestDefinition<String>,
380 mut config_builder: ConfigBuilder,
381) -> Result<UnitTest, Vec<String>> {
382 let transform_only_config = config_builder.clone();
383 let transform_only_graph = Graph::new_unchecked(
384 &transform_only_config.sources,
385 &transform_only_config.transforms,
386 &transform_only_config.sinks,
387 transform_only_config.schema,
388 transform_only_config
389 .global
390 .wildcard_matching
391 .unwrap_or_default(),
392 );
393 let test = test.resolve_outputs(&transform_only_graph)?;
394
395 let sources = metadata.hydrate_into_sources(&test.inputs)?;
396 let (test_result_rxs, sinks) =
397 metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
398
399 config_builder.sources = sources;
400 config_builder.sinks = sinks;
401 expand_globs(&mut config_builder);
402
403 let graph = Graph::new_unchecked(
404 &config_builder.sources,
405 &config_builder.transforms,
406 &config_builder.sinks,
407 config_builder.schema,
408 config_builder.global.wildcard_matching.unwrap_or_default(),
409 );
410
411 let mut valid_components = get_relevant_test_components(
412 config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
413 &graph,
414 )?;
415
416 let unexpanded_transforms = valid_components
418 .iter()
419 .filter_map(|component| {
420 component
421 .split_once('.')
422 .map(|(original_name, _)| original_name.to_string())
423 })
424 .collect::<Vec<_>>();
425 valid_components.extend(unexpanded_transforms);
426
427 config_builder.transforms = config_builder
429 .transforms
430 .into_iter()
431 .filter(|(key, _)| valid_components.contains(&key.to_string()))
432 .collect();
433
434 let graph = Graph::new_unchecked(
436 &config_builder.sources,
437 &config_builder.transforms,
438 &config_builder.sinks,
439 config_builder.schema,
440 config_builder.global.wildcard_matching.unwrap_or_default(),
441 );
442 let valid_inputs = graph.input_map()?;
443 for (_, transform) in config_builder.transforms.iter_mut() {
444 let inputs = std::mem::take(&mut transform.inputs);
445 transform.inputs = inputs
446 .into_iter()
447 .filter(|input| valid_inputs.contains_key(input))
448 .collect();
449 }
450
451 if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
452 config_builder
453 .sinks
454 .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
455 }
456 let config = config_builder.build()?;
457 let diff = config::ConfigDiff::initial(&config);
458 let pieces = TopologyPieces::build(&config, &diff, HashMap::new(), Default::default()).await?;
459
460 Ok(UnitTest {
461 name: test.name,
462 config,
463 pieces,
464 test_result_rxs,
465 })
466}
467
468fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
475 let config = config.clone();
476 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
477 get_transform_output_ids(
478 transform.inner.as_ref(),
479 key.clone(),
480 config.schema.log_namespace(),
481 )
482 .map(|output| output.to_string())
483 .collect::<Vec<_>>()
484 });
485
486 let mut loose_end_outputs = Vec::new();
487 for id in transform_ids {
488 if !config
489 .transforms
490 .iter()
491 .any(|(_, transform)| transform.inputs.contains(&id))
492 && !config
493 .sinks
494 .iter()
495 .any(|(_, sink)| sink.inputs.contains(&id))
496 {
497 loose_end_outputs.push(id);
498 }
499 }
500
501 if loose_end_outputs.is_empty() {
502 None
503 } else {
504 let noop_sink = UnitTestSinkConfig {
505 test_name: "".to_string(),
506 transform_ids: vec![],
507 result_tx: Arc::new(Mutex::new(None)),
508 check: UnitTestSinkCheck::NoOp,
509 };
510 Some(SinkOuter::new(loose_end_outputs, noop_sink))
511 }
512}
513
514fn build_and_validate_inputs(
515 test_inputs: &[TestInput],
516 available_insert_targets: &HashSet<ComponentKey>,
517) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
518 let mut inputs = HashMap::new();
519 let mut errors = Vec::new();
520 if test_inputs.is_empty() {
521 errors.push("must specify at least one input.".to_string());
522 return Err(errors);
523 }
524
525 for (index, input) in test_inputs.iter().enumerate() {
526 if available_insert_targets.contains(&input.insert_at) {
527 match build_input_event(input) {
528 Ok(input_event) => {
529 inputs
530 .entry(input.insert_at.clone())
531 .and_modify(|events: &mut Vec<Event>| {
532 events.push(input_event.clone());
533 })
534 .or_insert_with(|| vec![input_event]);
535 }
536 Err(error) => errors.push(error),
537 }
538 } else {
539 errors.push(format!(
540 "inputs[{}]: unable to locate target transform '{}'",
541 index, input.insert_at
542 ))
543 }
544 }
545
546 if errors.is_empty() {
547 Ok(inputs)
548 } else {
549 Err(errors)
550 }
551}
552
553fn build_outputs(
554 test_outputs: &[TestOutput],
555) -> Result<IndexMap<Vec<OutputId>, Vec<Vec<Condition>>>, Vec<String>> {
556 let mut outputs: IndexMap<Vec<OutputId>, Vec<Vec<Condition>>> = IndexMap::new();
557 let mut errors = Vec::new();
558
559 for output in test_outputs {
560 let mut conditions = Vec::new();
561 for (index, condition) in output
562 .conditions
563 .clone()
564 .unwrap_or_default()
565 .iter()
566 .enumerate()
567 {
568 match condition.build(&Default::default()) {
569 Ok(condition) => conditions.push(condition),
570 Err(error) => errors.push(format!(
571 "failed to create test condition '{index}': {error}"
572 )),
573 }
574 }
575
576 outputs
577 .entry(output.extract_from.clone().to_vec())
578 .and_modify(|existing_conditions| existing_conditions.push(conditions.clone()))
579 .or_insert(vec![conditions.clone()]);
580 }
581
582 if errors.is_empty() {
583 Ok(outputs)
584 } else {
585 Err(errors)
586 }
587}
588
589fn build_input_event(input: &TestInput) -> Result<Event, String> {
590 match input.type_str.as_ref() {
591 "raw" => match input.value.as_ref() {
592 Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
593 None => Err("input type 'raw' requires the field 'value'".to_string()),
594 },
595 "vrl" => {
596 if let Some(source) = &input.source {
597 let fns = vrl::stdlib::all();
598 let result = vrl::compiler::compile(source, &fns)
599 .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
600
601 let mut target = TargetValue {
602 value: value!({}),
603 metadata: value::Value::Object(BTreeMap::new()),
604 secrets: value::Secrets::default(),
605 };
606
607 let mut state = RuntimeState::default();
608 let timezone = TimeZone::default();
609 let mut ctx = Context::new(&mut target, &mut state, &timezone);
610
611 result
612 .program
613 .resolve(&mut ctx)
614 .map(|_| {
615 Event::Log(LogEvent::from_parts(
616 target.value.clone(),
617 EventMetadata::default_with_value(target.metadata.clone()),
618 ))
619 })
620 .map_err(|e| e.to_string())
621 } else {
622 Err("input type 'vrl' requires the field 'source'".to_string())
623 }
624 }
625 "log" => {
626 if let Some(log_fields) = &input.log_fields {
627 let mut event = LogEvent::from_str_legacy("");
628 for (path, value) in log_fields {
629 event
630 .parse_path_and_insert(path, value.clone())
631 .map_err(|e| e.to_string())?;
632 }
633 Ok(event.into())
634 } else {
635 Err("input type 'log' requires the field 'log_fields'".to_string())
636 }
637 }
638 "metric" => {
639 if let Some(metric) = &input.metric {
640 Ok(Event::Metric(metric.clone()))
641 } else {
642 Err("input type 'metric' requires the field 'metric'".to_string())
643 }
644 }
645 _ => Err(format!(
646 "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
647 input.type_str
648 )),
649 }
650}