1use crate::config::schema;
2use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
3use heim::{disk::Partition, units::information::byte};
4use indexmap::IndexMap;
5use std::{collections::HashMap, path::PathBuf};
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9 builder::ConfigBuilder, transform::get_transform_output_ids, ComponentKey, Config, OutputId,
10 Resource,
11};
12
13pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
15 if config.provider.is_some()
16 && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
17 {
18 Err(vec![
19 "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
20 ])
21 } else {
22 Ok(())
23 }
24}
25
26pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
27 let errors: Vec<_> = names
28 .filter(|component_key| component_key.id().contains('.'))
29 .map(|component_key| {
30 format!(
31 "Component name \"{}\" should not contain a \".\"",
32 component_key.id()
33 )
34 })
35 .collect();
36
37 if errors.is_empty() {
38 Ok(())
39 } else {
40 Err(errors)
41 }
42}
43
44pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
45 let mut errors = vec![];
46
47 if !config.allow_empty {
48 if config.sources.is_empty() {
49 errors.push("No sources defined in the config.".to_owned());
50 }
51
52 if config.sinks.is_empty() {
53 errors.push("No sinks defined in the config.".to_owned());
54 }
55 }
56
57 fn tagged<'a>(
59 tag: &'static str,
60 iter: impl Iterator<Item = &'a ComponentKey>,
61 ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
62 iter.map(move |x| (tag, x))
63 }
64
65 let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
67 for (ctype, id) in tagged("source", config.sources.keys())
68 .chain(tagged("transform", config.transforms.keys()))
69 .chain(tagged("sink", config.sinks.keys()))
70 {
71 let uses = used_keys.entry(id).or_default();
72 uses.push(ctype);
73 }
74
75 for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
76 errors.push(format!(
77 "More than one component with name \"{}\" ({}).",
78 id,
79 uses.join(", ")
80 ));
81 }
82
83 let sink_inputs = config
85 .sinks
86 .iter()
87 .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
88 let transform_inputs = config
89 .transforms
90 .iter()
91 .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
92 for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
93 if inputs.is_empty() {
94 errors.push(format!(
95 "{} \"{}\" has no inputs",
96 capitalize(output_type),
97 key
98 ));
99 }
100
101 let mut frequencies = HashMap::new();
102 for input in inputs {
103 let entry = frequencies.entry(input).or_insert(0usize);
104 *entry += 1;
105 }
106
107 for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
108 errors.push(format!(
109 "{} \"{}\" has input \"{}\" duplicated {} times",
110 capitalize(output_type),
111 key,
112 dup,
113 count,
114 ));
115 }
116 }
117
118 if errors.is_empty() {
119 Ok(())
120 } else {
121 Err(errors)
122 }
123}
124
125pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
126 let source_resources = config
127 .sources
128 .iter()
129 .map(|(id, config)| (id, config.inner.resources()));
130 let sink_resources = config
131 .sinks
132 .iter()
133 .map(|(id, config)| (id, config.resources(id)));
134
135 let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
136
137 if conflicting_components.is_empty() {
138 Ok(())
139 } else {
140 Err(conflicting_components
141 .into_iter()
142 .map(|(resource, components)| {
143 format!("Resource `{resource}` is claimed by multiple components: {components:?}")
144 })
145 .collect())
146 }
147}
148
149pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
152 let mut errors = Vec::new();
153 for (key, source) in config.sources.iter() {
154 let outputs = source.inner.outputs(config.schema.log_namespace());
155 if outputs
156 .iter()
157 .map(|output| output.port.as_deref().unwrap_or(""))
158 .any(|name| name == DEFAULT_OUTPUT)
159 {
160 errors.push(format!(
161 "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
162 ));
163 }
164 }
165
166 for (key, transform) in config.transforms.iter() {
167 let definition = schema::Definition::any();
169
170 if let Err(errs) = transform.inner.validate(&definition) {
171 errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
172 }
173
174 if get_transform_output_ids(
175 transform.inner.as_ref(),
176 key.clone(),
177 config.schema.log_namespace(),
178 )
179 .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
180 {
181 errors.push(format!(
182 "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
183 ));
184 }
185 }
186
187 if errors.is_empty() {
188 Ok(())
189 } else {
190 Err(errors)
191 }
192}
193
194pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
195 let global_data_dir = config.global.data_dir.clone();
208 let configured_disk_buffers = config
209 .sinks()
210 .flat_map(|(id, sink)| {
211 sink.buffer
212 .stages()
213 .iter()
214 .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
215 })
216 .collect::<Vec<_>>();
217
218 if configured_disk_buffers.is_empty() {
219 return Ok(());
220 }
221
222 let mountpoints = heim::disk::partitions()
226 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
227 .or_else(|_| {
228 heim::disk::partitions_physical()
229 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
230 })
231 .await;
232
233 let mountpoints = match mountpoints {
234 Ok(mut mountpoints) => {
235 mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
236 mountpoints
237 }
238 Err(e) => {
239 warn!(
240 cause = %e,
241 message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
242 );
243 return Ok(());
244 }
245 };
246
247 let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
249 HashMap::new(),
250 |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
251 let canonicalized_data_dir = usage
252 .data_dir()
253 .canonicalize()
254 .unwrap_or_else(|_| usage.data_dir().to_path_buf());
255 let mountpoint = mountpoints
256 .keys()
257 .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
258
259 match mountpoint {
260 None => warn!(
261 buffer_id = usage.id().id(),
262 data_dir = usage.data_dir().to_string_lossy().as_ref(),
263 canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
264 message = "Found no matching mountpoint for buffer data directory.",
265 ),
266 Some(mountpoint) => {
267 mappings.entry(mountpoint.clone()).or_default().push(usage);
268 }
269 }
270
271 mappings
272 },
273 );
274
275 let mut errors = Vec::new();
282
283 for (mountpoint, buffers) in mountpoint_buffer_mapping {
284 let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
285 let mountpoint_total_capacity = mountpoints
286 .get(&mountpoint)
287 .copied()
288 .expect("mountpoint must exist");
289
290 if buffer_max_size_total > mountpoint_total_capacity {
291 let component_ids = buffers
292 .iter()
293 .map(|usage| usage.id().id())
294 .collect::<Vec<_>>();
295 errors.push(format!(
296 "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
297Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
298 mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
299 ));
300 }
301 }
302
303 if errors.is_empty() {
304 Ok(())
305 } else {
306 Err(errors)
307 }
308}
309
310async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
311 stream::iter(partitions)
312 .map(Ok)
313 .and_then(|partition| {
314 let mountpoint_path = partition.mount_point().to_path_buf();
315 heim::disk::usage(mountpoint_path.clone())
316 .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
317 })
318 .try_collect::<IndexMap<_, _>>()
319 .await
320}
321
322pub fn warnings(config: &Config) -> Vec<String> {
323 let mut warnings = vec![];
324
325 let table_sources = config
326 .enrichment_tables
327 .iter()
328 .filter_map(|(key, table)| table.as_source(key))
329 .collect::<Vec<_>>();
330 let source_ids = config
331 .sources
332 .iter()
333 .chain(table_sources.iter().map(|(k, s)| (k, s)))
334 .flat_map(|(key, source)| {
335 source
336 .inner
337 .outputs(config.schema.log_namespace())
338 .iter()
339 .map(|output| {
340 if let Some(port) = &output.port {
341 ("source", OutputId::from((key, port.clone())))
342 } else {
343 ("source", OutputId::from(key))
344 }
345 })
346 .collect::<Vec<_>>()
347 });
348 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
349 get_transform_output_ids(
350 transform.inner.as_ref(),
351 key.clone(),
352 config.schema.log_namespace(),
353 )
354 .map(|output| ("transform", output))
355 .collect::<Vec<_>>()
356 });
357
358 let table_sinks = config
359 .enrichment_tables
360 .iter()
361 .filter_map(|(key, table)| table.as_sink(key))
362 .collect::<Vec<_>>();
363 for (input_type, id) in transform_ids.chain(source_ids) {
364 if !config
365 .transforms
366 .iter()
367 .any(|(_, transform)| transform.inputs.contains(&id))
368 && !config
369 .sinks
370 .iter()
371 .any(|(_, sink)| sink.inputs.contains(&id))
372 && !table_sinks
373 .iter()
374 .any(|(_, sink)| sink.inputs.contains(&id))
375 {
376 warnings.push(format!(
377 "{} \"{}\" has no consumers",
378 capitalize(input_type),
379 id
380 ));
381 }
382 }
383
384 warnings
385}
386
387fn capitalize(s: &str) -> String {
388 let mut s = s.to_owned();
389 if let Some(r) = s.get_mut(0..1) {
390 r.make_ascii_uppercase();
391 }
392 s
393}