1use super::{
2 schema, ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformOuter,
3 TransformOutput, WildcardMatching,
4};
5use indexmap::{set::IndexSet, IndexMap};
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::fmt;
8
9#[derive(Debug, Clone)]
10pub enum Node {
11 Source {
12 outputs: Vec<SourceOutput>,
13 },
14 Transform {
15 in_ty: DataType,
16 outputs: Vec<TransformOutput>,
17 },
18 Sink {
19 ty: DataType,
20 },
21}
22
23impl fmt::Display for Node {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 match self {
26 Node::Source { outputs } => {
27 write!(f, "component_kind: source\n outputs:")?;
28 for output in outputs {
29 write!(f, "\n {output}")?;
30 }
31 Ok(())
32 }
33 Node::Transform { in_ty, outputs } => {
34 write!(
35 f,
36 "component_kind: source\n input_types: {in_ty}\n outputs:"
37 )?;
38 for output in outputs {
39 write!(f, "\n {output}")?;
40 }
41 Ok(())
42 }
43 Node::Sink { ty } => {
44 write!(f, "component_kind: sink\n types: {ty}")
45 }
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
51struct Edge {
52 from: OutputId,
53 to: ComponentKey,
54}
55
56#[derive(Default)]
57pub struct Graph {
58 nodes: HashMap<ComponentKey, Node>,
59 edges: Vec<Edge>,
60}
61
62impl Graph {
63 pub fn new(
64 sources: &IndexMap<ComponentKey, SourceOuter>,
65 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
66 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
67 schema: schema::Options,
68 wildcard_matching: WildcardMatching,
69 ) -> Result<Self, Vec<String>> {
70 Self::new_inner(sources, transforms, sinks, false, schema, wildcard_matching)
71 }
72
73 pub fn new_unchecked(
74 sources: &IndexMap<ComponentKey, SourceOuter>,
75 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
76 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
77 schema: schema::Options,
78 wildcard_matching: WildcardMatching,
79 ) -> Self {
80 Self::new_inner(sources, transforms, sinks, true, schema, wildcard_matching)
81 .expect("errors ignored")
82 }
83
84 fn new_inner(
85 sources: &IndexMap<ComponentKey, SourceOuter>,
86 transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
87 sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
88 ignore_errors: bool,
89 schema: schema::Options,
90 wildcard_matching: WildcardMatching,
91 ) -> Result<Self, Vec<String>> {
92 let mut graph = Graph::default();
93 let mut errors = Vec::new();
94
95 for (id, config) in sources.iter() {
97 graph.nodes.insert(
98 id.clone(),
99 Node::Source {
100 outputs: config.inner.outputs(schema.log_namespace()),
101 },
102 );
103 }
104
105 for (id, transform) in transforms.iter() {
106 graph.nodes.insert(
107 id.clone(),
108 Node::Transform {
109 in_ty: transform.inner.input().data_type(),
110 outputs: transform.inner.outputs(
111 vector_lib::enrichment::TableRegistry::default(),
112 &[(id.into(), schema::Definition::any())],
113 schema.log_namespace(),
114 ),
115 },
116 );
117 }
118
119 for (id, config) in sinks {
120 graph.nodes.insert(
121 id.clone(),
122 Node::Sink {
123 ty: config.inner.input().data_type(),
124 },
125 );
126 }
127
128 let available_inputs = graph.input_map()?;
131
132 for (id, config) in transforms.iter() {
133 for input in config.inputs.iter() {
134 if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
135 errors.push(e);
136 }
137 }
138 }
139
140 for (id, config) in sinks {
141 for input in config.inputs.iter() {
142 if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
143 errors.push(e);
144 }
145 }
146 }
147
148 if ignore_errors || errors.is_empty() {
149 Ok(graph)
150 } else {
151 Err(errors)
152 }
153 }
154
155 fn add_input(
156 &mut self,
157 from: &str,
158 to: &ComponentKey,
159 available_inputs: &HashMap<String, OutputId>,
160 wildcard_matching: WildcardMatching,
161 ) -> Result<(), String> {
162 if let Some(output_id) = available_inputs.get(from) {
163 self.edges.push(Edge {
164 from: output_id.clone(),
165 to: to.clone(),
166 });
167 Ok(())
168 } else {
169 let output_type = match self.nodes.get(to) {
170 Some(Node::Transform { .. }) => "transform",
171 Some(Node::Sink { .. }) => "sink",
172 _ => panic!("only transforms and sinks have inputs"),
173 };
174 match wildcard_matching {
176 WildcardMatching::Relaxed => {
177 if from != glob::Pattern::escape(from) {
180 info!("Input \"{from}\" for {output_type} \"{to}\" didn’t match any components, but this was ignored because `relaxed_wildcard_matching` is enabled.");
181 return Ok(());
182 }
183 }
184 WildcardMatching::Strict => {}
185 }
186 info!(
187 "Available components:\n{}",
188 self.nodes
189 .iter()
190 .map(|(key, node)| format!("\"{key}\":\n {node}"))
191 .collect::<Vec<_>>()
192 .join("\n")
193 );
194 Err(format!(
195 "Input \"{from}\" for {output_type} \"{to}\" doesn't match any components.",
196 ))
197 }
198 }
199
200 fn get_input_type(&self, key: &ComponentKey) -> DataType {
207 match self.nodes[key] {
208 Node::Source { .. } => panic!("no inputs on sources"),
209 Node::Transform { in_ty, .. } => in_ty,
210 Node::Sink { ty } => ty,
211 }
212 }
213
214 fn get_output_type(&self, id: &OutputId) -> DataType {
221 match &self.nodes[&id.component] {
222 Node::Source { outputs } => outputs
223 .iter()
224 .find(|output| output.port == id.port)
225 .map(|output| output.ty)
226 .expect("output didn't exist"),
227 Node::Transform { outputs, .. } => outputs
228 .iter()
229 .find(|output| output.port == id.port)
230 .map(|output| output.ty)
231 .expect("output didn't exist"),
232 Node::Sink { .. } => panic!("no outputs on sinks"),
233 }
234 }
235
236 pub fn typecheck(&self) -> Result<(), Vec<String>> {
237 let mut errors = Vec::new();
238
239 for edge in &self.edges {
241 let from_ty = self.get_output_type(&edge.from);
242 let to_ty = self.get_input_type(&edge.to);
243
244 if !from_ty.intersects(to_ty) {
245 errors.push(format!(
246 "Data type mismatch between {} ({}) and {} ({})",
247 edge.from, from_ty, edge.to, to_ty
248 ));
249 }
250 }
251
252 if errors.is_empty() {
253 Ok(())
254 } else {
255 errors.sort();
256 errors.dedup();
257 Err(errors)
258 }
259 }
260
261 pub fn check_for_cycles(&self) -> Result<(), String> {
262 let sinks = self.nodes.iter().filter_map(|(name, node)| match node {
264 Node::Sink { .. } => Some(name),
265 _ => None,
266 });
267
268 for s in sinks {
270 let mut traversal: VecDeque<ComponentKey> = VecDeque::new();
271 let mut visited: HashSet<ComponentKey> = HashSet::new();
272 let mut stack: IndexSet<ComponentKey> = IndexSet::new();
273
274 traversal.push_back(s.to_owned());
275 while !traversal.is_empty() {
276 let n = traversal.back().expect("can't be empty").clone();
277 if !visited.contains(&n) {
278 visited.insert(n.clone());
279 stack.insert(n.clone());
280 } else {
281 stack.shift_remove(&n);
283 traversal.pop_back();
284 }
285 let inputs = self
286 .edges
287 .iter()
288 .filter(|e| e.to == n)
289 .map(|e| e.from.clone());
290 for input in inputs {
291 if !visited.contains(&input.component) {
292 traversal.push_back(input.component);
293 } else if stack.contains(&input.component) {
294 let path = stack
296 .iter()
297 .skip(1) .rev()
299 .map(|item| item.to_string())
300 .collect::<Vec<_>>();
301 return Err(format!(
302 "Cyclic dependency detected in the chain [ {} -> {} ]",
303 input.component.id(),
304 path.join(" -> ")
305 ));
306 }
307 }
308 }
309 }
310 Ok(())
311 }
312
313 pub fn valid_inputs(&self) -> HashSet<OutputId> {
314 self.nodes
315 .iter()
316 .flat_map(|(key, node)| match node {
317 Node::Sink { .. } => vec![],
318 Node::Source { outputs } => outputs
319 .iter()
320 .map(|output| OutputId {
321 component: key.clone(),
322 port: output.port.clone(),
323 })
324 .collect(),
325 Node::Transform { outputs, .. } => outputs
326 .iter()
327 .map(|output| OutputId {
328 component: key.clone(),
329 port: output.port.clone(),
330 })
331 .collect(),
332 })
333 .collect()
334 }
335
336 pub fn input_map(&self) -> Result<HashMap<String, OutputId>, Vec<String>> {
351 let mut mapped: HashMap<String, OutputId> = HashMap::new();
352 let mut errors = HashSet::new();
353
354 for id in self.valid_inputs() {
355 if let Some(_other) = mapped.insert(id.to_string(), id.clone()) {
356 errors.insert(format!("Input specifier {id} is ambiguous"));
357 }
358 }
359
360 if errors.is_empty() {
361 Ok(mapped)
362 } else {
363 Err(errors.into_iter().collect())
364 }
365 }
366
367 pub fn inputs_for(&self, node: &ComponentKey) -> Vec<OutputId> {
368 self.edges
369 .iter()
370 .filter(|edge| &edge.to == node)
371 .map(|edge| edge.from.clone())
372 .collect()
373 }
374
375 pub fn paths_to_sink_from(&self, root: &ComponentKey) -> Vec<Vec<ComponentKey>> {
381 let mut traversal: VecDeque<(ComponentKey, Vec<_>)> = VecDeque::new();
382 let mut paths = Vec::new();
383
384 traversal.push_back((root.to_owned(), Vec::new()));
385 while !traversal.is_empty() {
386 let (n, mut path) = traversal.pop_back().expect("can't be empty");
387 path.push(n.clone());
388 let neighbors = self
389 .edges
390 .iter()
391 .filter(|e| e.from.component == n)
392 .map(|e| e.to.clone())
393 .collect::<Vec<_>>();
394
395 if neighbors.is_empty() {
396 paths.push(path.clone());
397 } else {
398 for neighbor in neighbors {
399 traversal.push_back((neighbor, path.clone()));
400 }
401 }
402 }
403
404 paths
406 .into_iter()
407 .filter(|path| {
408 if let Some(key) = path.last() {
409 matches!(self.nodes.get(key), Some(Node::Sink { ty: _ }))
410 } else {
411 false
412 }
413 })
414 .collect()
415 }
416}
417
418#[cfg(test)]
419mod test {
420 use similar_asserts::assert_eq;
421 use vector_lib::schema::Definition;
422
423 use super::*;
424
425 impl Graph {
426 fn add_source(&mut self, id: &str, ty: DataType) {
427 self.nodes.insert(
428 id.into(),
429 Node::Source {
430 outputs: vec![match ty {
431 DataType::Metric => SourceOutput::new_metrics(),
432 DataType::Trace => SourceOutput::new_traces(),
433 _ => SourceOutput::new_maybe_logs(ty, Definition::any()),
434 }],
435 },
436 );
437 }
438
439 fn add_transform(
440 &mut self,
441 id: &str,
442 in_ty: DataType,
443 out_ty: DataType,
444 inputs: Vec<&str>,
445 ) {
446 let id = ComponentKey::from(id);
447 let inputs = clean_inputs(inputs);
448 self.nodes.insert(
449 id.clone(),
450 Node::Transform {
451 in_ty,
452 outputs: vec![TransformOutput::new(
453 out_ty,
454 [("test".into(), Definition::default_legacy_namespace())].into(),
455 )],
456 },
457 );
458 for from in inputs {
459 self.edges.push(Edge {
460 from,
461 to: id.clone(),
462 });
463 }
464 }
465
466 fn add_transform_output(&mut self, id: &str, name: &str, ty: DataType) {
467 let id = id.into();
468 match self.nodes.get_mut(&id) {
469 Some(Node::Transform { outputs, .. }) => outputs.push(
470 TransformOutput::new(
471 ty,
472 [("test".into(), Definition::default_legacy_namespace())].into(),
473 )
474 .with_port(name),
475 ),
476 _ => panic!("invalid transform"),
477 }
478 }
479
480 fn add_sink(&mut self, id: &str, ty: DataType, inputs: Vec<&str>) {
481 let id = ComponentKey::from(id);
482 let inputs = clean_inputs(inputs);
483 self.nodes.insert(id.clone(), Node::Sink { ty });
484 for from in inputs {
485 self.edges.push(Edge {
486 from,
487 to: id.clone(),
488 });
489 }
490 }
491
492 fn test_add_input(
493 &mut self,
494 node: &str,
495 input: &str,
496 wildcard_matching: WildcardMatching,
497 ) -> Result<(), String> {
498 let available_inputs = self.input_map().unwrap();
499 self.add_input(input, &node.into(), &available_inputs, wildcard_matching)
500 }
501 }
502
503 fn clean_inputs(inputs: Vec<&str>) -> Vec<OutputId> {
504 inputs.into_iter().map(Into::into).collect()
505 }
506
507 #[test]
508 fn paths_detects_cycles() {
509 let mut graph = Graph::default();
510 graph.add_source("in", DataType::Log);
511 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
512 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
513 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
514 graph.add_sink("out", DataType::Log, vec!["three"]);
515
516 assert_eq!(
517 Err("Cyclic dependency detected in the chain [ three -> one -> two -> three ]".into()),
518 graph.check_for_cycles()
519 );
520
521 let mut graph = Graph::default();
522 graph.add_source("in", DataType::Log);
523 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
524 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
525 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
526 graph.add_sink("out", DataType::Log, vec!["two"]);
527
528 assert_eq!(
529 Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
530 graph.check_for_cycles()
531 );
532 assert_eq!(
533 Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
534 graph.check_for_cycles()
535 );
536
537 let mut graph = Graph::default();
538 graph.add_source("in", DataType::Log);
539 graph.add_transform("in", DataType::Log, DataType::Log, vec!["in"]);
540 graph.add_sink("out", DataType::Log, vec!["in"]);
541
542 assert_eq!(
544 Err("Cyclic dependency detected in the chain [ in -> in ]".into()),
545 graph.check_for_cycles()
546 );
547 }
548
549 #[test]
550 fn paths_doesnt_detect_noncycles() {
551 let mut graph = Graph::default();
552 graph.add_source("in", DataType::Log);
553 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
554 graph.add_transform("two", DataType::Log, DataType::Log, vec!["in"]);
555 graph.add_transform("three", DataType::Log, DataType::Log, vec!["one", "two"]);
556 graph.add_sink("out", DataType::Log, vec!["three"]);
557
558 graph.check_for_cycles().unwrap();
559 }
560
561 #[test]
562 fn detects_type_mismatches() {
563 let mut graph = Graph::default();
564 graph.add_source("in", DataType::Log);
565 graph.add_sink("out", DataType::Metric, vec!["in"]);
566
567 assert_eq!(
568 Err(vec![
569 "Data type mismatch between in ([\"Log\"]) and out ([\"Metric\"])".into()
570 ]),
571 graph.typecheck()
572 );
573 }
574
575 #[test]
576 fn allows_log_or_metric_into_any() {
577 let mut graph = Graph::default();
578 graph.add_source("log_source", DataType::Log);
579 graph.add_source("metric_source", DataType::Metric);
580 graph.add_sink(
581 "any_sink",
582 DataType::all_bits(),
583 vec!["log_source", "metric_source"],
584 );
585
586 assert_eq!(Ok(()), graph.typecheck());
587 }
588
589 #[test]
590 fn allows_any_into_log_or_metric() {
591 let mut graph = Graph::default();
592 graph.add_source("any_source", DataType::all_bits());
593 graph.add_transform(
594 "log_to_any",
595 DataType::Log,
596 DataType::all_bits(),
597 vec!["any_source"],
598 );
599 graph.add_transform(
600 "any_to_log",
601 DataType::all_bits(),
602 DataType::Log,
603 vec!["any_source"],
604 );
605 graph.add_sink(
606 "log_sink",
607 DataType::Log,
608 vec!["any_source", "log_to_any", "any_to_log"],
609 );
610 graph.add_sink(
611 "metric_sink",
612 DataType::Metric,
613 vec!["any_source", "log_to_any"],
614 );
615
616 assert_eq!(graph.typecheck(), Ok(()));
617 }
618
619 #[test]
620 fn allows_both_directions_for_metrics() {
621 let mut graph = Graph::default();
622 graph.add_source("log_source", DataType::Log);
623 graph.add_source("metric_source", DataType::Metric);
624 graph.add_transform(
625 "log_to_log",
626 DataType::Log,
627 DataType::Log,
628 vec!["log_source"],
629 );
630 graph.add_transform(
631 "metric_to_metric",
632 DataType::Metric,
633 DataType::Metric,
634 vec!["metric_source"],
635 );
636 graph.add_transform(
637 "any_to_any",
638 DataType::all_bits(),
639 DataType::all_bits(),
640 vec!["log_to_log", "metric_to_metric"],
641 );
642 graph.add_transform(
643 "any_to_log",
644 DataType::all_bits(),
645 DataType::Log,
646 vec!["any_to_any"],
647 );
648 graph.add_transform(
649 "any_to_metric",
650 DataType::all_bits(),
651 DataType::Metric,
652 vec!["any_to_any"],
653 );
654 graph.add_sink("log_sink", DataType::Log, vec!["any_to_log"]);
655 graph.add_sink("metric_sink", DataType::Metric, vec!["any_to_metric"]);
656
657 assert_eq!(Ok(()), graph.typecheck());
658 }
659
660 #[test]
661 fn allows_multiple_transform_outputs() {
662 let mut graph = Graph::default();
663 graph.add_source("log_source", DataType::Log);
664 graph.add_transform(
665 "log_to_log",
666 DataType::Log,
667 DataType::Log,
668 vec!["log_source"],
669 );
670 graph.add_transform_output("log_to_log", "errors", DataType::Log);
671 graph.add_sink("good_log_sink", DataType::Log, vec!["log_to_log"]);
672
673 graph.add_sink("errored_log_sink", DataType::Log, vec![]);
675 graph.add_sink("bad_log_sink", DataType::Log, vec![]);
676
677 assert_eq!(
679 Ok(()),
680 graph.test_add_input(
681 "errored_log_sink",
682 "log_to_log.errors",
683 WildcardMatching::Strict
684 )
685 );
686
687 let expected = "Input \"log_to_log.not_errors\" for sink \"bad_log_sink\" doesn't match any components.".to_string();
689 assert_eq!(
690 Err(expected),
691 graph.test_add_input(
692 "bad_log_sink",
693 "log_to_log.not_errors",
694 WildcardMatching::Strict
695 )
696 );
697 }
698
699 #[test]
700 fn disallows_ambiguous_inputs() {
701 let mut graph = Graph::default();
702 graph.nodes.insert(
704 ComponentKey::from("foo.bar"),
705 Node::Source {
706 outputs: vec![SourceOutput::new_maybe_logs(
707 DataType::all_bits(),
708 Definition::any(),
709 )],
710 },
711 );
712 graph.nodes.insert(
713 ComponentKey::from("foo.bar"),
714 Node::Source {
715 outputs: vec![SourceOutput::new_maybe_logs(
716 DataType::all_bits(),
717 Definition::any(),
718 )],
719 },
720 );
721 graph.nodes.insert(
722 ComponentKey::from("foo"),
723 Node::Transform {
724 in_ty: DataType::all_bits(),
725 outputs: vec![
726 TransformOutput::new(
727 DataType::all_bits(),
728 [("test".into(), Definition::default_legacy_namespace())].into(),
729 ),
730 TransformOutput::new(
731 DataType::all_bits(),
732 [("test".into(), Definition::default_legacy_namespace())].into(),
733 )
734 .with_port("bar"),
735 ],
736 },
737 );
738
739 graph.nodes.insert(
741 ComponentKey::from("baz.errors"),
742 Node::Source {
743 outputs: vec![SourceOutput::new_maybe_logs(
744 DataType::all_bits(),
745 Definition::any(),
746 )],
747 },
748 );
749 graph.nodes.insert(
750 ComponentKey::from("baz"),
751 Node::Transform {
752 in_ty: DataType::all_bits(),
753 outputs: vec![
754 TransformOutput::new(
755 DataType::all_bits(),
756 [("test".into(), Definition::default_legacy_namespace())].into(),
757 ),
758 TransformOutput::new(
759 DataType::all_bits(),
760 [("test".into(), Definition::default_legacy_namespace())].into(),
761 )
762 .with_port("errors"),
763 ],
764 },
765 );
766
767 let mut errors = graph.input_map().unwrap_err();
768 errors.sort();
769 assert_eq!(
770 errors,
771 vec![
772 String::from("Input specifier baz.errors is ambiguous"),
773 String::from("Input specifier foo.bar is ambiguous"),
774 ]
775 );
776 }
777
778 #[test]
779 fn wildcard_matching() {
780 let mut graph = Graph::default();
781 graph.add_source("log_source", DataType::Log);
782
783 graph.add_sink("sink", DataType::Log, vec![]);
785
786 let wildcard_matching = WildcardMatching::Strict;
788 let expected =
789 "Input \"bad_source-*\" for sink \"sink\" doesn't match any components.".to_string();
790 assert_eq!(
791 Err(expected),
792 graph.test_add_input("sink", "bad_source-*", wildcard_matching)
793 );
794
795 let wildcard_matching = WildcardMatching::Relaxed;
797 assert_eq!(
798 Ok(()),
799 graph.test_add_input("sink", "bad_source-*", wildcard_matching)
800 );
801
802 let wildcard_matching = WildcardMatching::Relaxed;
804 let expected =
805 "Input \"bad_source-1\" for sink \"sink\" doesn't match any components.".to_string();
806 assert_eq!(
807 Err(expected),
808 graph.test_add_input("sink", "bad_source-1", wildcard_matching)
809 );
810 }
811
812 #[test]
813 fn paths_to_sink_simple() {
814 let mut graph = Graph::default();
815 graph.add_source("in", DataType::Log);
816 graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
817 graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
818 graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
819 graph.add_sink("out", DataType::Log, vec!["three"]);
820
821 let paths: Vec<Vec<_>> = graph
822 .paths_to_sink_from(&ComponentKey::from("in"))
823 .into_iter()
824 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
825 .collect();
826
827 assert_eq!(paths.len(), 1);
828 assert_eq!(paths[0], vec!["in", "one", "two", "three", "out"])
829 }
830
831 #[test]
832 fn paths_to_sink_non_existent_root() {
833 let graph = Graph::default();
834 let paths = graph.paths_to_sink_from(&ComponentKey::from("in"));
835
836 assert_eq!(paths.len(), 0);
837 }
838
839 #[test]
840 fn paths_to_sink_irrelevant_transforms() {
841 let mut graph = Graph::default();
842 graph.add_source("source", DataType::Log);
843 graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
845 graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
846 graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
847 graph.add_transform("t4", DataType::Log, DataType::Log, vec!["source"]);
849 graph.add_transform("t5", DataType::Log, DataType::Log, vec!["source"]);
850 graph.add_sink("sink1", DataType::Log, vec!["t4"]);
851 graph.add_sink("sink2", DataType::Log, vec!["t5"]);
852
853 let paths: Vec<Vec<_>> = graph
854 .paths_to_sink_from(&ComponentKey::from("source"))
855 .into_iter()
856 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
857 .collect();
858
859 assert_eq!(paths.len(), 2);
860 assert_eq!(paths[0], vec!["source", "t5", "sink2"]);
861 assert_eq!(paths[1], vec!["source", "t4", "sink1"]);
862 }
863
864 #[test]
865 fn paths_to_sink_multiple_inputs_into_sink() {
866 let mut graph = Graph::default();
867 graph.add_source("source", DataType::Log);
868 graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
869 graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
870 graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
871 graph.add_sink("sink1", DataType::Log, vec!["t2", "t3"]);
872
873 let paths: Vec<Vec<_>> = graph
874 .paths_to_sink_from(&ComponentKey::from("source"))
875 .into_iter()
876 .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
877 .collect();
878
879 assert_eq!(paths.len(), 2);
880 assert_eq!(paths[0], vec!["source", "t1", "t3", "sink1"]);
881 assert_eq!(paths[1], vec!["source", "t1", "t2", "sink1"]);
882 }
883}