vector/config/
compiler.rs

1use indexmap::{IndexMap, IndexSet};
2use vector_lib::id::Inputs;
3
4use super::{
5    Config, OutputId, builder::ConfigBuilder, graph::Graph, transform::get_transform_output_ids,
6    validation,
7};
8
9pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
10    let mut errors = Vec::new();
11
12    // component names should not have dots in the configuration file
13    // but components can expand (like route) to have components with a dot
14    // so this check should be done before expanding components
15    if let Err(name_errors) = validation::check_names(
16        builder
17            .transforms
18            .keys()
19            .chain(builder.sources.keys())
20            .chain(builder.sinks.keys()),
21    ) {
22        errors.extend(name_errors);
23    }
24
25    expand_globs(&mut builder);
26
27    if let Err(type_errors) = validation::check_shape(&builder) {
28        errors.extend(type_errors);
29    }
30
31    if let Err(type_errors) = validation::check_resources(&builder) {
32        errors.extend(type_errors);
33    }
34
35    if let Err(output_errors) = validation::check_outputs(&builder) {
36        errors.extend(output_errors);
37    }
38
39    if let Err(alpha_errors) = validation::check_buffer_utilization_ewma_alpha(&builder) {
40        errors.extend(alpha_errors);
41    }
42
43    let ConfigBuilder {
44        global,
45        #[cfg(feature = "api")]
46        api,
47        schema,
48        healthchecks,
49        enrichment_tables,
50        sources,
51        sinks,
52        transforms,
53        tests,
54        provider: _,
55        secret,
56        graceful_shutdown_duration,
57        allow_empty: _,
58    } = builder;
59    let all_sinks = sinks
60        .clone()
61        .into_iter()
62        .chain(
63            enrichment_tables
64                .iter()
65                .filter_map(|(key, table)| table.as_sink(key)),
66        )
67        .collect::<IndexMap<_, _>>();
68    let sources_and_table_sources = sources
69        .clone()
70        .into_iter()
71        .chain(
72            enrichment_tables
73                .iter()
74                .filter_map(|(key, table)| table.as_source(key)),
75        )
76        .collect::<IndexMap<_, _>>();
77
78    let graph = match Graph::new(
79        &sources_and_table_sources,
80        &transforms,
81        &all_sinks,
82        schema,
83        global.wildcard_matching.unwrap_or_default(),
84    ) {
85        Ok(graph) => graph,
86        Err(graph_errors) => {
87            errors.extend(graph_errors);
88            return Err(errors);
89        }
90    };
91
92    if let Err(type_errors) = graph.typecheck() {
93        errors.extend(type_errors);
94    }
95
96    if let Err(e) = graph.check_for_cycles() {
97        errors.push(e);
98    }
99
100    // Inputs are resolved from string into OutputIds as part of graph construction, so update them
101    // here before adding to the final config (the types require this).
102    let sinks = sinks
103        .into_iter()
104        .map(|(key, sink)| {
105            let inputs = graph.inputs_for(&key);
106            (key, sink.with_inputs(inputs))
107        })
108        .collect();
109    let transforms = transforms
110        .into_iter()
111        .map(|(key, transform)| {
112            let inputs = graph.inputs_for(&key);
113            (key, transform.with_inputs(inputs))
114        })
115        .collect();
116    let enrichment_tables = enrichment_tables
117        .into_iter()
118        .map(|(key, table)| {
119            let inputs = graph.inputs_for(&key);
120            (key, table.with_inputs(inputs))
121        })
122        .collect();
123    let tests = tests
124        .into_iter()
125        .map(|test| test.resolve_outputs(&graph))
126        .collect::<Result<Vec<_>, Vec<_>>>()?;
127
128    if errors.is_empty() {
129        let mut config = Config {
130            global,
131            #[cfg(feature = "api")]
132            api,
133            schema,
134            healthchecks,
135            enrichment_tables,
136            sources,
137            sinks,
138            transforms,
139            tests,
140            secret,
141            graceful_shutdown_duration,
142        };
143
144        config.propagate_acknowledgements()?;
145
146        let warnings = validation::warnings(&config);
147
148        Ok((config, warnings))
149    } else {
150        Err(errors)
151    }
152}
153
154/// Expand globs in input lists
155pub(crate) fn expand_globs(config: &mut ConfigBuilder) {
156    let candidates = config
157        .sources
158        .iter()
159        .flat_map(|(key, s)| {
160            s.inner
161                .outputs(config.schema.log_namespace())
162                .into_iter()
163                .map(|output| OutputId {
164                    component: key.clone(),
165                    port: output.port,
166                })
167        })
168        .chain(config.transforms.iter().flat_map(|(key, t)| {
169            get_transform_output_ids(t.inner.as_ref(), key.clone(), config.schema.log_namespace())
170        }))
171        .map(|output_id| output_id.to_string())
172        .collect::<IndexSet<String>>();
173
174    for (id, transform) in config.transforms.iter_mut() {
175        expand_globs_inner(&mut transform.inputs, &id.to_string(), &candidates);
176    }
177
178    for (id, sink) in config.sinks.iter_mut() {
179        expand_globs_inner(&mut sink.inputs, &id.to_string(), &candidates);
180    }
181}
182
183enum InputMatcher {
184    Pattern(glob::Pattern),
185    String(String),
186}
187
188impl InputMatcher {
189    fn matches(&self, candidate: &str) -> bool {
190        use InputMatcher::*;
191
192        match self {
193            Pattern(pattern) => pattern.matches(candidate),
194            String(s) => s == candidate,
195        }
196    }
197}
198
199fn expand_globs_inner(inputs: &mut Inputs<String>, id: &str, candidates: &IndexSet<String>) {
200    let raw_inputs = std::mem::take(inputs);
201    for raw_input in raw_inputs {
202        let matcher = glob::Pattern::new(&raw_input)
203            .map(InputMatcher::Pattern)
204            .unwrap_or_else(|error| {
205                warn!(message = "Invalid glob pattern for input.", component_id = %id, %error);
206                InputMatcher::String(raw_input.to_string())
207            });
208        let mut matched = false;
209        for input in candidates {
210            if matcher.matches(input) && input != id {
211                matched = true;
212                inputs.extend(Some(input.to_string()))
213            }
214        }
215        // If it didn't work as a glob pattern, leave it in the inputs as-is. This lets us give
216        // more accurate error messages about nonexistent inputs.
217        if !matched {
218            inputs.extend(Some(raw_input))
219        }
220    }
221}
222
223#[cfg(test)]
224mod test {
225    use vector_lib::config::ComponentKey;
226
227    use super::*;
228    use crate::test_util::mock::{basic_sink, basic_source, basic_transform};
229
230    #[test]
231    fn glob_expansion() {
232        let mut builder = ConfigBuilder::default();
233        builder.add_source("foo1", basic_source().1);
234        builder.add_source("foo2", basic_source().1);
235        builder.add_source("bar", basic_source().1);
236        builder.add_transform("foos", &["foo*"], basic_transform("", 1.0));
237        builder.add_sink("baz", &["foos*", "b*"], basic_sink(1).1);
238        builder.add_sink("quix", &["*oo*"], basic_sink(1).1);
239        builder.add_sink("quux", &["*"], basic_sink(1).1);
240
241        let config = builder.build().expect("build should succeed");
242
243        assert_eq!(
244            config
245                .transforms
246                .get(&ComponentKey::from("foos"))
247                .map(|item| without_ports(item.inputs.clone()))
248                .unwrap(),
249            vec![ComponentKey::from("foo1"), ComponentKey::from("foo2")]
250        );
251        assert_eq!(
252            config
253                .sinks
254                .get(&ComponentKey::from("baz"))
255                .map(|item| without_ports(item.inputs.clone()))
256                .unwrap(),
257            vec![ComponentKey::from("foos"), ComponentKey::from("bar")]
258        );
259        assert_eq!(
260            config
261                .sinks
262                .get(&ComponentKey::from("quux"))
263                .map(|item| without_ports(item.inputs.clone()))
264                .unwrap(),
265            vec![
266                ComponentKey::from("foo1"),
267                ComponentKey::from("foo2"),
268                ComponentKey::from("bar"),
269                ComponentKey::from("foos")
270            ]
271        );
272        assert_eq!(
273            config
274                .sinks
275                .get(&ComponentKey::from("quix"))
276                .map(|item| without_ports(item.inputs.clone()))
277                .unwrap(),
278            vec![
279                ComponentKey::from("foo1"),
280                ComponentKey::from("foo2"),
281                ComponentKey::from("foos")
282            ]
283        );
284    }
285
286    fn without_ports(outputs: Inputs<OutputId>) -> Vec<ComponentKey> {
287        outputs
288            .into_iter()
289            .map(|output| {
290                assert!(output.port.is_none());
291                output.component
292            })
293            .collect()
294    }
295}