vector/config/
graph.rs

1use super::{
2    schema, ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformOuter,
3    TransformOutput, WildcardMatching,
4};
5use indexmap::{set::IndexSet, IndexMap};
6use std::collections::{HashMap, HashSet, VecDeque};
7use std::fmt;
8
9#[derive(Debug, Clone)]
10pub enum Node {
11    Source {
12        outputs: Vec<SourceOutput>,
13    },
14    Transform {
15        in_ty: DataType,
16        outputs: Vec<TransformOutput>,
17    },
18    Sink {
19        ty: DataType,
20    },
21}
22
23impl fmt::Display for Node {
24    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25        match self {
26            Node::Source { outputs } => {
27                write!(f, "component_kind: source\n  outputs:")?;
28                for output in outputs {
29                    write!(f, "\n    {output}")?;
30                }
31                Ok(())
32            }
33            Node::Transform { in_ty, outputs } => {
34                write!(
35                    f,
36                    "component_kind: source\n  input_types: {in_ty}\n  outputs:"
37                )?;
38                for output in outputs {
39                    write!(f, "\n    {output}")?;
40                }
41                Ok(())
42            }
43            Node::Sink { ty } => {
44                write!(f, "component_kind: sink\n  types: {ty}")
45            }
46        }
47    }
48}
49
50#[derive(Debug, Clone)]
51struct Edge {
52    from: OutputId,
53    to: ComponentKey,
54}
55
56#[derive(Default)]
57pub struct Graph {
58    nodes: HashMap<ComponentKey, Node>,
59    edges: Vec<Edge>,
60}
61
62impl Graph {
63    pub fn new(
64        sources: &IndexMap<ComponentKey, SourceOuter>,
65        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
66        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
67        schema: schema::Options,
68        wildcard_matching: WildcardMatching,
69    ) -> Result<Self, Vec<String>> {
70        Self::new_inner(sources, transforms, sinks, false, schema, wildcard_matching)
71    }
72
73    pub fn new_unchecked(
74        sources: &IndexMap<ComponentKey, SourceOuter>,
75        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
76        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
77        schema: schema::Options,
78        wildcard_matching: WildcardMatching,
79    ) -> Self {
80        Self::new_inner(sources, transforms, sinks, true, schema, wildcard_matching)
81            .expect("errors ignored")
82    }
83
84    fn new_inner(
85        sources: &IndexMap<ComponentKey, SourceOuter>,
86        transforms: &IndexMap<ComponentKey, TransformOuter<String>>,
87        sinks: &IndexMap<ComponentKey, SinkOuter<String>>,
88        ignore_errors: bool,
89        schema: schema::Options,
90        wildcard_matching: WildcardMatching,
91    ) -> Result<Self, Vec<String>> {
92        let mut graph = Graph::default();
93        let mut errors = Vec::new();
94
95        // First, insert all of the different node types
96        for (id, config) in sources.iter() {
97            graph.nodes.insert(
98                id.clone(),
99                Node::Source {
100                    outputs: config.inner.outputs(schema.log_namespace()),
101                },
102            );
103        }
104
105        for (id, transform) in transforms.iter() {
106            graph.nodes.insert(
107                id.clone(),
108                Node::Transform {
109                    in_ty: transform.inner.input().data_type(),
110                    outputs: transform.inner.outputs(
111                        vector_lib::enrichment::TableRegistry::default(),
112                        &[(id.into(), schema::Definition::any())],
113                        schema.log_namespace(),
114                    ),
115                },
116            );
117        }
118
119        for (id, config) in sinks {
120            graph.nodes.insert(
121                id.clone(),
122                Node::Sink {
123                    ty: config.inner.input().data_type(),
124                },
125            );
126        }
127
128        // With all of the nodes added, go through inputs and add edges, resolving strings into
129        // actual `OutputId`s along the way.
130        let available_inputs = graph.input_map()?;
131
132        for (id, config) in transforms.iter() {
133            for input in config.inputs.iter() {
134                if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
135                    errors.push(e);
136                }
137            }
138        }
139
140        for (id, config) in sinks {
141            for input in config.inputs.iter() {
142                if let Err(e) = graph.add_input(input, id, &available_inputs, wildcard_matching) {
143                    errors.push(e);
144                }
145            }
146        }
147
148        if ignore_errors || errors.is_empty() {
149            Ok(graph)
150        } else {
151            Err(errors)
152        }
153    }
154
155    fn add_input(
156        &mut self,
157        from: &str,
158        to: &ComponentKey,
159        available_inputs: &HashMap<String, OutputId>,
160        wildcard_matching: WildcardMatching,
161    ) -> Result<(), String> {
162        if let Some(output_id) = available_inputs.get(from) {
163            self.edges.push(Edge {
164                from: output_id.clone(),
165                to: to.clone(),
166            });
167            Ok(())
168        } else {
169            let output_type = match self.nodes.get(to) {
170                Some(Node::Transform { .. }) => "transform",
171                Some(Node::Sink { .. }) => "sink",
172                _ => panic!("only transforms and sinks have inputs"),
173            };
174            // allow empty result if relaxed wildcard matching is enabled
175            match wildcard_matching {
176                WildcardMatching::Relaxed => {
177                    // using value != glob::Pattern::escape(value) to check if value is a glob
178                    // TODO: replace with proper check when https://github.com/rust-lang/glob/issues/72 is resolved
179                    if from != glob::Pattern::escape(from) {
180                        info!("Input \"{from}\" for {output_type} \"{to}\" didn’t match any components, but this was ignored because `relaxed_wildcard_matching` is enabled.");
181                        return Ok(());
182                    }
183                }
184                WildcardMatching::Strict => {}
185            }
186            info!(
187                "Available components:\n{}",
188                self.nodes
189                    .iter()
190                    .map(|(key, node)| format!("\"{key}\":\n  {node}"))
191                    .collect::<Vec<_>>()
192                    .join("\n")
193            );
194            Err(format!(
195                "Input \"{from}\" for {output_type} \"{to}\" doesn't match any components.",
196            ))
197        }
198    }
199
200    /// Return the input type of a given component.
201    ///
202    /// # Panics
203    ///
204    /// Will panic if the given key is not present in the graph or identifies a source, which can't
205    /// have inputs.
206    fn get_input_type(&self, key: &ComponentKey) -> DataType {
207        match self.nodes[key] {
208            Node::Source { .. } => panic!("no inputs on sources"),
209            Node::Transform { in_ty, .. } => in_ty,
210            Node::Sink { ty } => ty,
211        }
212    }
213
214    /// Return the output type associated with a given `OutputId`.
215    ///
216    /// # Panics
217    ///
218    /// Will panic if the given id is not present in the graph or identifies a sink, which can't
219    /// have inputs.
220    fn get_output_type(&self, id: &OutputId) -> DataType {
221        match &self.nodes[&id.component] {
222            Node::Source { outputs } => outputs
223                .iter()
224                .find(|output| output.port == id.port)
225                .map(|output| output.ty)
226                .expect("output didn't exist"),
227            Node::Transform { outputs, .. } => outputs
228                .iter()
229                .find(|output| output.port == id.port)
230                .map(|output| output.ty)
231                .expect("output didn't exist"),
232            Node::Sink { .. } => panic!("no outputs on sinks"),
233        }
234    }
235
236    pub fn typecheck(&self) -> Result<(), Vec<String>> {
237        let mut errors = Vec::new();
238
239        // check that all edges connect components with compatible data types
240        for edge in &self.edges {
241            let from_ty = self.get_output_type(&edge.from);
242            let to_ty = self.get_input_type(&edge.to);
243
244            if !from_ty.intersects(to_ty) {
245                errors.push(format!(
246                    "Data type mismatch between {} ({}) and {} ({})",
247                    edge.from, from_ty, edge.to, to_ty
248                ));
249            }
250        }
251
252        if errors.is_empty() {
253            Ok(())
254        } else {
255            errors.sort();
256            errors.dedup();
257            Err(errors)
258        }
259    }
260
261    pub fn check_for_cycles(&self) -> Result<(), String> {
262        // find all sinks
263        let sinks = self.nodes.iter().filter_map(|(name, node)| match node {
264            Node::Sink { .. } => Some(name),
265            _ => None,
266        });
267
268        // run DFS from each sink while keep tracking the current stack to detect cycles
269        for s in sinks {
270            let mut traversal: VecDeque<ComponentKey> = VecDeque::new();
271            let mut visited: HashSet<ComponentKey> = HashSet::new();
272            let mut stack: IndexSet<ComponentKey> = IndexSet::new();
273
274            traversal.push_back(s.to_owned());
275            while !traversal.is_empty() {
276                let n = traversal.back().expect("can't be empty").clone();
277                if !visited.contains(&n) {
278                    visited.insert(n.clone());
279                    stack.insert(n.clone());
280                } else {
281                    // we came back to the node after exploring all its children - remove it from the stack and traversal
282                    stack.shift_remove(&n);
283                    traversal.pop_back();
284                }
285                let inputs = self
286                    .edges
287                    .iter()
288                    .filter(|e| e.to == n)
289                    .map(|e| e.from.clone());
290                for input in inputs {
291                    if !visited.contains(&input.component) {
292                        traversal.push_back(input.component);
293                    } else if stack.contains(&input.component) {
294                        // we reached the node while it is on the current stack - it's a cycle
295                        let path = stack
296                            .iter()
297                            .skip(1) // skip the sink
298                            .rev()
299                            .map(|item| item.to_string())
300                            .collect::<Vec<_>>();
301                        return Err(format!(
302                            "Cyclic dependency detected in the chain [ {} -> {} ]",
303                            input.component.id(),
304                            path.join(" -> ")
305                        ));
306                    }
307                }
308            }
309        }
310        Ok(())
311    }
312
313    pub fn valid_inputs(&self) -> HashSet<OutputId> {
314        self.nodes
315            .iter()
316            .flat_map(|(key, node)| match node {
317                Node::Sink { .. } => vec![],
318                Node::Source { outputs } => outputs
319                    .iter()
320                    .map(|output| OutputId {
321                        component: key.clone(),
322                        port: output.port.clone(),
323                    })
324                    .collect(),
325                Node::Transform { outputs, .. } => outputs
326                    .iter()
327                    .map(|output| OutputId {
328                        component: key.clone(),
329                        port: output.port.clone(),
330                    })
331                    .collect(),
332            })
333            .collect()
334    }
335
336    /// Produce a map of output IDs for the current set of nodes in the graph, keyed by their string
337    /// representation. Returns errors for any nodes that have the same string representation,
338    /// making input specifications ambiguous.
339    ///
340    /// When we get a dotted path in the `inputs` section of a user's config, we need to determine
341    /// which of a few things that represents:
342    ///
343    ///   1. A component that's part of an expanded macro (e.g. `route.branch`)
344    ///   2. A named output of a branching transform (e.g. `name.errors`)
345    ///
346    /// A naive way to do that is to compare the string representation of all valid inputs to the
347    /// provided string and pick the one that matches. This works better if you can assume that there
348    /// are no conflicting string representations, so this function reports any ambiguity as an
349    /// error when creating the lookup map.
350    pub fn input_map(&self) -> Result<HashMap<String, OutputId>, Vec<String>> {
351        let mut mapped: HashMap<String, OutputId> = HashMap::new();
352        let mut errors = HashSet::new();
353
354        for id in self.valid_inputs() {
355            if let Some(_other) = mapped.insert(id.to_string(), id.clone()) {
356                errors.insert(format!("Input specifier {id} is ambiguous"));
357            }
358        }
359
360        if errors.is_empty() {
361            Ok(mapped)
362        } else {
363            Err(errors.into_iter().collect())
364        }
365    }
366
367    pub fn inputs_for(&self, node: &ComponentKey) -> Vec<OutputId> {
368        self.edges
369            .iter()
370            .filter(|edge| &edge.to == node)
371            .map(|edge| edge.from.clone())
372            .collect()
373    }
374
375    /// From a given root node, get all paths from the root node to leaf nodes
376    /// where the leaf node must be a sink. This is useful for determining which
377    /// components are relevant in a Vector unit test.
378    ///
379    /// Caller must check for cycles before calling this function.
380    pub fn paths_to_sink_from(&self, root: &ComponentKey) -> Vec<Vec<ComponentKey>> {
381        let mut traversal: VecDeque<(ComponentKey, Vec<_>)> = VecDeque::new();
382        let mut paths = Vec::new();
383
384        traversal.push_back((root.to_owned(), Vec::new()));
385        while !traversal.is_empty() {
386            let (n, mut path) = traversal.pop_back().expect("can't be empty");
387            path.push(n.clone());
388            let neighbors = self
389                .edges
390                .iter()
391                .filter(|e| e.from.component == n)
392                .map(|e| e.to.clone())
393                .collect::<Vec<_>>();
394
395            if neighbors.is_empty() {
396                paths.push(path.clone());
397            } else {
398                for neighbor in neighbors {
399                    traversal.push_back((neighbor, path.clone()));
400                }
401            }
402        }
403
404        // Keep only components from paths that end at a sink
405        paths
406            .into_iter()
407            .filter(|path| {
408                if let Some(key) = path.last() {
409                    matches!(self.nodes.get(key), Some(Node::Sink { ty: _ }))
410                } else {
411                    false
412                }
413            })
414            .collect()
415    }
416}
417
418#[cfg(test)]
419mod test {
420    use similar_asserts::assert_eq;
421    use vector_lib::schema::Definition;
422
423    use super::*;
424
425    impl Graph {
426        fn add_source(&mut self, id: &str, ty: DataType) {
427            self.nodes.insert(
428                id.into(),
429                Node::Source {
430                    outputs: vec![match ty {
431                        DataType::Metric => SourceOutput::new_metrics(),
432                        DataType::Trace => SourceOutput::new_traces(),
433                        _ => SourceOutput::new_maybe_logs(ty, Definition::any()),
434                    }],
435                },
436            );
437        }
438
439        fn add_transform(
440            &mut self,
441            id: &str,
442            in_ty: DataType,
443            out_ty: DataType,
444            inputs: Vec<&str>,
445        ) {
446            let id = ComponentKey::from(id);
447            let inputs = clean_inputs(inputs);
448            self.nodes.insert(
449                id.clone(),
450                Node::Transform {
451                    in_ty,
452                    outputs: vec![TransformOutput::new(
453                        out_ty,
454                        [("test".into(), Definition::default_legacy_namespace())].into(),
455                    )],
456                },
457            );
458            for from in inputs {
459                self.edges.push(Edge {
460                    from,
461                    to: id.clone(),
462                });
463            }
464        }
465
466        fn add_transform_output(&mut self, id: &str, name: &str, ty: DataType) {
467            let id = id.into();
468            match self.nodes.get_mut(&id) {
469                Some(Node::Transform { outputs, .. }) => outputs.push(
470                    TransformOutput::new(
471                        ty,
472                        [("test".into(), Definition::default_legacy_namespace())].into(),
473                    )
474                    .with_port(name),
475                ),
476                _ => panic!("invalid transform"),
477            }
478        }
479
480        fn add_sink(&mut self, id: &str, ty: DataType, inputs: Vec<&str>) {
481            let id = ComponentKey::from(id);
482            let inputs = clean_inputs(inputs);
483            self.nodes.insert(id.clone(), Node::Sink { ty });
484            for from in inputs {
485                self.edges.push(Edge {
486                    from,
487                    to: id.clone(),
488                });
489            }
490        }
491
492        fn test_add_input(
493            &mut self,
494            node: &str,
495            input: &str,
496            wildcard_matching: WildcardMatching,
497        ) -> Result<(), String> {
498            let available_inputs = self.input_map().unwrap();
499            self.add_input(input, &node.into(), &available_inputs, wildcard_matching)
500        }
501    }
502
503    fn clean_inputs(inputs: Vec<&str>) -> Vec<OutputId> {
504        inputs.into_iter().map(Into::into).collect()
505    }
506
507    #[test]
508    fn paths_detects_cycles() {
509        let mut graph = Graph::default();
510        graph.add_source("in", DataType::Log);
511        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
512        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
513        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
514        graph.add_sink("out", DataType::Log, vec!["three"]);
515
516        assert_eq!(
517            Err("Cyclic dependency detected in the chain [ three -> one -> two -> three ]".into()),
518            graph.check_for_cycles()
519        );
520
521        let mut graph = Graph::default();
522        graph.add_source("in", DataType::Log);
523        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in", "three"]);
524        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
525        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
526        graph.add_sink("out", DataType::Log, vec!["two"]);
527
528        assert_eq!(
529            Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
530            graph.check_for_cycles()
531        );
532        assert_eq!(
533            Err("Cyclic dependency detected in the chain [ two -> three -> one -> two ]".into()),
534            graph.check_for_cycles()
535        );
536
537        let mut graph = Graph::default();
538        graph.add_source("in", DataType::Log);
539        graph.add_transform("in", DataType::Log, DataType::Log, vec!["in"]);
540        graph.add_sink("out", DataType::Log, vec!["in"]);
541
542        // This isn't really a cyclic dependency but let me have this one.
543        assert_eq!(
544            Err("Cyclic dependency detected in the chain [ in -> in ]".into()),
545            graph.check_for_cycles()
546        );
547    }
548
549    #[test]
550    fn paths_doesnt_detect_noncycles() {
551        let mut graph = Graph::default();
552        graph.add_source("in", DataType::Log);
553        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
554        graph.add_transform("two", DataType::Log, DataType::Log, vec!["in"]);
555        graph.add_transform("three", DataType::Log, DataType::Log, vec!["one", "two"]);
556        graph.add_sink("out", DataType::Log, vec!["three"]);
557
558        graph.check_for_cycles().unwrap();
559    }
560
561    #[test]
562    fn detects_type_mismatches() {
563        let mut graph = Graph::default();
564        graph.add_source("in", DataType::Log);
565        graph.add_sink("out", DataType::Metric, vec!["in"]);
566
567        assert_eq!(
568            Err(vec![
569                "Data type mismatch between in ([\"Log\"]) and out ([\"Metric\"])".into()
570            ]),
571            graph.typecheck()
572        );
573    }
574
575    #[test]
576    fn allows_log_or_metric_into_any() {
577        let mut graph = Graph::default();
578        graph.add_source("log_source", DataType::Log);
579        graph.add_source("metric_source", DataType::Metric);
580        graph.add_sink(
581            "any_sink",
582            DataType::all_bits(),
583            vec!["log_source", "metric_source"],
584        );
585
586        assert_eq!(Ok(()), graph.typecheck());
587    }
588
589    #[test]
590    fn allows_any_into_log_or_metric() {
591        let mut graph = Graph::default();
592        graph.add_source("any_source", DataType::all_bits());
593        graph.add_transform(
594            "log_to_any",
595            DataType::Log,
596            DataType::all_bits(),
597            vec!["any_source"],
598        );
599        graph.add_transform(
600            "any_to_log",
601            DataType::all_bits(),
602            DataType::Log,
603            vec!["any_source"],
604        );
605        graph.add_sink(
606            "log_sink",
607            DataType::Log,
608            vec!["any_source", "log_to_any", "any_to_log"],
609        );
610        graph.add_sink(
611            "metric_sink",
612            DataType::Metric,
613            vec!["any_source", "log_to_any"],
614        );
615
616        assert_eq!(graph.typecheck(), Ok(()));
617    }
618
619    #[test]
620    fn allows_both_directions_for_metrics() {
621        let mut graph = Graph::default();
622        graph.add_source("log_source", DataType::Log);
623        graph.add_source("metric_source", DataType::Metric);
624        graph.add_transform(
625            "log_to_log",
626            DataType::Log,
627            DataType::Log,
628            vec!["log_source"],
629        );
630        graph.add_transform(
631            "metric_to_metric",
632            DataType::Metric,
633            DataType::Metric,
634            vec!["metric_source"],
635        );
636        graph.add_transform(
637            "any_to_any",
638            DataType::all_bits(),
639            DataType::all_bits(),
640            vec!["log_to_log", "metric_to_metric"],
641        );
642        graph.add_transform(
643            "any_to_log",
644            DataType::all_bits(),
645            DataType::Log,
646            vec!["any_to_any"],
647        );
648        graph.add_transform(
649            "any_to_metric",
650            DataType::all_bits(),
651            DataType::Metric,
652            vec!["any_to_any"],
653        );
654        graph.add_sink("log_sink", DataType::Log, vec!["any_to_log"]);
655        graph.add_sink("metric_sink", DataType::Metric, vec!["any_to_metric"]);
656
657        assert_eq!(Ok(()), graph.typecheck());
658    }
659
660    #[test]
661    fn allows_multiple_transform_outputs() {
662        let mut graph = Graph::default();
663        graph.add_source("log_source", DataType::Log);
664        graph.add_transform(
665            "log_to_log",
666            DataType::Log,
667            DataType::Log,
668            vec!["log_source"],
669        );
670        graph.add_transform_output("log_to_log", "errors", DataType::Log);
671        graph.add_sink("good_log_sink", DataType::Log, vec!["log_to_log"]);
672
673        // don't add inputs to these yet since they're not validated via these helpers
674        graph.add_sink("errored_log_sink", DataType::Log, vec![]);
675        graph.add_sink("bad_log_sink", DataType::Log, vec![]);
676
677        // make sure we're good with dotted paths
678        assert_eq!(
679            Ok(()),
680            graph.test_add_input(
681                "errored_log_sink",
682                "log_to_log.errors",
683                WildcardMatching::Strict
684            )
685        );
686
687        // make sure that we're not cool with an unknown dotted path
688        let expected = "Input \"log_to_log.not_errors\" for sink \"bad_log_sink\" doesn't match any components.".to_string();
689        assert_eq!(
690            Err(expected),
691            graph.test_add_input(
692                "bad_log_sink",
693                "log_to_log.not_errors",
694                WildcardMatching::Strict
695            )
696        );
697    }
698
699    #[test]
700    fn disallows_ambiguous_inputs() {
701        let mut graph = Graph::default();
702        // these all look like "foo.bar", but should only yield one error
703        graph.nodes.insert(
704            ComponentKey::from("foo.bar"),
705            Node::Source {
706                outputs: vec![SourceOutput::new_maybe_logs(
707                    DataType::all_bits(),
708                    Definition::any(),
709                )],
710            },
711        );
712        graph.nodes.insert(
713            ComponentKey::from("foo.bar"),
714            Node::Source {
715                outputs: vec![SourceOutput::new_maybe_logs(
716                    DataType::all_bits(),
717                    Definition::any(),
718                )],
719            },
720        );
721        graph.nodes.insert(
722            ComponentKey::from("foo"),
723            Node::Transform {
724                in_ty: DataType::all_bits(),
725                outputs: vec![
726                    TransformOutput::new(
727                        DataType::all_bits(),
728                        [("test".into(), Definition::default_legacy_namespace())].into(),
729                    ),
730                    TransformOutput::new(
731                        DataType::all_bits(),
732                        [("test".into(), Definition::default_legacy_namespace())].into(),
733                    )
734                    .with_port("bar"),
735                ],
736            },
737        );
738
739        // make sure we return more than one
740        graph.nodes.insert(
741            ComponentKey::from("baz.errors"),
742            Node::Source {
743                outputs: vec![SourceOutput::new_maybe_logs(
744                    DataType::all_bits(),
745                    Definition::any(),
746                )],
747            },
748        );
749        graph.nodes.insert(
750            ComponentKey::from("baz"),
751            Node::Transform {
752                in_ty: DataType::all_bits(),
753                outputs: vec![
754                    TransformOutput::new(
755                        DataType::all_bits(),
756                        [("test".into(), Definition::default_legacy_namespace())].into(),
757                    ),
758                    TransformOutput::new(
759                        DataType::all_bits(),
760                        [("test".into(), Definition::default_legacy_namespace())].into(),
761                    )
762                    .with_port("errors"),
763                ],
764            },
765        );
766
767        let mut errors = graph.input_map().unwrap_err();
768        errors.sort();
769        assert_eq!(
770            errors,
771            vec![
772                String::from("Input specifier baz.errors is ambiguous"),
773                String::from("Input specifier foo.bar is ambiguous"),
774            ]
775        );
776    }
777
778    #[test]
779    fn wildcard_matching() {
780        let mut graph = Graph::default();
781        graph.add_source("log_source", DataType::Log);
782
783        // don't add inputs to these yet since they're not validated via these helpers
784        graph.add_sink("sink", DataType::Log, vec![]);
785
786        // make sure we're not good with non existing inputs with relaxed wildcard matching disabled
787        let wildcard_matching = WildcardMatching::Strict;
788        let expected =
789            "Input \"bad_source-*\" for sink \"sink\" doesn't match any components.".to_string();
790        assert_eq!(
791            Err(expected),
792            graph.test_add_input("sink", "bad_source-*", wildcard_matching)
793        );
794
795        // make sure we're good with non existing inputs with relaxed wildcard matching enabled
796        let wildcard_matching = WildcardMatching::Relaxed;
797        assert_eq!(
798            Ok(()),
799            graph.test_add_input("sink", "bad_source-*", wildcard_matching)
800        );
801
802        // make sure we're not good with non existing inputs that are not wildcards even when relaxed wildcard matching is enabled
803        let wildcard_matching = WildcardMatching::Relaxed;
804        let expected =
805            "Input \"bad_source-1\" for sink \"sink\" doesn't match any components.".to_string();
806        assert_eq!(
807            Err(expected),
808            graph.test_add_input("sink", "bad_source-1", wildcard_matching)
809        );
810    }
811
812    #[test]
813    fn paths_to_sink_simple() {
814        let mut graph = Graph::default();
815        graph.add_source("in", DataType::Log);
816        graph.add_transform("one", DataType::Log, DataType::Log, vec!["in"]);
817        graph.add_transform("two", DataType::Log, DataType::Log, vec!["one"]);
818        graph.add_transform("three", DataType::Log, DataType::Log, vec!["two"]);
819        graph.add_sink("out", DataType::Log, vec!["three"]);
820
821        let paths: Vec<Vec<_>> = graph
822            .paths_to_sink_from(&ComponentKey::from("in"))
823            .into_iter()
824            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
825            .collect();
826
827        assert_eq!(paths.len(), 1);
828        assert_eq!(paths[0], vec!["in", "one", "two", "three", "out"])
829    }
830
831    #[test]
832    fn paths_to_sink_non_existent_root() {
833        let graph = Graph::default();
834        let paths = graph.paths_to_sink_from(&ComponentKey::from("in"));
835
836        assert_eq!(paths.len(), 0);
837    }
838
839    #[test]
840    fn paths_to_sink_irrelevant_transforms() {
841        let mut graph = Graph::default();
842        graph.add_source("source", DataType::Log);
843        // These transforms do not link to a sink
844        graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
845        graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
846        graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
847        // These transforms do link to a sink
848        graph.add_transform("t4", DataType::Log, DataType::Log, vec!["source"]);
849        graph.add_transform("t5", DataType::Log, DataType::Log, vec!["source"]);
850        graph.add_sink("sink1", DataType::Log, vec!["t4"]);
851        graph.add_sink("sink2", DataType::Log, vec!["t5"]);
852
853        let paths: Vec<Vec<_>> = graph
854            .paths_to_sink_from(&ComponentKey::from("source"))
855            .into_iter()
856            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
857            .collect();
858
859        assert_eq!(paths.len(), 2);
860        assert_eq!(paths[0], vec!["source", "t5", "sink2"]);
861        assert_eq!(paths[1], vec!["source", "t4", "sink1"]);
862    }
863
864    #[test]
865    fn paths_to_sink_multiple_inputs_into_sink() {
866        let mut graph = Graph::default();
867        graph.add_source("source", DataType::Log);
868        graph.add_transform("t1", DataType::Log, DataType::Log, vec!["source"]);
869        graph.add_transform("t2", DataType::Log, DataType::Log, vec!["t1"]);
870        graph.add_transform("t3", DataType::Log, DataType::Log, vec!["t1"]);
871        graph.add_sink("sink1", DataType::Log, vec!["t2", "t3"]);
872
873        let paths: Vec<Vec<_>> = graph
874            .paths_to_sink_from(&ComponentKey::from("source"))
875            .into_iter()
876            .map(|keys| keys.into_iter().map(|key| key.to_string()).collect())
877            .collect();
878
879        assert_eq!(paths.len(), 2);
880        assert_eq!(paths[0], vec!["source", "t1", "t3", "sink1"]);
881        assert_eq!(paths[1], vec!["source", "t1", "t2", "sink1"]);
882    }
883}