1use std::{collections::HashMap, path::PathBuf};
2
3use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream};
4use heim::{disk::Partition, units::information::byte};
5use indexmap::IndexMap;
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9 ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
10 transform::get_transform_output_ids,
11};
12use crate::config::schema;
13
14pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
16 if config.provider.is_some()
17 && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
18 {
19 Err(vec![
20 "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
21 ])
22 } else {
23 Ok(())
24 }
25}
26
27pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
28 let errors: Vec<_> = names
29 .filter(|component_key| component_key.id().contains('.'))
30 .map(|component_key| {
31 format!(
32 "Component name \"{}\" should not contain a \".\"",
33 component_key.id()
34 )
35 })
36 .collect();
37
38 if errors.is_empty() {
39 Ok(())
40 } else {
41 Err(errors)
42 }
43}
44
45pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
46 let mut errors = vec![];
47
48 if !config.allow_empty {
49 if config.sources.is_empty() {
50 errors.push("No sources defined in the config.".to_owned());
51 }
52
53 if config.sinks.is_empty() {
54 errors.push("No sinks defined in the config.".to_owned());
55 }
56 }
57
58 fn tagged<'a>(
60 tag: &'static str,
61 iter: impl Iterator<Item = &'a ComponentKey>,
62 ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
63 iter.map(move |x| (tag, x))
64 }
65
66 let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
68 for (ctype, id) in tagged("source", config.sources.keys())
69 .chain(tagged("transform", config.transforms.keys()))
70 .chain(tagged("sink", config.sinks.keys()))
71 {
72 let uses = used_keys.entry(id).or_default();
73 uses.push(ctype);
74 }
75
76 for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
77 errors.push(format!(
78 "More than one component with name \"{}\" ({}).",
79 id,
80 uses.join(", ")
81 ));
82 }
83
84 let sink_inputs = config
86 .sinks
87 .iter()
88 .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
89 let transform_inputs = config
90 .transforms
91 .iter()
92 .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
93 for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
94 if inputs.is_empty() {
95 errors.push(format!(
96 "{} \"{}\" has no inputs",
97 capitalize(output_type),
98 key
99 ));
100 }
101
102 let mut frequencies = HashMap::new();
103 for input in inputs {
104 let entry = frequencies.entry(input).or_insert(0usize);
105 *entry += 1;
106 }
107
108 for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
109 errors.push(format!(
110 "{} \"{}\" has input \"{}\" duplicated {} times",
111 capitalize(output_type),
112 key,
113 dup,
114 count,
115 ));
116 }
117 }
118
119 if errors.is_empty() {
120 Ok(())
121 } else {
122 Err(errors)
123 }
124}
125
126pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
127 let source_resources = config
128 .sources
129 .iter()
130 .map(|(id, config)| (id, config.inner.resources()));
131 let sink_resources = config
132 .sinks
133 .iter()
134 .map(|(id, config)| (id, config.resources(id)));
135
136 let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
137
138 if conflicting_components.is_empty() {
139 Ok(())
140 } else {
141 Err(conflicting_components
142 .into_iter()
143 .map(|(resource, components)| {
144 format!("Resource `{resource}` is claimed by multiple components: {components:?}")
145 })
146 .collect())
147 }
148}
149
150pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
153 let mut errors = Vec::new();
154 for (key, source) in config.sources.iter() {
155 let outputs = source.inner.outputs(config.schema.log_namespace());
156 if outputs
157 .iter()
158 .map(|output| output.port.as_deref().unwrap_or(""))
159 .any(|name| name == DEFAULT_OUTPUT)
160 {
161 errors.push(format!(
162 "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
163 ));
164 }
165 }
166
167 for (key, transform) in config.transforms.iter() {
168 let definition = schema::Definition::any();
170
171 if let Err(errs) = transform.inner.validate(&definition) {
172 errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
173 }
174
175 if get_transform_output_ids(
176 transform.inner.as_ref(),
177 key.clone(),
178 config.schema.log_namespace(),
179 )
180 .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
181 {
182 errors.push(format!(
183 "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
184 ));
185 }
186 }
187
188 if errors.is_empty() {
189 Ok(())
190 } else {
191 Err(errors)
192 }
193}
194
195pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
196 let global_data_dir = config.global.data_dir.clone();
209 let configured_disk_buffers = config
210 .sinks()
211 .flat_map(|(id, sink)| {
212 sink.buffer
213 .stages()
214 .iter()
215 .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
216 })
217 .collect::<Vec<_>>();
218
219 if configured_disk_buffers.is_empty() {
220 return Ok(());
221 }
222
223 let mountpoints = heim::disk::partitions()
227 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
228 .or_else(|_| {
229 heim::disk::partitions_physical()
230 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
231 })
232 .await;
233
234 let mountpoints = match mountpoints {
235 Ok(mut mountpoints) => {
236 mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
237 mountpoints
238 }
239 Err(e) => {
240 warn!(
241 cause = %e,
242 message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
243 );
244 return Ok(());
245 }
246 };
247
248 let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
250 HashMap::new(),
251 |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
252 let canonicalized_data_dir = usage
253 .data_dir()
254 .canonicalize()
255 .unwrap_or_else(|_| usage.data_dir().to_path_buf());
256 let mountpoint = mountpoints
257 .keys()
258 .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
259
260 match mountpoint {
261 None => warn!(
262 buffer_id = usage.id().id(),
263 data_dir = usage.data_dir().to_string_lossy().as_ref(),
264 canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
265 message = "Found no matching mountpoint for buffer data directory.",
266 ),
267 Some(mountpoint) => {
268 mappings.entry(mountpoint.clone()).or_default().push(usage);
269 }
270 }
271
272 mappings
273 },
274 );
275
276 let mut errors = Vec::new();
283
284 for (mountpoint, buffers) in mountpoint_buffer_mapping {
285 let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
286 let mountpoint_total_capacity = mountpoints
287 .get(&mountpoint)
288 .copied()
289 .expect("mountpoint must exist");
290
291 if buffer_max_size_total > mountpoint_total_capacity {
292 let component_ids = buffers
293 .iter()
294 .map(|usage| usage.id().id())
295 .collect::<Vec<_>>();
296 errors.push(format!(
297 "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
298Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
299 mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
300 ));
301 }
302 }
303
304 if errors.is_empty() {
305 Ok(())
306 } else {
307 Err(errors)
308 }
309}
310
311async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
312 stream::iter(partitions)
313 .map(Ok)
314 .and_then(|partition| {
315 let mountpoint_path = partition.mount_point().to_path_buf();
316 heim::disk::usage(mountpoint_path.clone())
317 .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
318 })
319 .try_collect::<IndexMap<_, _>>()
320 .await
321}
322
323pub fn warnings(config: &Config) -> Vec<String> {
324 let mut warnings = vec![];
325
326 let table_sources = config
327 .enrichment_tables
328 .iter()
329 .filter_map(|(key, table)| table.as_source(key))
330 .collect::<Vec<_>>();
331 let source_ids = config
332 .sources
333 .iter()
334 .chain(table_sources.iter().map(|(k, s)| (k, s)))
335 .flat_map(|(key, source)| {
336 source
337 .inner
338 .outputs(config.schema.log_namespace())
339 .iter()
340 .map(|output| {
341 if let Some(port) = &output.port {
342 ("source", OutputId::from((key, port.clone())))
343 } else {
344 ("source", OutputId::from(key))
345 }
346 })
347 .collect::<Vec<_>>()
348 });
349 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
350 get_transform_output_ids(
351 transform.inner.as_ref(),
352 key.clone(),
353 config.schema.log_namespace(),
354 )
355 .map(|output| ("transform", output))
356 .collect::<Vec<_>>()
357 });
358
359 let table_sinks = config
360 .enrichment_tables
361 .iter()
362 .filter_map(|(key, table)| table.as_sink(key))
363 .collect::<Vec<_>>();
364 for (input_type, id) in transform_ids.chain(source_ids) {
365 if !config
366 .transforms
367 .iter()
368 .any(|(_, transform)| transform.inputs.contains(&id))
369 && !config
370 .sinks
371 .iter()
372 .any(|(_, sink)| sink.inputs.contains(&id))
373 && !table_sinks
374 .iter()
375 .any(|(_, sink)| sink.inputs.contains(&id))
376 {
377 warnings.push(format!(
378 "{} \"{}\" has no consumers",
379 capitalize(input_type),
380 id
381 ));
382 }
383 }
384
385 warnings
386}
387
388fn capitalize(s: &str) -> String {
389 let mut s = s.to_owned();
390 if let Some(r) = s.get_mut(0..1) {
391 r.make_ascii_uppercase();
392 }
393 s
394}