vector/topology/
schema.rs

1use std::collections::HashMap;
2
3use snafu::Snafu;
4use vector_lib::config::SourceOutput;
5
6pub(super) use crate::schema::Definition;
7use crate::{
8    config::{ComponentKey, Config, OutputId, SinkOuter, TransformContext, TransformOutput},
9    topology,
10};
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14    ContainsNever,
15}
16
17/// The cache is used whilst building up the topology.
18/// TODO: Describe more, especially why we have a bool in the key.
19type Cache = HashMap<(bool, Vec<OutputId>), Vec<(OutputId, Definition)>>;
20
21pub fn possible_definitions(
22    inputs: &[OutputId],
23    config: &dyn ComponentContainer,
24    enrichment_tables: vector_lib::enrichment::TableRegistry,
25    cache: &mut Cache,
26) -> Result<Vec<(OutputId, Definition)>, Error> {
27    if inputs.is_empty() {
28        return Ok(vec![]);
29    }
30
31    // Try to get the definition from the cache.
32    if let Some(definition) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
33        return Ok(definition.clone());
34    }
35
36    let mut definitions = Vec::new();
37
38    for input in inputs {
39        let key = &input.component;
40
41        // If the input is a source, the output is merged into the top-level schema.
42        if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
43            let mut source_definition = input.with_definitions(
44                maybe_output
45                    .unwrap_or_else(|| {
46                        unreachable!(
47                            "source output mis-configured - output for port {:?} missing",
48                            &input.port
49                        )
50                    })
51                    .schema_definition(config.schema_enabled()),
52            );
53
54            if contains_never(&source_definition) {
55                return Err(Error::ContainsNever);
56            }
57
58            definitions.append(&mut source_definition);
59        }
60
61        // If the input is a transform, the output is merged into the top-level schema
62        if let Some(inputs) = config.transform_inputs(key) {
63            let input_definitions =
64                possible_definitions(inputs, config, enrichment_tables.clone(), cache)?;
65
66            let mut transform_definition = input.with_definitions(
67                config
68                    .transform_output_for_port(
69                        key,
70                        &input.port,
71                        enrichment_tables.clone(),
72                        &input_definitions,
73                    )
74                    .expect("transform must exist - already found inputs")
75                    .unwrap_or_else(|| {
76                        unreachable!(
77                            "transform output mis-configured - output for port {:?} missing",
78                            &input.port
79                        )
80                    })
81                    .schema_definitions(config.schema_enabled())
82                    .values()
83                    .cloned(),
84            );
85
86            if contains_never(&transform_definition) {
87                return Err(Error::ContainsNever);
88            }
89
90            definitions.append(&mut transform_definition);
91        }
92    }
93
94    Ok(definitions)
95}
96
97/// Get a list of definitions from individual pipelines feeding into a component.
98///
99/// For example, given the following topology:
100///
101///   Source 1 -> Transform 1                ->
102///   Source 2 -> Transform 2                ->
103///            -> Transform 3 ->
104///            -> Transform 4 -> Transform 5 -> Sink 1
105///
106/// In this example, if we ask for the definitions received by `Sink 1`, we'd receive four
107/// definitions, one for each route leading into `Sink 1`, with the route going through `Transform
108/// 5` being expanded into two individual routes (So1 -> T3 -> T5 -> Si1 AND So1 -> T4 -> T5 ->
109/// Si1).
110pub(super) fn expanded_definitions(
111    enrichment_tables: vector_lib::enrichment::TableRegistry,
112    inputs: &[OutputId],
113    config: &dyn ComponentContainer,
114    cache: &mut Cache,
115) -> Result<Vec<(OutputId, Definition)>, Error> {
116    // Try to get the definition from the cache.
117    if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
118        return Ok(definitions.clone());
119    }
120
121    let mut definitions: Vec<(OutputId, Definition)> = vec![];
122    let mut merged_cache = HashMap::default();
123
124    for input in inputs {
125        let key = &input.component;
126
127        // If the input is a source, it'll always have schema definition attached, even if it is an
128        // "empty" schema.
129        // We take the full schema definition regardless of `config.schema_enabled()`, the assumption
130        // being that the receiving component will not be validating the schema if schema checking is
131        // not enabled.
132        if let Some(outputs) = config.source_outputs(key) {
133            // After getting the source matching to the given input, we need to further narrow the
134            // actual output of the source feeding into this input, and then get the definition
135            // belonging to that output.
136            let mut source_definitions =
137                outputs
138                    .into_iter()
139                    .find_map(|output| {
140                        if output.port == input.port {
141                            Some(input.with_definitions(
142                                output.schema_definition(config.schema_enabled()),
143                            ))
144                        } else {
145                            None
146                        }
147                    })
148                    .unwrap_or_else(|| {
149                        // If we find no match, it means the topology is misconfigured. This is a fatal
150                        // error, but other parts of the topology builder deal with this state.
151                        unreachable!("source output mis-configured")
152                    });
153
154            if contains_never(&source_definitions) {
155                return Err(Error::ContainsNever);
156            }
157
158            definitions.append(&mut source_definitions);
159
160        // A transform can receive from multiple inputs, and each input needs to be expanded to
161        // a new pipeline.
162        } else if let Some(inputs) = config.transform_inputs(key) {
163            let input_definitions =
164                possible_definitions(inputs, config, enrichment_tables.clone(), &mut merged_cache)?;
165
166            let mut transform_definition = config
167                .transform_outputs(key, enrichment_tables.clone(), &input_definitions)
168                .expect("already found inputs")
169                .iter()
170                .find_map(|output| {
171                    if output.port == input.port {
172                        Some(
173                            input.with_definitions(
174                                output
175                                    .schema_definitions(config.schema_enabled())
176                                    .values()
177                                    .cloned(),
178                            ),
179                        )
180                    } else {
181                        None
182                    }
183                })
184                // If we find no match, it means the topology is misconfigured. This is a fatal
185                // error, but other parts of the topology builder deal with this state.
186                .expect("transform output misconfigured");
187
188            if contains_never(&transform_definition) {
189                return Err(Error::ContainsNever);
190            }
191
192            // Append whatever number of additional pipelines we created to the existing
193            // pipeline definitions.
194            definitions.append(&mut transform_definition);
195        }
196    }
197
198    cache.insert(
199        (config.schema_enabled(), inputs.to_vec()),
200        definitions.clone(),
201    );
202
203    Ok(definitions)
204}
205
206/// Returns a list of definitions from the given inputs.
207/// Errors if any of the definitions are [`Kind::never`] implying that
208/// an error condition has been reached.
209pub(crate) fn input_definitions(
210    inputs: &[OutputId],
211    config: &Config,
212    enrichment_tables: vector_lib::enrichment::TableRegistry,
213    cache: &mut Cache,
214) -> Result<Vec<(OutputId, Definition)>, Error> {
215    if inputs.is_empty() {
216        return Ok(vec![]);
217    }
218
219    if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
220        return Ok(definitions.clone());
221    }
222
223    let mut definitions = Vec::new();
224
225    for input in inputs {
226        let key = &input.component;
227
228        // If the input is a source we retrieve the definitions from the source
229        // (there should only be one) and add it to the return.
230        if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
231            let mut source_definitions = input.with_definitions(
232                maybe_output
233                    .unwrap_or_else(|| {
234                        unreachable!(
235                            "source output mis-configured - output for port {:?} missing",
236                            &input.port
237                        )
238                    })
239                    .schema_definition(config.schema_enabled()),
240            );
241
242            if contains_never(&source_definitions) {
243                return Err(Error::ContainsNever);
244            }
245
246            definitions.append(&mut source_definitions);
247        }
248
249        // If the input is a transform we recurse to the upstream components to retrieve
250        // their definitions and pass it through the transform to get the new definitions.
251        if let Some(inputs) = config.transform_inputs(key) {
252            let transform_definitions =
253                input_definitions(inputs, config, enrichment_tables.clone(), cache)?;
254
255            if contains_never(&transform_definitions) {
256                return Err(Error::ContainsNever);
257            }
258
259            let mut transform_definitions = input.with_definitions(
260                config
261                    .transform_output_for_port(
262                        key,
263                        &input.port,
264                        enrichment_tables.clone(),
265                        &transform_definitions,
266                    )
267                    .expect("transform must exist")
268                    .unwrap_or_else(|| {
269                        unreachable!(
270                            "transform output mis-configured - output for port {:?} missing",
271                            &input.port
272                        )
273                    })
274                    .schema_definitions(config.schema_enabled())
275                    .values()
276                    .cloned(),
277            );
278
279            if contains_never(&transform_definitions) {
280                return Err(Error::ContainsNever);
281            }
282
283            definitions.append(&mut transform_definitions);
284        }
285    }
286
287    Ok(definitions)
288}
289
290/// Checks if any of the definitions in the list contain `Kind::never()`. This
291/// implies the definition cannot contain any output and thus we should stop
292/// processing further.
293fn contains_never(transform_definitions: &[(OutputId, Definition)]) -> bool {
294    transform_definitions
295        .iter()
296        .any(|(_, definition)| definition.event_kind().is_never())
297}
298
299pub(super) fn validate_sink_expectations(
300    key: &ComponentKey,
301    sink: &SinkOuter<OutputId>,
302    config: &topology::Config,
303    enrichment_tables: vector_lib::enrichment::TableRegistry,
304) -> Result<(), Vec<String>> {
305    let mut errors = vec![];
306
307    // Get the schema against which we need to validate the schemas of the components feeding into
308    // this sink.
309    let input = sink.inner.input();
310    let requirement = input.schema_requirement();
311
312    // Get all pipeline definitions feeding into this sink.
313    let mut cache = HashMap::default();
314    let definitions =
315        match expanded_definitions(enrichment_tables, &sink.inputs, config, &mut cache) {
316            Ok(definitions) => definitions,
317            Err(err) => {
318                errors.push(err.to_string());
319                return Err(errors);
320            }
321        };
322
323    // Validate each individual definition against the sink requirement.
324    for (_output, definition) in definitions {
325        if let Err(err) = requirement.validate(&definition, config.schema.validation) {
326            errors.append(
327                &mut err
328                    .errors()
329                    .iter()
330                    .map(|err| format!("schema error in component {key}: {err}"))
331                    .collect(),
332            );
333        }
334    }
335
336    if !errors.is_empty() {
337        return Err(errors);
338    }
339
340    Ok(())
341}
342
343pub trait ComponentContainer {
344    fn schema_enabled(&self) -> bool;
345
346    fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>>;
347
348    fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]>;
349
350    fn transform_outputs(
351        &self,
352        key: &ComponentKey,
353        enrichment_tables: vector_lib::enrichment::TableRegistry,
354        input_definitions: &[(OutputId, Definition)],
355    ) -> Option<Vec<TransformOutput>>;
356
357    /// Gets the transform output for the given port.
358    ///
359    /// Returns Err(()) if there is no transform with the given key
360    /// Returns Some(None) if the source does not have an output for the port given
361    #[allow(clippy::result_unit_err)]
362    fn transform_output_for_port(
363        &self,
364        key: &ComponentKey,
365        port: &Option<String>,
366        enrichment_tables: vector_lib::enrichment::TableRegistry,
367        input_definitions: &[(OutputId, Definition)],
368    ) -> Result<Option<TransformOutput>, ()> {
369        if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) {
370            Ok(get_output_for_port(outputs, port))
371        } else {
372            Err(())
373        }
374    }
375
376    /// Gets the source output for the given port.
377    ///
378    /// Returns Err(()) if there is no source with the given key
379    /// Returns Some(None) if the source does not have an output for the port given
380    #[allow(clippy::result_unit_err)]
381    fn source_output_for_port(
382        &self,
383        key: &ComponentKey,
384        port: &Option<String>,
385    ) -> Result<Option<SourceOutput>, ()> {
386        if let Some(outputs) = self.source_outputs(key) {
387            Ok(get_source_output_for_port(outputs, port))
388        } else {
389            Err(())
390        }
391    }
392}
393
394fn get_output_for_port(
395    outputs: Vec<TransformOutput>,
396    port: &Option<String>,
397) -> Option<TransformOutput> {
398    outputs.into_iter().find(|output| &output.port == port)
399}
400
401fn get_source_output_for_port(
402    outputs: Vec<SourceOutput>,
403    port: &Option<String>,
404) -> Option<SourceOutput> {
405    outputs.into_iter().find(|output| &output.port == port)
406}
407
408impl ComponentContainer for Config {
409    fn schema_enabled(&self) -> bool {
410        self.schema.enabled
411    }
412
413    fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
414        self.source(key)
415            .map(|source| source.inner.outputs(self.schema.log_namespace()))
416    }
417
418    fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
419        self.transform(key).map(|transform| &transform.inputs[..])
420    }
421
422    fn transform_outputs(
423        &self,
424        key: &ComponentKey,
425        enrichment_tables: vector_lib::enrichment::TableRegistry,
426        input_definitions: &[(OutputId, Definition)],
427    ) -> Option<Vec<TransformOutput>> {
428        self.transform(key).map(|source| {
429            source.inner.outputs(
430                &TransformContext {
431                    schema: self.schema,
432                    enrichment_tables,
433                    ..Default::default()
434                },
435                input_definitions,
436            )
437        })
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use std::collections::HashMap;
444
445    use indexmap::IndexMap;
446    use similar_asserts::assert_eq;
447    use vector_lib::{
448        config::{DataType, SourceOutput, TransformOutput},
449        lookup::owned_value_path,
450    };
451    use vrl::value::Kind;
452
453    use super::*;
454
455    #[test]
456    fn test_expanded_definition() {
457        struct TestCase {
458            inputs: Vec<(&'static str, Option<String>)>,
459            sources: IndexMap<&'static str, Vec<SourceOutput>>,
460            transforms: IndexMap<&'static str, (Vec<OutputId>, Vec<TransformOutput>)>,
461            want: Vec<(OutputId, Definition)>,
462        }
463
464        impl ComponentContainer for TestCase {
465            fn schema_enabled(&self) -> bool {
466                true
467            }
468
469            fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
470                self.sources.get(key.id()).cloned()
471            }
472
473            fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
474                self.transforms.get(key.id()).map(|v| v.0.as_slice())
475            }
476
477            fn transform_outputs(
478                &self,
479                key: &ComponentKey,
480                _: vector_lib::enrichment::TableRegistry,
481                _: &[(OutputId, Definition)],
482            ) -> Option<Vec<TransformOutput>> {
483                self.transforms.get(key.id()).cloned().map(|v| v.1)
484            }
485        }
486
487        for (title, case) in HashMap::from([
488            (
489                "no inputs",
490                TestCase {
491                    inputs: vec![],
492                    sources: IndexMap::default(),
493                    transforms: IndexMap::default(),
494                    want: vec![],
495                },
496            ),
497            (
498                "single input, source with default schema",
499                TestCase {
500                    inputs: vec![("foo", None)],
501                    sources: IndexMap::from([(
502                        "foo",
503                        vec![SourceOutput::new_maybe_logs(
504                            DataType::all_bits(),
505                            Definition::default_legacy_namespace(),
506                        )],
507                    )]),
508                    transforms: IndexMap::default(),
509                    want: vec![("foo".into(), Definition::default_legacy_namespace())],
510                },
511            ),
512            (
513                "single input, source with schema",
514                TestCase {
515                    inputs: vec![("source-foo", None)],
516                    sources: IndexMap::from([(
517                        "source-foo",
518                        vec![SourceOutput::new_maybe_logs(
519                            DataType::all_bits(),
520                            Definition::empty_legacy_namespace().with_event_field(
521                                &owned_value_path!("foo"),
522                                Kind::integer().or_bytes(),
523                                Some("foo bar"),
524                            ),
525                        )],
526                    )]),
527                    transforms: IndexMap::default(),
528                    want: vec![(
529                        "source-foo".into(),
530                        Definition::empty_legacy_namespace().with_event_field(
531                            &owned_value_path!("foo"),
532                            Kind::integer().or_bytes(),
533                            Some("foo bar"),
534                        ),
535                    )],
536                },
537            ),
538            (
539                "multiple inputs, sources with schema",
540                TestCase {
541                    inputs: vec![("source-foo", None), ("source-bar", None)],
542                    sources: IndexMap::from([
543                        (
544                            "source-foo",
545                            vec![SourceOutput::new_maybe_logs(
546                                DataType::all_bits(),
547                                Definition::empty_legacy_namespace().with_event_field(
548                                    &owned_value_path!("foo"),
549                                    Kind::integer().or_bytes(),
550                                    Some("foo bar"),
551                                ),
552                            )],
553                        ),
554                        (
555                            "source-bar",
556                            vec![SourceOutput::new_maybe_logs(
557                                DataType::all_bits(),
558                                Definition::empty_legacy_namespace().with_event_field(
559                                    &owned_value_path!("foo"),
560                                    Kind::timestamp(),
561                                    Some("baz qux"),
562                                ),
563                            )],
564                        ),
565                    ]),
566                    transforms: IndexMap::default(),
567                    want: vec![
568                        (
569                            "source-foo".into(),
570                            Definition::empty_legacy_namespace().with_event_field(
571                                &owned_value_path!("foo"),
572                                Kind::integer().or_bytes(),
573                                Some("foo bar"),
574                            ),
575                        ),
576                        (
577                            "source-bar".into(),
578                            Definition::empty_legacy_namespace().with_event_field(
579                                &owned_value_path!("foo"),
580                                Kind::timestamp(),
581                                Some("baz qux"),
582                            ),
583                        ),
584                    ],
585                },
586            ),
587            (
588                "transform overrides source",
589                TestCase {
590                    inputs: vec![("source-bar", None), ("transform-baz", None)],
591                    sources: IndexMap::from([
592                        (
593                            "source-foo",
594                            vec![SourceOutput::new_maybe_logs(
595                                DataType::all_bits(),
596                                Definition::empty_legacy_namespace().with_event_field(
597                                    &owned_value_path!("foo"),
598                                    Kind::boolean(),
599                                    Some("foo"),
600                                ),
601                            )],
602                        ),
603                        (
604                            "source-bar",
605                            vec![SourceOutput::new_maybe_logs(
606                                DataType::all_bits(),
607                                Definition::empty_legacy_namespace().with_event_field(
608                                    &owned_value_path!("bar"),
609                                    Kind::integer(),
610                                    Some("bar"),
611                                ),
612                            )],
613                        ),
614                    ]),
615                    transforms: IndexMap::from([(
616                        "transform-baz",
617                        (
618                            vec![OutputId::from("source-foo")],
619                            vec![TransformOutput::new(
620                                DataType::all_bits(),
621                                [(
622                                    "source-foo".into(),
623                                    Definition::empty_legacy_namespace().with_event_field(
624                                        &owned_value_path!("baz"),
625                                        Kind::regex(),
626                                        Some("baz"),
627                                    ),
628                                )]
629                                .into(),
630                            )],
631                        ),
632                    )]),
633                    want: vec![
634                        (
635                            "source-bar".into(),
636                            Definition::empty_legacy_namespace().with_event_field(
637                                &owned_value_path!("bar"),
638                                Kind::integer(),
639                                Some("bar"),
640                            ),
641                        ),
642                        (
643                            "transform-baz".into(),
644                            Definition::empty_legacy_namespace().with_event_field(
645                                &owned_value_path!("baz"),
646                                Kind::regex(),
647                                Some("baz"),
648                            ),
649                        ),
650                    ],
651                },
652            ),
653            //   Source 1 -> Transform 1                ->
654            //   Source 2 -> Transform 2                ->
655            //            -> Transform 3 ->
656            //            -> Transform 4 -> Transform 5 -> Sink 1
657            (
658                "complex topology",
659                TestCase {
660                    inputs: vec![
661                        ("Transform 1", None),
662                        ("Transform 2", None),
663                        ("Transform 5", None),
664                    ],
665                    sources: IndexMap::from([
666                        (
667                            "Source 1",
668                            vec![SourceOutput::new_maybe_logs(
669                                DataType::all_bits(),
670                                Definition::empty_legacy_namespace().with_event_field(
671                                    &owned_value_path!("source-1"),
672                                    Kind::boolean(),
673                                    Some("source-1"),
674                                ),
675                            )],
676                        ),
677                        (
678                            "Source 2",
679                            vec![SourceOutput::new_maybe_logs(
680                                DataType::all_bits(),
681                                Definition::empty_legacy_namespace().with_event_field(
682                                    &owned_value_path!("source-2"),
683                                    Kind::integer(),
684                                    Some("source-2"),
685                                ),
686                            )],
687                        ),
688                    ]),
689                    transforms: IndexMap::from([
690                        (
691                            "Transform 1",
692                            (
693                                vec![OutputId::from("Source 1")],
694                                vec![TransformOutput::new(
695                                    DataType::all_bits(),
696                                    [(
697                                        "Source 1".into(),
698                                        Definition::empty_legacy_namespace().with_event_field(
699                                            &owned_value_path!("transform-1"),
700                                            Kind::regex(),
701                                            None,
702                                        ),
703                                    )]
704                                    .into(),
705                                )],
706                            ),
707                        ),
708                        (
709                            "Transform 2",
710                            (
711                                vec![OutputId::from("Source 2")],
712                                vec![TransformOutput::new(
713                                    DataType::all_bits(),
714                                    [(
715                                        "Source 2".into(),
716                                        Definition::empty_legacy_namespace().with_event_field(
717                                            &owned_value_path!("transform-2"),
718                                            Kind::float().or_null(),
719                                            Some("transform-2"),
720                                        ),
721                                    )]
722                                    .into(),
723                                )],
724                            ),
725                        ),
726                        (
727                            "Transform 3",
728                            (
729                                vec![OutputId::from("Source 2")],
730                                vec![TransformOutput::new(
731                                    DataType::all_bits(),
732                                    [(
733                                        "Source 2".into(),
734                                        Definition::empty_legacy_namespace().with_event_field(
735                                            &owned_value_path!("transform-3"),
736                                            Kind::integer(),
737                                            Some("transform-3"),
738                                        ),
739                                    )]
740                                    .into(),
741                                )],
742                            ),
743                        ),
744                        (
745                            "Transform 4",
746                            (
747                                vec![OutputId::from("Source 2")],
748                                vec![TransformOutput::new(
749                                    DataType::all_bits(),
750                                    [(
751                                        "Source 2".into(),
752                                        Definition::empty_legacy_namespace().with_event_field(
753                                            &owned_value_path!("transform-4"),
754                                            Kind::timestamp().or_bytes(),
755                                            Some("transform-4"),
756                                        ),
757                                    )]
758                                    .into(),
759                                )],
760                            ),
761                        ),
762                        (
763                            "Transform 5",
764                            (
765                                vec![OutputId::from("Transform 3"), OutputId::from("Transform 4")],
766                                vec![TransformOutput::new(
767                                    DataType::all_bits(),
768                                    [(
769                                        "Transform 3".into(),
770                                        Definition::empty_legacy_namespace().with_event_field(
771                                            &owned_value_path!("transform-5"),
772                                            Kind::boolean(),
773                                            Some("transform-5"),
774                                        ),
775                                    )]
776                                    .into(),
777                                )],
778                            ),
779                        ),
780                    ]),
781                    want: vec![
782                        // Pipeline 1
783                        (
784                            "Transform 1".into(),
785                            Definition::empty_legacy_namespace().with_event_field(
786                                &owned_value_path!("transform-1"),
787                                Kind::regex(),
788                                None,
789                            ),
790                        ),
791                        // Pipeline 2
792                        (
793                            "Transform 2".into(),
794                            Definition::empty_legacy_namespace().with_event_field(
795                                &owned_value_path!("transform-2"),
796                                Kind::float().or_null(),
797                                Some("transform-2"),
798                            ),
799                        ),
800                        // Pipeline 3
801                        (
802                            "Transform 5".into(),
803                            Definition::empty_legacy_namespace().with_event_field(
804                                &owned_value_path!("transform-5"),
805                                Kind::boolean(),
806                                Some("transform-5"),
807                            ),
808                        ),
809                    ],
810                },
811            ),
812        ]) {
813            let inputs = case
814                .inputs
815                .iter()
816                .cloned()
817                .map(|(key, port)| OutputId {
818                    component: key.into(),
819                    port,
820                })
821                .collect::<Vec<_>>();
822
823            let got = expanded_definitions(
824                vector_lib::enrichment::TableRegistry::default(),
825                &inputs,
826                &case,
827                &mut HashMap::default(),
828            )
829            .unwrap();
830            assert_eq!(got, case.want, "{}", title);
831        }
832    }
833}