1use std::{collections::HashMap, path::PathBuf};
2
3use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream};
4use heim::{disk::Partition, units::information::byte};
5use indexmap::IndexMap;
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9 ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
10 transform::get_transform_output_ids,
11};
12use crate::config::schema;
13
14const EWMA_ALPHA_MIN: f64 = 0.0;
17
18const EWMA_ALPHA_MAX: f64 = 1.0;
21
22pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
24 if config.provider.is_some()
25 && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
26 {
27 Err(vec![
28 "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
29 ])
30 } else {
31 Ok(())
32 }
33}
34
35pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
36 let errors: Vec<_> = names
37 .filter(|component_key| component_key.id().contains('.'))
38 .map(|component_key| {
39 format!(
40 "Component name \"{}\" should not contain a \".\"",
41 component_key.id()
42 )
43 })
44 .collect();
45
46 if errors.is_empty() {
47 Ok(())
48 } else {
49 Err(errors)
50 }
51}
52
53pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
54 let mut errors = vec![];
55
56 if !config.allow_empty {
57 if config.sources.is_empty() {
58 errors.push("No sources defined in the config.".to_owned());
59 }
60
61 if config.sinks.is_empty() {
62 errors.push("No sinks defined in the config.".to_owned());
63 }
64 }
65
66 fn tagged<'a>(
68 tag: &'static str,
69 iter: impl Iterator<Item = &'a ComponentKey>,
70 ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
71 iter.map(move |x| (tag, x))
72 }
73
74 let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
76 for (ctype, id) in tagged("source", config.sources.keys())
77 .chain(tagged("transform", config.transforms.keys()))
78 .chain(tagged("sink", config.sinks.keys()))
79 {
80 let uses = used_keys.entry(id).or_default();
81 uses.push(ctype);
82 }
83
84 for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
85 errors.push(format!(
86 "More than one component with name \"{}\" ({}).",
87 id,
88 uses.join(", ")
89 ));
90 }
91
92 let sink_inputs = config
94 .sinks
95 .iter()
96 .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
97 let transform_inputs = config
98 .transforms
99 .iter()
100 .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
101 for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
102 if inputs.is_empty() {
103 errors.push(format!(
104 "{} \"{}\" has no inputs",
105 capitalize(output_type),
106 key
107 ));
108 }
109
110 let mut frequencies = HashMap::new();
111 for input in inputs {
112 let entry = frequencies.entry(input).or_insert(0usize);
113 *entry += 1;
114 }
115
116 for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
117 errors.push(format!(
118 "{} \"{}\" has input \"{}\" duplicated {} times",
119 capitalize(output_type),
120 key,
121 dup,
122 count,
123 ));
124 }
125 }
126
127 if errors.is_empty() {
128 Ok(())
129 } else {
130 Err(errors)
131 }
132}
133
134pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
135 let source_resources = config
136 .sources
137 .iter()
138 .map(|(id, config)| (id, config.inner.resources()));
139 let sink_resources = config
140 .sinks
141 .iter()
142 .map(|(id, config)| (id, config.resources(id)));
143
144 let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
145
146 if conflicting_components.is_empty() {
147 Ok(())
148 } else {
149 Err(conflicting_components
150 .into_iter()
151 .map(|(resource, components)| {
152 format!("Resource `{resource}` is claimed by multiple components: {components:?}")
153 })
154 .collect())
155 }
156}
157
158pub fn check_buffer_utilization_ewma_alpha(config: &ConfigBuilder) -> Result<(), Vec<String>> {
161 if let Some(alpha) = config.global.buffer_utilization_ewma_alpha
162 && (alpha <= EWMA_ALPHA_MIN || alpha >= EWMA_ALPHA_MAX)
163 {
164 Err(vec![format!(
165 "Global `buffer_utilization_ewma_alpha` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}"
166 )])
167 } else {
168 Ok(())
169 }
170}
171
172pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
175 let mut errors = Vec::new();
176 for (key, source) in config.sources.iter() {
177 let outputs = source.inner.outputs(config.schema.log_namespace());
178 if outputs
179 .iter()
180 .map(|output| output.port.as_deref().unwrap_or(""))
181 .any(|name| name == DEFAULT_OUTPUT)
182 {
183 errors.push(format!(
184 "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
185 ));
186 }
187 }
188
189 for (key, transform) in config.transforms.iter() {
190 let definition = schema::Definition::any();
192
193 if let Err(errs) = transform.inner.validate(&definition) {
194 errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
195 }
196
197 if get_transform_output_ids(
198 transform.inner.as_ref(),
199 key.clone(),
200 config.schema.log_namespace(),
201 )
202 .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
203 {
204 errors.push(format!(
205 "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
206 ));
207 }
208 }
209
210 if errors.is_empty() {
211 Ok(())
212 } else {
213 Err(errors)
214 }
215}
216
217pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
218 let global_data_dir = config.global.data_dir.clone();
231 let configured_disk_buffers = config
232 .sinks()
233 .flat_map(|(id, sink)| {
234 sink.buffer
235 .stages()
236 .iter()
237 .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
238 })
239 .collect::<Vec<_>>();
240
241 if configured_disk_buffers.is_empty() {
242 return Ok(());
243 }
244
245 let mountpoints = heim::disk::partitions()
249 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
250 .or_else(|_| {
251 heim::disk::partitions_physical()
252 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
253 })
254 .await;
255
256 let mountpoints = match mountpoints {
257 Ok(mut mountpoints) => {
258 mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
259 mountpoints
260 }
261 Err(e) => {
262 warn!(
263 cause = %e,
264 message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
265 );
266 return Ok(());
267 }
268 };
269
270 let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
272 HashMap::new(),
273 |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
274 let canonicalized_data_dir = usage
275 .data_dir()
276 .canonicalize()
277 .unwrap_or_else(|_| usage.data_dir().to_path_buf());
278 let mountpoint = mountpoints
279 .keys()
280 .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
281
282 match mountpoint {
283 None => warn!(
284 buffer_id = usage.id().id(),
285 data_dir = usage.data_dir().to_string_lossy().as_ref(),
286 canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
287 message = "Found no matching mountpoint for buffer data directory.",
288 ),
289 Some(mountpoint) => {
290 mappings.entry(mountpoint.clone()).or_default().push(usage);
291 }
292 }
293
294 mappings
295 },
296 );
297
298 let mut errors = Vec::new();
305
306 for (mountpoint, buffers) in mountpoint_buffer_mapping {
307 let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
308 let mountpoint_total_capacity = mountpoints
309 .get(&mountpoint)
310 .copied()
311 .expect("mountpoint must exist");
312
313 if buffer_max_size_total > mountpoint_total_capacity {
314 let component_ids = buffers
315 .iter()
316 .map(|usage| usage.id().id())
317 .collect::<Vec<_>>();
318 errors.push(format!(
319 "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
320Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
321 mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
322 ));
323 }
324 }
325
326 if errors.is_empty() {
327 Ok(())
328 } else {
329 Err(errors)
330 }
331}
332
333async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
334 stream::iter(partitions)
335 .map(Ok)
336 .and_then(|partition| {
337 let mountpoint_path = partition.mount_point().to_path_buf();
338 heim::disk::usage(mountpoint_path.clone())
339 .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
340 })
341 .try_collect::<IndexMap<_, _>>()
342 .await
343}
344
345pub fn warnings(config: &Config) -> Vec<String> {
346 let mut warnings = vec![];
347
348 let table_sources = config
349 .enrichment_tables
350 .iter()
351 .filter_map(|(key, table)| table.as_source(key))
352 .collect::<Vec<_>>();
353 let source_ids = config
354 .sources
355 .iter()
356 .chain(table_sources.iter().map(|(k, s)| (k, s)))
357 .flat_map(|(key, source)| {
358 source
359 .inner
360 .outputs(config.schema.log_namespace())
361 .iter()
362 .map(|output| {
363 if let Some(port) = &output.port {
364 ("source", OutputId::from((key, port.clone())))
365 } else {
366 ("source", OutputId::from(key))
367 }
368 })
369 .collect::<Vec<_>>()
370 });
371 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
372 get_transform_output_ids(
373 transform.inner.as_ref(),
374 key.clone(),
375 config.schema.log_namespace(),
376 )
377 .map(|output| ("transform", output))
378 .collect::<Vec<_>>()
379 });
380
381 let table_sinks = config
382 .enrichment_tables
383 .iter()
384 .filter_map(|(key, table)| table.as_sink(key))
385 .collect::<Vec<_>>();
386 for (input_type, id) in transform_ids.chain(source_ids) {
387 if !config
388 .transforms
389 .iter()
390 .any(|(_, transform)| transform.inputs.contains(&id))
391 && !config
392 .sinks
393 .iter()
394 .any(|(_, sink)| sink.inputs.contains(&id))
395 && !table_sinks
396 .iter()
397 .any(|(_, sink)| sink.inputs.contains(&id))
398 {
399 warnings.push(format!(
400 "{} \"{}\" has no consumers",
401 capitalize(input_type),
402 id
403 ));
404 }
405 }
406
407 warnings
408}
409
410fn capitalize(s: &str) -> String {
411 let mut s = s.to_owned();
412 if let Some(r) = s.get_mut(0..1) {
413 r.make_ascii_uppercase();
414 }
415 s
416}