vector/config/unit_test/
mod.rs

1// should match vector-unit-test-tests feature
2#[cfg(all(
3    test,
4    feature = "sources-demo_logs",
5    feature = "transforms-remap",
6    feature = "transforms-route",
7    feature = "transforms-filter",
8    feature = "transforms-reduce",
9    feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15    collections::{BTreeMap, HashMap, HashSet},
16    sync::Arc,
17};
18
19use futures_util::{StreamExt, stream::FuturesUnordered};
20use indexmap::IndexMap;
21use tokio::sync::{
22    Mutex,
23    oneshot::{self, Receiver},
24};
25use uuid::Uuid;
26use vrl::{
27    compiler::{Context, TargetValue, TimeZone, state::RuntimeState},
28    diagnostic::Formatter,
29    value,
30};
31
32pub use self::unit_test_components::{
33    UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34    UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{OutputId, compiler::expand_globs, graph::Graph, transform::get_transform_output_ids};
37use crate::{
38    conditions::Condition,
39    config::{
40        self, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41        TestDefinition, TestInput, TestOutput, loading, loading::ConfigBuilderLoader,
42    },
43    event::{Event, EventMetadata, LogEvent},
44    signal,
45    topology::{
46        RunningTopology,
47        builder::{TopologyPieces, TopologyPiecesBuilder},
48    },
49};
50
51pub struct UnitTest {
52    pub name: String,
53    config: Config,
54    pieces: TopologyPieces,
55    test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
56}
57
58pub struct UnitTestResult {
59    pub errors: Vec<String>,
60}
61
62impl UnitTest {
63    pub async fn run(self) -> UnitTestResult {
64        let diff = config::ConfigDiff::initial(&self.config);
65        let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
66            .await
67            .unwrap();
68        topology.sources_finished().await;
69        let _stop_complete = topology.stop();
70
71        let mut in_flight = self
72            .test_result_rxs
73            .into_iter()
74            .collect::<FuturesUnordered<_>>();
75
76        let mut errors = Vec::new();
77        while let Some(partial_result) = in_flight.next().await {
78            let partial_result = partial_result.expect(
79                "An unexpected error occurred while executing unit tests. Please try again.",
80            );
81            errors.extend(partial_result.test_errors);
82        }
83
84        UnitTestResult { errors }
85    }
86}
87
88/// Loads Log Schema from configurations and sets global schema.
89/// Once this is done, configurations can be correctly loaded using
90/// configured log schema defaults.
91/// If deny is set, will panic if schema has already been set.
92fn init_log_schema_from_paths(
93    config_paths: &[ConfigPath],
94    deny_if_set: bool,
95) -> Result<(), Vec<String>> {
96    let builder = ConfigBuilderLoader::default()
97        .interpolate_env(true)
98        .load_from_paths(config_paths)?;
99    vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
100    Ok(())
101}
102
103pub async fn build_unit_tests_main(
104    paths: &[ConfigPath],
105    signal_handler: &mut signal::SignalHandler,
106) -> Result<Vec<UnitTest>, Vec<String>> {
107    init_log_schema_from_paths(paths, false)?;
108    let secrets_backends_loader = loading::loader_from_paths(
109        loading::SecretBackendLoader::default().interpolate_env(true),
110        paths,
111    )?;
112    let secrets = secrets_backends_loader
113        .retrieve_secrets(signal_handler)
114        .await
115        .map_err(|e| vec![e])?;
116
117    let config_builder = ConfigBuilderLoader::default()
118        .interpolate_env(true)
119        .secrets(secrets)
120        .load_from_paths(paths)?;
121
122    build_unit_tests(config_builder).await
123}
124
125pub async fn build_unit_tests(
126    mut config_builder: ConfigBuilder,
127) -> Result<Vec<UnitTest>, Vec<String>> {
128    // Sanitize config by removing existing sources and sinks
129    config_builder.sources = Default::default();
130    config_builder.sinks = Default::default();
131
132    let test_definitions = std::mem::take(&mut config_builder.tests);
133    let mut tests = Vec::new();
134    let mut build_errors = Vec::new();
135    let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
136
137    for mut test_definition in test_definitions {
138        let test_name = test_definition.name.clone();
139        // Move the legacy single test input into the inputs list if it exists
140        let legacy_input = std::mem::take(&mut test_definition.input);
141        if let Some(input) = legacy_input {
142            test_definition.inputs.push(input);
143        }
144        match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
145            Ok(test) => tests.push(test),
146            Err(errors) => {
147                let mut test_error = errors.join("\n");
148                // Indent all line breaks
149                test_error = test_error.replace('\n', "\n  ");
150                test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n  "));
151                build_errors.push(test_error);
152            }
153        }
154    }
155
156    if build_errors.is_empty() {
157        Ok(tests)
158    } else {
159        Err(build_errors)
160    }
161}
162
163pub struct UnitTestBuildMetadata {
164    // A set of all valid insert_at targets, used to validate test inputs.
165    available_insert_targets: HashSet<ComponentKey>,
166    // A mapping from transform name to unit test source name.
167    source_ids: HashMap<ComponentKey, String>,
168    // A base setup of all necessary unit test sources that can be "hydrated"
169    // with test input events to produces sources used in a particular test.
170    template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
171    // A mapping from transform name to unit test sink name.
172    sink_ids: HashMap<OutputId, String>,
173}
174
175impl UnitTestBuildMetadata {
176    pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
177        // A unique id used to name test sources and sinks to avoid name clashes
178        let random_id = Uuid::new_v4().to_string();
179
180        let available_insert_targets = config_builder
181            .transforms
182            .keys()
183            .cloned()
184            .collect::<HashSet<_>>();
185
186        let source_ids = available_insert_targets
187            .iter()
188            .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
189            .collect::<HashMap<_, _>>();
190
191        // Map a test source to every transform
192        let mut template_sources = IndexMap::new();
193        for (key, transform) in config_builder.transforms.iter_mut() {
194            let test_source_id = source_ids
195                .get(key)
196                .expect("Missing test source for a transform")
197                .clone();
198            transform.inputs.extend(Some(test_source_id));
199
200            template_sources.insert(key.clone(), UnitTestSourceConfig::default());
201        }
202
203        let builder = config_builder.clone();
204        let available_extract_targets = builder
205            .transforms
206            .iter()
207            .flat_map(|(key, transform)| {
208                get_transform_output_ids(
209                    transform.inner.as_ref(),
210                    key.clone(),
211                    builder.schema.log_namespace(),
212                )
213            })
214            .collect::<HashSet<_>>();
215
216        let sink_ids = available_extract_targets
217            .iter()
218            .map(|key| {
219                (
220                    key.clone(),
221                    format!(
222                        "{}-{}-{}",
223                        key.to_string().replace('.', "-"),
224                        "sink",
225                        random_id
226                    ),
227                )
228            })
229            .collect::<HashMap<_, _>>();
230
231        Ok(Self {
232            available_insert_targets,
233            source_ids,
234            template_sources,
235            sink_ids,
236        })
237    }
238
239    /// Convert test inputs into sources for use in a unit testing topology
240    pub fn hydrate_into_sources(
241        &self,
242        inputs: &[TestInput],
243    ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
244        let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
245        let mut template_sources = self.template_sources.clone();
246        Ok(inputs
247            .into_iter()
248            .map(|(insert_at, events)| {
249                let mut source_config =
250                    template_sources
251                        .shift_remove(&insert_at)
252                        .unwrap_or_else(|| {
253                            // At this point, all inputs should have been validated to
254                            // correspond with valid transforms, and all valid transforms
255                            // have a source attached.
256                            panic!(
257                                "Invalid input: cannot insert at {:?}",
258                                insert_at.to_string()
259                            )
260                        });
261                source_config.events.extend(events);
262                let id: &str = self
263                    .source_ids
264                    .get(&insert_at)
265                    .expect("Corresponding source must exist")
266                    .as_ref();
267                (ComponentKey::from(id), SourceOuter::new(source_config))
268            })
269            .collect::<IndexMap<_, _>>())
270    }
271
272    /// Convert test outputs into sinks for use in a unit testing topology
273    pub fn hydrate_into_sinks(
274        &self,
275        test_name: &str,
276        outputs: &[TestOutput],
277        no_outputs_from: &[OutputId],
278    ) -> Result<
279        (
280            Vec<Receiver<UnitTestSinkResult>>,
281            IndexMap<ComponentKey, SinkOuter<String>>,
282        ),
283        Vec<String>,
284    > {
285        if outputs.is_empty() && no_outputs_from.is_empty() {
286            return Err(vec![
287                "unit test must contain at least one of `outputs` or `no_outputs_from`."
288                    .to_string(),
289            ]);
290        }
291        let outputs = build_outputs(outputs)?;
292
293        let mut template_sinks = IndexMap::new();
294        let mut test_result_rxs = Vec::new();
295        // Add sinks with checks
296        for (ids, checks) in outputs {
297            let (tx, rx) = oneshot::channel();
298            let sink_ids = ids.clone();
299            let sink_config = UnitTestSinkConfig {
300                test_name: test_name.to_string(),
301                transform_ids: ids.iter().map(|id| id.to_string()).collect(),
302                result_tx: Arc::new(Mutex::new(Some(tx))),
303                check: UnitTestSinkCheck::Checks(checks),
304            };
305
306            test_result_rxs.push(rx);
307            template_sinks.insert(sink_ids, sink_config);
308        }
309
310        // Add sinks with no outputs check
311        for id in no_outputs_from {
312            let (tx, rx) = oneshot::channel();
313            let sink_config = UnitTestSinkConfig {
314                test_name: test_name.to_string(),
315                transform_ids: vec![id.to_string()],
316                result_tx: Arc::new(Mutex::new(Some(tx))),
317                check: UnitTestSinkCheck::NoOutputs,
318            };
319
320            test_result_rxs.push(rx);
321            template_sinks.insert(vec![id.clone()], sink_config);
322        }
323
324        let sinks = template_sinks
325            .into_iter()
326            .map(|(transform_ids, sink_config)| {
327                let transform_ids_str = transform_ids
328                    .iter()
329                    .map(|s| s.to_string())
330                    .collect::<Vec<_>>();
331                let sink_ids = transform_ids
332                    .iter()
333                    .map(|transform_id| {
334                        self.sink_ids
335                            .get(transform_id)
336                            .expect("Sink does not exist")
337                            .as_str()
338                    })
339                    .collect::<Vec<_>>();
340                let sink_id = sink_ids.join(",");
341                (
342                    ComponentKey::from(sink_id),
343                    SinkOuter::new(transform_ids_str, sink_config),
344                )
345            })
346            .collect::<IndexMap<_, _>>();
347
348        Ok((test_result_rxs, sinks))
349    }
350}
351
352// Find all components that participate in the test
353fn get_relevant_test_components(
354    sources: &[&ComponentKey],
355    graph: &Graph,
356) -> Result<HashSet<String>, Vec<String>> {
357    graph.check_for_cycles().map_err(|error| vec![error])?;
358    let mut errors = Vec::new();
359    let mut components = HashSet::new();
360    for source in sources {
361        let paths = graph.paths_to_sink_from(source);
362        if paths.is_empty() {
363            errors.push(format!(
364                "Unable to complete topology between input target '{}' and output target(s)",
365                source
366                    .to_string()
367                    .rsplit_once("-source-")
368                    .unwrap_or(("", ""))
369                    .0
370            ));
371        } else {
372            for path in paths {
373                components.extend(path.into_iter().map(|key| key.to_string()));
374            }
375        }
376    }
377
378    if errors.is_empty() {
379        Ok(components)
380    } else {
381        Err(errors)
382    }
383}
384
385async fn build_unit_test(
386    metadata: &UnitTestBuildMetadata,
387    test: TestDefinition<String>,
388    mut config_builder: ConfigBuilder,
389) -> Result<UnitTest, Vec<String>> {
390    let transform_only_config = config_builder.clone();
391    let transform_only_graph = Graph::new_unchecked(
392        &transform_only_config.sources,
393        &transform_only_config.transforms,
394        &transform_only_config.sinks,
395        transform_only_config.schema,
396        transform_only_config
397            .global
398            .wildcard_matching
399            .unwrap_or_default(),
400    );
401    let test = test.resolve_outputs(&transform_only_graph)?;
402
403    let sources = metadata.hydrate_into_sources(&test.inputs)?;
404    let (test_result_rxs, sinks) =
405        metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
406
407    config_builder.sources = sources;
408    config_builder.sinks = sinks;
409    expand_globs(&mut config_builder);
410
411    let graph = Graph::new_unchecked(
412        &config_builder.sources,
413        &config_builder.transforms,
414        &config_builder.sinks,
415        config_builder.schema,
416        config_builder.global.wildcard_matching.unwrap_or_default(),
417    );
418
419    let mut valid_components = get_relevant_test_components(
420        config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
421        &graph,
422    )?;
423
424    // Preserve the original unexpanded transform(s) which are valid test insertion points
425    let unexpanded_transforms = valid_components
426        .iter()
427        .filter_map(|component| {
428            component
429                .split_once('.')
430                .map(|(original_name, _)| original_name.to_string())
431        })
432        .collect::<Vec<_>>();
433    valid_components.extend(unexpanded_transforms);
434
435    // Enrichment tables consume inputs but are referenced dynamically in VRL transforms
436    // (via get_enrichment_table_record). Since we can't statically analyze VRL usage,
437    // we conservatively include all enrichment table inputs as valid components.
438    config_builder
439        .enrichment_tables
440        .iter()
441        .filter_map(|(key, c)| c.as_sink(key).map(|(_, sink)| sink.inputs))
442        .for_each(|i| valid_components.extend(i.into_iter()));
443
444    // Remove all transforms that are not relevant to the current test
445    config_builder.transforms = config_builder
446        .transforms
447        .into_iter()
448        .filter(|(key, _)| valid_components.contains(&key.to_string()))
449        .collect();
450
451    // Sanitize the inputs of all relevant transforms
452    let graph = Graph::new_unchecked(
453        &config_builder.sources,
454        &config_builder.transforms,
455        &config_builder.sinks,
456        config_builder.schema,
457        config_builder.global.wildcard_matching.unwrap_or_default(),
458    );
459    let valid_inputs = graph.input_map()?;
460    for (_, transform) in config_builder.transforms.iter_mut() {
461        let inputs = std::mem::take(&mut transform.inputs);
462        transform.inputs = inputs
463            .into_iter()
464            .filter(|input| valid_inputs.contains_key(input))
465            .collect();
466    }
467
468    if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
469        config_builder
470            .sinks
471            .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
472    }
473    let config = config_builder.build()?;
474    let diff = config::ConfigDiff::initial(&config);
475    let pieces = TopologyPiecesBuilder::new(&config, &diff).build().await?;
476
477    Ok(UnitTest {
478        name: test.name,
479        config,
480        pieces,
481        test_result_rxs,
482    })
483}
484
485/// Near the end of building a unit test, it's possible that we've included a
486/// transform(s) with multiple outputs where at least one of its output is
487/// consumed but its other outputs are left unconsumed.
488///
489/// To avoid warning logs that occur when building such topologies, we construct
490/// a NoOp sink here whose sole purpose is to consume any "loose end" outputs.
491fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
492    let config = config.clone();
493    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
494        get_transform_output_ids(
495            transform.inner.as_ref(),
496            key.clone(),
497            config.schema.log_namespace(),
498        )
499        .map(|output| output.to_string())
500        .collect::<Vec<_>>()
501    });
502
503    let mut loose_end_outputs = Vec::new();
504    for id in transform_ids {
505        if !config
506            .transforms
507            .iter()
508            .any(|(_, transform)| transform.inputs.contains(&id))
509            && !config
510                .sinks
511                .iter()
512                .any(|(_, sink)| sink.inputs.contains(&id))
513        {
514            loose_end_outputs.push(id);
515        }
516    }
517
518    if loose_end_outputs.is_empty() {
519        None
520    } else {
521        let noop_sink = UnitTestSinkConfig {
522            test_name: "".to_string(),
523            transform_ids: vec![],
524            result_tx: Arc::new(Mutex::new(None)),
525            check: UnitTestSinkCheck::NoOp,
526        };
527        Some(SinkOuter::new(loose_end_outputs, noop_sink))
528    }
529}
530
531fn build_and_validate_inputs(
532    test_inputs: &[TestInput],
533    available_insert_targets: &HashSet<ComponentKey>,
534) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
535    let mut inputs = HashMap::new();
536    let mut errors = Vec::new();
537    if test_inputs.is_empty() {
538        errors.push("must specify at least one input.".to_string());
539        return Err(errors);
540    }
541
542    for (index, input) in test_inputs.iter().enumerate() {
543        if available_insert_targets.contains(&input.insert_at) {
544            match build_input_event(input) {
545                Ok(input_event) => {
546                    inputs
547                        .entry(input.insert_at.clone())
548                        .and_modify(|events: &mut Vec<Event>| {
549                            events.push(input_event.clone());
550                        })
551                        .or_insert_with(|| vec![input_event]);
552                }
553                Err(error) => errors.push(error),
554            }
555        } else {
556            errors.push(format!(
557                "inputs[{}]: unable to locate target transform '{}'",
558                index, input.insert_at
559            ))
560        }
561    }
562
563    if errors.is_empty() {
564        Ok(inputs)
565    } else {
566        Err(errors)
567    }
568}
569
570fn build_outputs(
571    test_outputs: &[TestOutput],
572) -> Result<IndexMap<Vec<OutputId>, Vec<Vec<Condition>>>, Vec<String>> {
573    let mut outputs: IndexMap<Vec<OutputId>, Vec<Vec<Condition>>> = IndexMap::new();
574    let mut errors = Vec::new();
575
576    for output in test_outputs {
577        let mut conditions = Vec::new();
578        for (index, condition) in output
579            .conditions
580            .clone()
581            .unwrap_or_default()
582            .iter()
583            .enumerate()
584        {
585            match condition.build(&Default::default()) {
586                Ok(condition) => conditions.push(condition),
587                Err(error) => errors.push(format!(
588                    "failed to create test condition '{index}': {error}"
589                )),
590            }
591        }
592
593        outputs
594            .entry(output.extract_from.clone().to_vec())
595            .and_modify(|existing_conditions| existing_conditions.push(conditions.clone()))
596            .or_insert(vec![conditions.clone()]);
597    }
598
599    if errors.is_empty() {
600        Ok(outputs)
601    } else {
602        Err(errors)
603    }
604}
605
606fn build_input_event(input: &TestInput) -> Result<Event, String> {
607    match input.type_str.as_ref() {
608        "raw" => match input.value.as_ref() {
609            Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
610            None => Err("input type 'raw' requires the field 'value'".to_string()),
611        },
612        "vrl" => {
613            if let Some(source) = &input.source {
614                let fns = vrl::stdlib::all();
615                let result = vrl::compiler::compile(source, &fns)
616                    .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
617
618                let mut target = TargetValue {
619                    value: value!({}),
620                    metadata: value::Value::Object(BTreeMap::new()),
621                    secrets: value::Secrets::default(),
622                };
623
624                let mut state = RuntimeState::default();
625                let timezone = TimeZone::default();
626                let mut ctx = Context::new(&mut target, &mut state, &timezone);
627
628                result
629                    .program
630                    .resolve(&mut ctx)
631                    .map(|_| {
632                        Event::Log(LogEvent::from_parts(
633                            target.value.clone(),
634                            EventMetadata::default_with_value(target.metadata.clone()),
635                        ))
636                    })
637                    .map_err(|e| e.to_string())
638            } else {
639                Err("input type 'vrl' requires the field 'source'".to_string())
640            }
641        }
642        "log" => {
643            if let Some(log_fields) = &input.log_fields {
644                let mut event = LogEvent::from_str_legacy("");
645                for (path, value) in log_fields {
646                    event
647                        .parse_path_and_insert(path, value.clone())
648                        .map_err(|e| e.to_string())?;
649                }
650                Ok(event.into())
651            } else {
652                Err("input type 'log' requires the field 'log_fields'".to_string())
653            }
654        }
655        "metric" => {
656            if let Some(metric) = &input.metric {
657                Ok(Event::Metric(metric.clone()))
658            } else {
659                Err("input type 'metric' requires the field 'metric'".to_string())
660            }
661        }
662        _ => Err(format!(
663            "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
664            input.type_str
665        )),
666    }
667}