vector/config/
validation.rs

1use std::{collections::HashMap, path::PathBuf};
2
3use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream};
4use heim::{disk::Partition, units::information::byte};
5use indexmap::IndexMap;
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9    ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
10    transform::get_transform_output_ids,
11};
12use crate::config::schema;
13
14/// Minimum value (exclusive) for EWMA alpha options.
15/// The alpha value must be strictly greater than this value.
16const EWMA_ALPHA_MIN: f64 = 0.0;
17
18/// Maximum value (exclusive) for EWMA alpha options.
19/// The alpha value must be strictly less than this value.
20const EWMA_ALPHA_MAX: f64 = 1.0;
21
22/// Minimum value (exclusive) for EWMA half-life options.
23/// The half-life value must be strictly greater than this value.
24const EWMA_HALF_LIFE_SECONDS_MIN: f64 = 0.0;
25
26/// Validates an optional EWMA alpha value and returns an error message if invalid.
27/// Returns `None` if the value is `None` or valid, otherwise returns an error message.
28fn validate_ewma_alpha(alpha: Option<f64>, field_name: &str) -> Option<String> {
29    if let Some(alpha) = alpha
30        && !(alpha > EWMA_ALPHA_MIN && alpha < EWMA_ALPHA_MAX)
31    {
32        Some(format!(
33            "Global `{field_name}` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}"
34        ))
35    } else {
36        None
37    }
38}
39
40/// Validates an optional EWMA half-life value and returns an error message if invalid.
41/// Returns `None` if the value is `None` or valid, otherwise returns an error message.
42#[expect(
43    clippy::neg_cmp_op_on_partial_ord,
44    reason = "!(x > 0) rejects NaN and non-positive values; (x <= 0) would incorrectly accept NaN"
45)]
46fn validate_ewma_half_life_seconds(
47    half_life_seconds: Option<f64>,
48    field_name: &str,
49) -> Option<String> {
50    if let Some(half_life_seconds) = half_life_seconds
51        && !(half_life_seconds > EWMA_HALF_LIFE_SECONDS_MIN)
52    {
53        Some(format!(
54            "Global `{field_name}` must be greater than 0, got {half_life_seconds}"
55        ))
56    } else {
57        None
58    }
59}
60
61/// Check that provide + topology config aren't present in the same builder, which is an error.
62pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
63    if config.provider.is_some()
64        && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
65    {
66        Err(vec![
67            "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
68        ])
69    } else {
70        Ok(())
71    }
72}
73
74pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
75    let errors: Vec<_> = names
76        .filter(|component_key| component_key.id().contains('.'))
77        .map(|component_key| {
78            format!(
79                "Component name \"{}\" should not contain a \".\"",
80                component_key.id()
81            )
82        })
83        .collect();
84
85    if errors.is_empty() {
86        Ok(())
87    } else {
88        Err(errors)
89    }
90}
91
92pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
93    let mut errors = vec![];
94
95    if !config.allow_empty {
96        if config.sources.is_empty() {
97            errors.push("No sources defined in the config.".to_owned());
98        }
99
100        if config.sinks.is_empty() {
101            errors.push("No sinks defined in the config.".to_owned());
102        }
103    }
104
105    // Helper for below
106    fn tagged<'a>(
107        tag: &'static str,
108        iter: impl Iterator<Item = &'a ComponentKey>,
109    ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
110        iter.map(move |x| (tag, x))
111    }
112
113    // Check for non-unique names across sources, sinks, and transforms
114    let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
115    for (ctype, id) in tagged("source", config.sources.keys())
116        .chain(tagged("transform", config.transforms.keys()))
117        .chain(tagged("sink", config.sinks.keys()))
118    {
119        let uses = used_keys.entry(id).or_default();
120        uses.push(ctype);
121    }
122
123    for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
124        errors.push(format!(
125            "More than one component with name \"{}\" ({}).",
126            id,
127            uses.join(", ")
128        ));
129    }
130
131    // Warnings and errors
132    let sink_inputs = config
133        .sinks
134        .iter()
135        .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
136    let transform_inputs = config
137        .transforms
138        .iter()
139        .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
140    for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
141        if inputs.is_empty() {
142            errors.push(format!(
143                "{} \"{}\" has no inputs",
144                capitalize(output_type),
145                key
146            ));
147        }
148
149        let mut frequencies = HashMap::new();
150        for input in inputs {
151            let entry = frequencies.entry(input).or_insert(0usize);
152            *entry += 1;
153        }
154
155        for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
156            errors.push(format!(
157                "{} \"{}\" has input \"{}\" duplicated {} times",
158                capitalize(output_type),
159                key,
160                dup,
161                count,
162            ));
163        }
164    }
165
166    if errors.is_empty() {
167        Ok(())
168    } else {
169        Err(errors)
170    }
171}
172
173pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
174    let source_resources = config
175        .sources
176        .iter()
177        .map(|(id, config)| (id, config.inner.resources()));
178    let sink_resources = config
179        .sinks
180        .iter()
181        .map(|(id, config)| (id, config.resources(id)));
182
183    let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
184
185    if conflicting_components.is_empty() {
186        Ok(())
187    } else {
188        Err(conflicting_components
189            .into_iter()
190            .map(|(resource, components)| {
191                format!("Resource `{resource}` is claimed by multiple components: {components:?}")
192            })
193            .collect())
194    }
195}
196
197/// Validates that `*_ewma_alpha` values are within the valid range (0 < alpha < 1).
198pub fn check_values(config: &ConfigBuilder) -> Result<(), Vec<String>> {
199    let mut errors = Vec::new();
200
201    if let Some(error) = validate_ewma_half_life_seconds(
202        config.global.buffer_utilization_ewma_half_life_seconds,
203        "buffer_utilization_ewma_half_life_seconds",
204    ) {
205        errors.push(error);
206    }
207    if let Some(error) = validate_ewma_alpha(config.global.latency_ewma_alpha, "latency_ewma_alpha")
208    {
209        errors.push(error);
210    }
211
212    if errors.is_empty() {
213        Ok(())
214    } else {
215        Err(errors)
216    }
217}
218
219/// To avoid collisions between `output` metric tags, check that a component
220/// does not have a named output with the name [`DEFAULT_OUTPUT`]
221pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
222    let mut errors = Vec::new();
223    for (key, source) in config.sources.iter() {
224        let outputs = source.inner.outputs(config.schema.log_namespace());
225        if outputs
226            .iter()
227            .map(|output| output.port.as_deref().unwrap_or(""))
228            .any(|name| name == DEFAULT_OUTPUT)
229        {
230            errors.push(format!(
231                "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
232            ));
233        }
234    }
235
236    for (key, transform) in config.transforms.iter() {
237        // use the most general definition possible, since the real value isn't known yet.
238        let definition = schema::Definition::any();
239
240        if let Err(errs) = transform.inner.validate(&definition) {
241            errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
242        }
243
244        if get_transform_output_ids(
245            transform.inner.as_ref(),
246            key.clone(),
247            config.schema.log_namespace(),
248        )
249        .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
250        {
251            errors.push(format!(
252                "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
253            ));
254        }
255    }
256
257    if errors.is_empty() {
258        Ok(())
259    } else {
260        Err(errors)
261    }
262}
263
264pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
265    // We need to assert that Vector's data directory is located on a mountpoint that has enough
266    // capacity to allow all sinks with disk buffers configured to be able to use up to their
267    // maximum configured size without overrunning the total capacity.
268    //
269    // More subtly, we need to make sure we properly map a given buffer's data directory to the
270    // appropriate mountpoint, as it is technically possible that individual buffers could be on
271    // separate mountpoints.
272    //
273    // Notably, this does *not* cover other data usage by Vector on the same mountpoint because we
274    // don't always know the upper bound of that usage i.e. file checkpoint state.
275
276    // Grab all configured disk buffers, and if none are present, simply return early.
277    let global_data_dir = config.global.data_dir.clone();
278    let configured_disk_buffers = config
279        .sinks()
280        .flat_map(|(id, sink)| {
281            sink.buffer
282                .stages()
283                .iter()
284                .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
285        })
286        .collect::<Vec<_>>();
287
288    if configured_disk_buffers.is_empty() {
289        return Ok(());
290    }
291
292    // Now query all the mountpoints on the system, and get their total capacity. We also have to
293    // sort the mountpoints from longest to shortest so we can find the longest prefix match for
294    // each buffer data directory by simply iterating from beginning to end.
295    let mountpoints = heim::disk::partitions()
296        .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
297        .or_else(|_| {
298            heim::disk::partitions_physical()
299                .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
300        })
301        .await;
302
303    let mountpoints = match mountpoints {
304        Ok(mut mountpoints) => {
305            mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
306            mountpoints
307        }
308        Err(e) => {
309            warn!(
310                cause = %e,
311                message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
312            );
313            return Ok(());
314        }
315    };
316
317    // Now build a mapping of buffer IDs/usage configuration to the mountpoint they reside on.
318    let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
319        HashMap::new(),
320        |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
321            let canonicalized_data_dir = usage
322                .data_dir()
323                .canonicalize()
324                .unwrap_or_else(|_| usage.data_dir().to_path_buf());
325            let mountpoint = mountpoints
326                .keys()
327                .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
328
329            match mountpoint {
330                None => warn!(
331                    buffer_id = usage.id().id(),
332                    data_dir = usage.data_dir().to_string_lossy().as_ref(),
333                    canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
334                    message = "Found no matching mountpoint for buffer data directory.",
335                ),
336                Some(mountpoint) => {
337                    mappings.entry(mountpoint.clone()).or_default().push(usage);
338                }
339            }
340
341            mappings
342        },
343    );
344
345    // Finally, we have a mapping of disk buffers, based on their underlying mountpoint. Go through
346    // and check to make sure the sum total of `max_size` for all buffers associated with each
347    // mountpoint does not exceed that mountpoint's total capacity.
348    //
349    // We specifically do not do any sort of warning on free space because that has to be the
350    // responsibility of the operator to ensure there's enough total space for all buffers present.
351    let mut errors = Vec::new();
352
353    for (mountpoint, buffers) in mountpoint_buffer_mapping {
354        let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
355        let mountpoint_total_capacity = mountpoints
356            .get(&mountpoint)
357            .copied()
358            .expect("mountpoint must exist");
359
360        if buffer_max_size_total > mountpoint_total_capacity {
361            let component_ids = buffers
362                .iter()
363                .map(|usage| usage.id().id())
364                .collect::<Vec<_>>();
365            errors.push(format!(
366                "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
367Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
368                mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
369            ));
370        }
371    }
372
373    if errors.is_empty() {
374        Ok(())
375    } else {
376        Err(errors)
377    }
378}
379
380async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
381    stream::iter(partitions)
382        .map(Ok)
383        .and_then(|partition| {
384            let mountpoint_path = partition.mount_point().to_path_buf();
385            heim::disk::usage(mountpoint_path.clone())
386                .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
387        })
388        .try_collect::<IndexMap<_, _>>()
389        .await
390}
391
392pub fn warnings(config: &Config) -> Vec<String> {
393    let mut warnings = vec![];
394
395    let table_sources = config
396        .enrichment_tables
397        .iter()
398        .filter_map(|(key, table)| table.as_source(key))
399        .collect::<Vec<_>>();
400    let source_ids = config
401        .sources
402        .iter()
403        .chain(table_sources.iter().map(|(k, s)| (k, s)))
404        .flat_map(|(key, source)| {
405            source
406                .inner
407                .outputs(config.schema.log_namespace())
408                .iter()
409                .map(|output| {
410                    if let Some(port) = &output.port {
411                        ("source", OutputId::from((key, port.clone())))
412                    } else {
413                        ("source", OutputId::from(key))
414                    }
415                })
416                .collect::<Vec<_>>()
417        });
418    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
419        get_transform_output_ids(
420            transform.inner.as_ref(),
421            key.clone(),
422            config.schema.log_namespace(),
423        )
424        .map(|output| ("transform", output))
425        .collect::<Vec<_>>()
426    });
427
428    let table_sinks = config
429        .enrichment_tables
430        .iter()
431        .filter_map(|(key, table)| table.as_sink(key))
432        .collect::<Vec<_>>();
433    for (input_type, id) in transform_ids.chain(source_ids) {
434        if !config
435            .transforms
436            .iter()
437            .any(|(_, transform)| transform.inputs.contains(&id))
438            && !config
439                .sinks
440                .iter()
441                .any(|(_, sink)| sink.inputs.contains(&id))
442            && !table_sinks
443                .iter()
444                .any(|(_, sink)| sink.inputs.contains(&id))
445        {
446            warnings.push(format!(
447                "{} \"{}\" has no consumers",
448                capitalize(input_type),
449                id
450            ));
451        }
452    }
453
454    warnings
455}
456
457fn capitalize(s: &str) -> String {
458    let mut s = s.to_owned();
459    if let Some(r) = s.get_mut(0..1) {
460        r.make_ascii_uppercase();
461    }
462    s
463}