vector/topology/
schema.rs

1use std::collections::HashMap;
2
3use snafu::Snafu;
4use vector_lib::config::SourceOutput;
5
6pub(super) use crate::schema::Definition;
7
8use crate::{
9    config::{ComponentKey, Config, OutputId, SinkOuter, TransformOutput},
10    topology,
11};
12
13#[derive(Debug, Snafu)]
14pub enum Error {
15    ContainsNever,
16}
17
18/// The cache is used whilst building up the topology.
19/// TODO: Describe more, especially why we have a bool in the key.
20type Cache = HashMap<(bool, Vec<OutputId>), Vec<(OutputId, Definition)>>;
21
22pub fn possible_definitions(
23    inputs: &[OutputId],
24    config: &dyn ComponentContainer,
25    enrichment_tables: vector_lib::enrichment::TableRegistry,
26    cache: &mut Cache,
27) -> Result<Vec<(OutputId, Definition)>, Error> {
28    if inputs.is_empty() {
29        return Ok(vec![]);
30    }
31
32    // Try to get the definition from the cache.
33    if let Some(definition) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
34        return Ok(definition.clone());
35    }
36
37    let mut definitions = Vec::new();
38
39    for input in inputs {
40        let key = &input.component;
41
42        // If the input is a source, the output is merged into the top-level schema.
43        if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
44            let mut source_definition = input.with_definitions(
45                maybe_output
46                    .unwrap_or_else(|| {
47                        unreachable!(
48                            "source output mis-configured - output for port {:?} missing",
49                            &input.port
50                        )
51                    })
52                    .schema_definition(config.schema_enabled()),
53            );
54
55            if contains_never(&source_definition) {
56                return Err(Error::ContainsNever);
57            }
58
59            definitions.append(&mut source_definition);
60        }
61
62        // If the input is a transform, the output is merged into the top-level schema
63        if let Some(inputs) = config.transform_inputs(key) {
64            let input_definitions =
65                possible_definitions(inputs, config, enrichment_tables.clone(), cache)?;
66
67            let mut transform_definition = input.with_definitions(
68                config
69                    .transform_output_for_port(
70                        key,
71                        &input.port,
72                        enrichment_tables.clone(),
73                        &input_definitions,
74                    )
75                    .expect("transform must exist - already found inputs")
76                    .unwrap_or_else(|| {
77                        unreachable!(
78                            "transform output mis-configured - output for port {:?} missing",
79                            &input.port
80                        )
81                    })
82                    .schema_definitions(config.schema_enabled())
83                    .values()
84                    .cloned(),
85            );
86
87            if contains_never(&transform_definition) {
88                return Err(Error::ContainsNever);
89            }
90
91            definitions.append(&mut transform_definition);
92        }
93    }
94
95    Ok(definitions)
96}
97
98/// Get a list of definitions from individual pipelines feeding into a component.
99///
100/// For example, given the following topology:
101///
102///   Source 1 -> Transform 1                ->
103///   Source 2 -> Transform 2                ->
104///            -> Transform 3 ->
105///            -> Transform 4 -> Transform 5 -> Sink 1
106///
107/// In this example, if we ask for the definitions received by `Sink 1`, we'd receive four
108/// definitions, one for each route leading into `Sink 1`, with the route going through `Transform
109/// 5` being expanded into two individual routes (So1 -> T3 -> T5 -> Si1 AND So1 -> T4 -> T5 ->
110/// Si1).
111pub(super) fn expanded_definitions(
112    enrichment_tables: vector_lib::enrichment::TableRegistry,
113    inputs: &[OutputId],
114    config: &dyn ComponentContainer,
115    cache: &mut Cache,
116) -> Result<Vec<(OutputId, Definition)>, Error> {
117    // Try to get the definition from the cache.
118    if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
119        return Ok(definitions.clone());
120    }
121
122    let mut definitions: Vec<(OutputId, Definition)> = vec![];
123    let mut merged_cache = HashMap::default();
124
125    for input in inputs {
126        let key = &input.component;
127
128        // If the input is a source, it'll always have schema definition attached, even if it is an
129        // "empty" schema.
130        // We take the full schema definition regardless of `config.schema_enabled()`, the assumption
131        // being that the receiving component will not be validating the schema if schema checking is
132        // not enabled.
133        if let Some(outputs) = config.source_outputs(key) {
134            // After getting the source matching to the given input, we need to further narrow the
135            // actual output of the source feeding into this input, and then get the definition
136            // belonging to that output.
137            let mut source_definitions =
138                outputs
139                    .into_iter()
140                    .find_map(|output| {
141                        if output.port == input.port {
142                            Some(input.with_definitions(
143                                output.schema_definition(config.schema_enabled()),
144                            ))
145                        } else {
146                            None
147                        }
148                    })
149                    .unwrap_or_else(|| {
150                        // If we find no match, it means the topology is misconfigured. This is a fatal
151                        // error, but other parts of the topology builder deal with this state.
152                        unreachable!("source output mis-configured")
153                    });
154
155            if contains_never(&source_definitions) {
156                return Err(Error::ContainsNever);
157            }
158
159            definitions.append(&mut source_definitions);
160
161        // A transform can receive from multiple inputs, and each input needs to be expanded to
162        // a new pipeline.
163        } else if let Some(inputs) = config.transform_inputs(key) {
164            let input_definitions =
165                possible_definitions(inputs, config, enrichment_tables.clone(), &mut merged_cache)?;
166
167            let mut transform_definition = config
168                .transform_outputs(key, enrichment_tables.clone(), &input_definitions)
169                .expect("already found inputs")
170                .iter()
171                .find_map(|output| {
172                    if output.port == input.port {
173                        Some(
174                            input.with_definitions(
175                                output
176                                    .schema_definitions(config.schema_enabled())
177                                    .values()
178                                    .cloned(),
179                            ),
180                        )
181                    } else {
182                        None
183                    }
184                })
185                // If we find no match, it means the topology is misconfigured. This is a fatal
186                // error, but other parts of the topology builder deal with this state.
187                .expect("transform output misconfigured");
188
189            if contains_never(&transform_definition) {
190                return Err(Error::ContainsNever);
191            }
192
193            // Append whatever number of additional pipelines we created to the existing
194            // pipeline definitions.
195            definitions.append(&mut transform_definition);
196        }
197    }
198
199    cache.insert(
200        (config.schema_enabled(), inputs.to_vec()),
201        definitions.clone(),
202    );
203
204    Ok(definitions)
205}
206
207/// Returns a list of definitions from the given inputs.
208/// Errors if any of the definitions are [`Kind::never`] implying that
209/// an error condition has been reached.
210pub(crate) fn input_definitions(
211    inputs: &[OutputId],
212    config: &Config,
213    enrichment_tables: vector_lib::enrichment::TableRegistry,
214    cache: &mut Cache,
215) -> Result<Vec<(OutputId, Definition)>, Error> {
216    if inputs.is_empty() {
217        return Ok(vec![]);
218    }
219
220    if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
221        return Ok(definitions.clone());
222    }
223
224    let mut definitions = Vec::new();
225
226    for input in inputs {
227        let key = &input.component;
228
229        // If the input is a source we retrieve the definitions from the source
230        // (there should only be one) and add it to the return.
231        if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
232            let mut source_definitions = input.with_definitions(
233                maybe_output
234                    .unwrap_or_else(|| {
235                        unreachable!(
236                            "source output mis-configured - output for port {:?} missing",
237                            &input.port
238                        )
239                    })
240                    .schema_definition(config.schema_enabled()),
241            );
242
243            if contains_never(&source_definitions) {
244                return Err(Error::ContainsNever);
245            }
246
247            definitions.append(&mut source_definitions);
248        }
249
250        // If the input is a transform we recurse to the upstream components to retrieve
251        // their definitions and pass it through the transform to get the new definitions.
252        if let Some(inputs) = config.transform_inputs(key) {
253            let transform_definitions =
254                input_definitions(inputs, config, enrichment_tables.clone(), cache)?;
255
256            if contains_never(&transform_definitions) {
257                return Err(Error::ContainsNever);
258            }
259
260            let mut transform_definitions = input.with_definitions(
261                config
262                    .transform_output_for_port(
263                        key,
264                        &input.port,
265                        enrichment_tables.clone(),
266                        &transform_definitions,
267                    )
268                    .expect("transform must exist")
269                    .unwrap_or_else(|| {
270                        unreachable!(
271                            "transform output mis-configured - output for port {:?} missing",
272                            &input.port
273                        )
274                    })
275                    .schema_definitions(config.schema_enabled())
276                    .values()
277                    .cloned(),
278            );
279
280            if contains_never(&transform_definitions) {
281                return Err(Error::ContainsNever);
282            }
283
284            definitions.append(&mut transform_definitions);
285        }
286    }
287
288    Ok(definitions)
289}
290
291/// Checks if any of the definitions in the list contain `Kind::never()`. This
292/// implies the definition cannot contain any output and thus we should stop
293/// processing further.
294fn contains_never(transform_definitions: &[(OutputId, Definition)]) -> bool {
295    transform_definitions
296        .iter()
297        .any(|(_, definition)| definition.event_kind().is_never())
298}
299
300pub(super) fn validate_sink_expectations(
301    key: &ComponentKey,
302    sink: &SinkOuter<OutputId>,
303    config: &topology::Config,
304    enrichment_tables: vector_lib::enrichment::TableRegistry,
305) -> Result<(), Vec<String>> {
306    let mut errors = vec![];
307
308    // Get the schema against which we need to validate the schemas of the components feeding into
309    // this sink.
310    let input = sink.inner.input();
311    let requirement = input.schema_requirement();
312
313    // Get all pipeline definitions feeding into this sink.
314    let mut cache = HashMap::default();
315    let definitions =
316        match expanded_definitions(enrichment_tables, &sink.inputs, config, &mut cache) {
317            Ok(definitions) => definitions,
318            Err(err) => {
319                errors.push(err.to_string());
320                return Err(errors);
321            }
322        };
323
324    // Validate each individual definition against the sink requirement.
325    for (_output, definition) in definitions {
326        if let Err(err) = requirement.validate(&definition, config.schema.validation) {
327            errors.append(
328                &mut err
329                    .errors()
330                    .iter()
331                    .cloned()
332                    .map(|err| format!("schema error in component {key}: {err}"))
333                    .collect(),
334            );
335        }
336    }
337
338    if !errors.is_empty() {
339        return Err(errors);
340    }
341
342    Ok(())
343}
344
345pub trait ComponentContainer {
346    fn schema_enabled(&self) -> bool;
347
348    fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>>;
349
350    fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]>;
351
352    fn transform_outputs(
353        &self,
354        key: &ComponentKey,
355        enrichment_tables: vector_lib::enrichment::TableRegistry,
356        input_definitions: &[(OutputId, Definition)],
357    ) -> Option<Vec<TransformOutput>>;
358
359    /// Gets the transform output for the given port.
360    ///
361    /// Returns Err(()) if there is no transform with the given key
362    /// Returns Some(None) if the source does not have an output for the port given
363    #[allow(clippy::result_unit_err)]
364    fn transform_output_for_port(
365        &self,
366        key: &ComponentKey,
367        port: &Option<String>,
368        enrichment_tables: vector_lib::enrichment::TableRegistry,
369        input_definitions: &[(OutputId, Definition)],
370    ) -> Result<Option<TransformOutput>, ()> {
371        if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) {
372            Ok(get_output_for_port(outputs, port))
373        } else {
374            Err(())
375        }
376    }
377
378    /// Gets the source output for the given port.
379    ///
380    /// Returns Err(()) if there is no source with the given key
381    /// Returns Some(None) if the source does not have an output for the port given
382    #[allow(clippy::result_unit_err)]
383    fn source_output_for_port(
384        &self,
385        key: &ComponentKey,
386        port: &Option<String>,
387    ) -> Result<Option<SourceOutput>, ()> {
388        if let Some(outputs) = self.source_outputs(key) {
389            Ok(get_source_output_for_port(outputs, port))
390        } else {
391            Err(())
392        }
393    }
394}
395
396fn get_output_for_port(
397    outputs: Vec<TransformOutput>,
398    port: &Option<String>,
399) -> Option<TransformOutput> {
400    outputs.into_iter().find(|output| &output.port == port)
401}
402
403fn get_source_output_for_port(
404    outputs: Vec<SourceOutput>,
405    port: &Option<String>,
406) -> Option<SourceOutput> {
407    outputs.into_iter().find(|output| &output.port == port)
408}
409
410impl ComponentContainer for Config {
411    fn schema_enabled(&self) -> bool {
412        self.schema.enabled
413    }
414
415    fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
416        self.source(key)
417            .map(|source| source.inner.outputs(self.schema.log_namespace()))
418    }
419
420    fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
421        self.transform(key).map(|transform| &transform.inputs[..])
422    }
423
424    fn transform_outputs(
425        &self,
426        key: &ComponentKey,
427        enrichment_tables: vector_lib::enrichment::TableRegistry,
428        input_definitions: &[(OutputId, Definition)],
429    ) -> Option<Vec<TransformOutput>> {
430        self.transform(key).map(|source| {
431            source.inner.outputs(
432                enrichment_tables,
433                input_definitions,
434                self.schema.log_namespace(),
435            )
436        })
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use std::collections::HashMap;
443
444    use indexmap::IndexMap;
445    use similar_asserts::assert_eq;
446    use vector_lib::config::{DataType, SourceOutput, TransformOutput};
447    use vector_lib::lookup::owned_value_path;
448    use vrl::value::Kind;
449
450    use super::*;
451
452    #[test]
453    fn test_expanded_definition() {
454        struct TestCase {
455            inputs: Vec<(&'static str, Option<String>)>,
456            sources: IndexMap<&'static str, Vec<SourceOutput>>,
457            transforms: IndexMap<&'static str, (Vec<OutputId>, Vec<TransformOutput>)>,
458            want: Vec<(OutputId, Definition)>,
459        }
460
461        impl ComponentContainer for TestCase {
462            fn schema_enabled(&self) -> bool {
463                true
464            }
465
466            fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
467                self.sources.get(key.id()).cloned()
468            }
469
470            fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
471                self.transforms.get(key.id()).map(|v| v.0.as_slice())
472            }
473
474            fn transform_outputs(
475                &self,
476                key: &ComponentKey,
477                _: vector_lib::enrichment::TableRegistry,
478                _: &[(OutputId, Definition)],
479            ) -> Option<Vec<TransformOutput>> {
480                self.transforms.get(key.id()).cloned().map(|v| v.1)
481            }
482        }
483
484        for (title, case) in HashMap::from([
485            (
486                "no inputs",
487                TestCase {
488                    inputs: vec![],
489                    sources: IndexMap::default(),
490                    transforms: IndexMap::default(),
491                    want: vec![],
492                },
493            ),
494            (
495                "single input, source with default schema",
496                TestCase {
497                    inputs: vec![("foo", None)],
498                    sources: IndexMap::from([(
499                        "foo",
500                        vec![SourceOutput::new_maybe_logs(
501                            DataType::all_bits(),
502                            Definition::default_legacy_namespace(),
503                        )],
504                    )]),
505                    transforms: IndexMap::default(),
506                    want: vec![("foo".into(), Definition::default_legacy_namespace())],
507                },
508            ),
509            (
510                "single input, source with schema",
511                TestCase {
512                    inputs: vec![("source-foo", None)],
513                    sources: IndexMap::from([(
514                        "source-foo",
515                        vec![SourceOutput::new_maybe_logs(
516                            DataType::all_bits(),
517                            Definition::empty_legacy_namespace().with_event_field(
518                                &owned_value_path!("foo"),
519                                Kind::integer().or_bytes(),
520                                Some("foo bar"),
521                            ),
522                        )],
523                    )]),
524                    transforms: IndexMap::default(),
525                    want: vec![(
526                        "source-foo".into(),
527                        Definition::empty_legacy_namespace().with_event_field(
528                            &owned_value_path!("foo"),
529                            Kind::integer().or_bytes(),
530                            Some("foo bar"),
531                        ),
532                    )],
533                },
534            ),
535            (
536                "multiple inputs, sources with schema",
537                TestCase {
538                    inputs: vec![("source-foo", None), ("source-bar", None)],
539                    sources: IndexMap::from([
540                        (
541                            "source-foo",
542                            vec![SourceOutput::new_maybe_logs(
543                                DataType::all_bits(),
544                                Definition::empty_legacy_namespace().with_event_field(
545                                    &owned_value_path!("foo"),
546                                    Kind::integer().or_bytes(),
547                                    Some("foo bar"),
548                                ),
549                            )],
550                        ),
551                        (
552                            "source-bar",
553                            vec![SourceOutput::new_maybe_logs(
554                                DataType::all_bits(),
555                                Definition::empty_legacy_namespace().with_event_field(
556                                    &owned_value_path!("foo"),
557                                    Kind::timestamp(),
558                                    Some("baz qux"),
559                                ),
560                            )],
561                        ),
562                    ]),
563                    transforms: IndexMap::default(),
564                    want: vec![
565                        (
566                            "source-foo".into(),
567                            Definition::empty_legacy_namespace().with_event_field(
568                                &owned_value_path!("foo"),
569                                Kind::integer().or_bytes(),
570                                Some("foo bar"),
571                            ),
572                        ),
573                        (
574                            "source-bar".into(),
575                            Definition::empty_legacy_namespace().with_event_field(
576                                &owned_value_path!("foo"),
577                                Kind::timestamp(),
578                                Some("baz qux"),
579                            ),
580                        ),
581                    ],
582                },
583            ),
584            (
585                "transform overrides source",
586                TestCase {
587                    inputs: vec![("source-bar", None), ("transform-baz", None)],
588                    sources: IndexMap::from([
589                        (
590                            "source-foo",
591                            vec![SourceOutput::new_maybe_logs(
592                                DataType::all_bits(),
593                                Definition::empty_legacy_namespace().with_event_field(
594                                    &owned_value_path!("foo"),
595                                    Kind::boolean(),
596                                    Some("foo"),
597                                ),
598                            )],
599                        ),
600                        (
601                            "source-bar",
602                            vec![SourceOutput::new_maybe_logs(
603                                DataType::all_bits(),
604                                Definition::empty_legacy_namespace().with_event_field(
605                                    &owned_value_path!("bar"),
606                                    Kind::integer(),
607                                    Some("bar"),
608                                ),
609                            )],
610                        ),
611                    ]),
612                    transforms: IndexMap::from([(
613                        "transform-baz",
614                        (
615                            vec![OutputId::from("source-foo")],
616                            vec![TransformOutput::new(
617                                DataType::all_bits(),
618                                [(
619                                    "source-foo".into(),
620                                    Definition::empty_legacy_namespace().with_event_field(
621                                        &owned_value_path!("baz"),
622                                        Kind::regex(),
623                                        Some("baz"),
624                                    ),
625                                )]
626                                .into(),
627                            )],
628                        ),
629                    )]),
630                    want: vec![
631                        (
632                            "source-bar".into(),
633                            Definition::empty_legacy_namespace().with_event_field(
634                                &owned_value_path!("bar"),
635                                Kind::integer(),
636                                Some("bar"),
637                            ),
638                        ),
639                        (
640                            "transform-baz".into(),
641                            Definition::empty_legacy_namespace().with_event_field(
642                                &owned_value_path!("baz"),
643                                Kind::regex(),
644                                Some("baz"),
645                            ),
646                        ),
647                    ],
648                },
649            ),
650            //   Source 1 -> Transform 1                ->
651            //   Source 2 -> Transform 2                ->
652            //            -> Transform 3 ->
653            //            -> Transform 4 -> Transform 5 -> Sink 1
654            (
655                "complex topology",
656                TestCase {
657                    inputs: vec![
658                        ("Transform 1", None),
659                        ("Transform 2", None),
660                        ("Transform 5", None),
661                    ],
662                    sources: IndexMap::from([
663                        (
664                            "Source 1",
665                            vec![SourceOutput::new_maybe_logs(
666                                DataType::all_bits(),
667                                Definition::empty_legacy_namespace().with_event_field(
668                                    &owned_value_path!("source-1"),
669                                    Kind::boolean(),
670                                    Some("source-1"),
671                                ),
672                            )],
673                        ),
674                        (
675                            "Source 2",
676                            vec![SourceOutput::new_maybe_logs(
677                                DataType::all_bits(),
678                                Definition::empty_legacy_namespace().with_event_field(
679                                    &owned_value_path!("source-2"),
680                                    Kind::integer(),
681                                    Some("source-2"),
682                                ),
683                            )],
684                        ),
685                    ]),
686                    transforms: IndexMap::from([
687                        (
688                            "Transform 1",
689                            (
690                                vec![OutputId::from("Source 1")],
691                                vec![TransformOutput::new(
692                                    DataType::all_bits(),
693                                    [(
694                                        "Source 1".into(),
695                                        Definition::empty_legacy_namespace().with_event_field(
696                                            &owned_value_path!("transform-1"),
697                                            Kind::regex(),
698                                            None,
699                                        ),
700                                    )]
701                                    .into(),
702                                )],
703                            ),
704                        ),
705                        (
706                            "Transform 2",
707                            (
708                                vec![OutputId::from("Source 2")],
709                                vec![TransformOutput::new(
710                                    DataType::all_bits(),
711                                    [(
712                                        "Source 2".into(),
713                                        Definition::empty_legacy_namespace().with_event_field(
714                                            &owned_value_path!("transform-2"),
715                                            Kind::float().or_null(),
716                                            Some("transform-2"),
717                                        ),
718                                    )]
719                                    .into(),
720                                )],
721                            ),
722                        ),
723                        (
724                            "Transform 3",
725                            (
726                                vec![OutputId::from("Source 2")],
727                                vec![TransformOutput::new(
728                                    DataType::all_bits(),
729                                    [(
730                                        "Source 2".into(),
731                                        Definition::empty_legacy_namespace().with_event_field(
732                                            &owned_value_path!("transform-3"),
733                                            Kind::integer(),
734                                            Some("transform-3"),
735                                        ),
736                                    )]
737                                    .into(),
738                                )],
739                            ),
740                        ),
741                        (
742                            "Transform 4",
743                            (
744                                vec![OutputId::from("Source 2")],
745                                vec![TransformOutput::new(
746                                    DataType::all_bits(),
747                                    [(
748                                        "Source 2".into(),
749                                        Definition::empty_legacy_namespace().with_event_field(
750                                            &owned_value_path!("transform-4"),
751                                            Kind::timestamp().or_bytes(),
752                                            Some("transform-4"),
753                                        ),
754                                    )]
755                                    .into(),
756                                )],
757                            ),
758                        ),
759                        (
760                            "Transform 5",
761                            (
762                                vec![OutputId::from("Transform 3"), OutputId::from("Transform 4")],
763                                vec![TransformOutput::new(
764                                    DataType::all_bits(),
765                                    [(
766                                        "Transform 3".into(),
767                                        Definition::empty_legacy_namespace().with_event_field(
768                                            &owned_value_path!("transform-5"),
769                                            Kind::boolean(),
770                                            Some("transform-5"),
771                                        ),
772                                    )]
773                                    .into(),
774                                )],
775                            ),
776                        ),
777                    ]),
778                    want: vec![
779                        // Pipeline 1
780                        (
781                            "Transform 1".into(),
782                            Definition::empty_legacy_namespace().with_event_field(
783                                &owned_value_path!("transform-1"),
784                                Kind::regex(),
785                                None,
786                            ),
787                        ),
788                        // Pipeline 2
789                        (
790                            "Transform 2".into(),
791                            Definition::empty_legacy_namespace().with_event_field(
792                                &owned_value_path!("transform-2"),
793                                Kind::float().or_null(),
794                                Some("transform-2"),
795                            ),
796                        ),
797                        // Pipeline 3
798                        (
799                            "Transform 5".into(),
800                            Definition::empty_legacy_namespace().with_event_field(
801                                &owned_value_path!("transform-5"),
802                                Kind::boolean(),
803                                Some("transform-5"),
804                            ),
805                        ),
806                    ],
807                },
808            ),
809        ]) {
810            let inputs = case
811                .inputs
812                .iter()
813                .cloned()
814                .map(|(key, port)| OutputId {
815                    component: key.into(),
816                    port,
817                })
818                .collect::<Vec<_>>();
819
820            let got = expanded_definitions(
821                vector_lib::enrichment::TableRegistry::default(),
822                &inputs,
823                &case,
824                &mut HashMap::default(),
825            )
826            .unwrap();
827            assert_eq!(got, case.want, "{}", title);
828        }
829    }
830}