1use std::collections::HashMap;
2
3use snafu::Snafu;
4use vector_lib::config::SourceOutput;
5
6pub(super) use crate::schema::Definition;
7use crate::{
8 config::{ComponentKey, Config, OutputId, SinkOuter, TransformOutput},
9 topology,
10};
11
12#[derive(Debug, Snafu)]
13pub enum Error {
14 ContainsNever,
15}
16
17type Cache = HashMap<(bool, Vec<OutputId>), Vec<(OutputId, Definition)>>;
20
21pub fn possible_definitions(
22 inputs: &[OutputId],
23 config: &dyn ComponentContainer,
24 enrichment_tables: vector_lib::enrichment::TableRegistry,
25 cache: &mut Cache,
26) -> Result<Vec<(OutputId, Definition)>, Error> {
27 if inputs.is_empty() {
28 return Ok(vec![]);
29 }
30
31 if let Some(definition) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
33 return Ok(definition.clone());
34 }
35
36 let mut definitions = Vec::new();
37
38 for input in inputs {
39 let key = &input.component;
40
41 if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
43 let mut source_definition = input.with_definitions(
44 maybe_output
45 .unwrap_or_else(|| {
46 unreachable!(
47 "source output mis-configured - output for port {:?} missing",
48 &input.port
49 )
50 })
51 .schema_definition(config.schema_enabled()),
52 );
53
54 if contains_never(&source_definition) {
55 return Err(Error::ContainsNever);
56 }
57
58 definitions.append(&mut source_definition);
59 }
60
61 if let Some(inputs) = config.transform_inputs(key) {
63 let input_definitions =
64 possible_definitions(inputs, config, enrichment_tables.clone(), cache)?;
65
66 let mut transform_definition = input.with_definitions(
67 config
68 .transform_output_for_port(
69 key,
70 &input.port,
71 enrichment_tables.clone(),
72 &input_definitions,
73 )
74 .expect("transform must exist - already found inputs")
75 .unwrap_or_else(|| {
76 unreachable!(
77 "transform output mis-configured - output for port {:?} missing",
78 &input.port
79 )
80 })
81 .schema_definitions(config.schema_enabled())
82 .values()
83 .cloned(),
84 );
85
86 if contains_never(&transform_definition) {
87 return Err(Error::ContainsNever);
88 }
89
90 definitions.append(&mut transform_definition);
91 }
92 }
93
94 Ok(definitions)
95}
96
97pub(super) fn expanded_definitions(
111 enrichment_tables: vector_lib::enrichment::TableRegistry,
112 inputs: &[OutputId],
113 config: &dyn ComponentContainer,
114 cache: &mut Cache,
115) -> Result<Vec<(OutputId, Definition)>, Error> {
116 if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
118 return Ok(definitions.clone());
119 }
120
121 let mut definitions: Vec<(OutputId, Definition)> = vec![];
122 let mut merged_cache = HashMap::default();
123
124 for input in inputs {
125 let key = &input.component;
126
127 if let Some(outputs) = config.source_outputs(key) {
133 let mut source_definitions =
137 outputs
138 .into_iter()
139 .find_map(|output| {
140 if output.port == input.port {
141 Some(input.with_definitions(
142 output.schema_definition(config.schema_enabled()),
143 ))
144 } else {
145 None
146 }
147 })
148 .unwrap_or_else(|| {
149 unreachable!("source output mis-configured")
152 });
153
154 if contains_never(&source_definitions) {
155 return Err(Error::ContainsNever);
156 }
157
158 definitions.append(&mut source_definitions);
159
160 } else if let Some(inputs) = config.transform_inputs(key) {
163 let input_definitions =
164 possible_definitions(inputs, config, enrichment_tables.clone(), &mut merged_cache)?;
165
166 let mut transform_definition = config
167 .transform_outputs(key, enrichment_tables.clone(), &input_definitions)
168 .expect("already found inputs")
169 .iter()
170 .find_map(|output| {
171 if output.port == input.port {
172 Some(
173 input.with_definitions(
174 output
175 .schema_definitions(config.schema_enabled())
176 .values()
177 .cloned(),
178 ),
179 )
180 } else {
181 None
182 }
183 })
184 .expect("transform output misconfigured");
187
188 if contains_never(&transform_definition) {
189 return Err(Error::ContainsNever);
190 }
191
192 definitions.append(&mut transform_definition);
195 }
196 }
197
198 cache.insert(
199 (config.schema_enabled(), inputs.to_vec()),
200 definitions.clone(),
201 );
202
203 Ok(definitions)
204}
205
206pub(crate) fn input_definitions(
210 inputs: &[OutputId],
211 config: &Config,
212 enrichment_tables: vector_lib::enrichment::TableRegistry,
213 cache: &mut Cache,
214) -> Result<Vec<(OutputId, Definition)>, Error> {
215 if inputs.is_empty() {
216 return Ok(vec![]);
217 }
218
219 if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
220 return Ok(definitions.clone());
221 }
222
223 let mut definitions = Vec::new();
224
225 for input in inputs {
226 let key = &input.component;
227
228 if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
231 let mut source_definitions = input.with_definitions(
232 maybe_output
233 .unwrap_or_else(|| {
234 unreachable!(
235 "source output mis-configured - output for port {:?} missing",
236 &input.port
237 )
238 })
239 .schema_definition(config.schema_enabled()),
240 );
241
242 if contains_never(&source_definitions) {
243 return Err(Error::ContainsNever);
244 }
245
246 definitions.append(&mut source_definitions);
247 }
248
249 if let Some(inputs) = config.transform_inputs(key) {
252 let transform_definitions =
253 input_definitions(inputs, config, enrichment_tables.clone(), cache)?;
254
255 if contains_never(&transform_definitions) {
256 return Err(Error::ContainsNever);
257 }
258
259 let mut transform_definitions = input.with_definitions(
260 config
261 .transform_output_for_port(
262 key,
263 &input.port,
264 enrichment_tables.clone(),
265 &transform_definitions,
266 )
267 .expect("transform must exist")
268 .unwrap_or_else(|| {
269 unreachable!(
270 "transform output mis-configured - output for port {:?} missing",
271 &input.port
272 )
273 })
274 .schema_definitions(config.schema_enabled())
275 .values()
276 .cloned(),
277 );
278
279 if contains_never(&transform_definitions) {
280 return Err(Error::ContainsNever);
281 }
282
283 definitions.append(&mut transform_definitions);
284 }
285 }
286
287 Ok(definitions)
288}
289
290fn contains_never(transform_definitions: &[(OutputId, Definition)]) -> bool {
294 transform_definitions
295 .iter()
296 .any(|(_, definition)| definition.event_kind().is_never())
297}
298
299pub(super) fn validate_sink_expectations(
300 key: &ComponentKey,
301 sink: &SinkOuter<OutputId>,
302 config: &topology::Config,
303 enrichment_tables: vector_lib::enrichment::TableRegistry,
304) -> Result<(), Vec<String>> {
305 let mut errors = vec![];
306
307 let input = sink.inner.input();
310 let requirement = input.schema_requirement();
311
312 let mut cache = HashMap::default();
314 let definitions =
315 match expanded_definitions(enrichment_tables, &sink.inputs, config, &mut cache) {
316 Ok(definitions) => definitions,
317 Err(err) => {
318 errors.push(err.to_string());
319 return Err(errors);
320 }
321 };
322
323 for (_output, definition) in definitions {
325 if let Err(err) = requirement.validate(&definition, config.schema.validation) {
326 errors.append(
327 &mut err
328 .errors()
329 .iter()
330 .cloned()
331 .map(|err| format!("schema error in component {key}: {err}"))
332 .collect(),
333 );
334 }
335 }
336
337 if !errors.is_empty() {
338 return Err(errors);
339 }
340
341 Ok(())
342}
343
344pub trait ComponentContainer {
345 fn schema_enabled(&self) -> bool;
346
347 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>>;
348
349 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]>;
350
351 fn transform_outputs(
352 &self,
353 key: &ComponentKey,
354 enrichment_tables: vector_lib::enrichment::TableRegistry,
355 input_definitions: &[(OutputId, Definition)],
356 ) -> Option<Vec<TransformOutput>>;
357
358 #[allow(clippy::result_unit_err)]
363 fn transform_output_for_port(
364 &self,
365 key: &ComponentKey,
366 port: &Option<String>,
367 enrichment_tables: vector_lib::enrichment::TableRegistry,
368 input_definitions: &[(OutputId, Definition)],
369 ) -> Result<Option<TransformOutput>, ()> {
370 if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) {
371 Ok(get_output_for_port(outputs, port))
372 } else {
373 Err(())
374 }
375 }
376
377 #[allow(clippy::result_unit_err)]
382 fn source_output_for_port(
383 &self,
384 key: &ComponentKey,
385 port: &Option<String>,
386 ) -> Result<Option<SourceOutput>, ()> {
387 if let Some(outputs) = self.source_outputs(key) {
388 Ok(get_source_output_for_port(outputs, port))
389 } else {
390 Err(())
391 }
392 }
393}
394
395fn get_output_for_port(
396 outputs: Vec<TransformOutput>,
397 port: &Option<String>,
398) -> Option<TransformOutput> {
399 outputs.into_iter().find(|output| &output.port == port)
400}
401
402fn get_source_output_for_port(
403 outputs: Vec<SourceOutput>,
404 port: &Option<String>,
405) -> Option<SourceOutput> {
406 outputs.into_iter().find(|output| &output.port == port)
407}
408
409impl ComponentContainer for Config {
410 fn schema_enabled(&self) -> bool {
411 self.schema.enabled
412 }
413
414 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
415 self.source(key)
416 .map(|source| source.inner.outputs(self.schema.log_namespace()))
417 }
418
419 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
420 self.transform(key).map(|transform| &transform.inputs[..])
421 }
422
423 fn transform_outputs(
424 &self,
425 key: &ComponentKey,
426 enrichment_tables: vector_lib::enrichment::TableRegistry,
427 input_definitions: &[(OutputId, Definition)],
428 ) -> Option<Vec<TransformOutput>> {
429 self.transform(key).map(|source| {
430 source.inner.outputs(
431 enrichment_tables,
432 input_definitions,
433 self.schema.log_namespace(),
434 )
435 })
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use std::collections::HashMap;
442
443 use indexmap::IndexMap;
444 use similar_asserts::assert_eq;
445 use vector_lib::{
446 config::{DataType, SourceOutput, TransformOutput},
447 lookup::owned_value_path,
448 };
449 use vrl::value::Kind;
450
451 use super::*;
452
453 #[test]
454 fn test_expanded_definition() {
455 struct TestCase {
456 inputs: Vec<(&'static str, Option<String>)>,
457 sources: IndexMap<&'static str, Vec<SourceOutput>>,
458 transforms: IndexMap<&'static str, (Vec<OutputId>, Vec<TransformOutput>)>,
459 want: Vec<(OutputId, Definition)>,
460 }
461
462 impl ComponentContainer for TestCase {
463 fn schema_enabled(&self) -> bool {
464 true
465 }
466
467 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
468 self.sources.get(key.id()).cloned()
469 }
470
471 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
472 self.transforms.get(key.id()).map(|v| v.0.as_slice())
473 }
474
475 fn transform_outputs(
476 &self,
477 key: &ComponentKey,
478 _: vector_lib::enrichment::TableRegistry,
479 _: &[(OutputId, Definition)],
480 ) -> Option<Vec<TransformOutput>> {
481 self.transforms.get(key.id()).cloned().map(|v| v.1)
482 }
483 }
484
485 for (title, case) in HashMap::from([
486 (
487 "no inputs",
488 TestCase {
489 inputs: vec![],
490 sources: IndexMap::default(),
491 transforms: IndexMap::default(),
492 want: vec![],
493 },
494 ),
495 (
496 "single input, source with default schema",
497 TestCase {
498 inputs: vec![("foo", None)],
499 sources: IndexMap::from([(
500 "foo",
501 vec![SourceOutput::new_maybe_logs(
502 DataType::all_bits(),
503 Definition::default_legacy_namespace(),
504 )],
505 )]),
506 transforms: IndexMap::default(),
507 want: vec![("foo".into(), Definition::default_legacy_namespace())],
508 },
509 ),
510 (
511 "single input, source with schema",
512 TestCase {
513 inputs: vec![("source-foo", None)],
514 sources: IndexMap::from([(
515 "source-foo",
516 vec![SourceOutput::new_maybe_logs(
517 DataType::all_bits(),
518 Definition::empty_legacy_namespace().with_event_field(
519 &owned_value_path!("foo"),
520 Kind::integer().or_bytes(),
521 Some("foo bar"),
522 ),
523 )],
524 )]),
525 transforms: IndexMap::default(),
526 want: vec![(
527 "source-foo".into(),
528 Definition::empty_legacy_namespace().with_event_field(
529 &owned_value_path!("foo"),
530 Kind::integer().or_bytes(),
531 Some("foo bar"),
532 ),
533 )],
534 },
535 ),
536 (
537 "multiple inputs, sources with schema",
538 TestCase {
539 inputs: vec![("source-foo", None), ("source-bar", None)],
540 sources: IndexMap::from([
541 (
542 "source-foo",
543 vec![SourceOutput::new_maybe_logs(
544 DataType::all_bits(),
545 Definition::empty_legacy_namespace().with_event_field(
546 &owned_value_path!("foo"),
547 Kind::integer().or_bytes(),
548 Some("foo bar"),
549 ),
550 )],
551 ),
552 (
553 "source-bar",
554 vec![SourceOutput::new_maybe_logs(
555 DataType::all_bits(),
556 Definition::empty_legacy_namespace().with_event_field(
557 &owned_value_path!("foo"),
558 Kind::timestamp(),
559 Some("baz qux"),
560 ),
561 )],
562 ),
563 ]),
564 transforms: IndexMap::default(),
565 want: vec![
566 (
567 "source-foo".into(),
568 Definition::empty_legacy_namespace().with_event_field(
569 &owned_value_path!("foo"),
570 Kind::integer().or_bytes(),
571 Some("foo bar"),
572 ),
573 ),
574 (
575 "source-bar".into(),
576 Definition::empty_legacy_namespace().with_event_field(
577 &owned_value_path!("foo"),
578 Kind::timestamp(),
579 Some("baz qux"),
580 ),
581 ),
582 ],
583 },
584 ),
585 (
586 "transform overrides source",
587 TestCase {
588 inputs: vec![("source-bar", None), ("transform-baz", None)],
589 sources: IndexMap::from([
590 (
591 "source-foo",
592 vec![SourceOutput::new_maybe_logs(
593 DataType::all_bits(),
594 Definition::empty_legacy_namespace().with_event_field(
595 &owned_value_path!("foo"),
596 Kind::boolean(),
597 Some("foo"),
598 ),
599 )],
600 ),
601 (
602 "source-bar",
603 vec![SourceOutput::new_maybe_logs(
604 DataType::all_bits(),
605 Definition::empty_legacy_namespace().with_event_field(
606 &owned_value_path!("bar"),
607 Kind::integer(),
608 Some("bar"),
609 ),
610 )],
611 ),
612 ]),
613 transforms: IndexMap::from([(
614 "transform-baz",
615 (
616 vec![OutputId::from("source-foo")],
617 vec![TransformOutput::new(
618 DataType::all_bits(),
619 [(
620 "source-foo".into(),
621 Definition::empty_legacy_namespace().with_event_field(
622 &owned_value_path!("baz"),
623 Kind::regex(),
624 Some("baz"),
625 ),
626 )]
627 .into(),
628 )],
629 ),
630 )]),
631 want: vec![
632 (
633 "source-bar".into(),
634 Definition::empty_legacy_namespace().with_event_field(
635 &owned_value_path!("bar"),
636 Kind::integer(),
637 Some("bar"),
638 ),
639 ),
640 (
641 "transform-baz".into(),
642 Definition::empty_legacy_namespace().with_event_field(
643 &owned_value_path!("baz"),
644 Kind::regex(),
645 Some("baz"),
646 ),
647 ),
648 ],
649 },
650 ),
651 (
656 "complex topology",
657 TestCase {
658 inputs: vec![
659 ("Transform 1", None),
660 ("Transform 2", None),
661 ("Transform 5", None),
662 ],
663 sources: IndexMap::from([
664 (
665 "Source 1",
666 vec![SourceOutput::new_maybe_logs(
667 DataType::all_bits(),
668 Definition::empty_legacy_namespace().with_event_field(
669 &owned_value_path!("source-1"),
670 Kind::boolean(),
671 Some("source-1"),
672 ),
673 )],
674 ),
675 (
676 "Source 2",
677 vec![SourceOutput::new_maybe_logs(
678 DataType::all_bits(),
679 Definition::empty_legacy_namespace().with_event_field(
680 &owned_value_path!("source-2"),
681 Kind::integer(),
682 Some("source-2"),
683 ),
684 )],
685 ),
686 ]),
687 transforms: IndexMap::from([
688 (
689 "Transform 1",
690 (
691 vec![OutputId::from("Source 1")],
692 vec![TransformOutput::new(
693 DataType::all_bits(),
694 [(
695 "Source 1".into(),
696 Definition::empty_legacy_namespace().with_event_field(
697 &owned_value_path!("transform-1"),
698 Kind::regex(),
699 None,
700 ),
701 )]
702 .into(),
703 )],
704 ),
705 ),
706 (
707 "Transform 2",
708 (
709 vec![OutputId::from("Source 2")],
710 vec![TransformOutput::new(
711 DataType::all_bits(),
712 [(
713 "Source 2".into(),
714 Definition::empty_legacy_namespace().with_event_field(
715 &owned_value_path!("transform-2"),
716 Kind::float().or_null(),
717 Some("transform-2"),
718 ),
719 )]
720 .into(),
721 )],
722 ),
723 ),
724 (
725 "Transform 3",
726 (
727 vec![OutputId::from("Source 2")],
728 vec![TransformOutput::new(
729 DataType::all_bits(),
730 [(
731 "Source 2".into(),
732 Definition::empty_legacy_namespace().with_event_field(
733 &owned_value_path!("transform-3"),
734 Kind::integer(),
735 Some("transform-3"),
736 ),
737 )]
738 .into(),
739 )],
740 ),
741 ),
742 (
743 "Transform 4",
744 (
745 vec![OutputId::from("Source 2")],
746 vec![TransformOutput::new(
747 DataType::all_bits(),
748 [(
749 "Source 2".into(),
750 Definition::empty_legacy_namespace().with_event_field(
751 &owned_value_path!("transform-4"),
752 Kind::timestamp().or_bytes(),
753 Some("transform-4"),
754 ),
755 )]
756 .into(),
757 )],
758 ),
759 ),
760 (
761 "Transform 5",
762 (
763 vec![OutputId::from("Transform 3"), OutputId::from("Transform 4")],
764 vec![TransformOutput::new(
765 DataType::all_bits(),
766 [(
767 "Transform 3".into(),
768 Definition::empty_legacy_namespace().with_event_field(
769 &owned_value_path!("transform-5"),
770 Kind::boolean(),
771 Some("transform-5"),
772 ),
773 )]
774 .into(),
775 )],
776 ),
777 ),
778 ]),
779 want: vec![
780 (
782 "Transform 1".into(),
783 Definition::empty_legacy_namespace().with_event_field(
784 &owned_value_path!("transform-1"),
785 Kind::regex(),
786 None,
787 ),
788 ),
789 (
791 "Transform 2".into(),
792 Definition::empty_legacy_namespace().with_event_field(
793 &owned_value_path!("transform-2"),
794 Kind::float().or_null(),
795 Some("transform-2"),
796 ),
797 ),
798 (
800 "Transform 5".into(),
801 Definition::empty_legacy_namespace().with_event_field(
802 &owned_value_path!("transform-5"),
803 Kind::boolean(),
804 Some("transform-5"),
805 ),
806 ),
807 ],
808 },
809 ),
810 ]) {
811 let inputs = case
812 .inputs
813 .iter()
814 .cloned()
815 .map(|(key, port)| OutputId {
816 component: key.into(),
817 port,
818 })
819 .collect::<Vec<_>>();
820
821 let got = expanded_definitions(
822 vector_lib::enrichment::TableRegistry::default(),
823 &inputs,
824 &case,
825 &mut HashMap::default(),
826 )
827 .unwrap();
828 assert_eq!(got, case.want, "{}", title);
829 }
830 }
831}