vector/config/
graph.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    fmt,
4};
5
6use indexmap::{IndexMap, set::IndexSet};
7
8use super::{
9    ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformOuter,
10    TransformOutput, WildcardMatching, schema,
11};
12
13#[derive(Debug, Clone)]
14pub enum Node {
15    Source {
16        outputs: Vec<SourceOutput>,
17    },
18    Transform {
19        in_ty: DataType,
20        outputs: Vec<TransformOutput>,
21    },
22    Sink {
23        ty: DataType,
24    },
25}
26
27impl fmt::Display for Node {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        match self {
30            Node::Source { outputs } => {
31                write!(f, "component_kind: source\n  outputs:")?;
32                for output in outputs {
33                    write!(f, "\n    {output}")?;
34                }
35                Ok(())
36            }
37            Node::Transform { in_ty, outputs } => {
38                write!(
39                    f,
40                    "component_kind: source\n  input_types: {in_ty}\n  outputs:"
41                )?;
42                for output in outputs {
43                    write!(f, "\n    {output}")?;
44                }
45                Ok(())
46            }
47            Node::Sink { ty } => {
48                write!(f, "component_kind: sink\n  types: {ty}")
49            }
50        }
51    }
52}
53
54#[derive(Debug, Clone)]
55struct Edge {
56    from: OutputId,
57    to: ComponentKey,
58}
59
60#[derive(Default)]
61pub struct Graph {
62    nodes: HashMap<ComponentKey, Node>,
63    edges: Vec<Edge>,
64}
65
66impl Graph {
67    pub fn new(
68        sources: &IndexMap<ComponentKey, SourceOuter>,
69        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
70        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
71        schema: schema::Options,
72        wildcard_matching: WildcardMatching,
73    ) -> Result<Self, Vec<String>> {
74        Self::new_inner(sources, transforms, sinks, false, schema, wildcard_matching)
75    }
76
77    pub fn new_unchecked(
78        sources: &IndexMap<ComponentKey, SourceOuter>,
79        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
80        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
81        schema: schema::Options,
82        wildcard_matching: WildcardMatching,
83    ) -> Self {
84        Self::new_inner(sources, transforms, sinks, true, schema, wildcard_matching)
85            .expect("errors ignored")
86    }
87
88    fn new_inner(
89        sources: &IndexMap<ComponentKey, SourceOuter>,
90        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
91        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
92        ignore_errors: bool,
93        schema: schema::Options,
94        wildcard_matching: WildcardMatching,
95    ) -> Result<Self, Vec<String>> {
96        let mut graph = Graph::default();
97        let mut errors = Vec::new();
98
99        // First, insert all of the different node types
100        for (id, config) in sources.iter() {
101            graph.nodes.insert(
102                id.clone(),
103                Node::Source {
104                    outputs: config.inner.outputs(schema.log_namespace()),
105                },
106            );
107        }
108
109        for (id, transform) in transforms.iter() {
110            graph.nodes.insert(
111                id.clone(),
112                Node::Transform {
113                    in_ty: transform.inner.input().data_type(),
114                    outputs: transform.inner.outputs(
115                        vector_lib::enrichment::TableRegistry::default(),
116                        &[(id.into(), schema::Definition::any())],
117                        schema.log_namespace(),
118                    ),
119                },
120            );
121        }
122
123        for (id, config) in sinks {
124            graph.nodes.insert(
125                id.clone(),
126                Node::Sink {
127                    ty: config.inner.input().data_type(),
128                },
129            );
130        }
131
132        // With all of the nodes added, go through inputs and add edges, resolving strings into
133        // actual `OutputId`s along the way.
134        let available_inputs = graph.input_map()?;
135
136        for (id, config) in transforms.iter() {
137            for input in config.inputs.iter() {
138                if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
139                    errors.push(e);
140                }
141            }
142        }
143
144        for (id, config) in sinks {
145            for input in config.inputs.iter() {
146                if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
147                    errors.push(e);
148                }
149            }
150        }
151
152        if ignore_errors || errors.is_empty() {
153            Ok(graph)
154        } else {
155            Err(errors)
156        }
157    }
158
159    fn add_input(
160        &mut self,
161        from: &str,
162        to: &ComponentKey,
163        available_inputs: &HashMap<String, OutputId>,
164        wildcard_matching: WildcardMatching,
165    ) -> Result<(), String> {
166        if let Some(output_id) = available_inputs.get(from) {
167            self.edges.push(Edge {
168                from: output_id.clone(),
169                to: to.clone(),
170            });
171            Ok(())
172        } else {
173            let output_type = match self.nodes.get(to) {
174                Some(Node::Transform { .. }) => "transform",
175                Some(Node::Sink { .. }) => "sink",
176                _ => panic!("only transforms and sinks have inputs"),
177            };
178            // allow empty result if relaxed wildcard matching is enabled
179            match wildcard_matching {
180                WildcardMatching::Relaxed => {
181                    // using value != glob::Pattern::escape(value) to check if value is a glob
182                    // TODO: replace with proper check when https://github.com/rust-lang/glob/issues/72 is resolved
183                    if from != glob::Pattern::escape(from) {
184                        info!(
185                            "Input \"{from}\" for {output_type} \"{to}\" didn’t match any components, but this was ignored because `relaxed_wildcard_matching` is enabled."
186                        );
187                        return Ok(());
188                    }
189                }
190                WildcardMatching::Strict => {}
191            }
192            info!(
193                "Available components:\n{}",
194                self.nodes
195                    .iter()
196                    .map(|(key, node)| format!("\"{key}\":\n  {node}"))
197                    .collect::<Vec<_>>()
198                    .join("\n")
199            );
200            Err(format!(
201                "Input \"{from}\" for {output_type} \"{to}\" doesn't match any components.",
202            ))
203        }
204    }
205
206    /// Return the input type of a given component.
207    ///
208    /// # Panics
209    ///
210    /// Will panic if the given key is not present in the graph or identifies a source, which can't
211    /// have inputs.
212    fn get_input_type(&self, key: &ComponentKey) -> DataType {
213        match self.nodes[key] {
214            Node::Source { .. } => panic!("no inputs on sources"),
215            Node::Transform { in_ty, .. } => in_ty,
216            Node::Sink { ty } => ty,
217        }
218    }
219
220    /// Return the output type associated with a given `OutputId`.
221    ///
222    /// # Panics
223    ///
224    /// Will panic if the given id is not present in the graph or identifies a sink, which can't
225    /// have inputs.
226    fn get_output_type(&self, id: &OutputId) -> DataType {
227        match &self.nodes[&id.component] {
228            Node::Source { outputs } => outputs
229                .iter()
230                .find(|output| output.port == id.port)
231                .map(|output| output.ty)
232                .expect("output didn't exist"),
233            Node::Transform { outputs, .. } => outputs
234                .iter()
235                .find(|output| output.port == id.port)
236                .map(|output| output.ty)
237                .expect("output didn't exist"),
238            Node::Sink { .. } => panic!("no outputs on sinks"),
239        }
240    }
241
242    pub fn typecheck(&self) -> Result<(), Vec<String>> {
243        let mut errors = Vec::new();
244
245        // check that all edges connect components with compatible data types
246        for edge in &self.edges {
247            let from_ty = self.get_output_type(&edge.from);
248            let to_ty = self.get_input_type(&edge.to);
249
250            if !from_ty.intersects(to_ty) {
251                errors.push(format!(
252                    "Data type mismatch between {} ({}) and {} ({})",
253                    edge.from, from_ty, edge.to, to_ty
254                ));
255            }
256        }
257
258        if errors.is_empty() {
259            Ok(())
260        } else {
261            errors.sort();
262            errors.dedup();
263            Err(errors)
264        }
265    }
266
267    pub fn check_for_cycles(&self) -> Result<(), String> {
268        // find all sinks
269        let sinks = self.nodes.iter().filter_map(|(name, node)| match node {
270            Node::Sink { .. } => Some(name),
271            _ => None,
272        });
273
274        // run DFS from each sink while keep tracking the current stack to detect cycles
275        for s in sinks {
276            let mut traversal: VecDeque<ComponentKey> = VecDeque::new();
277            let mut visited: HashSet<ComponentKey> = HashSet::new();
278            let mut stack: IndexSet<ComponentKey> = IndexSet::new();
279
280            traversal.push_back(s.to_owned());
281            while !traversal.is_empty() {
282                let n = traversal.back().expect("can't be empty").clone();
283                if !visited.contains(&n) {
284                    visited.insert(n.clone());
285                    stack.insert(n.clone());
286                } else {
287                    // we came back to the node after exploring all its children - remove it from the stack and traversal
288                    stack.shift_remove(&n);
289                    traversal.pop_back();
290                }
291                let inputs = self
292                    .edges
293                    .iter()
294                    .filter(|e| e.to == n)
295                    .map(|e| e.from.clone());
296                for input in inputs {
297                    if !visited.contains(&input.component) {
298                        traversal.push_back(input.component);
299                    } else if stack.contains(&input.component) {
300                        // we reached the node while it is on the current stack - it's a cycle
301                        let path = stack
302                            .iter()
303                            .skip(1) // skip the sink
304                            .rev()
305                            .map(|item| item.to_string())
306                            .collect::<Vec<_>>();
307                        return Err(format!(
308                            "Cyclic dependency detected in the chain [ {} -> {} ]",
309                            input.component.id(),
310                            path.join(" -> ")
311                        ));
312                    }
313                }
314            }
315        }
316        Ok(())
317    }
318
319    pub fn valid_inputs(&self) -> HashSet<OutputId> {
320        self.nodes
321            .iter()
322            .flat_map(|(key, node)| match node {
323                Node::Sink { .. } => vec![],
324                Node::Source { outputs } => outputs
325                    .iter()
326                    .map(|output| OutputId {
327                        component: key.clone(),
328                        port: output.port.clone(),
329                    })
330                    .collect(),
331                Node::Transform { outputs, .. } => outputs
332                    .iter()
333                    .map(|output| OutputId {
334                        component: key.clone(),
335                        port: output.port.clone(),
336                    })
337                    .collect(),
338            })
339            .collect()
340    }
341
342    /// Produce a map of output IDs for the current set of nodes in the graph, keyed by their string
343    /// representation. Returns errors for any nodes that have the same string representation,
344    /// making input specifications ambiguous.
345    ///
346    /// When we get a dotted path in the `inputs` section of a user's config, we need to determine
347    /// which of a few things that represents:
348    ///
349    ///   1. A component that's part of an expanded macro (e.g. `route.branch`)
350    ///   2. A named output of a branching transform (e.g. `name.errors`)
351    ///
352    /// A naive way to do that is to compare the string representation of all valid inputs to the
353    /// provided string and pick the one that matches. This works better if you can assume that there
354    /// are no conflicting string representations, so this function reports any ambiguity as an
355    /// error when creating the lookup map.
356    pub fn input_map(&self) -> Result<HashMap<String, OutputId>, Vec<String>> {
357        let mut mapped: HashMap<String, OutputId> = HashMap::new();
358        let mut errors = HashSet::new();
359
360        for id in self.valid_inputs() {
361            if let Some(_other) = mapped.insert(id.to_string(), id.clone()) {
362                errors.insert(format!("Input specifier {id} is ambiguous"));
363            }
364        }
365
366        if errors.is_empty() {
367            Ok(mapped)
368        } else {
369            Err(errors.into_iter().collect())
370        }
371    }
372
373    pub fn inputs_for(&self, node: &ComponentKey) -> Vec<OutputId> {
374        self.edges
375            .iter()
376            .filter(|edge| &edge.to == node)
377            .map(|edge| edge.from.clone())
378            .collect()
379    }
380
381    /// From a given root node, get all paths from the root node to leaf nodes
382    /// where the leaf node must be a sink. This is useful for determining which
383    /// components are relevant in a Vector unit test.
384    ///
385    /// Caller must check for cycles before calling this function.
386    pub fn paths_to_sink_from(&self, root: &ComponentKey) -> Vec<Vec<ComponentKey>> {
387        let mut traversal: VecDeque<(ComponentKey, Vec<_>)> = VecDeque::new();
388        let mut paths = Vec::new();
389
390        traversal.push_back((root.to_owned(), Vec::new()));
391        while !traversal.is_empty() {
392            let (n, mut path) = traversal.pop_back().expect("can't be empty");
393            path.push(n.clone());
394            let neighbors = self
395                .edges
396                .iter()
397                .filter(|e| e.from.component == n)
398                .map(|e| e.to.clone())
399                .collect::<Vec<_>>();
400
401            if neighbors.is_empty() {
402                paths.push(path.clone());
403            } else {
404                for neighbor in neighbors {
405                    traversal.push_back((neighbor, path.clone()));
406                }
407            }
408        }
409
410        // Keep only components from paths that end at a sink
411        paths
412            .into_iter()
413            .filter(|path| {
414                if let Some(key) = path.last() {
415                    matches!(self.nodes.get(key), Some(Node::Sink { ty: _ }))
416                } else {
417                    false
418                }
419            })
420            .collect()
421    }
422}
423
424#[cfg(test)]
425mod test {
426    use similar_asserts::assert_eq;
427    use vector_lib::schema::Definition;
428
429    use super::*;
430
431    impl Graph {
432        fn add_source(&mut self, id: &str, ty: DataType) {
433            self.nodes.insert(
434                id.into(),
435                Node::Source {
436                    outputs: vec![match ty {
437                        DataType::Metric => SourceOutput::new_metrics(),
438                        DataType::Trace => SourceOutput::new_traces(),
439                        _ => SourceOutput::new_maybe_logs(ty, Definition::any()),
440                    }],
441                },
442            );
443        }
444
445        fn add_transform(
446            &mut self,
447            id: &str,
448            in_ty: DataType,
449            out_ty: DataType,
450            inputs: Vec<&str>,
451        ) {
452            let id = ComponentKey::from(id);
453            let inputs = clean_inputs(inputs);
454            self.nodes.insert(
455                id.clone(),
456                Node::Transform {
457                    in_ty,
458                    outputs: vec![TransformOutput::new(
459                        out_ty,
460                        [("test".into(), Definition::default_legacy_namespace())].into(),
461                    )],
462                },
463            );
464            for from in inputs {
465                self.edges.push(Edge {
466                    from,
467                    to: id.clone(),
468                });
469            }
470        }
471
472        fn add_transform_output(&mut self, id: &str, name: &str, ty: DataType) {
473            let id = id.into();
474            match self.nodes.get_mut(&id) {
475                Some(Node::Transform { outputs, .. }) => outputs.push(
476                    TransformOutput::new(
477                        ty,
478                        [("test".into(), Definition::default_legacy_namespace())].into(),
479                    )
480                    .with_port(name),
481                ),
482                _ => panic!("invalid transform"),
483            }
484        }
485
486        fn add_sink(&mut self, id: &str, ty: DataType, inputs: Vec<&str>) {
487            let id = ComponentKey::from(id);
488            let inputs = clean_inputs(inputs);
489            self.nodes.insert(id.clone(), Node::Sink { ty });
490            for from in inputs {
491                self.edges.push(Edge {
492                    from,
493                    to: id.clone(),
494                });
495            }
496        }
497
498        fn test_add_input(
499            &mut self,
500            node: &str,
501            input: &str,
502            wildcard_matching: WildcardMatching,
503        ) -> Result<(), String> {
504            let available_inputs = self.input_map().unwrap();
505            self.add_input(input, &node.into(), &available_inputs, wildcard_matching)
506        }
507    }
508
509    fn clean_inputs(inputs: Vec<&str>) -> Vec<OutputId> {
510        inputs.into_iter().map(Into::into).collect()
511    }
512
513    #[test]
514    fn paths_detects_cycles() {
515        let mut graph = Graph::default();
516        graph.add_source("in", DataType::Log);
517        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
518        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
519        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
520        graph.add_sink("out", DataType::Log, vec!["three"]);
521
522        assert_eq!(
523            Err("Cyclic dependency detected in the chain [ three -> one -> two -> three ]".into()),
524            graph.check_for_cycles()
525        );
526
527        let mut graph = Graph::default();
528        graph.add_source("in", DataType::Log);
529        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
530        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
531        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
532        graph.add_sink("out", DataType::Log, vec!["two"]);
533
534        assert_eq!(
535            Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
536            graph.check_for_cycles()
537        );
538        assert_eq!(
539            Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
540            graph.check_for_cycles()
541        );
542
543        let mut graph = Graph::default();
544        graph.add_source("in", DataType::Log);
545        graph.add_transform("in", DataType::Log, DataType::Log, vec!["in"]);
546        graph.add_sink("out", DataType::Log, vec!["in"]);
547
548        // This isn't really a cyclic dependency but let me have this one.
549        assert_eq!(
550            Err("Cyclic dependency detected in the chain [ in -> in ]".into()),
551            graph.check_for_cycles()
552        );
553    }
554
555    #[test]
556    fn paths_doesnt_detect_noncycles() {
557        let mut graph = Graph::default();
558        graph.add_source("in", DataType::Log);
559        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
560        graph.add_transform("two", DataType::Log, DataType::Log, vec!["in"]);
561        graph.add_transform("three", DataType::Log, DataType::Log, vec!["one", "two"]);
562        graph.add_sink("out", DataType::Log, vec!["three"]);
563
564        graph.check_for_cycles().unwrap();
565    }
566
567    #[test]
568    fn detects_type_mismatches() {
569        let mut graph = Graph::default();
570        graph.add_source("in", DataType::Log);
571        graph.add_sink("out", DataType::Metric, vec!["in"]);
572
573        assert_eq!(
574            Err(vec![
575                "Data type mismatch between in ([\"Log\"]) and out ([\"Metric\"])".into()
576            ]),
577            graph.typecheck()
578        );
579    }
580
581    #[test]
582    fn allows_log_or_metric_into_any() {
583        let mut graph = Graph::default();
584        graph.add_source("log_source", DataType::Log);
585        graph.add_source("metric_source", DataType::Metric);
586        graph.add_sink(
587            "any_sink",
588            DataType::all_bits(),
589            vec!["log_source", "metric_source"],
590        );
591
592        assert_eq!(Ok(()), graph.typecheck());
593    }
594
595    #[test]
596    fn allows_any_into_log_or_metric() {
597        let mut graph = Graph::default();
598        graph.add_source("any_source", DataType::all_bits());
599        graph.add_transform(
600            "log_to_any",
601            DataType::Log,
602            DataType::all_bits(),
603            vec!["any_source"],
604        );
605        graph.add_transform(
606            "any_to_log",
607            DataType::all_bits(),
608            DataType::Log,
609            vec!["any_source"],
610        );
611        graph.add_sink(
612            "log_sink",
613            DataType::Log,
614            vec!["any_source", "log_to_any", "any_to_log"],
615        );
616        graph.add_sink(
617            "metric_sink",
618            DataType::Metric,
619            vec!["any_source", "log_to_any"],
620        );
621
622        assert_eq!(graph.typecheck(), Ok(()));
623    }
624
625    #[test]
626    fn allows_both_directions_for_metrics() {
627        let mut graph = Graph::default();
628        graph.add_source("log_source", DataType::Log);
629        graph.add_source("metric_source", DataType::Metric);
630        graph.add_transform(
631            "log_to_log",
632            DataType::Log,
633            DataType::Log,
634            vec!["log_source"],
635        );
636        graph.add_transform(
637            "metric_to_metric",
638            DataType::Metric,
639            DataType::Metric,
640            vec!["metric_source"],
641        );
642        graph.add_transform(
643            "any_to_any",
644            DataType::all_bits(),
645            DataType::all_bits(),
646            vec!["log_to_log", "metric_to_metric"],
647        );
648        graph.add_transform(
649            "any_to_log",
650            DataType::all_bits(),
651            DataType::Log,
652            vec!["any_to_any"],
653        );
654        graph.add_transform(
655            "any_to_metric",
656            DataType::all_bits(),
657            DataType::Metric,
658            vec!["any_to_any"],
659        );
660        graph.add_sink("log_sink", DataType::Log, vec!["any_to_log"]);
661        graph.add_sink("metric_sink", DataType::Metric, vec!["any_to_metric"]);
662
663        assert_eq!(Ok(()), graph.typecheck());
664    }
665
666    #[test]
667    fn allows_multiple_transform_outputs() {
668        let mut graph = Graph::default();
669        graph.add_source("log_source", DataType::Log);
670        graph.add_transform(
671            "log_to_log",
672            DataType::Log,
673            DataType::Log,
674            vec!["log_source"],
675        );
676        graph.add_transform_output("log_to_log", "errors", DataType::Log);
677        graph.add_sink("good_log_sink", DataType::Log, vec!["log_to_log"]);
678
679        // don't add inputs to these yet since they're not validated via these helpers
680        graph.add_sink("errored_log_sink", DataType::Log, vec![]);
681        graph.add_sink("bad_log_sink", DataType::Log, vec![]);
682
683        // make sure we're good with dotted paths
684        assert_eq!(
685            Ok(()),
686            graph.test_add_input(
687                "errored_log_sink",
688                "log_to_log.errors",
689                WildcardMatching::Strict
690            )
691        );
692
693        // make sure that we're not cool with an unknown dotted path
694        let expected = "Input \"log_to_log.not_errors\" for sink \"bad_log_sink\" doesn't match any components.".to_string();
695        assert_eq!(
696            Err(expected),
697            graph.test_add_input(
698                "bad_log_sink",
699                "log_to_log.not_errors",
700                WildcardMatching::Strict
701            )
702        );
703    }
704
705    #[test]
706    fn disallows_ambiguous_inputs() {
707        let mut graph = Graph::default();
708        // these all look like "foo.bar", but should only yield one error
709        graph.nodes.insert(
710            ComponentKey::from("foo.bar"),
711            Node::Source {
712                outputs: vec![SourceOutput::new_maybe_logs(
713                    DataType::all_bits(),
714                    Definition::any(),
715                )],
716            },
717        );
718        graph.nodes.insert(
719            ComponentKey::from("foo.bar"),
720            Node::Source {
721                outputs: vec![SourceOutput::new_maybe_logs(
722                    DataType::all_bits(),
723                    Definition::any(),
724                )],
725            },
726        );
727        graph.nodes.insert(
728            ComponentKey::from("foo"),
729            Node::Transform {
730                in_ty: DataType::all_bits(),
731                outputs: vec![
732                    TransformOutput::new(
733                        DataType::all_bits(),
734                        [("test".into(), Definition::default_legacy_namespace())].into(),
735                    ),
736                    TransformOutput::new(
737                        DataType::all_bits(),
738                        [("test".into(), Definition::default_legacy_namespace())].into(),
739                    )
740                    .with_port("bar"),
741                ],
742            },
743        );
744
745        // make sure we return more than one
746        graph.nodes.insert(
747            ComponentKey::from("baz.errors"),
748            Node::Source {
749                outputs: vec![SourceOutput::new_maybe_logs(
750                    DataType::all_bits(),
751                    Definition::any(),
752                )],
753            },
754        );
755        graph.nodes.insert(
756            ComponentKey::from("baz"),
757            Node::Transform {
758                in_ty: DataType::all_bits(),
759                outputs: vec![
760                    TransformOutput::new(
761                        DataType::all_bits(),
762                        [("test".into(), Definition::default_legacy_namespace())].into(),
763                    ),
764                    TransformOutput::new(
765                        DataType::all_bits(),
766                        [("test".into(), Definition::default_legacy_namespace())].into(),
767                    )
768                    .with_port("errors"),
769                ],
770            },
771        );
772
773        let mut errors = graph.input_map().unwrap_err();
774        errors.sort();
775        assert_eq!(
776            errors,
777            vec![
778                String::from("Input specifier baz.errors is ambiguous"),
779                String::from("Input specifier foo.bar is ambiguous"),
780            ]
781        );
782    }
783
784    #[test]
785    fn wildcard_matching() {
786        let mut graph = Graph::default();
787        graph.add_source("log_source", DataType::Log);
788
789        // don't add inputs to these yet since they're not validated via these helpers
790        graph.add_sink("sink", DataType::Log, vec![]);
791
792        // make sure we're not good with non existing inputs with relaxed wildcard matching disabled
793        let wildcard_matching = WildcardMatching::Strict;
794        let expected =
795            "Input \"bad_source-*\" for sink \"sink\" doesn't match any components.".to_string();
796        assert_eq!(
797            Err(expected),
798            graph.test_add_input("sink", "bad_source-*", wildcard_matching)
799        );
800
801        // make sure we're good with non existing inputs with relaxed wildcard matching enabled
802        let wildcard_matching = WildcardMatching::Relaxed;
803        assert_eq!(
804            Ok(()),
805            graph.test_add_input("sink", "bad_source-*", wildcard_matching)
806        );
807
808        // make sure we're not good with non existing inputs that are not wildcards even when relaxed wildcard matching is enabled
809        let wildcard_matching = WildcardMatching::Relaxed;
810        let expected =
811            "Input \"bad_source-1\" for sink \"sink\" doesn't match any components.".to_string();
812        assert_eq!(
813            Err(expected),
814            graph.test_add_input("sink", "bad_source-1", wildcard_matching)
815        );
816    }
817
818    #[test]
819    fn paths_to_sink_simple() {
820        let mut graph = Graph::default();
821        graph.add_source("in", DataType::Log);
822        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
823        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
824        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
825        graph.add_sink("out", DataType::Log, vec!["three"]);
826
827        let paths: Vec<Vec<_>> = graph
828            .paths_to_sink_from(&ComponentKey::from("in"))
829            .into_iter()
830            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
831            .collect();
832
833        assert_eq!(paths.len(), 1);
834        assert_eq!(paths[0], vec!["in", "one", "two", "three", "out"])
835    }
836
837    #[test]
838    fn paths_to_sink_non_existent_root() {
839        let graph = Graph::default();
840        let paths = graph.paths_to_sink_from(&ComponentKey::from("in"));
841
842        assert_eq!(paths.len(), 0);
843    }
844
845    #[test]
846    fn paths_to_sink_irrelevant_transforms() {
847        let mut graph = Graph::default();
848        graph.add_source("source", DataType::Log);
849        // These transforms do not link to a sink
850        graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
851        graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
852        graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
853        // These transforms do link to a sink
854        graph.add_transform("t4", DataType::Log, DataType::Log, vec!["source"]);
855        graph.add_transform("t5", DataType::Log, DataType::Log, vec!["source"]);
856        graph.add_sink("sink1", DataType::Log, vec!["t4"]);
857        graph.add_sink("sink2", DataType::Log, vec!["t5"]);
858
859        let paths: Vec<Vec<_>> = graph
860            .paths_to_sink_from(&ComponentKey::from("source"))
861            .into_iter()
862            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
863            .collect();
864
865        assert_eq!(paths.len(), 2);
866        assert_eq!(paths[0], vec!["source", "t5", "sink2"]);
867        assert_eq!(paths[1], vec!["source", "t4", "sink1"]);
868    }
869
870    #[test]
871    fn paths_to_sink_multiple_inputs_into_sink() {
872        let mut graph = Graph::default();
873        graph.add_source("source", DataType::Log);
874        graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
875        graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
876        graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
877        graph.add_sink("sink1", DataType::Log, vec!["t2", "t3"]);
878
879        let paths: Vec<Vec<_>> = graph
880            .paths_to_sink_from(&ComponentKey::from("source"))
881            .into_iter()
882            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
883            .collect();
884
885        assert_eq!(paths.len(), 2);
886        assert_eq!(paths[0], vec!["source", "t1", "t3", "sink1"]);
887        assert_eq!(paths[1], vec!["source", "t1", "t2", "sink1"]);
888    }
889}