vector/config/
compiler.rs

1use super::{
2    builder::ConfigBuilder, graph::Graph, transform::get_transform_output_ids, validation, Config,
3    OutputId,
4};
5
6use indexmap::{IndexMap, IndexSet};
7use vector_lib::id::Inputs;
8
9pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
10    let mut errors = Vec::new();
11
12    // component names should not have dots in the configuration file
13    // but components can expand (like route) to have components with a dot
14    // so this check should be done before expanding components
15    if let Err(name_errors) = validation::check_names(
16        builder
17            .transforms
18            .keys()
19            .chain(builder.sources.keys())
20            .chain(builder.sinks.keys()),
21    ) {
22        errors.extend(name_errors);
23    }
24
25    expand_globs(&mut builder);
26
27    if let Err(type_errors) = validation::check_shape(&builder) {
28        errors.extend(type_errors);
29    }
30
31    if let Err(type_errors) = validation::check_resources(&builder) {
32        errors.extend(type_errors);
33    }
34
35    if let Err(output_errors) = validation::check_outputs(&builder) {
36        errors.extend(output_errors);
37    }
38
39    let ConfigBuilder {
40        global,
41        #[cfg(feature = "api")]
42        api,
43        schema,
44        healthchecks,
45        enrichment_tables,
46        sources,
47        sinks,
48        transforms,
49        tests,
50        provider: _,
51        secret,
52        graceful_shutdown_duration,
53        allow_empty: _,
54    } = builder;
55    let all_sinks = sinks
56        .clone()
57        .into_iter()
58        .chain(
59            enrichment_tables
60                .iter()
61                .filter_map(|(key, table)| table.as_sink(key)),
62        )
63        .collect::<IndexMap<_, _>>();
64    let sources_and_table_sources = sources
65        .clone()
66        .into_iter()
67        .chain(
68            enrichment_tables
69                .iter()
70                .filter_map(|(key, table)| table.as_source(key)),
71        )
72        .collect::<IndexMap<_, _>>();
73
74    let graph = match Graph::new(
75        &sources_and_table_sources,
76        &transforms,
77        &all_sinks,
78        schema,
79        global.wildcard_matching.unwrap_or_default(),
80    ) {
81        Ok(graph) => graph,
82        Err(graph_errors) => {
83            errors.extend(graph_errors);
84            return Err(errors);
85        }
86    };
87
88    if let Err(type_errors) = graph.typecheck() {
89        errors.extend(type_errors);
90    }
91
92    if let Err(e) = graph.check_for_cycles() {
93        errors.push(e);
94    }
95
96    // Inputs are resolved from string into OutputIds as part of graph construction, so update them
97    // here before adding to the final config (the types require this).
98    let sinks = sinks
99        .into_iter()
100        .map(|(key, sink)| {
101            let inputs = graph.inputs_for(&key);
102            (key, sink.with_inputs(inputs))
103        })
104        .collect();
105    let transforms = transforms
106        .into_iter()
107        .map(|(key, transform)| {
108            let inputs = graph.inputs_for(&key);
109            (key, transform.with_inputs(inputs))
110        })
111        .collect();
112    let enrichment_tables = enrichment_tables
113        .into_iter()
114        .map(|(key, table)| {
115            let inputs = graph.inputs_for(&key);
116            (key, table.with_inputs(inputs))
117        })
118        .collect();
119    let tests = tests
120        .into_iter()
121        .map(|test| test.resolve_outputs(&graph))
122        .collect::<Result<Vec<_>, Vec<_>>>()?;
123
124    if errors.is_empty() {
125        let mut config = Config {
126            global,
127            #[cfg(feature = "api")]
128            api,
129            schema,
130            healthchecks,
131            enrichment_tables,
132            sources,
133            sinks,
134            transforms,
135            tests,
136            secret,
137            graceful_shutdown_duration,
138        };
139
140        config.propagate_acknowledgements()?;
141
142        let warnings = validation::warnings(&config);
143
144        Ok((config, warnings))
145    } else {
146        Err(errors)
147    }
148}
149
150/// Expand globs in input lists
151pub(crate) fn expand_globs(config: &mut ConfigBuilder) {
152    let candidates = config
153        .sources
154        .iter()
155        .flat_map(|(key, s)| {
156            s.inner
157                .outputs(config.schema.log_namespace())
158                .into_iter()
159                .map(|output| OutputId {
160                    component: key.clone(),
161                    port: output.port,
162                })
163        })
164        .chain(config.transforms.iter().flat_map(|(key, t)| {
165            get_transform_output_ids(t.inner.as_ref(), key.clone(), config.schema.log_namespace())
166        }))
167        .map(|output_id| output_id.to_string())
168        .collect::<IndexSet<String>>();
169
170    for (id, transform) in config.transforms.iter_mut() {
171        expand_globs_inner(&mut transform.inputs, &id.to_string(), &candidates);
172    }
173
174    for (id, sink) in config.sinks.iter_mut() {
175        expand_globs_inner(&mut sink.inputs, &id.to_string(), &candidates);
176    }
177}
178
179enum InputMatcher {
180    Pattern(glob::Pattern),
181    String(String),
182}
183
184impl InputMatcher {
185    fn matches(&self, candidate: &str) -> bool {
186        use InputMatcher::*;
187
188        match self {
189            Pattern(pattern) => pattern.matches(candidate),
190            String(s) => s == candidate,
191        }
192    }
193}
194
195fn expand_globs_inner(inputs: &mut Inputs<String>, id: &str, candidates: &IndexSet<String>) {
196    let raw_inputs = std::mem::take(inputs);
197    for raw_input in raw_inputs {
198        let matcher = glob::Pattern::new(&raw_input)
199            .map(InputMatcher::Pattern)
200            .unwrap_or_else(|error| {
201                warn!(message = "Invalid glob pattern for input.", component_id = %id, %error);
202                InputMatcher::String(raw_input.to_string())
203            });
204        let mut matched = false;
205        for input in candidates {
206            if matcher.matches(input) && input != id {
207                matched = true;
208                inputs.extend(Some(input.to_string()))
209            }
210        }
211        // If it didn't work as a glob pattern, leave it in the inputs as-is. This lets us give
212        // more accurate error messages about nonexistent inputs.
213        if !matched {
214            inputs.extend(Some(raw_input))
215        }
216    }
217}
218
219#[cfg(test)]
220mod test {
221    use super::*;
222    use crate::test_util::mock::{basic_sink, basic_source, basic_transform};
223    use vector_lib::config::ComponentKey;
224
225    #[test]
226    fn glob_expansion() {
227        let mut builder = ConfigBuilder::default();
228        builder.add_source("foo1", basic_source().1);
229        builder.add_source("foo2", basic_source().1);
230        builder.add_source("bar", basic_source().1);
231        builder.add_transform("foos", &["foo*"], basic_transform("", 1.0));
232        builder.add_sink("baz", &["foos*", "b*"], basic_sink(1).1);
233        builder.add_sink("quix", &["*oo*"], basic_sink(1).1);
234        builder.add_sink("quux", &["*"], basic_sink(1).1);
235
236        let config = builder.build().expect("build should succeed");
237
238        assert_eq!(
239            config
240                .transforms
241                .get(&ComponentKey::from("foos"))
242                .map(|item| without_ports(item.inputs.clone()))
243                .unwrap(),
244            vec![ComponentKey::from("foo1"), ComponentKey::from("foo2")]
245        );
246        assert_eq!(
247            config
248                .sinks
249                .get(&ComponentKey::from("baz"))
250                .map(|item| without_ports(item.inputs.clone()))
251                .unwrap(),
252            vec![ComponentKey::from("foos"), ComponentKey::from("bar")]
253        );
254        assert_eq!(
255            config
256                .sinks
257                .get(&ComponentKey::from("quux"))
258                .map(|item| without_ports(item.inputs.clone()))
259                .unwrap(),
260            vec![
261                ComponentKey::from("foo1"),
262                ComponentKey::from("foo2"),
263                ComponentKey::from("bar"),
264                ComponentKey::from("foos")
265            ]
266        );
267        assert_eq!(
268            config
269                .sinks
270                .get(&ComponentKey::from("quix"))
271                .map(|item| without_ports(item.inputs.clone()))
272                .unwrap(),
273            vec![
274                ComponentKey::from("foo1"),
275                ComponentKey::from("foo2"),
276                ComponentKey::from("foos")
277            ]
278        );
279    }
280
281    fn without_ports(outputs: Inputs<OutputId>) -> Vec<ComponentKey> {
282        outputs
283            .into_iter()
284            .map(|output| {
285                assert!(output.port.is_none());
286                output.component
287            })
288            .collect()
289    }
290}