vector/config/
graph.rs

1use std::{
2    collections::{HashMap, HashSet, VecDeque},
3    fmt,
4};
5
6use indexmap::{IndexMap, set::IndexSet};
7
8use super::{
9    ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformContext,
10    TransformOuter, TransformOutput, WildcardMatching, schema,
11};
12
13#[derive(Debug, Clone)]
14pub enum Node {
15    Source {
16        outputs: Vec<SourceOutput>,
17    },
18    Transform {
19        in_ty: DataType,
20        outputs: Vec<TransformOutput>,
21    },
22    Sink {
23        ty: DataType,
24    },
25}
26
27impl fmt::Display for Node {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        match self {
30            Node::Source { outputs } => {
31                write!(f, "component_kind: source\n  outputs:")?;
32                for output in outputs {
33                    write!(f, "\n    {output}")?;
34                }
35                Ok(())
36            }
37            Node::Transform { in_ty, outputs } => {
38                write!(
39                    f,
40                    "component_kind: source\n  input_types: {in_ty}\n  outputs:"
41                )?;
42                for output in outputs {
43                    write!(f, "\n    {output}")?;
44                }
45                Ok(())
46            }
47            Node::Sink { ty } => {
48                write!(f, "component_kind: sink\n  types: {ty}")
49            }
50        }
51    }
52}
53
54#[derive(Debug, Clone)]
55struct Edge {
56    from: OutputId,
57    to: ComponentKey,
58}
59
60#[derive(Default)]
61pub struct Graph {
62    nodes: HashMap<ComponentKey, Node>,
63    edges: Vec<Edge>,
64}
65
66impl Graph {
67    pub fn new(
68        sources: &IndexMap<ComponentKey, SourceOuter>,
69        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
70        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
71        schema: schema::Options,
72        wildcard_matching: WildcardMatching,
73    ) -> Result<Self, Vec<String>> {
74        Self::new_inner(sources, transforms, sinks, false, schema, wildcard_matching)
75    }
76
77    pub fn new_unchecked(
78        sources: &IndexMap<ComponentKey, SourceOuter>,
79        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
80        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
81        schema: schema::Options,
82        wildcard_matching: WildcardMatching,
83    ) -> Self {
84        Self::new_inner(sources, transforms, sinks, true, schema, wildcard_matching)
85            .expect("errors ignored")
86    }
87
88    fn new_inner(
89        sources: &IndexMap<ComponentKey, SourceOuter>,
90        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
91        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
92        ignore_errors: bool,
93        schema: schema::Options,
94        wildcard_matching: WildcardMatching,
95    ) -> Result<Self, Vec<String>> {
96        let mut graph = Graph::default();
97        let mut errors = Vec::new();
98
99        // First, insert all of the different node types
100        for (id, config) in sources.iter() {
101            graph.nodes.insert(
102                id.clone(),
103                Node::Source {
104                    outputs: config.inner.outputs(schema.log_namespace()),
105                },
106            );
107        }
108
109        for (id, transform) in transforms.iter() {
110            graph.nodes.insert(
111                id.clone(),
112                Node::Transform {
113                    in_ty: transform.inner.input().data_type(),
114                    outputs: transform.inner.outputs(
115                        &TransformContext {
116                            schema,
117                            ..Default::default()
118                        },
119                        &[(id.into(), schema::Definition::any())],
120                    ),
121                },
122            );
123        }
124
125        for (id, config) in sinks {
126            graph.nodes.insert(
127                id.clone(),
128                Node::Sink {
129                    ty: config.inner.input().data_type(),
130                },
131            );
132        }
133
134        // With all of the nodes added, go through inputs and add edges, resolving strings into
135        // actual `OutputId`s along the way.
136        let available_inputs = graph.input_map()?;
137
138        for (id, config) in transforms.iter() {
139            for input in config.inputs.iter() {
140                if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
141                    errors.push(e);
142                }
143            }
144        }
145
146        for (id, config) in sinks {
147            for input in config.inputs.iter() {
148                if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
149                    errors.push(e);
150                }
151            }
152        }
153
154        if ignore_errors || errors.is_empty() {
155            Ok(graph)
156        } else {
157            Err(errors)
158        }
159    }
160
161    fn add_input(
162        &mut self,
163        from: &str,
164        to: &ComponentKey,
165        available_inputs: &HashMap<String, OutputId>,
166        wildcard_matching: WildcardMatching,
167    ) -> Result<(), String> {
168        if let Some(output_id) = available_inputs.get(from) {
169            self.edges.push(Edge {
170                from: output_id.clone(),
171                to: to.clone(),
172            });
173            Ok(())
174        } else {
175            let output_type = match self.nodes.get(to) {
176                Some(Node::Transform { .. }) => "transform",
177                Some(Node::Sink { .. }) => "sink",
178                _ => panic!("only transforms and sinks have inputs"),
179            };
180            // allow empty result if relaxed wildcard matching is enabled
181            match wildcard_matching {
182                WildcardMatching::Relaxed => {
183                    // using value != glob::Pattern::escape(value) to check if value is a glob
184                    // TODO: replace with proper check when https://github.com/rust-lang/glob/issues/72 is resolved
185                    if from != glob::Pattern::escape(from) {
186                        info!(
187                            "Input \"{from}\" for {output_type} \"{to}\" didn’t match any components, but this was ignored because `relaxed_wildcard_matching` is enabled."
188                        );
189                        return Ok(());
190                    }
191                }
192                WildcardMatching::Strict => {}
193            }
194            info!(
195                "Available components:\n{}",
196                self.nodes
197                    .iter()
198                    .map(|(key, node)| format!("\"{key}\":\n  {node}"))
199                    .collect::<Vec<_>>()
200                    .join("\n")
201            );
202            Err(format!(
203                "Input \"{from}\" for {output_type} \"{to}\" doesn't match any components.",
204            ))
205        }
206    }
207
208    /// Return the input type of a given component.
209    ///
210    /// # Panics
211    ///
212    /// Will panic if the given key is not present in the graph or identifies a source, which can't
213    /// have inputs.
214    fn get_input_type(&self, key: &ComponentKey) -> DataType {
215        match self.nodes[key] {
216            Node::Source { .. } => panic!("no inputs on sources"),
217            Node::Transform { in_ty, .. } => in_ty,
218            Node::Sink { ty } => ty,
219        }
220    }
221
222    /// Return the output type associated with a given `OutputId`.
223    ///
224    /// # Panics
225    ///
226    /// Will panic if the given id is not present in the graph or identifies a sink, which can't
227    /// have inputs.
228    fn get_output_type(&self, id: &OutputId) -> DataType {
229        match &self.nodes[&id.component] {
230            Node::Source { outputs } => outputs
231                .iter()
232                .find(|output| output.port == id.port)
233                .map(|output| output.ty)
234                .expect("output didn't exist"),
235            Node::Transform { outputs, .. } => outputs
236                .iter()
237                .find(|output| output.port == id.port)
238                .map(|output| output.ty)
239                .expect("output didn't exist"),
240            Node::Sink { .. } => panic!("no outputs on sinks"),
241        }
242    }
243
244    pub fn typecheck(&self) -> Result<(), Vec<String>> {
245        let mut errors = Vec::new();
246
247        // check that all edges connect components with compatible data types
248        for edge in &self.edges {
249            let from_ty = self.get_output_type(&edge.from);
250            let to_ty = self.get_input_type(&edge.to);
251
252            if !from_ty.intersects(to_ty) {
253                errors.push(format!(
254                    "Data type mismatch between {} ({}) and {} ({})",
255                    edge.from, from_ty, edge.to, to_ty
256                ));
257            }
258        }
259
260        if errors.is_empty() {
261            Ok(())
262        } else {
263            errors.sort();
264            errors.dedup();
265            Err(errors)
266        }
267    }
268
269    pub fn check_for_cycles(&self) -> Result<(), String> {
270        // find all sinks
271        let sinks = self.nodes.iter().filter_map(|(name, node)| match node {
272            Node::Sink { .. } => Some(name),
273            _ => None,
274        });
275
276        // run DFS from each sink while keep tracking the current stack to detect cycles
277        for s in sinks {
278            let mut traversal: VecDeque<ComponentKey> = VecDeque::new();
279            let mut visited: HashSet<ComponentKey> = HashSet::new();
280            let mut stack: IndexSet<ComponentKey> = IndexSet::new();
281
282            traversal.push_back(s.to_owned());
283            while !traversal.is_empty() {
284                let n = traversal.back().expect("can't be empty").clone();
285                if !visited.contains(&n) {
286                    visited.insert(n.clone());
287                    stack.insert(n.clone());
288                } else {
289                    // we came back to the node after exploring all its children - remove it from the stack and traversal
290                    stack.shift_remove(&n);
291                    traversal.pop_back();
292                }
293                let inputs = self
294                    .edges
295                    .iter()
296                    .filter(|e| e.to == n)
297                    .map(|e| e.from.clone());
298                for input in inputs {
299                    if !visited.contains(&input.component) {
300                        traversal.push_back(input.component);
301                    } else if stack.contains(&input.component) {
302                        // we reached the node while it is on the current stack - it's a cycle
303                        let path = stack
304                            .iter()
305                            .skip(1) // skip the sink
306                            .rev()
307                            .map(|item| item.to_string())
308                            .collect::<Vec<_>>();
309                        return Err(format!(
310                            "Cyclic dependency detected in the chain [ {} -> {} ]",
311                            input.component.id(),
312                            path.join(" -> ")
313                        ));
314                    }
315                }
316            }
317        }
318        Ok(())
319    }
320
321    pub fn valid_inputs(&self) -> HashSet<OutputId> {
322        self.nodes
323            .iter()
324            .flat_map(|(key, node)| match node {
325                Node::Sink { .. } => vec![],
326                Node::Source { outputs } => outputs
327                    .iter()
328                    .map(|output| OutputId {
329                        component: key.clone(),
330                        port: output.port.clone(),
331                    })
332                    .collect(),
333                Node::Transform { outputs, .. } => outputs
334                    .iter()
335                    .map(|output| OutputId {
336                        component: key.clone(),
337                        port: output.port.clone(),
338                    })
339                    .collect(),
340            })
341            .collect()
342    }
343
344    /// Produce a map of output IDs for the current set of nodes in the graph, keyed by their string
345    /// representation. Returns errors for any nodes that have the same string representation,
346    /// making input specifications ambiguous.
347    ///
348    /// When we get a dotted path in the `inputs` section of a user's config, we need to determine
349    /// which of a few things that represents:
350    ///
351    ///   1. A component that's part of an expanded macro (e.g. `route.branch`)
352    ///   2. A named output of a branching transform (e.g. `name.errors`)
353    ///
354    /// A naive way to do that is to compare the string representation of all valid inputs to the
355    /// provided string and pick the one that matches. This works better if you can assume that there
356    /// are no conflicting string representations, so this function reports any ambiguity as an
357    /// error when creating the lookup map.
358    pub fn input_map(&self) -> Result<HashMap<String, OutputId>, Vec<String>> {
359        let mut mapped: HashMap<String, OutputId> = HashMap::new();
360        let mut errors = HashSet::new();
361
362        for id in self.valid_inputs() {
363            if let Some(_other) = mapped.insert(id.to_string(), id.clone()) {
364                errors.insert(format!("Input specifier {id} is ambiguous"));
365            }
366        }
367
368        if errors.is_empty() {
369            Ok(mapped)
370        } else {
371            Err(errors.into_iter().collect())
372        }
373    }
374
375    pub fn inputs_for(&self, node: &ComponentKey) -> Vec<OutputId> {
376        self.edges
377            .iter()
378            .filter(|edge| &edge.to == node)
379            .map(|edge| edge.from.clone())
380            .collect()
381    }
382
383    /// From a given root node, get all paths from the root node to leaf nodes
384    /// where the leaf node must be a sink. This is useful for determining which
385    /// components are relevant in a Vector unit test.
386    ///
387    /// Caller must check for cycles before calling this function.
388    pub fn paths_to_sink_from(&self, root: &ComponentKey) -> Vec<Vec<ComponentKey>> {
389        let mut traversal: VecDeque<(ComponentKey, Vec<_>)> = VecDeque::new();
390        let mut paths = Vec::new();
391
392        traversal.push_back((root.to_owned(), Vec::new()));
393        while !traversal.is_empty() {
394            let (n, mut path) = traversal.pop_back().expect("can't be empty");
395            path.push(n.clone());
396            let neighbors = self
397                .edges
398                .iter()
399                .filter(|e| e.from.component == n)
400                .map(|e| e.to.clone())
401                .collect::<Vec<_>>();
402
403            if neighbors.is_empty() {
404                paths.push(path.clone());
405            } else {
406                for neighbor in neighbors {
407                    traversal.push_back((neighbor, path.clone()));
408                }
409            }
410        }
411
412        // Keep only components from paths that end at a sink
413        paths
414            .into_iter()
415            .filter(|path| {
416                if let Some(key) = path.last() {
417                    matches!(self.nodes.get(key), Some(Node::Sink { ty: _ }))
418                } else {
419                    false
420                }
421            })
422            .collect()
423    }
424}
425
426#[cfg(test)]
427mod test {
428    use similar_asserts::assert_eq;
429    use vector_lib::schema::Definition;
430
431    use super::*;
432
433    impl Graph {
434        fn add_source(&mut self, id: &str, ty: DataType) {
435            self.nodes.insert(
436                id.into(),
437                Node::Source {
438                    outputs: vec![match ty {
439                        DataType::Metric => SourceOutput::new_metrics(),
440                        DataType::Trace => SourceOutput::new_traces(),
441                        _ => SourceOutput::new_maybe_logs(ty, Definition::any()),
442                    }],
443                },
444            );
445        }
446
447        fn add_transform(
448            &mut self,
449            id: &str,
450            in_ty: DataType,
451            out_ty: DataType,
452            inputs: Vec<&str>,
453        ) {
454            let id = ComponentKey::from(id);
455            let inputs = clean_inputs(inputs);
456            self.nodes.insert(
457                id.clone(),
458                Node::Transform {
459                    in_ty,
460                    outputs: vec![TransformOutput::new(
461                        out_ty,
462                        [("test".into(), Definition::default_legacy_namespace())].into(),
463                    )],
464                },
465            );
466            for from in inputs {
467                self.edges.push(Edge {
468                    from,
469                    to: id.clone(),
470                });
471            }
472        }
473
474        fn add_transform_output(&mut self, id: &str, name: &str, ty: DataType) {
475            let id = id.into();
476            match self.nodes.get_mut(&id) {
477                Some(Node::Transform { outputs, .. }) => outputs.push(
478                    TransformOutput::new(
479                        ty,
480                        [("test".into(), Definition::default_legacy_namespace())].into(),
481                    )
482                    .with_port(name),
483                ),
484                _ => panic!("invalid transform"),
485            }
486        }
487
488        fn add_sink(&mut self, id: &str, ty: DataType, inputs: Vec<&str>) {
489            let id = ComponentKey::from(id);
490            let inputs = clean_inputs(inputs);
491            self.nodes.insert(id.clone(), Node::Sink { ty });
492            for from in inputs {
493                self.edges.push(Edge {
494                    from,
495                    to: id.clone(),
496                });
497            }
498        }
499
500        fn test_add_input(
501            &mut self,
502            node: &str,
503            input: &str,
504            wildcard_matching: WildcardMatching,
505        ) -> Result<(), String> {
506            let available_inputs = self.input_map().unwrap();
507            self.add_input(input, &node.into(), &available_inputs, wildcard_matching)
508        }
509    }
510
511    fn clean_inputs(inputs: Vec<&str>) -> Vec<OutputId> {
512        inputs.into_iter().map(Into::into).collect()
513    }
514
515    #[test]
516    fn paths_detects_cycles() {
517        let mut graph = Graph::default();
518        graph.add_source("in", DataType::Log);
519        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
520        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
521        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
522        graph.add_sink("out", DataType::Log, vec!["three"]);
523
524        assert_eq!(
525            Err("Cyclic dependency detected in the chain [ three -> one -> two -> three ]".into()),
526            graph.check_for_cycles()
527        );
528
529        let mut graph = Graph::default();
530        graph.add_source("in", DataType::Log);
531        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
532        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
533        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
534        graph.add_sink("out", DataType::Log, vec!["two"]);
535
536        assert_eq!(
537            Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
538            graph.check_for_cycles()
539        );
540        assert_eq!(
541            Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
542            graph.check_for_cycles()
543        );
544
545        let mut graph = Graph::default();
546        graph.add_source("in", DataType::Log);
547        graph.add_transform("in", DataType::Log, DataType::Log, vec!["in"]);
548        graph.add_sink("out", DataType::Log, vec!["in"]);
549
550        // This isn't really a cyclic dependency but let me have this one.
551        assert_eq!(
552            Err("Cyclic dependency detected in the chain [ in -> in ]".into()),
553            graph.check_for_cycles()
554        );
555    }
556
557    #[test]
558    fn paths_doesnt_detect_noncycles() {
559        let mut graph = Graph::default();
560        graph.add_source("in", DataType::Log);
561        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
562        graph.add_transform("two", DataType::Log, DataType::Log, vec!["in"]);
563        graph.add_transform("three", DataType::Log, DataType::Log, vec!["one", "two"]);
564        graph.add_sink("out", DataType::Log, vec!["three"]);
565
566        graph.check_for_cycles().unwrap();
567    }
568
569    #[test]
570    fn detects_type_mismatches() {
571        let mut graph = Graph::default();
572        graph.add_source("in", DataType::Log);
573        graph.add_sink("out", DataType::Metric, vec!["in"]);
574
575        assert_eq!(
576            Err(vec![
577                "Data type mismatch between in ([\"Log\"]) and out ([\"Metric\"])".into()
578            ]),
579            graph.typecheck()
580        );
581    }
582
583    #[test]
584    fn allows_log_or_metric_into_any() {
585        let mut graph = Graph::default();
586        graph.add_source("log_source", DataType::Log);
587        graph.add_source("metric_source", DataType::Metric);
588        graph.add_sink(
589            "any_sink",
590            DataType::all_bits(),
591            vec!["log_source", "metric_source"],
592        );
593
594        assert_eq!(Ok(()), graph.typecheck());
595    }
596
597    #[test]
598    fn allows_any_into_log_or_metric() {
599        let mut graph = Graph::default();
600        graph.add_source("any_source", DataType::all_bits());
601        graph.add_transform(
602            "log_to_any",
603            DataType::Log,
604            DataType::all_bits(),
605            vec!["any_source"],
606        );
607        graph.add_transform(
608            "any_to_log",
609            DataType::all_bits(),
610            DataType::Log,
611            vec!["any_source"],
612        );
613        graph.add_sink(
614            "log_sink",
615            DataType::Log,
616            vec!["any_source", "log_to_any", "any_to_log"],
617        );
618        graph.add_sink(
619            "metric_sink",
620            DataType::Metric,
621            vec!["any_source", "log_to_any"],
622        );
623
624        assert_eq!(graph.typecheck(), Ok(()));
625    }
626
627    #[test]
628    fn allows_both_directions_for_metrics() {
629        let mut graph = Graph::default();
630        graph.add_source("log_source", DataType::Log);
631        graph.add_source("metric_source", DataType::Metric);
632        graph.add_transform(
633            "log_to_log",
634            DataType::Log,
635            DataType::Log,
636            vec!["log_source"],
637        );
638        graph.add_transform(
639            "metric_to_metric",
640            DataType::Metric,
641            DataType::Metric,
642            vec!["metric_source"],
643        );
644        graph.add_transform(
645            "any_to_any",
646            DataType::all_bits(),
647            DataType::all_bits(),
648            vec!["log_to_log", "metric_to_metric"],
649        );
650        graph.add_transform(
651            "any_to_log",
652            DataType::all_bits(),
653            DataType::Log,
654            vec!["any_to_any"],
655        );
656        graph.add_transform(
657            "any_to_metric",
658            DataType::all_bits(),
659            DataType::Metric,
660            vec!["any_to_any"],
661        );
662        graph.add_sink("log_sink", DataType::Log, vec!["any_to_log"]);
663        graph.add_sink("metric_sink", DataType::Metric, vec!["any_to_metric"]);
664
665        assert_eq!(Ok(()), graph.typecheck());
666    }
667
668    #[test]
669    fn allows_multiple_transform_outputs() {
670        let mut graph = Graph::default();
671        graph.add_source("log_source", DataType::Log);
672        graph.add_transform(
673            "log_to_log",
674            DataType::Log,
675            DataType::Log,
676            vec!["log_source"],
677        );
678        graph.add_transform_output("log_to_log", "errors", DataType::Log);
679        graph.add_sink("good_log_sink", DataType::Log, vec!["log_to_log"]);
680
681        // don't add inputs to these yet since they're not validated via these helpers
682        graph.add_sink("errored_log_sink", DataType::Log, vec![]);
683        graph.add_sink("bad_log_sink", DataType::Log, vec![]);
684
685        // make sure we're good with dotted paths
686        assert_eq!(
687            Ok(()),
688            graph.test_add_input(
689                "errored_log_sink",
690                "log_to_log.errors",
691                WildcardMatching::Strict
692            )
693        );
694
695        // make sure that we're not cool with an unknown dotted path
696        let expected = "Input \"log_to_log.not_errors\" for sink \"bad_log_sink\" doesn't match any components.".to_string();
697        assert_eq!(
698            Err(expected),
699            graph.test_add_input(
700                "bad_log_sink",
701                "log_to_log.not_errors",
702                WildcardMatching::Strict
703            )
704        );
705    }
706
707    #[test]
708    fn disallows_ambiguous_inputs() {
709        let mut graph = Graph::default();
710        // these all look like "foo.bar", but should only yield one error
711        graph.nodes.insert(
712            ComponentKey::from("foo.bar"),
713            Node::Source {
714                outputs: vec![SourceOutput::new_maybe_logs(
715                    DataType::all_bits(),
716                    Definition::any(),
717                )],
718            },
719        );
720        graph.nodes.insert(
721            ComponentKey::from("foo.bar"),
722            Node::Source {
723                outputs: vec![SourceOutput::new_maybe_logs(
724                    DataType::all_bits(),
725                    Definition::any(),
726                )],
727            },
728        );
729        graph.nodes.insert(
730            ComponentKey::from("foo"),
731            Node::Transform {
732                in_ty: DataType::all_bits(),
733                outputs: vec![
734                    TransformOutput::new(
735                        DataType::all_bits(),
736                        [("test".into(), Definition::default_legacy_namespace())].into(),
737                    ),
738                    TransformOutput::new(
739                        DataType::all_bits(),
740                        [("test".into(), Definition::default_legacy_namespace())].into(),
741                    )
742                    .with_port("bar"),
743                ],
744            },
745        );
746
747        // make sure we return more than one
748        graph.nodes.insert(
749            ComponentKey::from("baz.errors"),
750            Node::Source {
751                outputs: vec![SourceOutput::new_maybe_logs(
752                    DataType::all_bits(),
753                    Definition::any(),
754                )],
755            },
756        );
757        graph.nodes.insert(
758            ComponentKey::from("baz"),
759            Node::Transform {
760                in_ty: DataType::all_bits(),
761                outputs: vec![
762                    TransformOutput::new(
763                        DataType::all_bits(),
764                        [("test".into(), Definition::default_legacy_namespace())].into(),
765                    ),
766                    TransformOutput::new(
767                        DataType::all_bits(),
768                        [("test".into(), Definition::default_legacy_namespace())].into(),
769                    )
770                    .with_port("errors"),
771                ],
772            },
773        );
774
775        let mut errors = graph.input_map().unwrap_err();
776        errors.sort();
777        assert_eq!(
778            errors,
779            vec![
780                String::from("Input specifier baz.errors is ambiguous"),
781                String::from("Input specifier foo.bar is ambiguous"),
782            ]
783        );
784    }
785
786    #[test]
787    fn wildcard_matching() {
788        let mut graph = Graph::default();
789        graph.add_source("log_source", DataType::Log);
790
791        // don't add inputs to these yet since they're not validated via these helpers
792        graph.add_sink("sink", DataType::Log, vec![]);
793
794        // make sure we're not good with non existing inputs with relaxed wildcard matching disabled
795        let wildcard_matching = WildcardMatching::Strict;
796        let expected =
797            "Input \"bad_source-*\" for sink \"sink\" doesn't match any components.".to_string();
798        assert_eq!(
799            Err(expected),
800            graph.test_add_input("sink", "bad_source-*", wildcard_matching)
801        );
802
803        // make sure we're good with non existing inputs with relaxed wildcard matching enabled
804        let wildcard_matching = WildcardMatching::Relaxed;
805        assert_eq!(
806            Ok(()),
807            graph.test_add_input("sink", "bad_source-*", wildcard_matching)
808        );
809
810        // make sure we're not good with non existing inputs that are not wildcards even when relaxed wildcard matching is enabled
811        let wildcard_matching = WildcardMatching::Relaxed;
812        let expected =
813            "Input \"bad_source-1\" for sink \"sink\" doesn't match any components.".to_string();
814        assert_eq!(
815            Err(expected),
816            graph.test_add_input("sink", "bad_source-1", wildcard_matching)
817        );
818    }
819
820    #[test]
821    fn paths_to_sink_simple() {
822        let mut graph = Graph::default();
823        graph.add_source("in", DataType::Log);
824        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
825        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
826        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
827        graph.add_sink("out", DataType::Log, vec!["three"]);
828
829        let paths: Vec<Vec<_>> = graph
830            .paths_to_sink_from(&ComponentKey::from("in"))
831            .into_iter()
832            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
833            .collect();
834
835        assert_eq!(paths.len(), 1);
836        assert_eq!(paths[0], vec!["in", "one", "two", "three", "out"])
837    }
838
839    #[test]
840    fn paths_to_sink_non_existent_root() {
841        let graph = Graph::default();
842        let paths = graph.paths_to_sink_from(&ComponentKey::from("in"));
843
844        assert_eq!(paths.len(), 0);
845    }
846
847    #[test]
848    fn paths_to_sink_irrelevant_transforms() {
849        let mut graph = Graph::default();
850        graph.add_source("source", DataType::Log);
851        // These transforms do not link to a sink
852        graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
853        graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
854        graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
855        // These transforms do link to a sink
856        graph.add_transform("t4", DataType::Log, DataType::Log, vec!["source"]);
857        graph.add_transform("t5", DataType::Log, DataType::Log, vec!["source"]);
858        graph.add_sink("sink1", DataType::Log, vec!["t4"]);
859        graph.add_sink("sink2", DataType::Log, vec!["t5"]);
860
861        let paths: Vec<Vec<_>> = graph
862            .paths_to_sink_from(&ComponentKey::from("source"))
863            .into_iter()
864            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
865            .collect();
866
867        assert_eq!(paths.len(), 2);
868        assert_eq!(paths[0], vec!["source", "t5", "sink2"]);
869        assert_eq!(paths[1], vec!["source", "t4", "sink1"]);
870    }
871
872    #[test]
873    fn paths_to_sink_multiple_inputs_into_sink() {
874        let mut graph = Graph::default();
875        graph.add_source("source", DataType::Log);
876        graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
877        graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
878        graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
879        graph.add_sink("sink1", DataType::Log, vec!["t2", "t3"]);
880
881        let paths: Vec<Vec<_>> = graph
882            .paths_to_sink_from(&ComponentKey::from("source"))
883            .into_iter()
884            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
885            .collect();
886
887        assert_eq!(paths.len(), 2);
888        assert_eq!(paths[0], vec!["source", "t1", "t3", "sink1"]);
889        assert_eq!(paths[1], vec!["source", "t1", "t2", "sink1"]);
890    }
891}