1use std::{collections::HashMap, path::PathBuf};
2
3use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream};
4use heim::{disk::Partition, units::information::byte};
5use indexmap::IndexMap;
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9 ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
10 transform::get_transform_output_ids,
11};
12use crate::config::schema;
13
14const EWMA_ALPHA_MIN: f64 = 0.0;
17
18const EWMA_ALPHA_MAX: f64 = 1.0;
21
22const EWMA_HALF_LIFE_SECONDS_MIN: f64 = 0.0;
25
26fn validate_ewma_alpha(alpha: Option<f64>, field_name: &str) -> Option<String> {
29 if let Some(alpha) = alpha
30 && !(alpha > EWMA_ALPHA_MIN && alpha < EWMA_ALPHA_MAX)
31 {
32 Some(format!(
33 "Global `{field_name}` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}"
34 ))
35 } else {
36 None
37 }
38}
39
40#[expect(
43 clippy::neg_cmp_op_on_partial_ord,
44 reason = "!(x > 0) rejects NaN and non-positive values; (x <= 0) would incorrectly accept NaN"
45)]
46fn validate_ewma_half_life_seconds(
47 half_life_seconds: Option<f64>,
48 field_name: &str,
49) -> Option<String> {
50 if let Some(half_life_seconds) = half_life_seconds
51 && !(half_life_seconds > EWMA_HALF_LIFE_SECONDS_MIN)
52 {
53 Some(format!(
54 "Global `{field_name}` must be greater than 0, got {half_life_seconds}"
55 ))
56 } else {
57 None
58 }
59}
60
61pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
63 if config.provider.is_some()
64 && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
65 {
66 Err(vec![
67 "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
68 ])
69 } else {
70 Ok(())
71 }
72}
73
74pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
75 let errors: Vec<_> = names
76 .filter(|component_key| component_key.id().contains('.'))
77 .map(|component_key| {
78 format!(
79 "Component name \"{}\" should not contain a \".\"",
80 component_key.id()
81 )
82 })
83 .collect();
84
85 if errors.is_empty() {
86 Ok(())
87 } else {
88 Err(errors)
89 }
90}
91
92pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
93 let mut errors = vec![];
94
95 if !config.allow_empty {
96 if config.sources.is_empty() {
97 errors.push("No sources defined in the config.".to_owned());
98 }
99
100 if config.sinks.is_empty() {
101 errors.push("No sinks defined in the config.".to_owned());
102 }
103 }
104
105 fn tagged<'a>(
107 tag: &'static str,
108 iter: impl Iterator<Item = &'a ComponentKey>,
109 ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
110 iter.map(move |x| (tag, x))
111 }
112
113 let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
115 for (ctype, id) in tagged("source", config.sources.keys())
116 .chain(tagged("transform", config.transforms.keys()))
117 .chain(tagged("sink", config.sinks.keys()))
118 {
119 let uses = used_keys.entry(id).or_default();
120 uses.push(ctype);
121 }
122
123 for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
124 errors.push(format!(
125 "More than one component with name \"{}\" ({}).",
126 id,
127 uses.join(", ")
128 ));
129 }
130
131 let sink_inputs = config
133 .sinks
134 .iter()
135 .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
136 let transform_inputs = config
137 .transforms
138 .iter()
139 .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
140 for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
141 if inputs.is_empty() {
142 errors.push(format!(
143 "{} \"{}\" has no inputs",
144 capitalize(output_type),
145 key
146 ));
147 }
148
149 let mut frequencies = HashMap::new();
150 for input in inputs {
151 let entry = frequencies.entry(input).or_insert(0usize);
152 *entry += 1;
153 }
154
155 for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
156 errors.push(format!(
157 "{} \"{}\" has input \"{}\" duplicated {} times",
158 capitalize(output_type),
159 key,
160 dup,
161 count,
162 ));
163 }
164 }
165
166 if errors.is_empty() {
167 Ok(())
168 } else {
169 Err(errors)
170 }
171}
172
173pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
174 let source_resources = config
175 .sources
176 .iter()
177 .map(|(id, config)| (id, config.inner.resources()));
178 let sink_resources = config
179 .sinks
180 .iter()
181 .map(|(id, config)| (id, config.resources(id)));
182
183 let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
184
185 if conflicting_components.is_empty() {
186 Ok(())
187 } else {
188 Err(conflicting_components
189 .into_iter()
190 .map(|(resource, components)| {
191 format!("Resource `{resource}` is claimed by multiple components: {components:?}")
192 })
193 .collect())
194 }
195}
196
197pub fn check_values(config: &ConfigBuilder) -> Result<(), Vec<String>> {
199 let mut errors = Vec::new();
200
201 if let Some(error) = validate_ewma_half_life_seconds(
202 config.global.buffer_utilization_ewma_half_life_seconds,
203 "buffer_utilization_ewma_half_life_seconds",
204 ) {
205 errors.push(error);
206 }
207 if let Some(error) = validate_ewma_alpha(config.global.latency_ewma_alpha, "latency_ewma_alpha")
208 {
209 errors.push(error);
210 }
211
212 if errors.is_empty() {
213 Ok(())
214 } else {
215 Err(errors)
216 }
217}
218
219pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
222 let mut errors = Vec::new();
223 for (key, source) in config.sources.iter() {
224 let outputs = source.inner.outputs(config.schema.log_namespace());
225 if outputs
226 .iter()
227 .map(|output| output.port.as_deref().unwrap_or(""))
228 .any(|name| name == DEFAULT_OUTPUT)
229 {
230 errors.push(format!(
231 "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
232 ));
233 }
234 }
235
236 for (key, transform) in config.transforms.iter() {
237 let definition = schema::Definition::any();
239
240 if let Err(errs) = transform.inner.validate(&definition) {
241 errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
242 }
243
244 if get_transform_output_ids(
245 transform.inner.as_ref(),
246 key.clone(),
247 config.schema.log_namespace(),
248 )
249 .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
250 {
251 errors.push(format!(
252 "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
253 ));
254 }
255 }
256
257 if errors.is_empty() {
258 Ok(())
259 } else {
260 Err(errors)
261 }
262}
263
264pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
265 let global_data_dir = config.global.data_dir.clone();
278 let configured_disk_buffers = config
279 .sinks()
280 .flat_map(|(id, sink)| {
281 sink.buffer
282 .stages()
283 .iter()
284 .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
285 })
286 .collect::<Vec<_>>();
287
288 if configured_disk_buffers.is_empty() {
289 return Ok(());
290 }
291
292 let mountpoints = heim::disk::partitions()
296 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
297 .or_else(|_| {
298 heim::disk::partitions_physical()
299 .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
300 })
301 .await;
302
303 let mountpoints = match mountpoints {
304 Ok(mut mountpoints) => {
305 mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
306 mountpoints
307 }
308 Err(e) => {
309 warn!(
310 cause = %e,
311 message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
312 );
313 return Ok(());
314 }
315 };
316
317 let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
319 HashMap::new(),
320 |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
321 let canonicalized_data_dir = usage
322 .data_dir()
323 .canonicalize()
324 .unwrap_or_else(|_| usage.data_dir().to_path_buf());
325 let mountpoint = mountpoints
326 .keys()
327 .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
328
329 match mountpoint {
330 None => warn!(
331 buffer_id = usage.id().id(),
332 data_dir = usage.data_dir().to_string_lossy().as_ref(),
333 canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
334 message = "Found no matching mountpoint for buffer data directory.",
335 ),
336 Some(mountpoint) => {
337 mappings.entry(mountpoint.clone()).or_default().push(usage);
338 }
339 }
340
341 mappings
342 },
343 );
344
345 let mut errors = Vec::new();
352
353 for (mountpoint, buffers) in mountpoint_buffer_mapping {
354 let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
355 let mountpoint_total_capacity = mountpoints
356 .get(&mountpoint)
357 .copied()
358 .expect("mountpoint must exist");
359
360 if buffer_max_size_total > mountpoint_total_capacity {
361 let component_ids = buffers
362 .iter()
363 .map(|usage| usage.id().id())
364 .collect::<Vec<_>>();
365 errors.push(format!(
366 "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
367Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
368 mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
369 ));
370 }
371 }
372
373 if errors.is_empty() {
374 Ok(())
375 } else {
376 Err(errors)
377 }
378}
379
380async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
381 stream::iter(partitions)
382 .map(Ok)
383 .and_then(|partition| {
384 let mountpoint_path = partition.mount_point().to_path_buf();
385 heim::disk::usage(mountpoint_path.clone())
386 .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
387 })
388 .try_collect::<IndexMap<_, _>>()
389 .await
390}
391
392pub fn warnings(config: &Config) -> Vec<String> {
393 let mut warnings = vec![];
394
395 let table_sources = config
396 .enrichment_tables
397 .iter()
398 .filter_map(|(key, table)| table.as_source(key))
399 .collect::<Vec<_>>();
400 let source_ids = config
401 .sources
402 .iter()
403 .chain(table_sources.iter().map(|(k, s)| (k, s)))
404 .flat_map(|(key, source)| {
405 source
406 .inner
407 .outputs(config.schema.log_namespace())
408 .iter()
409 .map(|output| {
410 if let Some(port) = &output.port {
411 ("source", OutputId::from((key, port.clone())))
412 } else {
413 ("source", OutputId::from(key))
414 }
415 })
416 .collect::<Vec<_>>()
417 });
418 let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
419 get_transform_output_ids(
420 transform.inner.as_ref(),
421 key.clone(),
422 config.schema.log_namespace(),
423 )
424 .map(|output| ("transform", output))
425 .collect::<Vec<_>>()
426 });
427
428 let table_sinks = config
429 .enrichment_tables
430 .iter()
431 .filter_map(|(key, table)| table.as_sink(key))
432 .collect::<Vec<_>>();
433 for (input_type, id) in transform_ids.chain(source_ids) {
434 if !config
435 .transforms
436 .iter()
437 .any(|(_, transform)| transform.inputs.contains(&id))
438 && !config
439 .sinks
440 .iter()
441 .any(|(_, sink)| sink.inputs.contains(&id))
442 && !table_sinks
443 .iter()
444 .any(|(_, sink)| sink.inputs.contains(&id))
445 {
446 warnings.push(format!(
447 "{} \"{}\" has no consumers",
448 capitalize(input_type),
449 id
450 ));
451 }
452 }
453
454 warnings
455}
456
457fn capitalize(s: &str) -> String {
458 let mut s = s.to_owned();
459 if let Some(r) = s.get_mut(0..1) {
460 r.make_ascii_uppercase();
461 }
462 s
463}