1use super::{
2 builder::ConfigBuilder, graph::Graph, transform::get_transform_output_ids, validation, Config,
3 OutputId,
4};
5
6use indexmap::{IndexMap, IndexSet};
7use vector_lib::id::Inputs;
8
9pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
10 let mut errors = Vec::new();
11
12 if let Err(name_errors) = validation::check_names(
16 builder
17 .transforms
18 .keys()
19 .chain(builder.sources.keys())
20 .chain(builder.sinks.keys()),
21 ) {
22 errors.extend(name_errors);
23 }
24
25 expand_globs(&mut builder);
26
27 if let Err(type_errors) = validation::check_shape(&builder) {
28 errors.extend(type_errors);
29 }
30
31 if let Err(type_errors) = validation::check_resources(&builder) {
32 errors.extend(type_errors);
33 }
34
35 if let Err(output_errors) = validation::check_outputs(&builder) {
36 errors.extend(output_errors);
37 }
38
39 let ConfigBuilder {
40 global,
41 #[cfg(feature = "api")]
42 api,
43 schema,
44 healthchecks,
45 enrichment_tables,
46 sources,
47 sinks,
48 transforms,
49 tests,
50 provider: _,
51 secret,
52 graceful_shutdown_duration,
53 allow_empty: _,
54 } = builder;
55 let all_sinks = sinks
56 .clone()
57 .into_iter()
58 .chain(
59 enrichment_tables
60 .iter()
61 .filter_map(|(key, table)| table.as_sink(key)),
62 )
63 .collect::<IndexMap<_, _>>();
64 let sources_and_table_sources = sources
65 .clone()
66 .into_iter()
67 .chain(
68 enrichment_tables
69 .iter()
70 .filter_map(|(key, table)| table.as_source(key)),
71 )
72 .collect::<IndexMap<_, _>>();
73
74 let graph = match Graph::new(
75 &sources_and_table_sources,
76 &transforms,
77 &all_sinks,
78 schema,
79 global.wildcard_matching.unwrap_or_default(),
80 ) {
81 Ok(graph) => graph,
82 Err(graph_errors) => {
83 errors.extend(graph_errors);
84 return Err(errors);
85 }
86 };
87
88 if let Err(type_errors) = graph.typecheck() {
89 errors.extend(type_errors);
90 }
91
92 if let Err(e) = graph.check_for_cycles() {
93 errors.push(e);
94 }
95
96 let sinks = sinks
99 .into_iter()
100 .map(|(key, sink)| {
101 let inputs = graph.inputs_for(&key);
102 (key, sink.with_inputs(inputs))
103 })
104 .collect();
105 let transforms = transforms
106 .into_iter()
107 .map(|(key, transform)| {
108 let inputs = graph.inputs_for(&key);
109 (key, transform.with_inputs(inputs))
110 })
111 .collect();
112 let enrichment_tables = enrichment_tables
113 .into_iter()
114 .map(|(key, table)| {
115 let inputs = graph.inputs_for(&key);
116 (key, table.with_inputs(inputs))
117 })
118 .collect();
119 let tests = tests
120 .into_iter()
121 .map(|test| test.resolve_outputs(&graph))
122 .collect::<Result<Vec<_>, Vec<_>>>()?;
123
124 if errors.is_empty() {
125 let mut config = Config {
126 global,
127 #[cfg(feature = "api")]
128 api,
129 schema,
130 healthchecks,
131 enrichment_tables,
132 sources,
133 sinks,
134 transforms,
135 tests,
136 secret,
137 graceful_shutdown_duration,
138 };
139
140 config.propagate_acknowledgements()?;
141
142 let warnings = validation::warnings(&config);
143
144 Ok((config, warnings))
145 } else {
146 Err(errors)
147 }
148}
149
150pub(crate) fn expand_globs(config: &mut ConfigBuilder) {
152 let candidates = config
153 .sources
154 .iter()
155 .flat_map(|(key, s)| {
156 s.inner
157 .outputs(config.schema.log_namespace())
158 .into_iter()
159 .map(|output| OutputId {
160 component: key.clone(),
161 port: output.port,
162 })
163 })
164 .chain(config.transforms.iter().flat_map(|(key, t)| {
165 get_transform_output_ids(t.inner.as_ref(), key.clone(), config.schema.log_namespace())
166 }))
167 .map(|output_id| output_id.to_string())
168 .collect::<IndexSet<String>>();
169
170 for (id, transform) in config.transforms.iter_mut() {
171 expand_globs_inner(&mut transform.inputs, &id.to_string(), &candidates);
172 }
173
174 for (id, sink) in config.sinks.iter_mut() {
175 expand_globs_inner(&mut sink.inputs, &id.to_string(), &candidates);
176 }
177}
178
179enum InputMatcher {
180 Pattern(glob::Pattern),
181 String(String),
182}
183
184impl InputMatcher {
185 fn matches(&self, candidate: &str) -> bool {
186 use InputMatcher::*;
187
188 match self {
189 Pattern(pattern) => pattern.matches(candidate),
190 String(s) => s == candidate,
191 }
192 }
193}
194
195fn expand_globs_inner(inputs: &mut Inputs<String>, id: &str, candidates: &IndexSet<String>) {
196 let raw_inputs = std::mem::take(inputs);
197 for raw_input in raw_inputs {
198 let matcher = glob::Pattern::new(&raw_input)
199 .map(InputMatcher::Pattern)
200 .unwrap_or_else(|error| {
201 warn!(message = "Invalid glob pattern for input.", component_id = %id, %error);
202 InputMatcher::String(raw_input.to_string())
203 });
204 let mut matched = false;
205 for input in candidates {
206 if matcher.matches(input) && input != id {
207 matched = true;
208 inputs.extend(Some(input.to_string()))
209 }
210 }
211 if !matched {
214 inputs.extend(Some(raw_input))
215 }
216 }
217}
218
219#[cfg(test)]
220mod test {
221 use super::*;
222 use crate::test_util::mock::{basic_sink, basic_source, basic_transform};
223 use vector_lib::config::ComponentKey;
224
225 #[test]
226 fn glob_expansion() {
227 let mut builder = ConfigBuilder::default();
228 builder.add_source("foo1", basic_source().1);
229 builder.add_source("foo2", basic_source().1);
230 builder.add_source("bar", basic_source().1);
231 builder.add_transform("foos", &["foo*"], basic_transform("", 1.0));
232 builder.add_sink("baz", &["foos*", "b*"], basic_sink(1).1);
233 builder.add_sink("quix", &["*oo*"], basic_sink(1).1);
234 builder.add_sink("quux", &["*"], basic_sink(1).1);
235
236 let config = builder.build().expect("build should succeed");
237
238 assert_eq!(
239 config
240 .transforms
241 .get(&ComponentKey::from("foos"))
242 .map(|item| without_ports(item.inputs.clone()))
243 .unwrap(),
244 vec![ComponentKey::from("foo1"), ComponentKey::from("foo2")]
245 );
246 assert_eq!(
247 config
248 .sinks
249 .get(&ComponentKey::from("baz"))
250 .map(|item| without_ports(item.inputs.clone()))
251 .unwrap(),
252 vec![ComponentKey::from("foos"), ComponentKey::from("bar")]
253 );
254 assert_eq!(
255 config
256 .sinks
257 .get(&ComponentKey::from("quux"))
258 .map(|item| without_ports(item.inputs.clone()))
259 .unwrap(),
260 vec![
261 ComponentKey::from("foo1"),
262 ComponentKey::from("foo2"),
263 ComponentKey::from("bar"),
264 ComponentKey::from("foos")
265 ]
266 );
267 assert_eq!(
268 config
269 .sinks
270 .get(&ComponentKey::from("quix"))
271 .map(|item| without_ports(item.inputs.clone()))
272 .unwrap(),
273 vec![
274 ComponentKey::from("foo1"),
275 ComponentKey::from("foo2"),
276 ComponentKey::from("foos")
277 ]
278 );
279 }
280
281 fn without_ports(outputs: Inputs<OutputId>) -> Vec<ComponentKey> {
282 outputs
283 .into_iter()
284 .map(|output| {
285 assert!(output.port.is_none());
286 output.component
287 })
288 .collect()
289 }
290}