vector/config/unit_test/
mod.rs

1// should match vector-unit-test-tests feature
2#[cfg(all(
3    test,
4    feature = "sources-demo_logs",
5    feature = "transforms-remap",
6    feature = "transforms-route",
7    feature = "transforms-filter",
8    feature = "transforms-reduce",
9    feature = "sinks-console"
10))]
11mod tests;
12mod unit_test_components;
13
14use std::{
15    collections::{BTreeMap, HashMap, HashSet},
16    sync::Arc,
17};
18
19use futures_util::{stream::FuturesUnordered, StreamExt};
20use indexmap::IndexMap;
21use tokio::sync::{
22    oneshot::{self, Receiver},
23    Mutex,
24};
25use uuid::Uuid;
26use vrl::{
27    compiler::{state::RuntimeState, Context, TargetValue, TimeZone},
28    diagnostic::Formatter,
29    value,
30};
31
32pub use self::unit_test_components::{
33    UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
34    UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
35};
36use super::{compiler::expand_globs, graph::Graph, transform::get_transform_output_ids, OutputId};
37use crate::{
38    conditions::Condition,
39    config::{
40        self, loading, ComponentKey, Config, ConfigBuilder, ConfigPath, SinkOuter, SourceOuter,
41        TestDefinition, TestInput, TestOutput,
42    },
43    event::{Event, EventMetadata, LogEvent},
44    signal,
45    topology::{builder::TopologyPieces, RunningTopology},
46};
47
48pub struct UnitTest {
49    pub name: String,
50    config: Config,
51    pieces: TopologyPieces,
52    test_result_rxs: Vec<Receiver<UnitTestSinkResult>>,
53}
54
55pub struct UnitTestResult {
56    pub errors: Vec<String>,
57}
58
59impl UnitTest {
60    pub async fn run(self) -> UnitTestResult {
61        let diff = config::ConfigDiff::initial(&self.config);
62        let (topology, _) = RunningTopology::start_validated(self.config, diff, self.pieces)
63            .await
64            .unwrap();
65        topology.sources_finished().await;
66        let _stop_complete = topology.stop();
67
68        let mut in_flight = self
69            .test_result_rxs
70            .into_iter()
71            .collect::<FuturesUnordered<_>>();
72
73        let mut errors = Vec::new();
74        while let Some(partial_result) = in_flight.next().await {
75            let partial_result = partial_result.expect(
76                "An unexpected error occurred while executing unit tests. Please try again.",
77            );
78            errors.extend(partial_result.test_errors);
79        }
80
81        UnitTestResult { errors }
82    }
83}
84
85/// Loads Log Schema from configurations and sets global schema.
86/// Once this is done, configurations can be correctly loaded using
87/// configured log schema defaults.
88/// If deny is set, will panic if schema has already been set.
89fn init_log_schema_from_paths(
90    config_paths: &[ConfigPath],
91    deny_if_set: bool,
92) -> Result<(), Vec<String>> {
93    let builder = config::loading::load_builder_from_paths(config_paths)?;
94    vector_lib::config::init_log_schema(builder.global.log_schema, deny_if_set);
95    Ok(())
96}
97
98pub async fn build_unit_tests_main(
99    paths: &[ConfigPath],
100    signal_handler: &mut signal::SignalHandler,
101) -> Result<Vec<UnitTest>, Vec<String>> {
102    init_log_schema_from_paths(paths, false)?;
103    let mut secrets_backends_loader = loading::load_secret_backends_from_paths(paths)?;
104    let config_builder = if secrets_backends_loader.has_secrets_to_retrieve() {
105        let resolved_secrets = secrets_backends_loader
106            .retrieve(&mut signal_handler.subscribe())
107            .await
108            .map_err(|e| vec![e])?;
109        loading::load_builder_from_paths_with_secrets(paths, resolved_secrets)?
110    } else {
111        loading::load_builder_from_paths(paths)?
112    };
113
114    build_unit_tests(config_builder).await
115}
116
117pub async fn build_unit_tests(
118    mut config_builder: ConfigBuilder,
119) -> Result<Vec<UnitTest>, Vec<String>> {
120    // Sanitize config by removing existing sources and sinks
121    config_builder.sources = Default::default();
122    config_builder.sinks = Default::default();
123
124    let test_definitions = std::mem::take(&mut config_builder.tests);
125    let mut tests = Vec::new();
126    let mut build_errors = Vec::new();
127    let metadata = UnitTestBuildMetadata::initialize(&mut config_builder)?;
128
129    for mut test_definition in test_definitions {
130        let test_name = test_definition.name.clone();
131        // Move the legacy single test input into the inputs list if it exists
132        let legacy_input = std::mem::take(&mut test_definition.input);
133        if let Some(input) = legacy_input {
134            test_definition.inputs.push(input);
135        }
136        match build_unit_test(&metadata, test_definition, config_builder.clone()).await {
137            Ok(test) => tests.push(test),
138            Err(errors) => {
139                let mut test_error = errors.join("\n");
140                // Indent all line breaks
141                test_error = test_error.replace('\n', "\n  ");
142                test_error.insert_str(0, &format!("Failed to build test '{test_name}':\n  "));
143                build_errors.push(test_error);
144            }
145        }
146    }
147
148    if build_errors.is_empty() {
149        Ok(tests)
150    } else {
151        Err(build_errors)
152    }
153}
154
155pub struct UnitTestBuildMetadata {
156    // A set of all valid insert_at targets, used to validate test inputs.
157    available_insert_targets: HashSet<ComponentKey>,
158    // A mapping from transform name to unit test source name.
159    source_ids: HashMap<ComponentKey, String>,
160    // A base setup of all necessary unit test sources that can be "hydrated"
161    // with test input events to produces sources used in a particular test.
162    template_sources: IndexMap<ComponentKey, UnitTestSourceConfig>,
163    // A mapping from transform name to unit test sink name.
164    sink_ids: HashMap<OutputId, String>,
165}
166
167impl UnitTestBuildMetadata {
168    pub fn initialize(config_builder: &mut ConfigBuilder) -> Result<Self, Vec<String>> {
169        // A unique id used to name test sources and sinks to avoid name clashes
170        let random_id = Uuid::new_v4().to_string();
171
172        let available_insert_targets = config_builder
173            .transforms
174            .keys()
175            .cloned()
176            .collect::<HashSet<_>>();
177
178        let source_ids = available_insert_targets
179            .iter()
180            .map(|key| (key.clone(), format!("{}-{}-{}", key, "source", random_id)))
181            .collect::<HashMap<_, _>>();
182
183        // Map a test source to every transform
184        let mut template_sources = IndexMap::new();
185        for (key, transform) in config_builder.transforms.iter_mut() {
186            let test_source_id = source_ids
187                .get(key)
188                .expect("Missing test source for a transform")
189                .clone();
190            transform.inputs.extend(Some(test_source_id));
191
192            template_sources.insert(key.clone(), UnitTestSourceConfig::default());
193        }
194
195        let builder = config_builder.clone();
196        let available_extract_targets = builder
197            .transforms
198            .iter()
199            .flat_map(|(key, transform)| {
200                get_transform_output_ids(
201                    transform.inner.as_ref(),
202                    key.clone(),
203                    builder.schema.log_namespace(),
204                )
205            })
206            .collect::<HashSet<_>>();
207
208        let sink_ids = available_extract_targets
209            .iter()
210            .map(|key| {
211                (
212                    key.clone(),
213                    format!(
214                        "{}-{}-{}",
215                        key.to_string().replace('.', "-"),
216                        "sink",
217                        random_id
218                    ),
219                )
220            })
221            .collect::<HashMap<_, _>>();
222
223        Ok(Self {
224            available_insert_targets,
225            source_ids,
226            template_sources,
227            sink_ids,
228        })
229    }
230
231    /// Convert test inputs into sources for use in a unit testing topology
232    pub fn hydrate_into_sources(
233        &self,
234        inputs: &[TestInput],
235    ) -> Result<IndexMap<ComponentKey, SourceOuter>, Vec<String>> {
236        let inputs = build_and_validate_inputs(inputs, &self.available_insert_targets)?;
237        let mut template_sources = self.template_sources.clone();
238        Ok(inputs
239            .into_iter()
240            .map(|(insert_at, events)| {
241                let mut source_config =
242                    template_sources
243                        .shift_remove(&insert_at)
244                        .unwrap_or_else(|| {
245                            // At this point, all inputs should have been validated to
246                            // correspond with valid transforms, and all valid transforms
247                            // have a source attached.
248                            panic!(
249                                "Invalid input: cannot insert at {:?}",
250                                insert_at.to_string()
251                            )
252                        });
253                source_config.events.extend(events);
254                let id: &str = self
255                    .source_ids
256                    .get(&insert_at)
257                    .expect("Corresponding source must exist")
258                    .as_ref();
259                (ComponentKey::from(id), SourceOuter::new(source_config))
260            })
261            .collect::<IndexMap<_, _>>())
262    }
263
264    /// Convert test outputs into sinks for use in a unit testing topology
265    pub fn hydrate_into_sinks(
266        &self,
267        test_name: &str,
268        outputs: &[TestOutput],
269        no_outputs_from: &[OutputId],
270    ) -> Result<
271        (
272            Vec<Receiver<UnitTestSinkResult>>,
273            IndexMap<ComponentKey, SinkOuter<String>>,
274        ),
275        Vec<String>,
276    > {
277        if outputs.is_empty() && no_outputs_from.is_empty() {
278            return Err(vec![
279                "unit test must contain at least one of `outputs` or `no_outputs_from`."
280                    .to_string(),
281            ]);
282        }
283        let outputs = build_outputs(outputs)?;
284
285        let mut template_sinks = IndexMap::new();
286        let mut test_result_rxs = Vec::new();
287        // Add sinks with checks
288        for (ids, checks) in outputs {
289            let (tx, rx) = oneshot::channel();
290            let sink_ids = ids.clone();
291            let sink_config = UnitTestSinkConfig {
292                test_name: test_name.to_string(),
293                transform_ids: ids.iter().map(|id| id.to_string()).collect(),
294                result_tx: Arc::new(Mutex::new(Some(tx))),
295                check: UnitTestSinkCheck::Checks(checks),
296            };
297
298            test_result_rxs.push(rx);
299            template_sinks.insert(sink_ids, sink_config);
300        }
301
302        // Add sinks with no outputs check
303        for id in no_outputs_from {
304            let (tx, rx) = oneshot::channel();
305            let sink_config = UnitTestSinkConfig {
306                test_name: test_name.to_string(),
307                transform_ids: vec![id.to_string()],
308                result_tx: Arc::new(Mutex::new(Some(tx))),
309                check: UnitTestSinkCheck::NoOutputs,
310            };
311
312            test_result_rxs.push(rx);
313            template_sinks.insert(vec![id.clone()], sink_config);
314        }
315
316        let sinks = template_sinks
317            .into_iter()
318            .map(|(transform_ids, sink_config)| {
319                let transform_ids_str = transform_ids
320                    .iter()
321                    .map(|s| s.to_string())
322                    .collect::<Vec<_>>();
323                let sink_ids = transform_ids
324                    .iter()
325                    .map(|transform_id| {
326                        self.sink_ids
327                            .get(transform_id)
328                            .expect("Sink does not exist")
329                            .as_str()
330                    })
331                    .collect::<Vec<_>>();
332                let sink_id = sink_ids.join(",");
333                (
334                    ComponentKey::from(sink_id),
335                    SinkOuter::new(transform_ids_str, sink_config),
336                )
337            })
338            .collect::<IndexMap<_, _>>();
339
340        Ok((test_result_rxs, sinks))
341    }
342}
343
344// Find all components that participate in the test
345fn get_relevant_test_components(
346    sources: &[&ComponentKey],
347    graph: &Graph,
348) -> Result<HashSet<String>, Vec<String>> {
349    graph.check_for_cycles().map_err(|error| vec![error])?;
350    let mut errors = Vec::new();
351    let mut components = HashSet::new();
352    for source in sources {
353        let paths = graph.paths_to_sink_from(source);
354        if paths.is_empty() {
355            errors.push(format!(
356                "Unable to complete topology between input target '{}' and output target(s)",
357                source
358                    .to_string()
359                    .rsplit_once("-source-")
360                    .unwrap_or(("", ""))
361                    .0
362            ));
363        } else {
364            for path in paths {
365                components.extend(path.into_iter().map(|key| key.to_string()));
366            }
367        }
368    }
369
370    if errors.is_empty() {
371        Ok(components)
372    } else {
373        Err(errors)
374    }
375}
376
377async fn build_unit_test(
378    metadata: &UnitTestBuildMetadata,
379    test: TestDefinition<String>,
380    mut config_builder: ConfigBuilder,
381) -> Result<UnitTest, Vec<String>> {
382    let transform_only_config = config_builder.clone();
383    let transform_only_graph = Graph::new_unchecked(
384        &transform_only_config.sources,
385        &transform_only_config.transforms,
386        &transform_only_config.sinks,
387        transform_only_config.schema,
388        transform_only_config
389            .global
390            .wildcard_matching
391            .unwrap_or_default(),
392    );
393    let test = test.resolve_outputs(&transform_only_graph)?;
394
395    let sources = metadata.hydrate_into_sources(&test.inputs)?;
396    let (test_result_rxs, sinks) =
397        metadata.hydrate_into_sinks(&test.name, &test.outputs, &test.no_outputs_from)?;
398
399    config_builder.sources = sources;
400    config_builder.sinks = sinks;
401    expand_globs(&mut config_builder);
402
403    let graph = Graph::new_unchecked(
404        &config_builder.sources,
405        &config_builder.transforms,
406        &config_builder.sinks,
407        config_builder.schema,
408        config_builder.global.wildcard_matching.unwrap_or_default(),
409    );
410
411    let mut valid_components = get_relevant_test_components(
412        config_builder.sources.keys().collect::<Vec<_>>().as_ref(),
413        &graph,
414    )?;
415
416    // Preserve the original unexpanded transform(s) which are valid test insertion points
417    let unexpanded_transforms = valid_components
418        .iter()
419        .filter_map(|component| {
420            component
421                .split_once('.')
422                .map(|(original_name, _)| original_name.to_string())
423        })
424        .collect::<Vec<_>>();
425    valid_components.extend(unexpanded_transforms);
426
427    // Remove all transforms that are not relevant to the current test
428    config_builder.transforms = config_builder
429        .transforms
430        .into_iter()
431        .filter(|(key, _)| valid_components.contains(&key.to_string()))
432        .collect();
433
434    // Sanitize the inputs of all relevant transforms
435    let graph = Graph::new_unchecked(
436        &config_builder.sources,
437        &config_builder.transforms,
438        &config_builder.sinks,
439        config_builder.schema,
440        config_builder.global.wildcard_matching.unwrap_or_default(),
441    );
442    let valid_inputs = graph.input_map()?;
443    for (_, transform) in config_builder.transforms.iter_mut() {
444        let inputs = std::mem::take(&mut transform.inputs);
445        transform.inputs = inputs
446            .into_iter()
447            .filter(|input| valid_inputs.contains_key(input))
448            .collect();
449    }
450
451    if let Some(sink) = get_loose_end_outputs_sink(&config_builder) {
452        config_builder
453            .sinks
454            .insert(ComponentKey::from(Uuid::new_v4().to_string()), sink);
455    }
456    let config = config_builder.build()?;
457    let diff = config::ConfigDiff::initial(&config);
458    let pieces = TopologyPieces::build(&config, &diff, HashMap::new(), Default::default()).await?;
459
460    Ok(UnitTest {
461        name: test.name,
462        config,
463        pieces,
464        test_result_rxs,
465    })
466}
467
468/// Near the end of building a unit test, it's possible that we've included a
469/// transform(s) with multiple outputs where at least one of its output is
470/// consumed but its other outputs are left unconsumed.
471///
472/// To avoid warning logs that occur when building such topologies, we construct
473/// a NoOp sink here whose sole purpose is to consume any "loose end" outputs.
474fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
475    let config = config.clone();
476    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
477        get_transform_output_ids(
478            transform.inner.as_ref(),
479            key.clone(),
480            config.schema.log_namespace(),
481        )
482        .map(|output| output.to_string())
483        .collect::<Vec<_>>()
484    });
485
486    let mut loose_end_outputs = Vec::new();
487    for id in transform_ids {
488        if !config
489            .transforms
490            .iter()
491            .any(|(_, transform)| transform.inputs.contains(&id))
492            && !config
493                .sinks
494                .iter()
495                .any(|(_, sink)| sink.inputs.contains(&id))
496        {
497            loose_end_outputs.push(id);
498        }
499    }
500
501    if loose_end_outputs.is_empty() {
502        None
503    } else {
504        let noop_sink = UnitTestSinkConfig {
505            test_name: "".to_string(),
506            transform_ids: vec![],
507            result_tx: Arc::new(Mutex::new(None)),
508            check: UnitTestSinkCheck::NoOp,
509        };
510        Some(SinkOuter::new(loose_end_outputs, noop_sink))
511    }
512}
513
514fn build_and_validate_inputs(
515    test_inputs: &[TestInput],
516    available_insert_targets: &HashSet<ComponentKey>,
517) -> Result<HashMap<ComponentKey, Vec<Event>>, Vec<String>> {
518    let mut inputs = HashMap::new();
519    let mut errors = Vec::new();
520    if test_inputs.is_empty() {
521        errors.push("must specify at least one input.".to_string());
522        return Err(errors);
523    }
524
525    for (index, input) in test_inputs.iter().enumerate() {
526        if available_insert_targets.contains(&input.insert_at) {
527            match build_input_event(input) {
528                Ok(input_event) => {
529                    inputs
530                        .entry(input.insert_at.clone())
531                        .and_modify(|events: &mut Vec<Event>| {
532                            events.push(input_event.clone());
533                        })
534                        .or_insert_with(|| vec![input_event]);
535                }
536                Err(error) => errors.push(error),
537            }
538        } else {
539            errors.push(format!(
540                "inputs[{}]: unable to locate target transform '{}'",
541                index, input.insert_at
542            ))
543        }
544    }
545
546    if errors.is_empty() {
547        Ok(inputs)
548    } else {
549        Err(errors)
550    }
551}
552
553fn build_outputs(
554    test_outputs: &[TestOutput],
555) -> Result<IndexMap<Vec<OutputId>, Vec<Vec<Condition>>>, Vec<String>> {
556    let mut outputs: IndexMap<Vec<OutputId>, Vec<Vec<Condition>>> = IndexMap::new();
557    let mut errors = Vec::new();
558
559    for output in test_outputs {
560        let mut conditions = Vec::new();
561        for (index, condition) in output
562            .conditions
563            .clone()
564            .unwrap_or_default()
565            .iter()
566            .enumerate()
567        {
568            match condition.build(&Default::default()) {
569                Ok(condition) => conditions.push(condition),
570                Err(error) => errors.push(format!(
571                    "failed to create test condition '{index}': {error}"
572                )),
573            }
574        }
575
576        outputs
577            .entry(output.extract_from.clone().to_vec())
578            .and_modify(|existing_conditions| existing_conditions.push(conditions.clone()))
579            .or_insert(vec![conditions.clone()]);
580    }
581
582    if errors.is_empty() {
583        Ok(outputs)
584    } else {
585        Err(errors)
586    }
587}
588
589fn build_input_event(input: &TestInput) -> Result<Event, String> {
590    match input.type_str.as_ref() {
591        "raw" => match input.value.as_ref() {
592            Some(v) => Ok(Event::Log(LogEvent::from_str_legacy(v.clone()))),
593            None => Err("input type 'raw' requires the field 'value'".to_string()),
594        },
595        "vrl" => {
596            if let Some(source) = &input.source {
597                let fns = vrl::stdlib::all();
598                let result = vrl::compiler::compile(source, &fns)
599                    .map_err(|e| Formatter::new(source, e.clone()).to_string())?;
600
601                let mut target = TargetValue {
602                    value: value!({}),
603                    metadata: value::Value::Object(BTreeMap::new()),
604                    secrets: value::Secrets::default(),
605                };
606
607                let mut state = RuntimeState::default();
608                let timezone = TimeZone::default();
609                let mut ctx = Context::new(&mut target, &mut state, &timezone);
610
611                result
612                    .program
613                    .resolve(&mut ctx)
614                    .map(|_| {
615                        Event::Log(LogEvent::from_parts(
616                            target.value.clone(),
617                            EventMetadata::default_with_value(target.metadata.clone()),
618                        ))
619                    })
620                    .map_err(|e| e.to_string())
621            } else {
622                Err("input type 'vrl' requires the field 'source'".to_string())
623            }
624        }
625        "log" => {
626            if let Some(log_fields) = &input.log_fields {
627                let mut event = LogEvent::from_str_legacy("");
628                for (path, value) in log_fields {
629                    event
630                        .parse_path_and_insert(path, value.clone())
631                        .map_err(|e| e.to_string())?;
632                }
633                Ok(event.into())
634            } else {
635                Err("input type 'log' requires the field 'log_fields'".to_string())
636            }
637        }
638        "metric" => {
639            if let Some(metric) = &input.metric {
640                Ok(Event::Metric(metric.clone()))
641            } else {
642                Err("input type 'metric' requires the field 'metric'".to_string())
643            }
644        }
645        _ => Err(format!(
646            "unrecognized input type '{}', expected one of: 'raw', 'log' or 'metric'",
647            input.type_str
648        )),
649    }
650}