vector/topology/
schema.rs

1use std::collections::HashMap;
2
3use snafu::Snafu;
4use vector_lib::config::SourceOutput;
5
6pub(super) use crate::schema::Definition;
7use crate::{
8    config::{ComponentKey, Config, OutputId, SinkOuter, TransformOutput},
9    topology,
10};
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14    ContainsNever,
15}
16
17/// The cache is used whilst building up the topology.
18/// TODO: Describe more, especially why we have a bool in the key.
19type Cache = HashMap<(bool, Vec<OutputId>), Vec<(OutputId, Definition)>>;
20
21pub fn possible_definitions(
22    inputs: &[OutputId],
23    config: &dyn ComponentContainer,
24    enrichment_tables: vector_lib::enrichment::TableRegistry,
25    cache: &mut Cache,
26) -> Result<Vec<(OutputId, Definition)>, Error> {
27    if inputs.is_empty() {
28        return Ok(vec![]);
29    }
30
31    // Try to get the definition from the cache.
32    if let Some(definition) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
33        return Ok(definition.clone());
34    }
35
36    let mut definitions = Vec::new();
37
38    for input in inputs {
39        let key = &input.component;
40
41        // If the input is a source, the output is merged into the top-level schema.
42        if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
43            let mut source_definition = input.with_definitions(
44                maybe_output
45                    .unwrap_or_else(|| {
46                        unreachable!(
47                            "source output mis-configured - output for port {:?} missing",
48                            &input.port
49                        )
50                    })
51                    .schema_definition(config.schema_enabled()),
52            );
53
54            if contains_never(&source_definition) {
55                return Err(Error::ContainsNever);
56            }
57
58            definitions.append(&mut source_definition);
59        }
60
61        // If the input is a transform, the output is merged into the top-level schema
62        if let Some(inputs) = config.transform_inputs(key) {
63            let input_definitions =
64                possible_definitions(inputs, config, enrichment_tables.clone(), cache)?;
65
66            let mut transform_definition = input.with_definitions(
67                config
68                    .transform_output_for_port(
69                        key,
70                        &input.port,
71                        enrichment_tables.clone(),
72                        &input_definitions,
73                    )
74                    .expect("transform must exist - already found inputs")
75                    .unwrap_or_else(|| {
76                        unreachable!(
77                            "transform output mis-configured - output for port {:?} missing",
78                            &input.port
79                        )
80                    })
81                    .schema_definitions(config.schema_enabled())
82                    .values()
83                    .cloned(),
84            );
85
86            if contains_never(&transform_definition) {
87                return Err(Error::ContainsNever);
88            }
89
90            definitions.append(&mut transform_definition);
91        }
92    }
93
94    Ok(definitions)
95}
96
97/// Get a list of definitions from individual pipelines feeding into a component.
98///
99/// For example, given the following topology:
100///
101///   Source 1 -> Transform 1                ->
102///   Source 2 -> Transform 2                ->
103///            -> Transform 3 ->
104///            -> Transform 4 -> Transform 5 -> Sink 1
105///
106/// In this example, if we ask for the definitions received by `Sink 1`, we'd receive four
107/// definitions, one for each route leading into `Sink 1`, with the route going through `Transform
108/// 5` being expanded into two individual routes (So1 -> T3 -> T5 -> Si1 AND So1 -> T4 -> T5 ->
109/// Si1).
110pub(super) fn expanded_definitions(
111    enrichment_tables: vector_lib::enrichment::TableRegistry,
112    inputs: &[OutputId],
113    config: &dyn ComponentContainer,
114    cache: &mut Cache,
115) -> Result<Vec<(OutputId, Definition)>, Error> {
116    // Try to get the definition from the cache.
117    if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
118        return Ok(definitions.clone());
119    }
120
121    let mut definitions: Vec<(OutputId, Definition)> = vec![];
122    let mut merged_cache = HashMap::default();
123
124    for input in inputs {
125        let key = &input.component;
126
127        // If the input is a source, it'll always have schema definition attached, even if it is an
128        // "empty" schema.
129        // We take the full schema definition regardless of `config.schema_enabled()`, the assumption
130        // being that the receiving component will not be validating the schema if schema checking is
131        // not enabled.
132        if let Some(outputs) = config.source_outputs(key) {
133            // After getting the source matching to the given input, we need to further narrow the
134            // actual output of the source feeding into this input, and then get the definition
135            // belonging to that output.
136            let mut source_definitions =
137                outputs
138                    .into_iter()
139                    .find_map(|output| {
140                        if output.port == input.port {
141                            Some(input.with_definitions(
142                                output.schema_definition(config.schema_enabled()),
143                            ))
144                        } else {
145                            None
146                        }
147                    })
148                    .unwrap_or_else(|| {
149                        // If we find no match, it means the topology is misconfigured. This is a fatal
150                        // error, but other parts of the topology builder deal with this state.
151                        unreachable!("source output mis-configured")
152                    });
153
154            if contains_never(&source_definitions) {
155                return Err(Error::ContainsNever);
156            }
157
158            definitions.append(&mut source_definitions);
159
160        // A transform can receive from multiple inputs, and each input needs to be expanded to
161        // a new pipeline.
162        } else if let Some(inputs) = config.transform_inputs(key) {
163            let input_definitions =
164                possible_definitions(inputs, config, enrichment_tables.clone(), &mut merged_cache)?;
165
166            let mut transform_definition = config
167                .transform_outputs(key, enrichment_tables.clone(), &input_definitions)
168                .expect("already found inputs")
169                .iter()
170                .find_map(|output| {
171                    if output.port == input.port {
172                        Some(
173                            input.with_definitions(
174                                output
175                                    .schema_definitions(config.schema_enabled())
176                                    .values()
177                                    .cloned(),
178                            ),
179                        )
180                    } else {
181                        None
182                    }
183                })
184                // If we find no match, it means the topology is misconfigured. This is a fatal
185                // error, but other parts of the topology builder deal with this state.
186                .expect("transform output misconfigured");
187
188            if contains_never(&transform_definition) {
189                return Err(Error::ContainsNever);
190            }
191
192            // Append whatever number of additional pipelines we created to the existing
193            // pipeline definitions.
194            definitions.append(&mut transform_definition);
195        }
196    }
197
198    cache.insert(
199        (config.schema_enabled(), inputs.to_vec()),
200        definitions.clone(),
201    );
202
203    Ok(definitions)
204}
205
206/// Returns a list of definitions from the given inputs.
207/// Errors if any of the definitions are [`Kind::never`] implying that
208/// an error condition has been reached.
209pub(crate) fn input_definitions(
210    inputs: &[OutputId],
211    config: &Config,
212    enrichment_tables: vector_lib::enrichment::TableRegistry,
213    cache: &mut Cache,
214) -> Result<Vec<(OutputId, Definition)>, Error> {
215    if inputs.is_empty() {
216        return Ok(vec![]);
217    }
218
219    if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
220        return Ok(definitions.clone());
221    }
222
223    let mut definitions = Vec::new();
224
225    for input in inputs {
226        let key = &input.component;
227
228        // If the input is a source we retrieve the definitions from the source
229        // (there should only be one) and add it to the return.
230        if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
231            let mut source_definitions = input.with_definitions(
232                maybe_output
233                    .unwrap_or_else(|| {
234                        unreachable!(
235                            "source output mis-configured - output for port {:?} missing",
236                            &input.port
237                        )
238                    })
239                    .schema_definition(config.schema_enabled()),
240            );
241
242            if contains_never(&source_definitions) {
243                return Err(Error::ContainsNever);
244            }
245
246            definitions.append(&mut source_definitions);
247        }
248
249        // If the input is a transform we recurse to the upstream components to retrieve
250        // their definitions and pass it through the transform to get the new definitions.
251        if let Some(inputs) = config.transform_inputs(key) {
252            let transform_definitions =
253                input_definitions(inputs, config, enrichment_tables.clone(), cache)?;
254
255            if contains_never(&transform_definitions) {
256                return Err(Error::ContainsNever);
257            }
258
259            let mut transform_definitions = input.with_definitions(
260                config
261                    .transform_output_for_port(
262                        key,
263                        &input.port,
264                        enrichment_tables.clone(),
265                        &transform_definitions,
266                    )
267                    .expect("transform must exist")
268                    .unwrap_or_else(|| {
269                        unreachable!(
270                            "transform output mis-configured - output for port {:?} missing",
271                            &input.port
272                        )
273                    })
274                    .schema_definitions(config.schema_enabled())
275                    .values()
276                    .cloned(),
277            );
278
279            if contains_never(&transform_definitions) {
280                return Err(Error::ContainsNever);
281            }
282
283            definitions.append(&mut transform_definitions);
284        }
285    }
286
287    Ok(definitions)
288}
289
290/// Checks if any of the definitions in the list contain `Kind::never()`. This
291/// implies the definition cannot contain any output and thus we should stop
292/// processing further.
293fn contains_never(transform_definitions: &[(OutputId, Definition)]) -> bool {
294    transform_definitions
295        .iter()
296        .any(|(_, definition)| definition.event_kind().is_never())
297}
298
299pub(super) fn validate_sink_expectations(
300    key: &ComponentKey,
301    sink: &SinkOuter<OutputId>,
302    config: &topology::Config,
303    enrichment_tables: vector_lib::enrichment::TableRegistry,
304) -> Result<(), Vec<String>> {
305    let mut errors = vec![];
306
307    // Get the schema against which we need to validate the schemas of the components feeding into
308    // this sink.
309    let input = sink.inner.input();
310    let requirement = input.schema_requirement();
311
312    // Get all pipeline definitions feeding into this sink.
313    let mut cache = HashMap::default();
314    let definitions =
315        match expanded_definitions(enrichment_tables, &sink.inputs, config, &mut cache) {
316            Ok(definitions) => definitions,
317            Err(err) => {
318                errors.push(err.to_string());
319                return Err(errors);
320            }
321        };
322
323    // Validate each individual definition against the sink requirement.
324    for (_output, definition) in definitions {
325        if let Err(err) = requirement.validate(&definition, config.schema.validation) {
326            errors.append(
327                &mut err
328                    .errors()
329                    .iter()
330                    .cloned()
331                    .map(|err| format!("schema error in component {key}: {err}"))
332                    .collect(),
333            );
334        }
335    }
336
337    if !errors.is_empty() {
338        return Err(errors);
339    }
340
341    Ok(())
342}
343
344pub trait ComponentContainer {
345    fn schema_enabled(&self) -> bool;
346
347    fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>>;
348
349    fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]>;
350
351    fn transform_outputs(
352        &self,
353        key: &ComponentKey,
354        enrichment_tables: vector_lib::enrichment::TableRegistry,
355        input_definitions: &[(OutputId, Definition)],
356    ) -> Option<Vec<TransformOutput>>;
357
358    /// Gets the transform output for the given port.
359    ///
360    /// Returns Err(()) if there is no transform with the given key
361    /// Returns Some(None) if the source does not have an output for the port given
362    #[allow(clippy::result_unit_err)]
363    fn transform_output_for_port(
364        &self,
365        key: &ComponentKey,
366        port: &Option<String>,
367        enrichment_tables: vector_lib::enrichment::TableRegistry,
368        input_definitions: &[(OutputId, Definition)],
369    ) -> Result<Option<TransformOutput>, ()> {
370        if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) {
371            Ok(get_output_for_port(outputs, port))
372        } else {
373            Err(())
374        }
375    }
376
377    /// Gets the source output for the given port.
378    ///
379    /// Returns Err(()) if there is no source with the given key
380    /// Returns Some(None) if the source does not have an output for the port given
381    #[allow(clippy::result_unit_err)]
382    fn source_output_for_port(
383        &self,
384        key: &ComponentKey,
385        port: &Option<String>,
386    ) -> Result<Option<SourceOutput>, ()> {
387        if let Some(outputs) = self.source_outputs(key) {
388            Ok(get_source_output_for_port(outputs, port))
389        } else {
390            Err(())
391        }
392    }
393}
394
395fn get_output_for_port(
396    outputs: Vec<TransformOutput>,
397    port: &Option<String>,
398) -> Option<TransformOutput> {
399    outputs.into_iter().find(|output| &output.port == port)
400}
401
402fn get_source_output_for_port(
403    outputs: Vec<SourceOutput>,
404    port: &Option<String>,
405) -> Option<SourceOutput> {
406    outputs.into_iter().find(|output| &output.port == port)
407}
408
409impl ComponentContainer for Config {
410    fn schema_enabled(&self) -> bool {
411        self.schema.enabled
412    }
413
414    fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
415        self.source(key)
416            .map(|source| source.inner.outputs(self.schema.log_namespace()))
417    }
418
419    fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
420        self.transform(key).map(|transform| &transform.inputs[..])
421    }
422
423    fn transform_outputs(
424        &self,
425        key: &ComponentKey,
426        enrichment_tables: vector_lib::enrichment::TableRegistry,
427        input_definitions: &[(OutputId, Definition)],
428    ) -> Option<Vec<TransformOutput>> {
429        self.transform(key).map(|source| {
430            source.inner.outputs(
431                enrichment_tables,
432                input_definitions,
433                self.schema.log_namespace(),
434            )
435        })
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use std::collections::HashMap;
442
443    use indexmap::IndexMap;
444    use similar_asserts::assert_eq;
445    use vector_lib::{
446        config::{DataType, SourceOutput, TransformOutput},
447        lookup::owned_value_path,
448    };
449    use vrl::value::Kind;
450
451    use super::*;
452
453    #[test]
454    fn test_expanded_definition() {
455        struct TestCase {
456            inputs: Vec<(&'static str, Option<String>)>,
457            sources: IndexMap<&'static str, Vec<SourceOutput>>,
458            transforms: IndexMap<&'static str, (Vec<OutputId>, Vec<TransformOutput>)>,
459            want: Vec<(OutputId, Definition)>,
460        }
461
462        impl ComponentContainer for TestCase {
463            fn schema_enabled(&self) -> bool {
464                true
465            }
466
467            fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
468                self.sources.get(key.id()).cloned()
469            }
470
471            fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
472                self.transforms.get(key.id()).map(|v| v.0.as_slice())
473            }
474
475            fn transform_outputs(
476                &self,
477                key: &ComponentKey,
478                _: vector_lib::enrichment::TableRegistry,
479                _: &[(OutputId, Definition)],
480            ) -> Option<Vec<TransformOutput>> {
481                self.transforms.get(key.id()).cloned().map(|v| v.1)
482            }
483        }
484
485        for (title, case) in HashMap::from([
486            (
487                "no inputs",
488                TestCase {
489                    inputs: vec![],
490                    sources: IndexMap::default(),
491                    transforms: IndexMap::default(),
492                    want: vec![],
493                },
494            ),
495            (
496                "single input, source with default schema",
497                TestCase {
498                    inputs: vec![("foo", None)],
499                    sources: IndexMap::from([(
500                        "foo",
501                        vec![SourceOutput::new_maybe_logs(
502                            DataType::all_bits(),
503                            Definition::default_legacy_namespace(),
504                        )],
505                    )]),
506                    transforms: IndexMap::default(),
507                    want: vec![("foo".into(), Definition::default_legacy_namespace())],
508                },
509            ),
510            (
511                "single input, source with schema",
512                TestCase {
513                    inputs: vec![("source-foo", None)],
514                    sources: IndexMap::from([(
515                        "source-foo",
516                        vec![SourceOutput::new_maybe_logs(
517                            DataType::all_bits(),
518                            Definition::empty_legacy_namespace().with_event_field(
519                                &owned_value_path!("foo"),
520                                Kind::integer().or_bytes(),
521                                Some("foo bar"),
522                            ),
523                        )],
524                    )]),
525                    transforms: IndexMap::default(),
526                    want: vec![(
527                        "source-foo".into(),
528                        Definition::empty_legacy_namespace().with_event_field(
529                            &owned_value_path!("foo"),
530                            Kind::integer().or_bytes(),
531                            Some("foo bar"),
532                        ),
533                    )],
534                },
535            ),
536            (
537                "multiple inputs, sources with schema",
538                TestCase {
539                    inputs: vec![("source-foo", None), ("source-bar", None)],
540                    sources: IndexMap::from([
541                        (
542                            "source-foo",
543                            vec![SourceOutput::new_maybe_logs(
544                                DataType::all_bits(),
545                                Definition::empty_legacy_namespace().with_event_field(
546                                    &owned_value_path!("foo"),
547                                    Kind::integer().or_bytes(),
548                                    Some("foo bar"),
549                                ),
550                            )],
551                        ),
552                        (
553                            "source-bar",
554                            vec![SourceOutput::new_maybe_logs(
555                                DataType::all_bits(),
556                                Definition::empty_legacy_namespace().with_event_field(
557                                    &owned_value_path!("foo"),
558                                    Kind::timestamp(),
559                                    Some("baz qux"),
560                                ),
561                            )],
562                        ),
563                    ]),
564                    transforms: IndexMap::default(),
565                    want: vec![
566                        (
567                            "source-foo".into(),
568                            Definition::empty_legacy_namespace().with_event_field(
569                                &owned_value_path!("foo"),
570                                Kind::integer().or_bytes(),
571                                Some("foo bar"),
572                            ),
573                        ),
574                        (
575                            "source-bar".into(),
576                            Definition::empty_legacy_namespace().with_event_field(
577                                &owned_value_path!("foo"),
578                                Kind::timestamp(),
579                                Some("baz qux"),
580                            ),
581                        ),
582                    ],
583                },
584            ),
585            (
586                "transform overrides source",
587                TestCase {
588                    inputs: vec![("source-bar", None), ("transform-baz", None)],
589                    sources: IndexMap::from([
590                        (
591                            "source-foo",
592                            vec![SourceOutput::new_maybe_logs(
593                                DataType::all_bits(),
594                                Definition::empty_legacy_namespace().with_event_field(
595                                    &owned_value_path!("foo"),
596                                    Kind::boolean(),
597                                    Some("foo"),
598                                ),
599                            )],
600                        ),
601                        (
602                            "source-bar",
603                            vec![SourceOutput::new_maybe_logs(
604                                DataType::all_bits(),
605                                Definition::empty_legacy_namespace().with_event_field(
606                                    &owned_value_path!("bar"),
607                                    Kind::integer(),
608                                    Some("bar"),
609                                ),
610                            )],
611                        ),
612                    ]),
613                    transforms: IndexMap::from([(
614                        "transform-baz",
615                        (
616                            vec![OutputId::from("source-foo")],
617                            vec![TransformOutput::new(
618                                DataType::all_bits(),
619                                [(
620                                    "source-foo".into(),
621                                    Definition::empty_legacy_namespace().with_event_field(
622                                        &owned_value_path!("baz"),
623                                        Kind::regex(),
624                                        Some("baz"),
625                                    ),
626                                )]
627                                .into(),
628                            )],
629                        ),
630                    )]),
631                    want: vec![
632                        (
633                            "source-bar".into(),
634                            Definition::empty_legacy_namespace().with_event_field(
635                                &owned_value_path!("bar"),
636                                Kind::integer(),
637                                Some("bar"),
638                            ),
639                        ),
640                        (
641                            "transform-baz".into(),
642                            Definition::empty_legacy_namespace().with_event_field(
643                                &owned_value_path!("baz"),
644                                Kind::regex(),
645                                Some("baz"),
646                            ),
647                        ),
648                    ],
649                },
650            ),
651            //   Source 1 -> Transform 1                ->
652            //   Source 2 -> Transform 2                ->
653            //            -> Transform 3 ->
654            //            -> Transform 4 -> Transform 5 -> Sink 1
655            (
656                "complex topology",
657                TestCase {
658                    inputs: vec![
659                        ("Transform 1", None),
660                        ("Transform 2", None),
661                        ("Transform 5", None),
662                    ],
663                    sources: IndexMap::from([
664                        (
665                            "Source 1",
666                            vec![SourceOutput::new_maybe_logs(
667                                DataType::all_bits(),
668                                Definition::empty_legacy_namespace().with_event_field(
669                                    &owned_value_path!("source-1"),
670                                    Kind::boolean(),
671                                    Some("source-1"),
672                                ),
673                            )],
674                        ),
675                        (
676                            "Source 2",
677                            vec![SourceOutput::new_maybe_logs(
678                                DataType::all_bits(),
679                                Definition::empty_legacy_namespace().with_event_field(
680                                    &owned_value_path!("source-2"),
681                                    Kind::integer(),
682                                    Some("source-2"),
683                                ),
684                            )],
685                        ),
686                    ]),
687                    transforms: IndexMap::from([
688                        (
689                            "Transform 1",
690                            (
691                                vec![OutputId::from("Source 1")],
692                                vec![TransformOutput::new(
693                                    DataType::all_bits(),
694                                    [(
695                                        "Source 1".into(),
696                                        Definition::empty_legacy_namespace().with_event_field(
697                                            &owned_value_path!("transform-1"),
698                                            Kind::regex(),
699                                            None,
700                                        ),
701                                    )]
702                                    .into(),
703                                )],
704                            ),
705                        ),
706                        (
707                            "Transform 2",
708                            (
709                                vec![OutputId::from("Source 2")],
710                                vec![TransformOutput::new(
711                                    DataType::all_bits(),
712                                    [(
713                                        "Source 2".into(),
714                                        Definition::empty_legacy_namespace().with_event_field(
715                                            &owned_value_path!("transform-2"),
716                                            Kind::float().or_null(),
717                                            Some("transform-2"),
718                                        ),
719                                    )]
720                                    .into(),
721                                )],
722                            ),
723                        ),
724                        (
725                            "Transform 3",
726                            (
727                                vec![OutputId::from("Source 2")],
728                                vec![TransformOutput::new(
729                                    DataType::all_bits(),
730                                    [(
731                                        "Source 2".into(),
732                                        Definition::empty_legacy_namespace().with_event_field(
733                                            &owned_value_path!("transform-3"),
734                                            Kind::integer(),
735                                            Some("transform-3"),
736                                        ),
737                                    )]
738                                    .into(),
739                                )],
740                            ),
741                        ),
742                        (
743                            "Transform 4",
744                            (
745                                vec![OutputId::from("Source 2")],
746                                vec![TransformOutput::new(
747                                    DataType::all_bits(),
748                                    [(
749                                        "Source 2".into(),
750                                        Definition::empty_legacy_namespace().with_event_field(
751                                            &owned_value_path!("transform-4"),
752                                            Kind::timestamp().or_bytes(),
753                                            Some("transform-4"),
754                                        ),
755                                    )]
756                                    .into(),
757                                )],
758                            ),
759                        ),
760                        (
761                            "Transform 5",
762                            (
763                                vec![OutputId::from("Transform 3"), OutputId::from("Transform 4")],
764                                vec![TransformOutput::new(
765                                    DataType::all_bits(),
766                                    [(
767                                        "Transform 3".into(),
768                                        Definition::empty_legacy_namespace().with_event_field(
769                                            &owned_value_path!("transform-5"),
770                                            Kind::boolean(),
771                                            Some("transform-5"),
772                                        ),
773                                    )]
774                                    .into(),
775                                )],
776                            ),
777                        ),
778                    ]),
779                    want: vec![
780                        // Pipeline 1
781                        (
782                            "Transform 1".into(),
783                            Definition::empty_legacy_namespace().with_event_field(
784                                &owned_value_path!("transform-1"),
785                                Kind::regex(),
786                                None,
787                            ),
788                        ),
789                        // Pipeline 2
790                        (
791                            "Transform 2".into(),
792                            Definition::empty_legacy_namespace().with_event_field(
793                                &owned_value_path!("transform-2"),
794                                Kind::float().or_null(),
795                                Some("transform-2"),
796                            ),
797                        ),
798                        // Pipeline 3
799                        (
800                            "Transform 5".into(),
801                            Definition::empty_legacy_namespace().with_event_field(
802                                &owned_value_path!("transform-5"),
803                                Kind::boolean(),
804                                Some("transform-5"),
805                            ),
806                        ),
807                    ],
808                },
809            ),
810        ]) {
811            let inputs = case
812                .inputs
813                .iter()
814                .cloned()
815                .map(|(key, port)| OutputId {
816                    component: key.into(),
817                    port,
818                })
819                .collect::<Vec<_>>();
820
821            let got = expanded_definitions(
822                vector_lib::enrichment::TableRegistry::default(),
823                &inputs,
824                &case,
825                &mut HashMap::default(),
826            )
827            .unwrap();
828            assert_eq!(got, case.want, "{}", title);
829        }
830    }
831}