1use std::collections::HashMap;
2
3use snafu::Snafu;
4use vector_lib::config::SourceOutput;
5
6pub(super) use crate::schema::Definition;
7use crate::{
8 config::{ComponentKey, Config, OutputId, SinkOuter, TransformContext, TransformOutput},
9 topology,
10};
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14 ContainsNever,
15}
16
17type Cache = HashMap<(bool, Vec<OutputId>), Vec<(OutputId, Definition)>>;
20
21pub fn possible_definitions(
22 inputs: &[OutputId],
23 config: &dyn ComponentContainer,
24 enrichment_tables: vector_lib::enrichment::TableRegistry,
25 cache: &mut Cache,
26) -> Result<Vec<(OutputId, Definition)>, Error> {
27 if inputs.is_empty() {
28 return Ok(vec![]);
29 }
30
31 if let Some(definition) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
33 return Ok(definition.clone());
34 }
35
36 let mut definitions = Vec::new();
37
38 for input in inputs {
39 let key = &input.component;
40
41 if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
43 let mut source_definition = input.with_definitions(
44 maybe_output
45 .unwrap_or_else(|| {
46 unreachable!(
47 "source output mis-configured - output for port {:?} missing",
48 &input.port
49 )
50 })
51 .schema_definition(config.schema_enabled()),
52 );
53
54 if contains_never(&source_definition) {
55 return Err(Error::ContainsNever);
56 }
57
58 definitions.append(&mut source_definition);
59 }
60
61 if let Some(inputs) = config.transform_inputs(key) {
63 let input_definitions =
64 possible_definitions(inputs, config, enrichment_tables.clone(), cache)?;
65
66 let mut transform_definition = input.with_definitions(
67 config
68 .transform_output_for_port(
69 key,
70 &input.port,
71 enrichment_tables.clone(),
72 &input_definitions,
73 )
74 .expect("transform must exist - already found inputs")
75 .unwrap_or_else(|| {
76 unreachable!(
77 "transform output mis-configured - output for port {:?} missing",
78 &input.port
79 )
80 })
81 .schema_definitions(config.schema_enabled())
82 .values()
83 .cloned(),
84 );
85
86 if contains_never(&transform_definition) {
87 return Err(Error::ContainsNever);
88 }
89
90 definitions.append(&mut transform_definition);
91 }
92 }
93
94 Ok(definitions)
95}
96
97pub(super) fn expanded_definitions(
111 enrichment_tables: vector_lib::enrichment::TableRegistry,
112 inputs: &[OutputId],
113 config: &dyn ComponentContainer,
114 cache: &mut Cache,
115) -> Result<Vec<(OutputId, Definition)>, Error> {
116 if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
118 return Ok(definitions.clone());
119 }
120
121 let mut definitions: Vec<(OutputId, Definition)> = vec![];
122 let mut merged_cache = HashMap::default();
123
124 for input in inputs {
125 let key = &input.component;
126
127 if let Some(outputs) = config.source_outputs(key) {
133 let mut source_definitions =
137 outputs
138 .into_iter()
139 .find_map(|output| {
140 if output.port == input.port {
141 Some(input.with_definitions(
142 output.schema_definition(config.schema_enabled()),
143 ))
144 } else {
145 None
146 }
147 })
148 .unwrap_or_else(|| {
149 unreachable!("source output mis-configured")
152 });
153
154 if contains_never(&source_definitions) {
155 return Err(Error::ContainsNever);
156 }
157
158 definitions.append(&mut source_definitions);
159
160 } else if let Some(inputs) = config.transform_inputs(key) {
163 let input_definitions =
164 possible_definitions(inputs, config, enrichment_tables.clone(), &mut merged_cache)?;
165
166 let mut transform_definition = config
167 .transform_outputs(key, enrichment_tables.clone(), &input_definitions)
168 .expect("already found inputs")
169 .iter()
170 .find_map(|output| {
171 if output.port == input.port {
172 Some(
173 input.with_definitions(
174 output
175 .schema_definitions(config.schema_enabled())
176 .values()
177 .cloned(),
178 ),
179 )
180 } else {
181 None
182 }
183 })
184 .expect("transform output misconfigured");
187
188 if contains_never(&transform_definition) {
189 return Err(Error::ContainsNever);
190 }
191
192 definitions.append(&mut transform_definition);
195 }
196 }
197
198 cache.insert(
199 (config.schema_enabled(), inputs.to_vec()),
200 definitions.clone(),
201 );
202
203 Ok(definitions)
204}
205
206pub(crate) fn input_definitions(
210 inputs: &[OutputId],
211 config: &Config,
212 enrichment_tables: vector_lib::enrichment::TableRegistry,
213 cache: &mut Cache,
214) -> Result<Vec<(OutputId, Definition)>, Error> {
215 if inputs.is_empty() {
216 return Ok(vec![]);
217 }
218
219 if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
220 return Ok(definitions.clone());
221 }
222
223 let mut definitions = Vec::new();
224
225 for input in inputs {
226 let key = &input.component;
227
228 if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
231 let mut source_definitions = input.with_definitions(
232 maybe_output
233 .unwrap_or_else(|| {
234 unreachable!(
235 "source output mis-configured - output for port {:?} missing",
236 &input.port
237 )
238 })
239 .schema_definition(config.schema_enabled()),
240 );
241
242 if contains_never(&source_definitions) {
243 return Err(Error::ContainsNever);
244 }
245
246 definitions.append(&mut source_definitions);
247 }
248
249 if let Some(inputs) = config.transform_inputs(key) {
252 let transform_definitions =
253 input_definitions(inputs, config, enrichment_tables.clone(), cache)?;
254
255 if contains_never(&transform_definitions) {
256 return Err(Error::ContainsNever);
257 }
258
259 let mut transform_definitions = input.with_definitions(
260 config
261 .transform_output_for_port(
262 key,
263 &input.port,
264 enrichment_tables.clone(),
265 &transform_definitions,
266 )
267 .expect("transform must exist")
268 .unwrap_or_else(|| {
269 unreachable!(
270 "transform output mis-configured - output for port {:?} missing",
271 &input.port
272 )
273 })
274 .schema_definitions(config.schema_enabled())
275 .values()
276 .cloned(),
277 );
278
279 if contains_never(&transform_definitions) {
280 return Err(Error::ContainsNever);
281 }
282
283 definitions.append(&mut transform_definitions);
284 }
285 }
286
287 Ok(definitions)
288}
289
290fn contains_never(transform_definitions: &[(OutputId, Definition)]) -> bool {
294 transform_definitions
295 .iter()
296 .any(|(_, definition)| definition.event_kind().is_never())
297}
298
299pub(super) fn validate_sink_expectations(
300 key: &ComponentKey,
301 sink: &SinkOuter<OutputId>,
302 config: &topology::Config,
303 enrichment_tables: vector_lib::enrichment::TableRegistry,
304) -> Result<(), Vec<String>> {
305 let mut errors = vec![];
306
307 let input = sink.inner.input();
310 let requirement = input.schema_requirement();
311
312 let mut cache = HashMap::default();
314 let definitions =
315 match expanded_definitions(enrichment_tables, &sink.inputs, config, &mut cache) {
316 Ok(definitions) => definitions,
317 Err(err) => {
318 errors.push(err.to_string());
319 return Err(errors);
320 }
321 };
322
323 for (_output, definition) in definitions {
325 if let Err(err) = requirement.validate(&definition, config.schema.validation) {
326 errors.append(
327 &mut err
328 .errors()
329 .iter()
330 .map(|err| format!("schema error in component {key}: {err}"))
331 .collect(),
332 );
333 }
334 }
335
336 if !errors.is_empty() {
337 return Err(errors);
338 }
339
340 Ok(())
341}
342
343pub trait ComponentContainer {
344 fn schema_enabled(&self) -> bool;
345
346 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>>;
347
348 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]>;
349
350 fn transform_outputs(
351 &self,
352 key: &ComponentKey,
353 enrichment_tables: vector_lib::enrichment::TableRegistry,
354 input_definitions: &[(OutputId, Definition)],
355 ) -> Option<Vec<TransformOutput>>;
356
357 #[allow(clippy::result_unit_err)]
362 fn transform_output_for_port(
363 &self,
364 key: &ComponentKey,
365 port: &Option<String>,
366 enrichment_tables: vector_lib::enrichment::TableRegistry,
367 input_definitions: &[(OutputId, Definition)],
368 ) -> Result<Option<TransformOutput>, ()> {
369 if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) {
370 Ok(get_output_for_port(outputs, port))
371 } else {
372 Err(())
373 }
374 }
375
376 #[allow(clippy::result_unit_err)]
381 fn source_output_for_port(
382 &self,
383 key: &ComponentKey,
384 port: &Option<String>,
385 ) -> Result<Option<SourceOutput>, ()> {
386 if let Some(outputs) = self.source_outputs(key) {
387 Ok(get_source_output_for_port(outputs, port))
388 } else {
389 Err(())
390 }
391 }
392}
393
394fn get_output_for_port(
395 outputs: Vec<TransformOutput>,
396 port: &Option<String>,
397) -> Option<TransformOutput> {
398 outputs.into_iter().find(|output| &output.port == port)
399}
400
401fn get_source_output_for_port(
402 outputs: Vec<SourceOutput>,
403 port: &Option<String>,
404) -> Option<SourceOutput> {
405 outputs.into_iter().find(|output| &output.port == port)
406}
407
408impl ComponentContainer for Config {
409 fn schema_enabled(&self) -> bool {
410 self.schema.enabled
411 }
412
413 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
414 self.source(key)
415 .map(|source| source.inner.outputs(self.schema.log_namespace()))
416 }
417
418 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
419 self.transform(key).map(|transform| &transform.inputs[..])
420 }
421
422 fn transform_outputs(
423 &self,
424 key: &ComponentKey,
425 enrichment_tables: vector_lib::enrichment::TableRegistry,
426 input_definitions: &[(OutputId, Definition)],
427 ) -> Option<Vec<TransformOutput>> {
428 self.transform(key).map(|source| {
429 source.inner.outputs(
430 &TransformContext {
431 schema: self.schema,
432 enrichment_tables,
433 ..Default::default()
434 },
435 input_definitions,
436 )
437 })
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use std::collections::HashMap;
444
445 use indexmap::IndexMap;
446 use similar_asserts::assert_eq;
447 use vector_lib::{
448 config::{DataType, SourceOutput, TransformOutput},
449 lookup::owned_value_path,
450 };
451 use vrl::value::Kind;
452
453 use super::*;
454
455 #[test]
456 fn test_expanded_definition() {
457 struct TestCase {
458 inputs: Vec<(&'static str, Option<String>)>,
459 sources: IndexMap<&'static str, Vec<SourceOutput>>,
460 transforms: IndexMap<&'static str, (Vec<OutputId>, Vec<TransformOutput>)>,
461 want: Vec<(OutputId, Definition)>,
462 }
463
464 impl ComponentContainer for TestCase {
465 fn schema_enabled(&self) -> bool {
466 true
467 }
468
469 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
470 self.sources.get(key.id()).cloned()
471 }
472
473 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
474 self.transforms.get(key.id()).map(|v| v.0.as_slice())
475 }
476
477 fn transform_outputs(
478 &self,
479 key: &ComponentKey,
480 _: vector_lib::enrichment::TableRegistry,
481 _: &[(OutputId, Definition)],
482 ) -> Option<Vec<TransformOutput>> {
483 self.transforms.get(key.id()).cloned().map(|v| v.1)
484 }
485 }
486
487 for (title, case) in HashMap::from([
488 (
489 "no inputs",
490 TestCase {
491 inputs: vec![],
492 sources: IndexMap::default(),
493 transforms: IndexMap::default(),
494 want: vec![],
495 },
496 ),
497 (
498 "single input, source with default schema",
499 TestCase {
500 inputs: vec![("foo", None)],
501 sources: IndexMap::from([(
502 "foo",
503 vec![SourceOutput::new_maybe_logs(
504 DataType::all_bits(),
505 Definition::default_legacy_namespace(),
506 )],
507 )]),
508 transforms: IndexMap::default(),
509 want: vec![("foo".into(), Definition::default_legacy_namespace())],
510 },
511 ),
512 (
513 "single input, source with schema",
514 TestCase {
515 inputs: vec![("source-foo", None)],
516 sources: IndexMap::from([(
517 "source-foo",
518 vec![SourceOutput::new_maybe_logs(
519 DataType::all_bits(),
520 Definition::empty_legacy_namespace().with_event_field(
521 &owned_value_path!("foo"),
522 Kind::integer().or_bytes(),
523 Some("foo bar"),
524 ),
525 )],
526 )]),
527 transforms: IndexMap::default(),
528 want: vec![(
529 "source-foo".into(),
530 Definition::empty_legacy_namespace().with_event_field(
531 &owned_value_path!("foo"),
532 Kind::integer().or_bytes(),
533 Some("foo bar"),
534 ),
535 )],
536 },
537 ),
538 (
539 "multiple inputs, sources with schema",
540 TestCase {
541 inputs: vec![("source-foo", None), ("source-bar", None)],
542 sources: IndexMap::from([
543 (
544 "source-foo",
545 vec![SourceOutput::new_maybe_logs(
546 DataType::all_bits(),
547 Definition::empty_legacy_namespace().with_event_field(
548 &owned_value_path!("foo"),
549 Kind::integer().or_bytes(),
550 Some("foo bar"),
551 ),
552 )],
553 ),
554 (
555 "source-bar",
556 vec![SourceOutput::new_maybe_logs(
557 DataType::all_bits(),
558 Definition::empty_legacy_namespace().with_event_field(
559 &owned_value_path!("foo"),
560 Kind::timestamp(),
561 Some("baz qux"),
562 ),
563 )],
564 ),
565 ]),
566 transforms: IndexMap::default(),
567 want: vec![
568 (
569 "source-foo".into(),
570 Definition::empty_legacy_namespace().with_event_field(
571 &owned_value_path!("foo"),
572 Kind::integer().or_bytes(),
573 Some("foo bar"),
574 ),
575 ),
576 (
577 "source-bar".into(),
578 Definition::empty_legacy_namespace().with_event_field(
579 &owned_value_path!("foo"),
580 Kind::timestamp(),
581 Some("baz qux"),
582 ),
583 ),
584 ],
585 },
586 ),
587 (
588 "transform overrides source",
589 TestCase {
590 inputs: vec![("source-bar", None), ("transform-baz", None)],
591 sources: IndexMap::from([
592 (
593 "source-foo",
594 vec![SourceOutput::new_maybe_logs(
595 DataType::all_bits(),
596 Definition::empty_legacy_namespace().with_event_field(
597 &owned_value_path!("foo"),
598 Kind::boolean(),
599 Some("foo"),
600 ),
601 )],
602 ),
603 (
604 "source-bar",
605 vec![SourceOutput::new_maybe_logs(
606 DataType::all_bits(),
607 Definition::empty_legacy_namespace().with_event_field(
608 &owned_value_path!("bar"),
609 Kind::integer(),
610 Some("bar"),
611 ),
612 )],
613 ),
614 ]),
615 transforms: IndexMap::from([(
616 "transform-baz",
617 (
618 vec![OutputId::from("source-foo")],
619 vec![TransformOutput::new(
620 DataType::all_bits(),
621 [(
622 "source-foo".into(),
623 Definition::empty_legacy_namespace().with_event_field(
624 &owned_value_path!("baz"),
625 Kind::regex(),
626 Some("baz"),
627 ),
628 )]
629 .into(),
630 )],
631 ),
632 )]),
633 want: vec![
634 (
635 "source-bar".into(),
636 Definition::empty_legacy_namespace().with_event_field(
637 &owned_value_path!("bar"),
638 Kind::integer(),
639 Some("bar"),
640 ),
641 ),
642 (
643 "transform-baz".into(),
644 Definition::empty_legacy_namespace().with_event_field(
645 &owned_value_path!("baz"),
646 Kind::regex(),
647 Some("baz"),
648 ),
649 ),
650 ],
651 },
652 ),
653 (
658 "complex topology",
659 TestCase {
660 inputs: vec![
661 ("Transform 1", None),
662 ("Transform 2", None),
663 ("Transform 5", None),
664 ],
665 sources: IndexMap::from([
666 (
667 "Source 1",
668 vec![SourceOutput::new_maybe_logs(
669 DataType::all_bits(),
670 Definition::empty_legacy_namespace().with_event_field(
671 &owned_value_path!("source-1"),
672 Kind::boolean(),
673 Some("source-1"),
674 ),
675 )],
676 ),
677 (
678 "Source 2",
679 vec![SourceOutput::new_maybe_logs(
680 DataType::all_bits(),
681 Definition::empty_legacy_namespace().with_event_field(
682 &owned_value_path!("source-2"),
683 Kind::integer(),
684 Some("source-2"),
685 ),
686 )],
687 ),
688 ]),
689 transforms: IndexMap::from([
690 (
691 "Transform 1",
692 (
693 vec![OutputId::from("Source 1")],
694 vec![TransformOutput::new(
695 DataType::all_bits(),
696 [(
697 "Source 1".into(),
698 Definition::empty_legacy_namespace().with_event_field(
699 &owned_value_path!("transform-1"),
700 Kind::regex(),
701 None,
702 ),
703 )]
704 .into(),
705 )],
706 ),
707 ),
708 (
709 "Transform 2",
710 (
711 vec![OutputId::from("Source 2")],
712 vec![TransformOutput::new(
713 DataType::all_bits(),
714 [(
715 "Source 2".into(),
716 Definition::empty_legacy_namespace().with_event_field(
717 &owned_value_path!("transform-2"),
718 Kind::float().or_null(),
719 Some("transform-2"),
720 ),
721 )]
722 .into(),
723 )],
724 ),
725 ),
726 (
727 "Transform 3",
728 (
729 vec![OutputId::from("Source 2")],
730 vec![TransformOutput::new(
731 DataType::all_bits(),
732 [(
733 "Source 2".into(),
734 Definition::empty_legacy_namespace().with_event_field(
735 &owned_value_path!("transform-3"),
736 Kind::integer(),
737 Some("transform-3"),
738 ),
739 )]
740 .into(),
741 )],
742 ),
743 ),
744 (
745 "Transform 4",
746 (
747 vec![OutputId::from("Source 2")],
748 vec![TransformOutput::new(
749 DataType::all_bits(),
750 [(
751 "Source 2".into(),
752 Definition::empty_legacy_namespace().with_event_field(
753 &owned_value_path!("transform-4"),
754 Kind::timestamp().or_bytes(),
755 Some("transform-4"),
756 ),
757 )]
758 .into(),
759 )],
760 ),
761 ),
762 (
763 "Transform 5",
764 (
765 vec![OutputId::from("Transform 3"), OutputId::from("Transform 4")],
766 vec![TransformOutput::new(
767 DataType::all_bits(),
768 [(
769 "Transform 3".into(),
770 Definition::empty_legacy_namespace().with_event_field(
771 &owned_value_path!("transform-5"),
772 Kind::boolean(),
773 Some("transform-5"),
774 ),
775 )]
776 .into(),
777 )],
778 ),
779 ),
780 ]),
781 want: vec![
782 (
784 "Transform 1".into(),
785 Definition::empty_legacy_namespace().with_event_field(
786 &owned_value_path!("transform-1"),
787 Kind::regex(),
788 None,
789 ),
790 ),
791 (
793 "Transform 2".into(),
794 Definition::empty_legacy_namespace().with_event_field(
795 &owned_value_path!("transform-2"),
796 Kind::float().or_null(),
797 Some("transform-2"),
798 ),
799 ),
800 (
802 "Transform 5".into(),
803 Definition::empty_legacy_namespace().with_event_field(
804 &owned_value_path!("transform-5"),
805 Kind::boolean(),
806 Some("transform-5"),
807 ),
808 ),
809 ],
810 },
811 ),
812 ]) {
813 let inputs = case
814 .inputs
815 .iter()
816 .cloned()
817 .map(|(key, port)| OutputId {
818 component: key.into(),
819 port,
820 })
821 .collect::<Vec<_>>();
822
823 let got = expanded_definitions(
824 vector_lib::enrichment::TableRegistry::default(),
825 &inputs,
826 &case,
827 &mut HashMap::default(),
828 )
829 .unwrap();
830 assert_eq!(got, case.want, "{}", title);
831 }
832 }
833}