1use std::{
2 collections::{HashMap, HashSet, VecDeque},
3 fmt,
4};
5
6use indexmap::{IndexMap, set::IndexSet};
7
8use super::{
9 ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformContext,
10 TransformOuter, TransformOutput, WildcardMatching, schema,
11};
12
13#[derive(Debug, Clone)]
14pub enum Node {
15 Source {
16 outputs: Vec<SourceOutput>,
17 },
18 Transform {
19 in_ty: DataType,
20 outputs: Vec<TransformOutput>,
21 },
22 Sink {
23 ty: DataType,
24 },
25}
26
27impl fmt::Display for Node {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 match self {
30 Node::Source { outputs } => {
31 write!(f, "component_kind: source\n outputs:")?;
32 for output in outputs {
33 write!(f, "\n {output}")?;
34 }
35 Ok(())
36 }
37 Node::Transform { in_ty, outputs } => {
38 write!(
39 f,
40 "component_kind: source\n input_types: {in_ty}\n outputs:"
41 )?;
42 for output in outputs {
43 write!(f, "\n {output}")?;
44 }
45 Ok(())
46 }
47 Node::Sink { ty } => {
48 write!(f, "component_kind: sink\n types: {ty}")
49 }
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
55struct Edge {
56 from: OutputId,
57 to: ComponentKey,
58}
59
60#[derive(Default)]
61pub struct Graph {
62 nodes: HashMap<ComponentKey, Node>,
63 edges: Vec<Edge>,
64}
65
66impl Graph {
67 pub fn new(
68 sources: &IndexMap<ComponentKey, SourceOuter>,
69 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
70 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
71 schema: schema::Options,
72 wildcard_matching: WildcardMatching,
73 ) -> Result<Self, Vec<String>> {
74 Self::new_inner(sources, transforms, sinks, false, schema, wildcard_matching)
75 }
76
77 pub fn new_unchecked(
78 sources: &IndexMap<ComponentKey, SourceOuter>,
79 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
80 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
81 schema: schema::Options,
82 wildcard_matching: WildcardMatching,
83 ) -> Self {
84 Self::new_inner(sources, transforms, sinks, true, schema, wildcard_matching)
85 .expect("errors ignored")
86 }
87
88 fn new_inner(
89 sources: &IndexMap<ComponentKey, SourceOuter>,
90 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
91 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
92 ignore_errors: bool,
93 schema: schema::Options,
94 wildcard_matching: WildcardMatching,
95 ) -> Result<Self, Vec<String>> {
96 let mut graph = Graph::default();
97 let mut errors = Vec::new();
98
99 for (id, config) in sources.iter() {
101 graph.nodes.insert(
102 id.clone(),
103 Node::Source {
104 outputs: config.inner.outputs(schema.log_namespace()),
105 },
106 );
107 }
108
109 for (id, transform) in transforms.iter() {
110 graph.nodes.insert(
111 id.clone(),
112 Node::Transform {
113 in_ty: transform.inner.input().data_type(),
114 outputs: transform.inner.outputs(
115 &TransformContext {
116 schema,
117 ..Default::default()
118 },
119 &[(id.into(), schema::Definition::any())],
120 ),
121 },
122 );
123 }
124
125 for (id, config) in sinks {
126 graph.nodes.insert(
127 id.clone(),
128 Node::Sink {
129 ty: config.inner.input().data_type(),
130 },
131 );
132 }
133
134 let available_inputs = graph.input_map()?;
137
138 for (id, config) in transforms.iter() {
139 for input in config.inputs.iter() {
140 if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
141 errors.push(e);
142 }
143 }
144 }
145
146 for (id, config) in sinks {
147 for input in config.inputs.iter() {
148 if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
149 errors.push(e);
150 }
151 }
152 }
153
154 if ignore_errors || errors.is_empty() {
155 Ok(graph)
156 } else {
157 Err(errors)
158 }
159 }
160
161 fn add_input(
162 &mut self,
163 from: &str,
164 to: &ComponentKey,
165 available_inputs: &HashMap<String, OutputId>,
166 wildcard_matching: WildcardMatching,
167 ) -> Result<(), String> {
168 if let Some(output_id) = available_inputs.get(from) {
169 self.edges.push(Edge {
170 from: output_id.clone(),
171 to: to.clone(),
172 });
173 Ok(())
174 } else {
175 let output_type = match self.nodes.get(to) {
176 Some(Node::Transform { .. }) => "transform",
177 Some(Node::Sink { .. }) => "sink",
178 _ => panic!("only transforms and sinks have inputs"),
179 };
180 match wildcard_matching {
182 WildcardMatching::Relaxed => {
183 if from != glob::Pattern::escape(from) {
186 info!(
187 "Input \"{from}\" for {output_type} \"{to}\" didn’t match any components, but this was ignored because `relaxed_wildcard_matching` is enabled."
188 );
189 return Ok(());
190 }
191 }
192 WildcardMatching::Strict => {}
193 }
194 info!(
195 "Available components:\n{}",
196 self.nodes
197 .iter()
198 .map(|(key, node)| format!("\"{key}\":\n {node}"))
199 .collect::<Vec<_>>()
200 .join("\n")
201 );
202 Err(format!(
203 "Input \"{from}\" for {output_type} \"{to}\" doesn't match any components.",
204 ))
205 }
206 }
207
208 fn get_input_type(&self, key: &ComponentKey) -> DataType {
215 match self.nodes[key] {
216 Node::Source { .. } => panic!("no inputs on sources"),
217 Node::Transform { in_ty, .. } => in_ty,
218 Node::Sink { ty } => ty,
219 }
220 }
221
222 fn get_output_type(&self, id: &OutputId) -> DataType {
229 match &self.nodes[&id.component] {
230 Node::Source { outputs } => outputs
231 .iter()
232 .find(|output| output.port == id.port)
233 .map(|output| output.ty)
234 .expect("output didn't exist"),
235 Node::Transform { outputs, .. } => outputs
236 .iter()
237 .find(|output| output.port == id.port)
238 .map(|output| output.ty)
239 .expect("output didn't exist"),
240 Node::Sink { .. } => panic!("no outputs on sinks"),
241 }
242 }
243
244 pub fn typecheck(&self) -> Result<(), Vec<String>> {
245 let mut errors = Vec::new();
246
247 for edge in &self.edges {
249 let from_ty = self.get_output_type(&edge.from);
250 let to_ty = self.get_input_type(&edge.to);
251
252 if !from_ty.intersects(to_ty) {
253 errors.push(format!(
254 "Data type mismatch between {} ({}) and {} ({})",
255 edge.from, from_ty, edge.to, to_ty
256 ));
257 }
258 }
259
260 if errors.is_empty() {
261 Ok(())
262 } else {
263 errors.sort();
264 errors.dedup();
265 Err(errors)
266 }
267 }
268
269 pub fn check_for_cycles(&self) -> Result<(), String> {
270 let sinks = self.nodes.iter().filter_map(|(name, node)| match node {
272 Node::Sink { .. } => Some(name),
273 _ => None,
274 });
275
276 for s in sinks {
278 let mut traversal: VecDeque<ComponentKey> = VecDeque::new();
279 let mut visited: HashSet<ComponentKey> = HashSet::new();
280 let mut stack: IndexSet<ComponentKey> = IndexSet::new();
281
282 traversal.push_back(s.to_owned());
283 while !traversal.is_empty() {
284 let n = traversal.back().expect("can't be empty").clone();
285 if !visited.contains(&n) {
286 visited.insert(n.clone());
287 stack.insert(n.clone());
288 } else {
289 stack.shift_remove(&n);
291 traversal.pop_back();
292 }
293 let inputs = self
294 .edges
295 .iter()
296 .filter(|e| e.to == n)
297 .map(|e| e.from.clone());
298 for input in inputs {
299 if !visited.contains(&input.component) {
300 traversal.push_back(input.component);
301 } else if stack.contains(&input.component) {
302 let path = stack
304 .iter()
305 .skip(1) .rev()
307 .map(|item| item.to_string())
308 .collect::<Vec<_>>();
309 return Err(format!(
310 "Cyclic dependency detected in the chain [ {} -> {} ]",
311 input.component.id(),
312 path.join(" -> ")
313 ));
314 }
315 }
316 }
317 }
318 Ok(())
319 }
320
321 pub fn valid_inputs(&self) -> HashSet<OutputId> {
322 self.nodes
323 .iter()
324 .flat_map(|(key, node)| match node {
325 Node::Sink { .. } => vec![],
326 Node::Source { outputs } => outputs
327 .iter()
328 .map(|output| OutputId {
329 component: key.clone(),
330 port: output.port.clone(),
331 })
332 .collect(),
333 Node::Transform { outputs, .. } => outputs
334 .iter()
335 .map(|output| OutputId {
336 component: key.clone(),
337 port: output.port.clone(),
338 })
339 .collect(),
340 })
341 .collect()
342 }
343
344 pub fn input_map(&self) -> Result<HashMap<String, OutputId>, Vec<String>> {
359 let mut mapped: HashMap<String, OutputId> = HashMap::new();
360 let mut errors = HashSet::new();
361
362 for id in self.valid_inputs() {
363 if let Some(_other) = mapped.insert(id.to_string(), id.clone()) {
364 errors.insert(format!("Input specifier {id} is ambiguous"));
365 }
366 }
367
368 if errors.is_empty() {
369 Ok(mapped)
370 } else {
371 Err(errors.into_iter().collect())
372 }
373 }
374
375 pub fn inputs_for(&self, node: &ComponentKey) -> Vec<OutputId> {
376 self.edges
377 .iter()
378 .filter(|edge| &edge.to == node)
379 .map(|edge| edge.from.clone())
380 .collect()
381 }
382
383 pub fn paths_to_sink_from(&self, root: &ComponentKey) -> Vec<Vec<ComponentKey>> {
389 let mut traversal: VecDeque<(ComponentKey, Vec<_>)> = VecDeque::new();
390 let mut paths = Vec::new();
391
392 traversal.push_back((root.to_owned(), Vec::new()));
393 while !traversal.is_empty() {
394 let (n, mut path) = traversal.pop_back().expect("can't be empty");
395 path.push(n.clone());
396 let neighbors = self
397 .edges
398 .iter()
399 .filter(|e| e.from.component == n)
400 .map(|e| e.to.clone())
401 .collect::<Vec<_>>();
402
403 if neighbors.is_empty() {
404 paths.push(path.clone());
405 } else {
406 for neighbor in neighbors {
407 traversal.push_back((neighbor, path.clone()));
408 }
409 }
410 }
411
412 paths
414 .into_iter()
415 .filter(|path| {
416 if let Some(key) = path.last() {
417 matches!(self.nodes.get(key), Some(Node::Sink { ty: _ }))
418 } else {
419 false
420 }
421 })
422 .collect()
423 }
424}
425
426#[cfg(test)]
427mod test {
428 use similar_asserts::assert_eq;
429 use vector_lib::schema::Definition;
430
431 use super::*;
432
433 impl Graph {
434 fn add_source(&mut self, id: &str, ty: DataType) {
435 self.nodes.insert(
436 id.into(),
437 Node::Source {
438 outputs: vec![match ty {
439 DataType::Metric => SourceOutput::new_metrics(),
440 DataType::Trace => SourceOutput::new_traces(),
441 _ => SourceOutput::new_maybe_logs(ty, Definition::any()),
442 }],
443 },
444 );
445 }
446
447 fn add_transform(
448 &mut self,
449 id: &str,
450 in_ty: DataType,
451 out_ty: DataType,
452 inputs: Vec<&str>,
453 ) {
454 let id = ComponentKey::from(id);
455 let inputs = clean_inputs(inputs);
456 self.nodes.insert(
457 id.clone(),
458 Node::Transform {
459 in_ty,
460 outputs: vec![TransformOutput::new(
461 out_ty,
462 [("test".into(), Definition::default_legacy_namespace())].into(),
463 )],
464 },
465 );
466 for from in inputs {
467 self.edges.push(Edge {
468 from,
469 to: id.clone(),
470 });
471 }
472 }
473
474 fn add_transform_output(&mut self, id: &str, name: &str, ty: DataType) {
475 let id = id.into();
476 match self.nodes.get_mut(&id) {
477 Some(Node::Transform { outputs, .. }) => outputs.push(
478 TransformOutput::new(
479 ty,
480 [("test".into(), Definition::default_legacy_namespace())].into(),
481 )
482 .with_port(name),
483 ),
484 _ => panic!("invalid transform"),
485 }
486 }
487
488 fn add_sink(&mut self, id: &str, ty: DataType, inputs: Vec<&str>) {
489 let id = ComponentKey::from(id);
490 let inputs = clean_inputs(inputs);
491 self.nodes.insert(id.clone(), Node::Sink { ty });
492 for from in inputs {
493 self.edges.push(Edge {
494 from,
495 to: id.clone(),
496 });
497 }
498 }
499
500 fn test_add_input(
501 &mut self,
502 node: &str,
503 input: &str,
504 wildcard_matching: WildcardMatching,
505 ) -> Result<(), String> {
506 let available_inputs = self.input_map().unwrap();
507 self.add_input(input, &node.into(), &available_inputs, wildcard_matching)
508 }
509 }
510
511 fn clean_inputs(inputs: Vec<&str>) -> Vec<OutputId> {
512 inputs.into_iter().map(Into::into).collect()
513 }
514
515 #[test]
516 fn paths_detects_cycles() {
517 let mut graph = Graph::default();
518 graph.add_source("in", DataType::Log);
519 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
520 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
521 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
522 graph.add_sink("out", DataType::Log, vec!["three"]);
523
524 assert_eq!(
525 Err("Cyclic dependency detected in the chain [ three -> one -> two -> three ]".into()),
526 graph.check_for_cycles()
527 );
528
529 let mut graph = Graph::default();
530 graph.add_source("in", DataType::Log);
531 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
532 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
533 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
534 graph.add_sink("out", DataType::Log, vec!["two"]);
535
536 assert_eq!(
537 Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
538 graph.check_for_cycles()
539 );
540 assert_eq!(
541 Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
542 graph.check_for_cycles()
543 );
544
545 let mut graph = Graph::default();
546 graph.add_source("in", DataType::Log);
547 graph.add_transform("in", DataType::Log, DataType::Log, vec!["in"]);
548 graph.add_sink("out", DataType::Log, vec!["in"]);
549
550 assert_eq!(
552 Err("Cyclic dependency detected in the chain [ in -> in ]".into()),
553 graph.check_for_cycles()
554 );
555 }
556
557 #[test]
558 fn paths_doesnt_detect_noncycles() {
559 let mut graph = Graph::default();
560 graph.add_source("in", DataType::Log);
561 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
562 graph.add_transform("two", DataType::Log, DataType::Log, vec!["in"]);
563 graph.add_transform("three", DataType::Log, DataType::Log, vec!["one", "two"]);
564 graph.add_sink("out", DataType::Log, vec!["three"]);
565
566 graph.check_for_cycles().unwrap();
567 }
568
569 #[test]
570 fn detects_type_mismatches() {
571 let mut graph = Graph::default();
572 graph.add_source("in", DataType::Log);
573 graph.add_sink("out", DataType::Metric, vec!["in"]);
574
575 assert_eq!(
576 Err(vec![
577 "Data type mismatch between in ([\"Log\"]) and out ([\"Metric\"])".into()
578 ]),
579 graph.typecheck()
580 );
581 }
582
583 #[test]
584 fn allows_log_or_metric_into_any() {
585 let mut graph = Graph::default();
586 graph.add_source("log_source", DataType::Log);
587 graph.add_source("metric_source", DataType::Metric);
588 graph.add_sink(
589 "any_sink",
590 DataType::all_bits(),
591 vec!["log_source", "metric_source"],
592 );
593
594 assert_eq!(Ok(()), graph.typecheck());
595 }
596
597 #[test]
598 fn allows_any_into_log_or_metric() {
599 let mut graph = Graph::default();
600 graph.add_source("any_source", DataType::all_bits());
601 graph.add_transform(
602 "log_to_any",
603 DataType::Log,
604 DataType::all_bits(),
605 vec!["any_source"],
606 );
607 graph.add_transform(
608 "any_to_log",
609 DataType::all_bits(),
610 DataType::Log,
611 vec!["any_source"],
612 );
613 graph.add_sink(
614 "log_sink",
615 DataType::Log,
616 vec!["any_source", "log_to_any", "any_to_log"],
617 );
618 graph.add_sink(
619 "metric_sink",
620 DataType::Metric,
621 vec!["any_source", "log_to_any"],
622 );
623
624 assert_eq!(graph.typecheck(), Ok(()));
625 }
626
627 #[test]
628 fn allows_both_directions_for_metrics() {
629 let mut graph = Graph::default();
630 graph.add_source("log_source", DataType::Log);
631 graph.add_source("metric_source", DataType::Metric);
632 graph.add_transform(
633 "log_to_log",
634 DataType::Log,
635 DataType::Log,
636 vec!["log_source"],
637 );
638 graph.add_transform(
639 "metric_to_metric",
640 DataType::Metric,
641 DataType::Metric,
642 vec!["metric_source"],
643 );
644 graph.add_transform(
645 "any_to_any",
646 DataType::all_bits(),
647 DataType::all_bits(),
648 vec!["log_to_log", "metric_to_metric"],
649 );
650 graph.add_transform(
651 "any_to_log",
652 DataType::all_bits(),
653 DataType::Log,
654 vec!["any_to_any"],
655 );
656 graph.add_transform(
657 "any_to_metric",
658 DataType::all_bits(),
659 DataType::Metric,
660 vec!["any_to_any"],
661 );
662 graph.add_sink("log_sink", DataType::Log, vec!["any_to_log"]);
663 graph.add_sink("metric_sink", DataType::Metric, vec!["any_to_metric"]);
664
665 assert_eq!(Ok(()), graph.typecheck());
666 }
667
668 #[test]
669 fn allows_multiple_transform_outputs() {
670 let mut graph = Graph::default();
671 graph.add_source("log_source", DataType::Log);
672 graph.add_transform(
673 "log_to_log",
674 DataType::Log,
675 DataType::Log,
676 vec!["log_source"],
677 );
678 graph.add_transform_output("log_to_log", "errors", DataType::Log);
679 graph.add_sink("good_log_sink", DataType::Log, vec!["log_to_log"]);
680
681 graph.add_sink("errored_log_sink", DataType::Log, vec![]);
683 graph.add_sink("bad_log_sink", DataType::Log, vec![]);
684
685 assert_eq!(
687 Ok(()),
688 graph.test_add_input(
689 "errored_log_sink",
690 "log_to_log.errors",
691 WildcardMatching::Strict
692 )
693 );
694
695 let expected = "Input \"log_to_log.not_errors\" for sink \"bad_log_sink\" doesn't match any components.".to_string();
697 assert_eq!(
698 Err(expected),
699 graph.test_add_input(
700 "bad_log_sink",
701 "log_to_log.not_errors",
702 WildcardMatching::Strict
703 )
704 );
705 }
706
707 #[test]
708 fn disallows_ambiguous_inputs() {
709 let mut graph = Graph::default();
710 graph.nodes.insert(
712 ComponentKey::from("foo.bar"),
713 Node::Source {
714 outputs: vec![SourceOutput::new_maybe_logs(
715 DataType::all_bits(),
716 Definition::any(),
717 )],
718 },
719 );
720 graph.nodes.insert(
721 ComponentKey::from("foo.bar"),
722 Node::Source {
723 outputs: vec![SourceOutput::new_maybe_logs(
724 DataType::all_bits(),
725 Definition::any(),
726 )],
727 },
728 );
729 graph.nodes.insert(
730 ComponentKey::from("foo"),
731 Node::Transform {
732 in_ty: DataType::all_bits(),
733 outputs: vec![
734 TransformOutput::new(
735 DataType::all_bits(),
736 [("test".into(), Definition::default_legacy_namespace())].into(),
737 ),
738 TransformOutput::new(
739 DataType::all_bits(),
740 [("test".into(), Definition::default_legacy_namespace())].into(),
741 )
742 .with_port("bar"),
743 ],
744 },
745 );
746
747 graph.nodes.insert(
749 ComponentKey::from("baz.errors"),
750 Node::Source {
751 outputs: vec![SourceOutput::new_maybe_logs(
752 DataType::all_bits(),
753 Definition::any(),
754 )],
755 },
756 );
757 graph.nodes.insert(
758 ComponentKey::from("baz"),
759 Node::Transform {
760 in_ty: DataType::all_bits(),
761 outputs: vec![
762 TransformOutput::new(
763 DataType::all_bits(),
764 [("test".into(), Definition::default_legacy_namespace())].into(),
765 ),
766 TransformOutput::new(
767 DataType::all_bits(),
768 [("test".into(), Definition::default_legacy_namespace())].into(),
769 )
770 .with_port("errors"),
771 ],
772 },
773 );
774
775 let mut errors = graph.input_map().unwrap_err();
776 errors.sort();
777 assert_eq!(
778 errors,
779 vec![
780 String::from("Input specifier baz.errors is ambiguous"),
781 String::from("Input specifier foo.bar is ambiguous"),
782 ]
783 );
784 }
785
786 #[test]
787 fn wildcard_matching() {
788 let mut graph = Graph::default();
789 graph.add_source("log_source", DataType::Log);
790
791 graph.add_sink("sink", DataType::Log, vec![]);
793
794 let wildcard_matching = WildcardMatching::Strict;
796 let expected =
797 "Input \"bad_source-*\" for sink \"sink\" doesn't match any components.".to_string();
798 assert_eq!(
799 Err(expected),
800 graph.test_add_input("sink", "bad_source-*", wildcard_matching)
801 );
802
803 let wildcard_matching = WildcardMatching::Relaxed;
805 assert_eq!(
806 Ok(()),
807 graph.test_add_input("sink", "bad_source-*", wildcard_matching)
808 );
809
810 let wildcard_matching = WildcardMatching::Relaxed;
812 let expected =
813 "Input \"bad_source-1\" for sink \"sink\" doesn't match any components.".to_string();
814 assert_eq!(
815 Err(expected),
816 graph.test_add_input("sink", "bad_source-1", wildcard_matching)
817 );
818 }
819
820 #[test]
821 fn paths_to_sink_simple() {
822 let mut graph = Graph::default();
823 graph.add_source("in", DataType::Log);
824 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
825 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
826 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
827 graph.add_sink("out", DataType::Log, vec!["three"]);
828
829 let paths: Vec<Vec<_>> = graph
830 .paths_to_sink_from(&ComponentKey::from("in"))
831 .into_iter()
832 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
833 .collect();
834
835 assert_eq!(paths.len(), 1);
836 assert_eq!(paths[0], vec!["in", "one", "two", "three", "out"])
837 }
838
839 #[test]
840 fn paths_to_sink_non_existent_root() {
841 let graph = Graph::default();
842 let paths = graph.paths_to_sink_from(&ComponentKey::from("in"));
843
844 assert_eq!(paths.len(), 0);
845 }
846
847 #[test]
848 fn paths_to_sink_irrelevant_transforms() {
849 let mut graph = Graph::default();
850 graph.add_source("source", DataType::Log);
851 graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
853 graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
854 graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
855 graph.add_transform("t4", DataType::Log, DataType::Log, vec!["source"]);
857 graph.add_transform("t5", DataType::Log, DataType::Log, vec!["source"]);
858 graph.add_sink("sink1", DataType::Log, vec!["t4"]);
859 graph.add_sink("sink2", DataType::Log, vec!["t5"]);
860
861 let paths: Vec<Vec<_>> = graph
862 .paths_to_sink_from(&ComponentKey::from("source"))
863 .into_iter()
864 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
865 .collect();
866
867 assert_eq!(paths.len(), 2);
868 assert_eq!(paths[0], vec!["source", "t5", "sink2"]);
869 assert_eq!(paths[1], vec!["source", "t4", "sink1"]);
870 }
871
872 #[test]
873 fn paths_to_sink_multiple_inputs_into_sink() {
874 let mut graph = Graph::default();
875 graph.add_source("source", DataType::Log);
876 graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
877 graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
878 graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
879 graph.add_sink("sink1", DataType::Log, vec!["t2", "t3"]);
880
881 let paths: Vec<Vec<_>> = graph
882 .paths_to_sink_from(&ComponentKey::from("source"))
883 .into_iter()
884 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
885 .collect();
886
887 assert_eq!(paths.len(), 2);
888 assert_eq!(paths[0], vec!["source", "t1", "t3", "sink1"]);
889 assert_eq!(paths[1], vec!["source", "t1", "t2", "sink1"]);
890 }
891}