1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 fmt,
4};
5
6use indexmap::{IndexMap, set::IndexSet};
7
8use super::{
9 ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformOuter,
10 TransformOutput, WildcardMatching, schema,
11};
12
13#[derive(Debug, Clone)]
14pub enum Node {
15 Source {
16 outputs: Vec<SourceOutput>,
17 },
18 Transform {
19 in_ty: DataType,
20 outputs: Vec<TransformOutput>,
21 },
22 Sink {
23 ty: DataType,
24 },
25}
26
27impl fmt::Display for Node {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 match self {
30 Node::Source { outputs } => {
31 write!(f, "component_kind: source\n outputs:")?;
32 for output in outputs {
33 write!(f, "\n {output}")?;
34 }
35 Ok(())
36 }
37 Node::Transform { in_ty, outputs } => {
38 write!(
39 f,
40 "component_kind: source\n input_types: {in_ty}\n outputs:"
41 )?;
42 for output in outputs {
43 write!(f, "\n {output}")?;
44 }
45 Ok(())
46 }
47 Node::Sink { ty } => {
48 write!(f, "component_kind: sink\n types: {ty}")
49 }
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
55struct Edge {
56 from: OutputId,
57 to: ComponentKey,
58}
59
60#[derive(Default)]
61pub struct Graph {
62 nodes: HashMap<ComponentKey, Node>,
63 edges: Vec<Edge>,
64}
65
66impl Graph {
67 pub fn new(
68 sources: &IndexMap<ComponentKey, SourceOuter>,
69 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
70 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
71 schema: schema::Options,
72 wildcard_matching: WildcardMatching,
73 ) -> Result<Self, Vec<String>> {
74 Self::new_inner(sources, transforms, sinks, false, schema, wildcard_matching)
75 }
76
77 pub fn new_unchecked(
78 sources: &IndexMap<ComponentKey, SourceOuter>,
79 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
80 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
81 schema: schema::Options,
82 wildcard_matching: WildcardMatching,
83 ) -> Self {
84 Self::new_inner(sources, transforms, sinks, true, schema, wildcard_matching)
85 .expect("errors ignored")
86 }
87
88 fn new_inner(
89 sources: &IndexMap<ComponentKey, SourceOuter>,
90 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
91 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
92 ignore_errors: bool,
93 schema: schema::Options,
94 wildcard_matching: WildcardMatching,
95 ) -> Result<Self, Vec<String>> {
96 let mut graph = Graph::default();
97 let mut errors = Vec::new();
98
99 for (id, config) in sources.iter() {
101 graph.nodes.insert(
102 id.clone(),
103 Node::Source {
104 outputs: config.inner.outputs(schema.log_namespace()),
105 },
106 );
107 }
108
109 for (id, transform) in transforms.iter() {
110 graph.nodes.insert(
111 id.clone(),
112 Node::Transform {
113 in_ty: transform.inner.input().data_type(),
114 outputs: transform.inner.outputs(
115 vector_lib::enrichment::TableRegistry::default(),
116 &[(id.into(), schema::Definition::any())],
117 schema.log_namespace(),
118 ),
119 },
120 );
121 }
122
123 for (id, config) in sinks {
124 graph.nodes.insert(
125 id.clone(),
126 Node::Sink {
127 ty: config.inner.input().data_type(),
128 },
129 );
130 }
131
132 let available_inputs = graph.input_map()?;
135
136 for (id, config) in transforms.iter() {
137 for input in config.inputs.iter() {
138 if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
139 errors.push(e);
140 }
141 }
142 }
143
144 for (id, config) in sinks {
145 for input in config.inputs.iter() {
146 if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
147 errors.push(e);
148 }
149 }
150 }
151
152 if ignore_errors || errors.is_empty() {
153 Ok(graph)
154 } else {
155 Err(errors)
156 }
157 }
158
159 fn add_input(
160 &mut self,
161 from: &str,
162 to: &ComponentKey,
163 available_inputs: &HashMap<String, OutputId>,
164 wildcard_matching: WildcardMatching,
165 ) -> Result<(), String> {
166 if let Some(output_id) = available_inputs.get(from) {
167 self.edges.push(Edge {
168 from: output_id.clone(),
169 to: to.clone(),
170 });
171 Ok(())
172 } else {
173 let output_type = match self.nodes.get(to) {
174 Some(Node::Transform { .. }) => "transform",
175 Some(Node::Sink { .. }) => "sink",
176 _ => panic!("only transforms and sinks have inputs"),
177 };
178 match wildcard_matching {
180 WildcardMatching::Relaxed => {
181 if from != glob::Pattern::escape(from) {
184 info!(
185 "Input \"{from}\" for {output_type} \"{to}\" didn’t match any components, but this was ignored because `relaxed_wildcard_matching` is enabled."
186 );
187 return Ok(());
188 }
189 }
190 WildcardMatching::Strict => {}
191 }
192 info!(
193 "Available components:\n{}",
194 self.nodes
195 .iter()
196 .map(|(key, node)| format!("\"{key}\":\n {node}"))
197 .collect::<Vec<_>>()
198 .join("\n")
199 );
200 Err(format!(
201 "Input \"{from}\" for {output_type} \"{to}\" doesn't match any components.",
202 ))
203 }
204 }
205
206 fn get_input_type(&self, key: &ComponentKey) -> DataType {
213 match self.nodes[key] {
214 Node::Source { .. } => panic!("no inputs on sources"),
215 Node::Transform { in_ty, .. } => in_ty,
216 Node::Sink { ty } => ty,
217 }
218 }
219
220 fn get_output_type(&self, id: &OutputId) -> DataType {
227 match &self.nodes[&id.component] {
228 Node::Source { outputs } => outputs
229 .iter()
230 .find(|output| output.port == id.port)
231 .map(|output| output.ty)
232 .expect("output didn't exist"),
233 Node::Transform { outputs, .. } => outputs
234 .iter()
235 .find(|output| output.port == id.port)
236 .map(|output| output.ty)
237 .expect("output didn't exist"),
238 Node::Sink { .. } => panic!("no outputs on sinks"),
239 }
240 }
241
242 pub fn typecheck(&self) -> Result<(), Vec<String>> {
243 let mut errors = Vec::new();
244
245 for edge in &self.edges {
247 let from_ty = self.get_output_type(&edge.from);
248 let to_ty = self.get_input_type(&edge.to);
249
250 if !from_ty.intersects(to_ty) {
251 errors.push(format!(
252 "Data type mismatch between {} ({}) and {} ({})",
253 edge.from, from_ty, edge.to, to_ty
254 ));
255 }
256 }
257
258 if errors.is_empty() {
259 Ok(())
260 } else {
261 errors.sort();
262 errors.dedup();
263 Err(errors)
264 }
265 }
266
267 pub fn check_for_cycles(&self) -> Result<(), String> {
268 let sinks = self.nodes.iter().filter_map(|(name, node)| match node {
270 Node::Sink { .. } => Some(name),
271 _ => None,
272 });
273
274 for s in sinks {
276 let mut traversal: VecDeque<ComponentKey> = VecDeque::new();
277 let mut visited: HashSet<ComponentKey> = HashSet::new();
278 let mut stack: IndexSet<ComponentKey> = IndexSet::new();
279
280 traversal.push_back(s.to_owned());
281 while !traversal.is_empty() {
282 let n = traversal.back().expect("can't be empty").clone();
283 if !visited.contains(&n) {
284 visited.insert(n.clone());
285 stack.insert(n.clone());
286 } else {
287 stack.shift_remove(&n);
289 traversal.pop_back();
290 }
291 let inputs = self
292 .edges
293 .iter()
294 .filter(|e| e.to == n)
295 .map(|e| e.from.clone());
296 for input in inputs {
297 if !visited.contains(&input.component) {
298 traversal.push_back(input.component);
299 } else if stack.contains(&input.component) {
300 let path = stack
302 .iter()
303 .skip(1) .rev()
305 .map(|item| item.to_string())
306 .collect::<Vec<_>>();
307 return Err(format!(
308 "Cyclic dependency detected in the chain [ {} -> {} ]",
309 input.component.id(),
310 path.join(" -> ")
311 ));
312 }
313 }
314 }
315 }
316 Ok(())
317 }
318
319 pub fn valid_inputs(&self) -> HashSet<OutputId> {
320 self.nodes
321 .iter()
322 .flat_map(|(key, node)| match node {
323 Node::Sink { .. } => vec![],
324 Node::Source { outputs } => outputs
325 .iter()
326 .map(|output| OutputId {
327 component: key.clone(),
328 port: output.port.clone(),
329 })
330 .collect(),
331 Node::Transform { outputs, .. } => outputs
332 .iter()
333 .map(|output| OutputId {
334 component: key.clone(),
335 port: output.port.clone(),
336 })
337 .collect(),
338 })
339 .collect()
340 }
341
342 pub fn input_map(&self) -> Result<HashMap<String, OutputId>, Vec<String>> {
357 let mut mapped: HashMap<String, OutputId> = HashMap::new();
358 let mut errors = HashSet::new();
359
360 for id in self.valid_inputs() {
361 if let Some(_other) = mapped.insert(id.to_string(), id.clone()) {
362 errors.insert(format!("Input specifier {id} is ambiguous"));
363 }
364 }
365
366 if errors.is_empty() {
367 Ok(mapped)
368 } else {
369 Err(errors.into_iter().collect())
370 }
371 }
372
373 pub fn inputs_for(&self, node: &ComponentKey) -> Vec<OutputId> {
374 self.edges
375 .iter()
376 .filter(|edge| &edge.to == node)
377 .map(|edge| edge.from.clone())
378 .collect()
379 }
380
381 pub fn paths_to_sink_from(&self, root: &ComponentKey) -> Vec<Vec<ComponentKey>> {
387 let mut traversal: VecDeque<(ComponentKey, Vec<_>)> = VecDeque::new();
388 let mut paths = Vec::new();
389
390 traversal.push_back((root.to_owned(), Vec::new()));
391 while !traversal.is_empty() {
392 let (n, mut path) = traversal.pop_back().expect("can't be empty");
393 path.push(n.clone());
394 let neighbors = self
395 .edges
396 .iter()
397 .filter(|e| e.from.component == n)
398 .map(|e| e.to.clone())
399 .collect::<Vec<_>>();
400
401 if neighbors.is_empty() {
402 paths.push(path.clone());
403 } else {
404 for neighbor in neighbors {
405 traversal.push_back((neighbor, path.clone()));
406 }
407 }
408 }
409
410 paths
412 .into_iter()
413 .filter(|path| {
414 if let Some(key) = path.last() {
415 matches!(self.nodes.get(key), Some(Node::Sink { ty: _ }))
416 } else {
417 false
418 }
419 })
420 .collect()
421 }
422}
423
424#[cfg(test)]
425mod test {
426 use similar_asserts::assert_eq;
427 use vector_lib::schema::Definition;
428
429 use super::*;
430
431 impl Graph {
432 fn add_source(&mut self, id: &str, ty: DataType) {
433 self.nodes.insert(
434 id.into(),
435 Node::Source {
436 outputs: vec![match ty {
437 DataType::Metric => SourceOutput::new_metrics(),
438 DataType::Trace => SourceOutput::new_traces(),
439 _ => SourceOutput::new_maybe_logs(ty, Definition::any()),
440 }],
441 },
442 );
443 }
444
445 fn add_transform(
446 &mut self,
447 id: &str,
448 in_ty: DataType,
449 out_ty: DataType,
450 inputs: Vec<&str>,
451 ) {
452 let id = ComponentKey::from(id);
453 let inputs = clean_inputs(inputs);
454 self.nodes.insert(
455 id.clone(),
456 Node::Transform {
457 in_ty,
458 outputs: vec![TransformOutput::new(
459 out_ty,
460 [("test".into(), Definition::default_legacy_namespace())].into(),
461 )],
462 },
463 );
464 for from in inputs {
465 self.edges.push(Edge {
466 from,
467 to: id.clone(),
468 });
469 }
470 }
471
472 fn add_transform_output(&mut self, id: &str, name: &str, ty: DataType) {
473 let id = id.into();
474 match self.nodes.get_mut(&id) {
475 Some(Node::Transform { outputs, .. }) => outputs.push(
476 TransformOutput::new(
477 ty,
478 [("test".into(), Definition::default_legacy_namespace())].into(),
479 )
480 .with_port(name),
481 ),
482 _ => panic!("invalid transform"),
483 }
484 }
485
486 fn add_sink(&mut self, id: &str, ty: DataType, inputs: Vec<&str>) {
487 let id = ComponentKey::from(id);
488 let inputs = clean_inputs(inputs);
489 self.nodes.insert(id.clone(), Node::Sink { ty });
490 for from in inputs {
491 self.edges.push(Edge {
492 from,
493 to: id.clone(),
494 });
495 }
496 }
497
498 fn test_add_input(
499 &mut self,
500 node: &str,
501 input: &str,
502 wildcard_matching: WildcardMatching,
503 ) -> Result<(), String> {
504 let available_inputs = self.input_map().unwrap();
505 self.add_input(input, &node.into(), &available_inputs, wildcard_matching)
506 }
507 }
508
509 fn clean_inputs(inputs: Vec<&str>) -> Vec<OutputId> {
510 inputs.into_iter().map(Into::into).collect()
511 }
512
513 #[test]
514 fn paths_detects_cycles() {
515 let mut graph = Graph::default();
516 graph.add_source("in", DataType::Log);
517 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
518 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
519 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
520 graph.add_sink("out", DataType::Log, vec!["three"]);
521
522 assert_eq!(
523 Err("Cyclic dependency detected in the chain [ three -> one -> two -> three ]".into()),
524 graph.check_for_cycles()
525 );
526
527 let mut graph = Graph::default();
528 graph.add_source("in", DataType::Log);
529 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
530 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
531 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
532 graph.add_sink("out", DataType::Log, vec!["two"]);
533
534 assert_eq!(
535 Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
536 graph.check_for_cycles()
537 );
538 assert_eq!(
539 Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
540 graph.check_for_cycles()
541 );
542
543 let mut graph = Graph::default();
544 graph.add_source("in", DataType::Log);
545 graph.add_transform("in", DataType::Log, DataType::Log, vec!["in"]);
546 graph.add_sink("out", DataType::Log, vec!["in"]);
547
548 assert_eq!(
550 Err("Cyclic dependency detected in the chain [ in -> in ]".into()),
551 graph.check_for_cycles()
552 );
553 }
554
555 #[test]
556 fn paths_doesnt_detect_noncycles() {
557 let mut graph = Graph::default();
558 graph.add_source("in", DataType::Log);
559 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
560 graph.add_transform("two", DataType::Log, DataType::Log, vec!["in"]);
561 graph.add_transform("three", DataType::Log, DataType::Log, vec!["one", "two"]);
562 graph.add_sink("out", DataType::Log, vec!["three"]);
563
564 graph.check_for_cycles().unwrap();
565 }
566
567 #[test]
568 fn detects_type_mismatches() {
569 let mut graph = Graph::default();
570 graph.add_source("in", DataType::Log);
571 graph.add_sink("out", DataType::Metric, vec!["in"]);
572
573 assert_eq!(
574 Err(vec![
575 "Data type mismatch between in ([\"Log\"]) and out ([\"Metric\"])".into()
576 ]),
577 graph.typecheck()
578 );
579 }
580
581 #[test]
582 fn allows_log_or_metric_into_any() {
583 let mut graph = Graph::default();
584 graph.add_source("log_source", DataType::Log);
585 graph.add_source("metric_source", DataType::Metric);
586 graph.add_sink(
587 "any_sink",
588 DataType::all_bits(),
589 vec!["log_source", "metric_source"],
590 );
591
592 assert_eq!(Ok(()), graph.typecheck());
593 }
594
595 #[test]
596 fn allows_any_into_log_or_metric() {
597 let mut graph = Graph::default();
598 graph.add_source("any_source", DataType::all_bits());
599 graph.add_transform(
600 "log_to_any",
601 DataType::Log,
602 DataType::all_bits(),
603 vec!["any_source"],
604 );
605 graph.add_transform(
606 "any_to_log",
607 DataType::all_bits(),
608 DataType::Log,
609 vec!["any_source"],
610 );
611 graph.add_sink(
612 "log_sink",
613 DataType::Log,
614 vec!["any_source", "log_to_any", "any_to_log"],
615 );
616 graph.add_sink(
617 "metric_sink",
618 DataType::Metric,
619 vec!["any_source", "log_to_any"],
620 );
621
622 assert_eq!(graph.typecheck(), Ok(()));
623 }
624
625 #[test]
626 fn allows_both_directions_for_metrics() {
627 let mut graph = Graph::default();
628 graph.add_source("log_source", DataType::Log);
629 graph.add_source("metric_source", DataType::Metric);
630 graph.add_transform(
631 "log_to_log",
632 DataType::Log,
633 DataType::Log,
634 vec!["log_source"],
635 );
636 graph.add_transform(
637 "metric_to_metric",
638 DataType::Metric,
639 DataType::Metric,
640 vec!["metric_source"],
641 );
642 graph.add_transform(
643 "any_to_any",
644 DataType::all_bits(),
645 DataType::all_bits(),
646 vec!["log_to_log", "metric_to_metric"],
647 );
648 graph.add_transform(
649 "any_to_log",
650 DataType::all_bits(),
651 DataType::Log,
652 vec!["any_to_any"],
653 );
654 graph.add_transform(
655 "any_to_metric",
656 DataType::all_bits(),
657 DataType::Metric,
658 vec!["any_to_any"],
659 );
660 graph.add_sink("log_sink", DataType::Log, vec!["any_to_log"]);
661 graph.add_sink("metric_sink", DataType::Metric, vec!["any_to_metric"]);
662
663 assert_eq!(Ok(()), graph.typecheck());
664 }
665
666 #[test]
667 fn allows_multiple_transform_outputs() {
668 let mut graph = Graph::default();
669 graph.add_source("log_source", DataType::Log);
670 graph.add_transform(
671 "log_to_log",
672 DataType::Log,
673 DataType::Log,
674 vec!["log_source"],
675 );
676 graph.add_transform_output("log_to_log", "errors", DataType::Log);
677 graph.add_sink("good_log_sink", DataType::Log, vec!["log_to_log"]);
678
679 graph.add_sink("errored_log_sink", DataType::Log, vec![]);
681 graph.add_sink("bad_log_sink", DataType::Log, vec![]);
682
683 assert_eq!(
685 Ok(()),
686 graph.test_add_input(
687 "errored_log_sink",
688 "log_to_log.errors",
689 WildcardMatching::Strict
690 )
691 );
692
693 let expected = "Input \"log_to_log.not_errors\" for sink \"bad_log_sink\" doesn't match any components.".to_string();
695 assert_eq!(
696 Err(expected),
697 graph.test_add_input(
698 "bad_log_sink",
699 "log_to_log.not_errors",
700 WildcardMatching::Strict
701 )
702 );
703 }
704
705 #[test]
706 fn disallows_ambiguous_inputs() {
707 let mut graph = Graph::default();
708 graph.nodes.insert(
710 ComponentKey::from("foo.bar"),
711 Node::Source {
712 outputs: vec![SourceOutput::new_maybe_logs(
713 DataType::all_bits(),
714 Definition::any(),
715 )],
716 },
717 );
718 graph.nodes.insert(
719 ComponentKey::from("foo.bar"),
720 Node::Source {
721 outputs: vec![SourceOutput::new_maybe_logs(
722 DataType::all_bits(),
723 Definition::any(),
724 )],
725 },
726 );
727 graph.nodes.insert(
728 ComponentKey::from("foo"),
729 Node::Transform {
730 in_ty: DataType::all_bits(),
731 outputs: vec![
732 TransformOutput::new(
733 DataType::all_bits(),
734 [("test".into(), Definition::default_legacy_namespace())].into(),
735 ),
736 TransformOutput::new(
737 DataType::all_bits(),
738 [("test".into(), Definition::default_legacy_namespace())].into(),
739 )
740 .with_port("bar"),
741 ],
742 },
743 );
744
745 graph.nodes.insert(
747 ComponentKey::from("baz.errors"),
748 Node::Source {
749 outputs: vec![SourceOutput::new_maybe_logs(
750 DataType::all_bits(),
751 Definition::any(),
752 )],
753 },
754 );
755 graph.nodes.insert(
756 ComponentKey::from("baz"),
757 Node::Transform {
758 in_ty: DataType::all_bits(),
759 outputs: vec![
760 TransformOutput::new(
761 DataType::all_bits(),
762 [("test".into(), Definition::default_legacy_namespace())].into(),
763 ),
764 TransformOutput::new(
765 DataType::all_bits(),
766 [("test".into(), Definition::default_legacy_namespace())].into(),
767 )
768 .with_port("errors"),
769 ],
770 },
771 );
772
773 let mut errors = graph.input_map().unwrap_err();
774 errors.sort();
775 assert_eq!(
776 errors,
777 vec![
778 String::from("Input specifier baz.errors is ambiguous"),
779 String::from("Input specifier foo.bar is ambiguous"),
780 ]
781 );
782 }
783
784 #[test]
785 fn wildcard_matching() {
786 let mut graph = Graph::default();
787 graph.add_source("log_source", DataType::Log);
788
789 graph.add_sink("sink", DataType::Log, vec![]);
791
792 let wildcard_matching = WildcardMatching::Strict;
794 let expected =
795 "Input \"bad_source-*\" for sink \"sink\" doesn't match any components.".to_string();
796 assert_eq!(
797 Err(expected),
798 graph.test_add_input("sink", "bad_source-*", wildcard_matching)
799 );
800
801 let wildcard_matching = WildcardMatching::Relaxed;
803 assert_eq!(
804 Ok(()),
805 graph.test_add_input("sink", "bad_source-*", wildcard_matching)
806 );
807
808 let wildcard_matching = WildcardMatching::Relaxed;
810 let expected =
811 "Input \"bad_source-1\" for sink \"sink\" doesn't match any components.".to_string();
812 assert_eq!(
813 Err(expected),
814 graph.test_add_input("sink", "bad_source-1", wildcard_matching)
815 );
816 }
817
818 #[test]
819 fn paths_to_sink_simple() {
820 let mut graph = Graph::default();
821 graph.add_source("in", DataType::Log);
822 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
823 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
824 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
825 graph.add_sink("out", DataType::Log, vec!["three"]);
826
827 let paths: Vec<Vec<_>> = graph
828 .paths_to_sink_from(&ComponentKey::from("in"))
829 .into_iter()
830 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
831 .collect();
832
833 assert_eq!(paths.len(), 1);
834 assert_eq!(paths[0], vec!["in", "one", "two", "three", "out"])
835 }
836
837 #[test]
838 fn paths_to_sink_non_existent_root() {
839 let graph = Graph::default();
840 let paths = graph.paths_to_sink_from(&ComponentKey::from("in"));
841
842 assert_eq!(paths.len(), 0);
843 }
844
845 #[test]
846 fn paths_to_sink_irrelevant_transforms() {
847 let mut graph = Graph::default();
848 graph.add_source("source", DataType::Log);
849 graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
851 graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
852 graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
853 graph.add_transform("t4", DataType::Log, DataType::Log, vec!["source"]);
855 graph.add_transform("t5", DataType::Log, DataType::Log, vec!["source"]);
856 graph.add_sink("sink1", DataType::Log, vec!["t4"]);
857 graph.add_sink("sink2", DataType::Log, vec!["t5"]);
858
859 let paths: Vec<Vec<_>> = graph
860 .paths_to_sink_from(&ComponentKey::from("source"))
861 .into_iter()
862 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
863 .collect();
864
865 assert_eq!(paths.len(), 2);
866 assert_eq!(paths[0], vec!["source", "t5", "sink2"]);
867 assert_eq!(paths[1], vec!["source", "t4", "sink1"]);
868 }
869
870 #[test]
871 fn paths_to_sink_multiple_inputs_into_sink() {
872 let mut graph = Graph::default();
873 graph.add_source("source", DataType::Log);
874 graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
875 graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
876 graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
877 graph.add_sink("sink1", DataType::Log, vec!["t2", "t3"]);
878
879 let paths: Vec<Vec<_>> = graph
880 .paths_to_sink_from(&ComponentKey::from("source"))
881 .into_iter()
882 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
883 .collect();
884
885 assert_eq!(paths.len(), 2);
886 assert_eq!(paths[0], vec!["source", "t1", "t3", "sink1"]);
887 assert_eq!(paths[1], vec!["source", "t1", "t2", "sink1"]);
888 }
889}