vector/config/unit_test/
mod.rs

1// should match vector-unit-test-tests feature
2#[cfg(all(
3    test,
4    feature = "sources-demo_logs",
5    feature = "transforms-remap",
6    feature = "transforms-route",
7    feature = "transforms-filter",
8    feature = "transforms-reduce",
9    feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15    collections::{BTreeMap, HashMap, HashSet},
16    sync::Arc,
17};
18
19use futures_util::{StreamExt, stream::FuturesUnordered};
20use indexmap::IndexMap;
21use tokio::sync::{
22    Mutex,
23    oneshot::{self, Receiver},
24};
25use uuid::Uuid;
26use vrl::{
27    compiler::{Context, TargetValue, TimeZone, state::RuntimeState},
28    diagnostic::Formatter,
29    value,
30};
31
32pub use self::unit_test_components::{
33    UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34    UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{OutputId, compiler::expand_globs, graph::Graph, transform::get_transform_output_ids};
37use crate::{
38    conditions::Condition,
39    config::{
40        self, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41        TestDefinition, TestInput, TestOutput, loading,
42    },
43    event::{Event, EventMetadata, LogEvent},
44    signal,
45    topology::{
46        RunningTopology,
47        builder::{TopologyPieces, TopologyPiecesBuilder},
48    },
49};
50
51pub struct UnitTest {
52    pub name: String,
53    config: Config,
54    pieces: TopologyPieces,
55    test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
56}
57
58pub struct UnitTestResult {
59    pub errors: Vec<String>,
60}
61
62impl UnitTest {
63    pub async fn run(self) -> UnitTestResult {
64        let diff = config::ConfigDiff::initial(&self.config);
65        let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
66            .await
67            .unwrap();
68        topology.sources_finished().await;
69        let _stop_complete = topology.stop();
70
71        let mut in_flight = self
72            .test_result_rxs
73            .into_iter()
74            .collect::<FuturesUnordered<_>>();
75
76        let mut errors = Vec::new();
77        while let Some(partial_result) = in_flight.next().await {
78            let partial_result = partial_result.expect(
79                "An unexpected error occurred while executing unit tests. Please try again.",
80            );
81            errors.extend(partial_result.test_errors);
82        }
83
84        UnitTestResult { errors }
85    }
86}
87
88/// Loads Log Schema from configurations and sets global schema.
89/// Once this is done, configurations can be correctly loaded using
90/// configured log schema defaults.
91/// If deny is set, will panic if schema has already been set.
92fn init_log_schema_from_paths(
93    config_paths: &[ConfigPath],
94    deny_if_set: bool,
95) -> Result<(), Vec<String>> {
96    let builder = config::loading::load_builder_from_paths(config_paths)?;
97    vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
98    Ok(())
99}
100
101pub async fn build_unit_tests_main(
102    paths: &[ConfigPath],
103    signal_handler: &mut signal::SignalHandler,
104) -> Result<Vec<UnitTest>, Vec<String>> {
105    init_log_schema_from_paths(paths, false)?;
106    let mut secrets_backends_loader = loading::load_secret_backends_from_paths(paths)?;
107    let config_builder = if secrets_backends_loader.has_secrets_to_retrieve() {
108        let resolved_secrets = secrets_backends_loader
109            .retrieve(&mut signal_handler.subscribe())
110            .await
111            .map_err(|e| vec![e])?;
112        loading::load_builder_from_paths_with_secrets(paths, resolved_secrets)?
113    } else {
114        loading::load_builder_from_paths(paths)?
115    };
116
117    build_unit_tests(config_builder).await
118}
119
120pub async fn build_unit_tests(
121    mut config_builder: ConfigBuilder,
122) -> Result<Vec<UnitTest>, Vec<String>> {
123    // Sanitize config by removing existing sources and sinks
124    config_builder.sources = Default::default();
125    config_builder.sinks = Default::default();
126
127    let test_definitions = std::mem::take(&mut config_builder.tests);
128    let mut tests = Vec::new();
129    let mut build_errors = Vec::new();
130    let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
131
132    for mut test_definition in test_definitions {
133        let test_name = test_definition.name.clone();
134        // Move the legacy single test input into the inputs list if it exists
135        let legacy_input = std::mem::take(&mut test_definition.input);
136        if let Some(input) = legacy_input {
137            test_definition.inputs.push(input);
138        }
139        match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
140            Ok(test) => tests.push(test),
141            Err(errors) => {
142                let mut test_error = errors.join("\n");
143                // Indent all line breaks
144                test_error = test_error.replace('\n', "\n  ");
145                test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n  "));
146                build_errors.push(test_error);
147            }
148        }
149    }
150
151    if build_errors.is_empty() {
152        Ok(tests)
153    } else {
154        Err(build_errors)
155    }
156}
157
158pub struct UnitTestBuildMetadata {
159    // A set of all valid insert_at targets, used to validate test inputs.
160    available_insert_targets: HashSet<ComponentKey>,
161    // A mapping from transform name to unit test source name.
162    source_ids: HashMap<ComponentKey, String>,
163    // A base setup of all necessary unit test sources that can be "hydrated"
164    // with test input events to produces sources used in a particular test.
165    template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
166    // A mapping from transform name to unit test sink name.
167    sink_ids: HashMap<OutputId, String>,
168}
169
170impl UnitTestBuildMetadata {
171    pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
172        // A unique id used to name test sources and sinks to avoid name clashes
173        let random_id = Uuid::new_v4().to_string();
174
175        let available_insert_targets = config_builder
176            .transforms
177            .keys()
178            .cloned()
179            .collect::<HashSet<_>>();
180
181        let source_ids = available_insert_targets
182            .iter()
183            .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
184            .collect::<HashMap<_, _>>();
185
186        // Map a test source to every transform
187        let mut template_sources = IndexMap::new();
188        for (key, transform) in config_builder.transforms.iter_mut() {
189            let test_source_id = source_ids
190                .get(key)
191                .expect("Missing test source for a transform")
192                .clone();
193            transform.inputs.extend(Some(test_source_id));
194
195            template_sources.insert(key.clone(), UnitTestSourceConfig::default());
196        }
197
198        let builder = config_builder.clone();
199        let available_extract_targets = builder
200            .transforms
201            .iter()
202            .flat_map(|(key, transform)| {
203                get_transform_output_ids(
204                    transform.inner.as_ref(),
205                    key.clone(),
206                    builder.schema.log_namespace(),
207                )
208            })
209            .collect::<HashSet<_>>();
210
211        let sink_ids = available_extract_targets
212            .iter()
213            .map(|key| {
214                (
215                    key.clone(),
216                    format!(
217                        "{}-{}-{}",
218                        key.to_string().replace('.', "-"),
219                        "sink",
220                        random_id
221                    ),
222                )
223            })
224            .collect::<HashMap<_, _>>();
225
226        Ok(Self {
227            available_insert_targets,
228            source_ids,
229            template_sources,
230            sink_ids,
231        })
232    }
233
234    /// Convert test inputs into sources for use in a unit testing topology
235    pub fn hydrate_into_sources(
236        &self,
237        inputs: &[TestInput],
238    ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
239        let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
240        let mut template_sources = self.template_sources.clone();
241        Ok(inputs
242            .into_iter()
243            .map(|(insert_at, events)| {
244                let mut source_config =
245                    template_sources
246                        .shift_remove(&insert_at)
247                        .unwrap_or_else(|| {
248                            // At this point, all inputs should have been validated to
249                            // correspond with valid transforms, and all valid transforms
250                            // have a source attached.
251                            panic!(
252                                "Invalid input: cannot insert at {:?}",
253                                insert_at.to_string()
254                            )
255                        });
256                source_config.events.extend(events);
257                let id: &str = self
258                    .source_ids
259                    .get(&insert_at)
260                    .expect("Corresponding source must exist")
261                    .as_ref();
262                (ComponentKey::from(id), SourceOuter::new(source_config))
263            })
264            .collect::<IndexMap<_, _>>())
265    }
266
267    /// Convert test outputs into sinks for use in a unit testing topology
268    pub fn hydrate_into_sinks(
269        &self,
270        test_name: &str,
271        outputs: &[TestOutput],
272        no_outputs_from: &[OutputId],
273    ) -> Result<
274        (
275            Vec<Receiver<UnitTestSinkResult>>,
276            IndexMap<ComponentKey, SinkOuter<String>>,
277        ),
278        Vec<String>,
279    > {
280        if outputs.is_empty() && no_outputs_from.is_empty() {
281            return Err(vec![
282                "unit test must contain at least one of `outputs` or `no_outputs_from`."
283                    .to_string(),
284            ]);
285        }
286        let outputs = build_outputs(outputs)?;
287
288        let mut template_sinks = IndexMap::new();
289        let mut test_result_rxs = Vec::new();
290        // Add sinks with checks
291        for (ids, checks) in outputs {
292            let (tx, rx) = oneshot::channel();
293            let sink_ids = ids.clone();
294            let sink_config = UnitTestSinkConfig {
295                test_name: test_name.to_string(),
296                transform_ids: ids.iter().map(|id| id.to_string()).collect(),
297                result_tx: Arc::new(Mutex::new(Some(tx))),
298                check: UnitTestSinkCheck::Checks(checks),
299            };
300
301            test_result_rxs.push(rx);
302            template_sinks.insert(sink_ids, sink_config);
303        }
304
305        // Add sinks with no outputs check
306        for id in no_outputs_from {
307            let (tx, rx) = oneshot::channel();
308            let sink_config = UnitTestSinkConfig {
309                test_name: test_name.to_string(),
310                transform_ids: vec![id.to_string()],
311                result_tx: Arc::new(Mutex::new(Some(tx))),
312                check: UnitTestSinkCheck::NoOutputs,
313            };
314
315            test_result_rxs.push(rx);
316            template_sinks.insert(vec![id.clone()], sink_config);
317        }
318
319        let sinks = template_sinks
320            .into_iter()
321            .map(|(transform_ids, sink_config)| {
322                let transform_ids_str = transform_ids
323                    .iter()
324                    .map(|s| s.to_string())
325                    .collect::<Vec<_>>();
326                let sink_ids = transform_ids
327                    .iter()
328                    .map(|transform_id| {
329                        self.sink_ids
330                            .get(transform_id)
331                            .expect("Sink does not exist")
332                            .as_str()
333                    })
334                    .collect::<Vec<_>>();
335                let sink_id = sink_ids.join(",");
336                (
337                    ComponentKey::from(sink_id),
338                    SinkOuter::new(transform_ids_str, sink_config),
339                )
340            })
341            .collect::<IndexMap<_, _>>();
342
343        Ok((test_result_rxs, sinks))
344    }
345}
346
347// Find all components that participate in the test
348fn get_relevant_test_components(
349    sources: &[&ComponentKey],
350    graph: &Graph,
351) -> Result<HashSet<String>, Vec<String>> {
352    graph.check_for_cycles().map_err(|error| vec![error])?;
353    let mut errors = Vec::new();
354    let mut components = HashSet::new();
355    for source in sources {
356        let paths = graph.paths_to_sink_from(source);
357        if paths.is_empty() {
358            errors.push(format!(
359                "Unable to complete topology between input target '{}' and output target(s)",
360                source
361                    .to_string()
362                    .rsplit_once("-source-")
363                    .unwrap_or(("", ""))
364                    .0
365            ));
366        } else {
367            for path in paths {
368                components.extend(path.into_iter().map(|key| key.to_string()));
369            }
370        }
371    }
372
373    if errors.is_empty() {
374        Ok(components)
375    } else {
376        Err(errors)
377    }
378}
379
380async fn build_unit_test(
381    metadata: &UnitTestBuildMetadata,
382    test: TestDefinition<String>,
383    mut config_builder: ConfigBuilder,
384) -> Result<UnitTest, Vec<String>> {
385    let transform_only_config = config_builder.clone();
386    let transform_only_graph = Graph::new_unchecked(
387        &transform_only_config.sources,
388        &transform_only_config.transforms,
389        &transform_only_config.sinks,
390        transform_only_config.schema,
391        transform_only_config
392            .global
393            .wildcard_matching
394            .unwrap_or_default(),
395    );
396    let test = test.resolve_outputs(&transform_only_graph)?;
397
398    let sources = metadata.hydrate_into_sources(&test.inputs)?;
399    let (test_result_rxs, sinks) =
400        metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
401
402    config_builder.sources = sources;
403    config_builder.sinks = sinks;
404    expand_globs(&mut config_builder);
405
406    let graph = Graph::new_unchecked(
407        &config_builder.sources,
408        &config_builder.transforms,
409        &config_builder.sinks,
410        config_builder.schema,
411        config_builder.global.wildcard_matching.unwrap_or_default(),
412    );
413
414    let mut valid_components = get_relevant_test_components(
415        config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
416        &graph,
417    )?;
418
419    // Preserve the original unexpanded transform(s) which are valid test insertion points
420    let unexpanded_transforms = valid_components
421        .iter()
422        .filter_map(|component| {
423            component
424                .split_once('.')
425                .map(|(original_name, _)| original_name.to_string())
426        })
427        .collect::<Vec<_>>();
428    valid_components.extend(unexpanded_transforms);
429
430    // Remove all transforms that are not relevant to the current test
431    config_builder.transforms = config_builder
432        .transforms
433        .into_iter()
434        .filter(|(key, _)| valid_components.contains(&key.to_string()))
435        .collect();
436
437    // Sanitize the inputs of all relevant transforms
438    let graph = Graph::new_unchecked(
439        &config_builder.sources,
440        &config_builder.transforms,
441        &config_builder.sinks,
442        config_builder.schema,
443        config_builder.global.wildcard_matching.unwrap_or_default(),
444    );
445    let valid_inputs = graph.input_map()?;
446    for (_, transform) in config_builder.transforms.iter_mut() {
447        let inputs = std::mem::take(&mut transform.inputs);
448        transform.inputs = inputs
449            .into_iter()
450            .filter(|input| valid_inputs.contains_key(input))
451            .collect();
452    }
453
454    if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
455        config_builder
456            .sinks
457            .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
458    }
459    let config = config_builder.build()?;
460    let diff = config::ConfigDiff::initial(&config);
461    let pieces = TopologyPiecesBuilder::new(&config, &diff).build().await?;
462
463    Ok(UnitTest {
464        name: test.name,
465        config,
466        pieces,
467        test_result_rxs,
468    })
469}
470
471/// Near the end of building a unit test, it's possible that we've included a
472/// transform(s) with multiple outputs where at least one of its output is
473/// consumed but its other outputs are left unconsumed.
474///
475/// To avoid warning logs that occur when building such topologies, we construct
476/// a NoOp sink here whose sole purpose is to consume any "loose end" outputs.
477fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
478    let config = config.clone();
479    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
480        get_transform_output_ids(
481            transform.inner.as_ref(),
482            key.clone(),
483            config.schema.log_namespace(),
484        )
485        .map(|output| output.to_string())
486        .collect::<Vec<_>>()
487    });
488
489    let mut loose_end_outputs = Vec::new();
490    for id in transform_ids {
491        if !config
492            .transforms
493            .iter()
494            .any(|(_, transform)| transform.inputs.contains(&id))
495            && !config
496                .sinks
497                .iter()
498                .any(|(_, sink)| sink.inputs.contains(&id))
499        {
500            loose_end_outputs.push(id);
501        }
502    }
503
504    if loose_end_outputs.is_empty() {
505        None
506    } else {
507        let noop_sink = UnitTestSinkConfig {
508            test_name: "".to_string(),
509            transform_ids: vec![],
510            result_tx: Arc::new(Mutex::new(None)),
511            check: UnitTestSinkCheck::NoOp,
512        };
513        Some(SinkOuter::new(loose_end_outputs, noop_sink))
514    }
515}
516
517fn build_and_validate_inputs(
518    test_inputs: &[TestInput],
519    available_insert_targets: &HashSet<ComponentKey>,
520) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
521    let mut inputs = HashMap::new();
522    let mut errors = Vec::new();
523    if test_inputs.is_empty() {
524        errors.push("must specify at least one input.".to_string());
525        return Err(errors);
526    }
527
528    for (index, input) in test_inputs.iter().enumerate() {
529        if available_insert_targets.contains(&input.insert_at) {
530            match build_input_event(input) {
531                Ok(input_event) => {
532                    inputs
533                        .entry(input.insert_at.clone())
534                        .and_modify(|events: &mut Vec<Event>| {
535                            events.push(input_event.clone());
536                        })
537                        .or_insert_with(|| vec![input_event]);
538                }
539                Err(error) => errors.push(error),
540            }
541        } else {
542            errors.push(format!(
543                "inputs[{}]: unable to locate target transform '{}'",
544                index, input.insert_at
545            ))
546        }
547    }
548
549    if errors.is_empty() {
550        Ok(inputs)
551    } else {
552        Err(errors)
553    }
554}
555
556fn build_outputs(
557    test_outputs: &[TestOutput],
558) -> Result<IndexMap<Vec<OutputId>, Vec<Vec<Condition>>>, Vec<String>> {
559    let mut outputs: IndexMap<Vec<OutputId>, Vec<Vec<Condition>>> = IndexMap::new();
560    let mut errors = Vec::new();
561
562    for output in test_outputs {
563        let mut conditions = Vec::new();
564        for (index, condition) in output
565            .conditions
566            .clone()
567            .unwrap_or_default()
568            .iter()
569            .enumerate()
570        {
571            match condition.build(&Default::default()) {
572                Ok(condition) => conditions.push(condition),
573                Err(error) => errors.push(format!(
574                    "failed to create test condition '{index}': {error}"
575                )),
576            }
577        }
578
579        outputs
580            .entry(output.extract_from.clone().to_vec())
581            .and_modify(|existing_conditions| existing_conditions.push(conditions.clone()))
582            .or_insert(vec![conditions.clone()]);
583    }
584
585    if errors.is_empty() {
586        Ok(outputs)
587    } else {
588        Err(errors)
589    }
590}
591
592fn build_input_event(input: &TestInput) -> Result<Event, String> {
593    match input.type_str.as_ref() {
594        "raw" => match input.value.as_ref() {
595            Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
596            None => Err("input type 'raw' requires the field 'value'".to_string()),
597        },
598        "vrl" => {
599            if let Some(source) = &input.source {
600                let fns = vrl::stdlib::all();
601                let result = vrl::compiler::compile(source, &fns)
602                    .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
603
604                let mut target = TargetValue {
605                    value: value!({}),
606                    metadata: value::Value::Object(BTreeMap::new()),
607                    secrets: value::Secrets::default(),
608                };
609
610                let mut state = RuntimeState::default();
611                let timezone = TimeZone::default();
612                let mut ctx = Context::new(&mut target, &mut state, &timezone);
613
614                result
615                    .program
616                    .resolve(&mut ctx)
617                    .map(|_| {
618                        Event::Log(LogEvent::from_parts(
619                            target.value.clone(),
620                            EventMetadata::default_with_value(target.metadata.clone()),
621                        ))
622                    })
623                    .map_err(|e| e.to_string())
624            } else {
625                Err("input type 'vrl' requires the field 'source'".to_string())
626            }
627        }
628        "log" => {
629            if let Some(log_fields) = &input.log_fields {
630                let mut event = LogEvent::from_str_legacy("");
631                for (path, value) in log_fields {
632                    event
633                        .parse_path_and_insert(path, value.clone())
634                        .map_err(|e| e.to_string())?;
635                }
636                Ok(event.into())
637            } else {
638                Err("input type 'log' requires the field 'log_fields'".to_string())
639            }
640        }
641        "metric" => {
642            if let Some(metric) = &input.metric {
643                Ok(Event::Metric(metric.clone()))
644            } else {
645                Err("input type 'metric' requires the field 'metric'".to_string())
646            }
647        }
648        _ => Err(format!(
649            "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
650            input.type_str
651        )),
652    }
653}