1use indexmap::{IndexMap, IndexSet};
2use vector_lib::id::Inputs;
3
4use super::{
5 Config, OutputId, builder::ConfigBuilder, graph::Graph, transform::get_transform_output_ids,
6 validation,
7};
8
9pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
10 let mut errors = Vec::new();
11
12 if let Err(name_errors) = validation::check_names(
16 builder
17 .transforms
18 .keys()
19 .chain(builder.sources.keys())
20 .chain(builder.sinks.keys()),
21 ) {
22 errors.extend(name_errors);
23 }
24
25 expand_globs(&mut builder);
26
27 if let Err(type_errors) = validation::check_shape(&builder) {
28 errors.extend(type_errors);
29 }
30
31 if let Err(type_errors) = validation::check_resources(&builder) {
32 errors.extend(type_errors);
33 }
34
35 if let Err(output_errors) = validation::check_outputs(&builder) {
36 errors.extend(output_errors);
37 }
38
39 if let Err(alpha_errors) = validation::check_buffer_utilization_ewma_alpha(&builder) {
40 errors.extend(alpha_errors);
41 }
42
43 let ConfigBuilder {
44 global,
45 #[cfg(feature = "api")]
46 api,
47 schema,
48 healthchecks,
49 enrichment_tables,
50 sources,
51 sinks,
52 transforms,
53 tests,
54 provider: _,
55 secret,
56 graceful_shutdown_duration,
57 allow_empty: _,
58 } = builder;
59 let all_sinks = sinks
60 .clone()
61 .into_iter()
62 .chain(
63 enrichment_tables
64 .iter()
65 .filter_map(|(key, table)| table.as_sink(key)),
66 )
67 .collect::<IndexMap<_, _>>();
68 let sources_and_table_sources = sources
69 .clone()
70 .into_iter()
71 .chain(
72 enrichment_tables
73 .iter()
74 .filter_map(|(key, table)| table.as_source(key)),
75 )
76 .collect::<IndexMap<_, _>>();
77
78 let graph = match Graph::new(
79 &sources_and_table_sources,
80 &transforms,
81 &all_sinks,
82 schema,
83 global.wildcard_matching.unwrap_or_default(),
84 ) {
85 Ok(graph) => graph,
86 Err(graph_errors) => {
87 errors.extend(graph_errors);
88 return Err(errors);
89 }
90 };
91
92 if let Err(type_errors) = graph.typecheck() {
93 errors.extend(type_errors);
94 }
95
96 if let Err(e) = graph.check_for_cycles() {
97 errors.push(e);
98 }
99
100 let sinks = sinks
103 .into_iter()
104 .map(|(key, sink)| {
105 let inputs = graph.inputs_for(&key);
106 (key, sink.with_inputs(inputs))
107 })
108 .collect();
109 let transforms = transforms
110 .into_iter()
111 .map(|(key, transform)| {
112 let inputs = graph.inputs_for(&key);
113 (key, transform.with_inputs(inputs))
114 })
115 .collect();
116 let enrichment_tables = enrichment_tables
117 .into_iter()
118 .map(|(key, table)| {
119 let inputs = graph.inputs_for(&key);
120 (key, table.with_inputs(inputs))
121 })
122 .collect();
123 let tests = tests
124 .into_iter()
125 .map(|test| test.resolve_outputs(&graph))
126 .collect::<Result<Vec<_>, Vec<_>>>()?;
127
128 if errors.is_empty() {
129 let mut config = Config {
130 global,
131 #[cfg(feature = "api")]
132 api,
133 schema,
134 healthchecks,
135 enrichment_tables,
136 sources,
137 sinks,
138 transforms,
139 tests,
140 secret,
141 graceful_shutdown_duration,
142 };
143
144 config.propagate_acknowledgements()?;
145
146 let warnings = validation::warnings(&config);
147
148 Ok((config, warnings))
149 } else {
150 Err(errors)
151 }
152}
153
154pub(crate) fn expand_globs(config: &mut ConfigBuilder) {
156 let candidates = config
157 .sources
158 .iter()
159 .flat_map(|(key, s)| {
160 s.inner
161 .outputs(config.schema.log_namespace())
162 .into_iter()
163 .map(|output| OutputId {
164 component: key.clone(),
165 port: output.port,
166 })
167 })
168 .chain(config.transforms.iter().flat_map(|(key, t)| {
169 get_transform_output_ids(t.inner.as_ref(), key.clone(), config.schema.log_namespace())
170 }))
171 .map(|output_id| output_id.to_string())
172 .collect::<IndexSet<String>>();
173
174 for (id, transform) in config.transforms.iter_mut() {
175 expand_globs_inner(&mut transform.inputs, &id.to_string(), &candidates);
176 }
177
178 for (id, sink) in config.sinks.iter_mut() {
179 expand_globs_inner(&mut sink.inputs, &id.to_string(), &candidates);
180 }
181}
182
183enum InputMatcher {
184 Pattern(glob::Pattern),
185 String(String),
186}
187
188impl InputMatcher {
189 fn matches(&self, candidate: &str) -> bool {
190 use InputMatcher::*;
191
192 match self {
193 Pattern(pattern) => pattern.matches(candidate),
194 String(s) => s == candidate,
195 }
196 }
197}
198
199fn expand_globs_inner(inputs: &mut Inputs<String>, id: &str, candidates: &IndexSet<String>) {
200 let raw_inputs = std::mem::take(inputs);
201 for raw_input in raw_inputs {
202 let matcher = glob::Pattern::new(&raw_input)
203 .map(InputMatcher::Pattern)
204 .unwrap_or_else(|error| {
205 warn!(message = "Invalid glob pattern for input.", component_id = %id, %error);
206 InputMatcher::String(raw_input.to_string())
207 });
208 let mut matched = false;
209 for input in candidates {
210 if matcher.matches(input) && input != id {
211 matched = true;
212 inputs.extend(Some(input.to_string()))
213 }
214 }
215 if !matched {
218 inputs.extend(Some(raw_input))
219 }
220 }
221}
222
223#[cfg(test)]
224mod test {
225 use vector_lib::config::ComponentKey;
226
227 use super::*;
228 use crate::test_util::mock::{basic_sink, basic_source, basic_transform};
229
230 #[test]
231 fn glob_expansion() {
232 let mut builder = ConfigBuilder::default();
233 builder.add_source("foo1", basic_source().1);
234 builder.add_source("foo2", basic_source().1);
235 builder.add_source("bar", basic_source().1);
236 builder.add_transform("foos", &["foo*"], basic_transform("", 1.0));
237 builder.add_sink("baz", &["foos*", "b*"], basic_sink(1).1);
238 builder.add_sink("quix", &["*oo*"], basic_sink(1).1);
239 builder.add_sink("quux", &["*"], basic_sink(1).1);
240
241 let config = builder.build().expect("build should succeed");
242
243 assert_eq!(
244 config
245 .transforms
246 .get(&ComponentKey::from("foos"))
247 .map(|item| without_ports(item.inputs.clone()))
248 .unwrap(),
249 vec![ComponentKey::from("foo1"), ComponentKey::from("foo2")]
250 );
251 assert_eq!(
252 config
253 .sinks
254 .get(&ComponentKey::from("baz"))
255 .map(|item| without_ports(item.inputs.clone()))
256 .unwrap(),
257 vec![ComponentKey::from("foos"), ComponentKey::from("bar")]
258 );
259 assert_eq!(
260 config
261 .sinks
262 .get(&ComponentKey::from("quux"))
263 .map(|item| without_ports(item.inputs.clone()))
264 .unwrap(),
265 vec![
266 ComponentKey::from("foo1"),
267 ComponentKey::from("foo2"),
268 ComponentKey::from("bar"),
269 ComponentKey::from("foos")
270 ]
271 );
272 assert_eq!(
273 config
274 .sinks
275 .get(&ComponentKey::from("quix"))
276 .map(|item| without_ports(item.inputs.clone()))
277 .unwrap(),
278 vec![
279 ComponentKey::from("foo1"),
280 ComponentKey::from("foo2"),
281 ComponentKey::from("foos")
282 ]
283 );
284 }
285
286 fn without_ports(outputs: Inputs<OutputId>) -> Vec<ComponentKey> {
287 outputs
288 .into_iter()
289 .map(|output| {
290 assert!(output.port.is_none());
291 output.component
292 })
293 .collect()
294 }
295}