1use std::collections::HashMap;
2
3use snafu::Snafu;
4use vector_lib::config::SourceOutput;
5
6pub(super) use crate::schema::Definition;
7
8use crate::{
9 config::{ComponentKey, Config, OutputId, SinkOuter, TransformOutput},
10 topology,
11};
12
13#[derive(Debug, Snafu)]
14pub enum Error {
15 ContainsNever,
16}
17
18type Cache = HashMap<(bool, Vec<OutputId>), Vec<(OutputId, Definition)>>;
21
22pub fn possible_definitions(
23 inputs: &[OutputId],
24 config: &dyn ComponentContainer,
25 enrichment_tables: vector_lib::enrichment::TableRegistry,
26 cache: &mut Cache,
27) -> Result<Vec<(OutputId, Definition)>, Error> {
28 if inputs.is_empty() {
29 return Ok(vec![]);
30 }
31
32 if let Some(definition) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
34 return Ok(definition.clone());
35 }
36
37 let mut definitions = Vec::new();
38
39 for input in inputs {
40 let key = &input.component;
41
42 if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
44 let mut source_definition = input.with_definitions(
45 maybe_output
46 .unwrap_or_else(|| {
47 unreachable!(
48 "source output mis-configured - output for port {:?} missing",
49 &input.port
50 )
51 })
52 .schema_definition(config.schema_enabled()),
53 );
54
55 if contains_never(&source_definition) {
56 return Err(Error::ContainsNever);
57 }
58
59 definitions.append(&mut source_definition);
60 }
61
62 if let Some(inputs) = config.transform_inputs(key) {
64 let input_definitions =
65 possible_definitions(inputs, config, enrichment_tables.clone(), cache)?;
66
67 let mut transform_definition = input.with_definitions(
68 config
69 .transform_output_for_port(
70 key,
71 &input.port,
72 enrichment_tables.clone(),
73 &input_definitions,
74 )
75 .expect("transform must exist - already found inputs")
76 .unwrap_or_else(|| {
77 unreachable!(
78 "transform output mis-configured - output for port {:?} missing",
79 &input.port
80 )
81 })
82 .schema_definitions(config.schema_enabled())
83 .values()
84 .cloned(),
85 );
86
87 if contains_never(&transform_definition) {
88 return Err(Error::ContainsNever);
89 }
90
91 definitions.append(&mut transform_definition);
92 }
93 }
94
95 Ok(definitions)
96}
97
98pub(super) fn expanded_definitions(
112 enrichment_tables: vector_lib::enrichment::TableRegistry,
113 inputs: &[OutputId],
114 config: &dyn ComponentContainer,
115 cache: &mut Cache,
116) -> Result<Vec<(OutputId, Definition)>, Error> {
117 if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
119 return Ok(definitions.clone());
120 }
121
122 let mut definitions: Vec<(OutputId, Definition)> = vec![];
123 let mut merged_cache = HashMap::default();
124
125 for input in inputs {
126 let key = &input.component;
127
128 if let Some(outputs) = config.source_outputs(key) {
134 let mut source_definitions =
138 outputs
139 .into_iter()
140 .find_map(|output| {
141 if output.port == input.port {
142 Some(input.with_definitions(
143 output.schema_definition(config.schema_enabled()),
144 ))
145 } else {
146 None
147 }
148 })
149 .unwrap_or_else(|| {
150 unreachable!("source output mis-configured")
153 });
154
155 if contains_never(&source_definitions) {
156 return Err(Error::ContainsNever);
157 }
158
159 definitions.append(&mut source_definitions);
160
161 } else if let Some(inputs) = config.transform_inputs(key) {
164 let input_definitions =
165 possible_definitions(inputs, config, enrichment_tables.clone(), &mut merged_cache)?;
166
167 let mut transform_definition = config
168 .transform_outputs(key, enrichment_tables.clone(), &input_definitions)
169 .expect("already found inputs")
170 .iter()
171 .find_map(|output| {
172 if output.port == input.port {
173 Some(
174 input.with_definitions(
175 output
176 .schema_definitions(config.schema_enabled())
177 .values()
178 .cloned(),
179 ),
180 )
181 } else {
182 None
183 }
184 })
185 .expect("transform output misconfigured");
188
189 if contains_never(&transform_definition) {
190 return Err(Error::ContainsNever);
191 }
192
193 definitions.append(&mut transform_definition);
196 }
197 }
198
199 cache.insert(
200 (config.schema_enabled(), inputs.to_vec()),
201 definitions.clone(),
202 );
203
204 Ok(definitions)
205}
206
207pub(crate) fn input_definitions(
211 inputs: &[OutputId],
212 config: &Config,
213 enrichment_tables: vector_lib::enrichment::TableRegistry,
214 cache: &mut Cache,
215) -> Result<Vec<(OutputId, Definition)>, Error> {
216 if inputs.is_empty() {
217 return Ok(vec![]);
218 }
219
220 if let Some(definitions) = cache.get(&(config.schema_enabled(), inputs.to_vec())) {
221 return Ok(definitions.clone());
222 }
223
224 let mut definitions = Vec::new();
225
226 for input in inputs {
227 let key = &input.component;
228
229 if let Ok(maybe_output) = config.source_output_for_port(key, &input.port) {
232 let mut source_definitions = input.with_definitions(
233 maybe_output
234 .unwrap_or_else(|| {
235 unreachable!(
236 "source output mis-configured - output for port {:?} missing",
237 &input.port
238 )
239 })
240 .schema_definition(config.schema_enabled()),
241 );
242
243 if contains_never(&source_definitions) {
244 return Err(Error::ContainsNever);
245 }
246
247 definitions.append(&mut source_definitions);
248 }
249
250 if let Some(inputs) = config.transform_inputs(key) {
253 let transform_definitions =
254 input_definitions(inputs, config, enrichment_tables.clone(), cache)?;
255
256 if contains_never(&transform_definitions) {
257 return Err(Error::ContainsNever);
258 }
259
260 let mut transform_definitions = input.with_definitions(
261 config
262 .transform_output_for_port(
263 key,
264 &input.port,
265 enrichment_tables.clone(),
266 &transform_definitions,
267 )
268 .expect("transform must exist")
269 .unwrap_or_else(|| {
270 unreachable!(
271 "transform output mis-configured - output for port {:?} missing",
272 &input.port
273 )
274 })
275 .schema_definitions(config.schema_enabled())
276 .values()
277 .cloned(),
278 );
279
280 if contains_never(&transform_definitions) {
281 return Err(Error::ContainsNever);
282 }
283
284 definitions.append(&mut transform_definitions);
285 }
286 }
287
288 Ok(definitions)
289}
290
291fn contains_never(transform_definitions: &[(OutputId, Definition)]) -> bool {
295 transform_definitions
296 .iter()
297 .any(|(_, definition)| definition.event_kind().is_never())
298}
299
300pub(super) fn validate_sink_expectations(
301 key: &ComponentKey,
302 sink: &SinkOuter<OutputId>,
303 config: &topology::Config,
304 enrichment_tables: vector_lib::enrichment::TableRegistry,
305) -> Result<(), Vec<String>> {
306 let mut errors = vec![];
307
308 let input = sink.inner.input();
311 let requirement = input.schema_requirement();
312
313 let mut cache = HashMap::default();
315 let definitions =
316 match expanded_definitions(enrichment_tables, &sink.inputs, config, &mut cache) {
317 Ok(definitions) => definitions,
318 Err(err) => {
319 errors.push(err.to_string());
320 return Err(errors);
321 }
322 };
323
324 for (_output, definition) in definitions {
326 if let Err(err) = requirement.validate(&definition, config.schema.validation) {
327 errors.append(
328 &mut err
329 .errors()
330 .iter()
331 .cloned()
332 .map(|err| format!("schema error in component {key}: {err}"))
333 .collect(),
334 );
335 }
336 }
337
338 if !errors.is_empty() {
339 return Err(errors);
340 }
341
342 Ok(())
343}
344
345pub trait ComponentContainer {
346 fn schema_enabled(&self) -> bool;
347
348 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>>;
349
350 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]>;
351
352 fn transform_outputs(
353 &self,
354 key: &ComponentKey,
355 enrichment_tables: vector_lib::enrichment::TableRegistry,
356 input_definitions: &[(OutputId, Definition)],
357 ) -> Option<Vec<TransformOutput>>;
358
359 #[allow(clippy::result_unit_err)]
364 fn transform_output_for_port(
365 &self,
366 key: &ComponentKey,
367 port: &Option<String>,
368 enrichment_tables: vector_lib::enrichment::TableRegistry,
369 input_definitions: &[(OutputId, Definition)],
370 ) -> Result<Option<TransformOutput>, ()> {
371 if let Some(outputs) = self.transform_outputs(key, enrichment_tables, input_definitions) {
372 Ok(get_output_for_port(outputs, port))
373 } else {
374 Err(())
375 }
376 }
377
378 #[allow(clippy::result_unit_err)]
383 fn source_output_for_port(
384 &self,
385 key: &ComponentKey,
386 port: &Option<String>,
387 ) -> Result<Option<SourceOutput>, ()> {
388 if let Some(outputs) = self.source_outputs(key) {
389 Ok(get_source_output_for_port(outputs, port))
390 } else {
391 Err(())
392 }
393 }
394}
395
396fn get_output_for_port(
397 outputs: Vec<TransformOutput>,
398 port: &Option<String>,
399) -> Option<TransformOutput> {
400 outputs.into_iter().find(|output| &output.port == port)
401}
402
403fn get_source_output_for_port(
404 outputs: Vec<SourceOutput>,
405 port: &Option<String>,
406) -> Option<SourceOutput> {
407 outputs.into_iter().find(|output| &output.port == port)
408}
409
410impl ComponentContainer for Config {
411 fn schema_enabled(&self) -> bool {
412 self.schema.enabled
413 }
414
415 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
416 self.source(key)
417 .map(|source| source.inner.outputs(self.schema.log_namespace()))
418 }
419
420 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
421 self.transform(key).map(|transform| &transform.inputs[..])
422 }
423
424 fn transform_outputs(
425 &self,
426 key: &ComponentKey,
427 enrichment_tables: vector_lib::enrichment::TableRegistry,
428 input_definitions: &[(OutputId, Definition)],
429 ) -> Option<Vec<TransformOutput>> {
430 self.transform(key).map(|source| {
431 source.inner.outputs(
432 enrichment_tables,
433 input_definitions,
434 self.schema.log_namespace(),
435 )
436 })
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use std::collections::HashMap;
443
444 use indexmap::IndexMap;
445 use similar_asserts::assert_eq;
446 use vector_lib::config::{DataType, SourceOutput, TransformOutput};
447 use vector_lib::lookup::owned_value_path;
448 use vrl::value::Kind;
449
450 use super::*;
451
452 #[test]
453 fn test_expanded_definition() {
454 struct TestCase {
455 inputs: Vec<(&'static str, Option<String>)>,
456 sources: IndexMap<&'static str, Vec<SourceOutput>>,
457 transforms: IndexMap<&'static str, (Vec<OutputId>, Vec<TransformOutput>)>,
458 want: Vec<(OutputId, Definition)>,
459 }
460
461 impl ComponentContainer for TestCase {
462 fn schema_enabled(&self) -> bool {
463 true
464 }
465
466 fn source_outputs(&self, key: &ComponentKey) -> Option<Vec<SourceOutput>> {
467 self.sources.get(key.id()).cloned()
468 }
469
470 fn transform_inputs(&self, key: &ComponentKey) -> Option<&[OutputId]> {
471 self.transforms.get(key.id()).map(|v| v.0.as_slice())
472 }
473
474 fn transform_outputs(
475 &self,
476 key: &ComponentKey,
477 _: vector_lib::enrichment::TableRegistry,
478 _: &[(OutputId, Definition)],
479 ) -> Option<Vec<TransformOutput>> {
480 self.transforms.get(key.id()).cloned().map(|v| v.1)
481 }
482 }
483
484 for (title, case) in HashMap::from([
485 (
486 "no inputs",
487 TestCase {
488 inputs: vec![],
489 sources: IndexMap::default(),
490 transforms: IndexMap::default(),
491 want: vec![],
492 },
493 ),
494 (
495 "single input, source with default schema",
496 TestCase {
497 inputs: vec![("foo", None)],
498 sources: IndexMap::from([(
499 "foo",
500 vec![SourceOutput::new_maybe_logs(
501 DataType::all_bits(),
502 Definition::default_legacy_namespace(),
503 )],
504 )]),
505 transforms: IndexMap::default(),
506 want: vec![("foo".into(), Definition::default_legacy_namespace())],
507 },
508 ),
509 (
510 "single input, source with schema",
511 TestCase {
512 inputs: vec![("source-foo", None)],
513 sources: IndexMap::from([(
514 "source-foo",
515 vec![SourceOutput::new_maybe_logs(
516 DataType::all_bits(),
517 Definition::empty_legacy_namespace().with_event_field(
518 &owned_value_path!("foo"),
519 Kind::integer().or_bytes(),
520 Some("foo bar"),
521 ),
522 )],
523 )]),
524 transforms: IndexMap::default(),
525 want: vec![(
526 "source-foo".into(),
527 Definition::empty_legacy_namespace().with_event_field(
528 &owned_value_path!("foo"),
529 Kind::integer().or_bytes(),
530 Some("foo bar"),
531 ),
532 )],
533 },
534 ),
535 (
536 "multiple inputs, sources with schema",
537 TestCase {
538 inputs: vec![("source-foo", None), ("source-bar", None)],
539 sources: IndexMap::from([
540 (
541 "source-foo",
542 vec![SourceOutput::new_maybe_logs(
543 DataType::all_bits(),
544 Definition::empty_legacy_namespace().with_event_field(
545 &owned_value_path!("foo"),
546 Kind::integer().or_bytes(),
547 Some("foo bar"),
548 ),
549 )],
550 ),
551 (
552 "source-bar",
553 vec![SourceOutput::new_maybe_logs(
554 DataType::all_bits(),
555 Definition::empty_legacy_namespace().with_event_field(
556 &owned_value_path!("foo"),
557 Kind::timestamp(),
558 Some("baz qux"),
559 ),
560 )],
561 ),
562 ]),
563 transforms: IndexMap::default(),
564 want: vec![
565 (
566 "source-foo".into(),
567 Definition::empty_legacy_namespace().with_event_field(
568 &owned_value_path!("foo"),
569 Kind::integer().or_bytes(),
570 Some("foo bar"),
571 ),
572 ),
573 (
574 "source-bar".into(),
575 Definition::empty_legacy_namespace().with_event_field(
576 &owned_value_path!("foo"),
577 Kind::timestamp(),
578 Some("baz qux"),
579 ),
580 ),
581 ],
582 },
583 ),
584 (
585 "transform overrides source",
586 TestCase {
587 inputs: vec![("source-bar", None), ("transform-baz", None)],
588 sources: IndexMap::from([
589 (
590 "source-foo",
591 vec![SourceOutput::new_maybe_logs(
592 DataType::all_bits(),
593 Definition::empty_legacy_namespace().with_event_field(
594 &owned_value_path!("foo"),
595 Kind::boolean(),
596 Some("foo"),
597 ),
598 )],
599 ),
600 (
601 "source-bar",
602 vec![SourceOutput::new_maybe_logs(
603 DataType::all_bits(),
604 Definition::empty_legacy_namespace().with_event_field(
605 &owned_value_path!("bar"),
606 Kind::integer(),
607 Some("bar"),
608 ),
609 )],
610 ),
611 ]),
612 transforms: IndexMap::from([(
613 "transform-baz",
614 (
615 vec![OutputId::from("source-foo")],
616 vec![TransformOutput::new(
617 DataType::all_bits(),
618 [(
619 "source-foo".into(),
620 Definition::empty_legacy_namespace().with_event_field(
621 &owned_value_path!("baz"),
622 Kind::regex(),
623 Some("baz"),
624 ),
625 )]
626 .into(),
627 )],
628 ),
629 )]),
630 want: vec![
631 (
632 "source-bar".into(),
633 Definition::empty_legacy_namespace().with_event_field(
634 &owned_value_path!("bar"),
635 Kind::integer(),
636 Some("bar"),
637 ),
638 ),
639 (
640 "transform-baz".into(),
641 Definition::empty_legacy_namespace().with_event_field(
642 &owned_value_path!("baz"),
643 Kind::regex(),
644 Some("baz"),
645 ),
646 ),
647 ],
648 },
649 ),
650 (
655 "complex topology",
656 TestCase {
657 inputs: vec![
658 ("Transform 1", None),
659 ("Transform 2", None),
660 ("Transform 5", None),
661 ],
662 sources: IndexMap::from([
663 (
664 "Source 1",
665 vec![SourceOutput::new_maybe_logs(
666 DataType::all_bits(),
667 Definition::empty_legacy_namespace().with_event_field(
668 &owned_value_path!("source-1"),
669 Kind::boolean(),
670 Some("source-1"),
671 ),
672 )],
673 ),
674 (
675 "Source 2",
676 vec![SourceOutput::new_maybe_logs(
677 DataType::all_bits(),
678 Definition::empty_legacy_namespace().with_event_field(
679 &owned_value_path!("source-2"),
680 Kind::integer(),
681 Some("source-2"),
682 ),
683 )],
684 ),
685 ]),
686 transforms: IndexMap::from([
687 (
688 "Transform 1",
689 (
690 vec![OutputId::from("Source 1")],
691 vec![TransformOutput::new(
692 DataType::all_bits(),
693 [(
694 "Source 1".into(),
695 Definition::empty_legacy_namespace().with_event_field(
696 &owned_value_path!("transform-1"),
697 Kind::regex(),
698 None,
699 ),
700 )]
701 .into(),
702 )],
703 ),
704 ),
705 (
706 "Transform 2",
707 (
708 vec![OutputId::from("Source 2")],
709 vec![TransformOutput::new(
710 DataType::all_bits(),
711 [(
712 "Source 2".into(),
713 Definition::empty_legacy_namespace().with_event_field(
714 &owned_value_path!("transform-2"),
715 Kind::float().or_null(),
716 Some("transform-2"),
717 ),
718 )]
719 .into(),
720 )],
721 ),
722 ),
723 (
724 "Transform 3",
725 (
726 vec![OutputId::from("Source 2")],
727 vec![TransformOutput::new(
728 DataType::all_bits(),
729 [(
730 "Source 2".into(),
731 Definition::empty_legacy_namespace().with_event_field(
732 &owned_value_path!("transform-3"),
733 Kind::integer(),
734 Some("transform-3"),
735 ),
736 )]
737 .into(),
738 )],
739 ),
740 ),
741 (
742 "Transform 4",
743 (
744 vec![OutputId::from("Source 2")],
745 vec![TransformOutput::new(
746 DataType::all_bits(),
747 [(
748 "Source 2".into(),
749 Definition::empty_legacy_namespace().with_event_field(
750 &owned_value_path!("transform-4"),
751 Kind::timestamp().or_bytes(),
752 Some("transform-4"),
753 ),
754 )]
755 .into(),
756 )],
757 ),
758 ),
759 (
760 "Transform 5",
761 (
762 vec![OutputId::from("Transform 3"), OutputId::from("Transform 4")],
763 vec![TransformOutput::new(
764 DataType::all_bits(),
765 [(
766 "Transform 3".into(),
767 Definition::empty_legacy_namespace().with_event_field(
768 &owned_value_path!("transform-5"),
769 Kind::boolean(),
770 Some("transform-5"),
771 ),
772 )]
773 .into(),
774 )],
775 ),
776 ),
777 ]),
778 want: vec![
779 (
781 "Transform 1".into(),
782 Definition::empty_legacy_namespace().with_event_field(
783 &owned_value_path!("transform-1"),
784 Kind::regex(),
785 None,
786 ),
787 ),
788 (
790 "Transform 2".into(),
791 Definition::empty_legacy_namespace().with_event_field(
792 &owned_value_path!("transform-2"),
793 Kind::float().or_null(),
794 Some("transform-2"),
795 ),
796 ),
797 (
799 "Transform 5".into(),
800 Definition::empty_legacy_namespace().with_event_field(
801 &owned_value_path!("transform-5"),
802 Kind::boolean(),
803 Some("transform-5"),
804 ),
805 ),
806 ],
807 },
808 ),
809 ]) {
810 let inputs = case
811 .inputs
812 .iter()
813 .cloned()
814 .map(|(key, port)| OutputId {
815 component: key.into(),
816 port,
817 })
818 .collect::<Vec<_>>();
819
820 let got = expanded_definitions(
821 vector_lib::enrichment::TableRegistry::default(),
822 &inputs,
823 &case,
824 &mut HashMap::default(),
825 )
826 .unwrap();
827 assert_eq!(got, case.want, "{}", title);
828 }
829 }
830}