vector/config/
validation.rs

1use std::{collections::HashMap, path::PathBuf};
2
3use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream};
4use heim::{disk::Partition, units::information::byte};
5use indexmap::IndexMap;
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9    ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
10    transform::get_transform_output_ids,
11};
12use crate::config::schema;
13
14/// Minimum value (exclusive) for `utilization_ewma_alpha`.
15/// The alpha value must be strictly greater than this value.
16const EWMA_ALPHA_MIN: f64 = 0.0;
17
18/// Maximum value (exclusive) for `utilization_ewma_alpha`.
19/// The alpha value must be strictly less than this value.
20const EWMA_ALPHA_MAX: f64 = 1.0;
21
22/// Check that provide + topology config aren't present in the same builder, which is an error.
23pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
24    if config.provider.is_some()
25        && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
26    {
27        Err(vec![
28            "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
29        ])
30    } else {
31        Ok(())
32    }
33}
34
35pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
36    let errors: Vec<_> = names
37        .filter(|component_key| component_key.id().contains('.'))
38        .map(|component_key| {
39            format!(
40                "Component name \"{}\" should not contain a \".\"",
41                component_key.id()
42            )
43        })
44        .collect();
45
46    if errors.is_empty() {
47        Ok(())
48    } else {
49        Err(errors)
50    }
51}
52
53pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
54    let mut errors = vec![];
55
56    if !config.allow_empty {
57        if config.sources.is_empty() {
58            errors.push("No sources defined in the config.".to_owned());
59        }
60
61        if config.sinks.is_empty() {
62            errors.push("No sinks defined in the config.".to_owned());
63        }
64    }
65
66    // Helper for below
67    fn tagged<'a>(
68        tag: &'static str,
69        iter: impl Iterator<Item = &'a ComponentKey>,
70    ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
71        iter.map(move |x| (tag, x))
72    }
73
74    // Check for non-unique names across sources, sinks, and transforms
75    let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
76    for (ctype, id) in tagged("source", config.sources.keys())
77        .chain(tagged("transform", config.transforms.keys()))
78        .chain(tagged("sink", config.sinks.keys()))
79    {
80        let uses = used_keys.entry(id).or_default();
81        uses.push(ctype);
82    }
83
84    for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
85        errors.push(format!(
86            "More than one component with name \"{}\" ({}).",
87            id,
88            uses.join(", ")
89        ));
90    }
91
92    // Warnings and errors
93    let sink_inputs = config
94        .sinks
95        .iter()
96        .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
97    let transform_inputs = config
98        .transforms
99        .iter()
100        .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
101    for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
102        if inputs.is_empty() {
103            errors.push(format!(
104                "{} \"{}\" has no inputs",
105                capitalize(output_type),
106                key
107            ));
108        }
109
110        let mut frequencies = HashMap::new();
111        for input in inputs {
112            let entry = frequencies.entry(input).or_insert(0usize);
113            *entry += 1;
114        }
115
116        for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
117            errors.push(format!(
118                "{} \"{}\" has input \"{}\" duplicated {} times",
119                capitalize(output_type),
120                key,
121                dup,
122                count,
123            ));
124        }
125    }
126
127    if errors.is_empty() {
128        Ok(())
129    } else {
130        Err(errors)
131    }
132}
133
134pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
135    let source_resources = config
136        .sources
137        .iter()
138        .map(|(id, config)| (id, config.inner.resources()));
139    let sink_resources = config
140        .sinks
141        .iter()
142        .map(|(id, config)| (id, config.resources(id)));
143
144    let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
145
146    if conflicting_components.is_empty() {
147        Ok(())
148    } else {
149        Err(conflicting_components
150            .into_iter()
151            .map(|(resource, components)| {
152                format!("Resource `{resource}` is claimed by multiple components: {components:?}")
153            })
154            .collect())
155    }
156}
157
158/// Validates that `buffer_utilization_ewma_alpha` value is within the valid range (0 < alpha < 1)
159/// for the global configuration.
160pub fn check_buffer_utilization_ewma_alpha(config: &ConfigBuilder) -> Result<(), Vec<String>> {
161    if let Some(alpha) = config.global.buffer_utilization_ewma_alpha
162        && (alpha <= EWMA_ALPHA_MIN || alpha >= EWMA_ALPHA_MAX)
163    {
164        Err(vec![format!(
165            "Global `buffer_utilization_ewma_alpha` must be between 0 and 1 exclusive (0 < alpha < 1), got {alpha}"
166        )])
167    } else {
168        Ok(())
169    }
170}
171
172/// To avoid collisions between `output` metric tags, check that a component
173/// does not have a named output with the name [`DEFAULT_OUTPUT`]
174pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
175    let mut errors = Vec::new();
176    for (key, source) in config.sources.iter() {
177        let outputs = source.inner.outputs(config.schema.log_namespace());
178        if outputs
179            .iter()
180            .map(|output| output.port.as_deref().unwrap_or(""))
181            .any(|name| name == DEFAULT_OUTPUT)
182        {
183            errors.push(format!(
184                "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
185            ));
186        }
187    }
188
189    for (key, transform) in config.transforms.iter() {
190        // use the most general definition possible, since the real value isn't known yet.
191        let definition = schema::Definition::any();
192
193        if let Err(errs) = transform.inner.validate(&definition) {
194            errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
195        }
196
197        if get_transform_output_ids(
198            transform.inner.as_ref(),
199            key.clone(),
200            config.schema.log_namespace(),
201        )
202        .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
203        {
204            errors.push(format!(
205                "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
206            ));
207        }
208    }
209
210    if errors.is_empty() {
211        Ok(())
212    } else {
213        Err(errors)
214    }
215}
216
217pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
218    // We need to assert that Vector's data directory is located on a mountpoint that has enough
219    // capacity to allow all sinks with disk buffers configured to be able to use up to their
220    // maximum configured size without overrunning the total capacity.
221    //
222    // More subtly, we need to make sure we properly map a given buffer's data directory to the
223    // appropriate mountpoint, as it is technically possible that individual buffers could be on
224    // separate mountpoints.
225    //
226    // Notably, this does *not* cover other data usage by Vector on the same mountpoint because we
227    // don't always know the upper bound of that usage i.e. file checkpoint state.
228
229    // Grab all configured disk buffers, and if none are present, simply return early.
230    let global_data_dir = config.global.data_dir.clone();
231    let configured_disk_buffers = config
232        .sinks()
233        .flat_map(|(id, sink)| {
234            sink.buffer
235                .stages()
236                .iter()
237                .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
238        })
239        .collect::<Vec<_>>();
240
241    if configured_disk_buffers.is_empty() {
242        return Ok(());
243    }
244
245    // Now query all the mountpoints on the system, and get their total capacity. We also have to
246    // sort the mountpoints from longest to shortest so we can find the longest prefix match for
247    // each buffer data directory by simply iterating from beginning to end.
248    let mountpoints = heim::disk::partitions()
249        .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
250        .or_else(|_| {
251            heim::disk::partitions_physical()
252                .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
253        })
254        .await;
255
256    let mountpoints = match mountpoints {
257        Ok(mut mountpoints) => {
258            mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
259            mountpoints
260        }
261        Err(e) => {
262            warn!(
263                cause = %e,
264                message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
265            );
266            return Ok(());
267        }
268    };
269
270    // Now build a mapping of buffer IDs/usage configuration to the mountpoint they reside on.
271    let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
272        HashMap::new(),
273        |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
274            let canonicalized_data_dir = usage
275                .data_dir()
276                .canonicalize()
277                .unwrap_or_else(|_| usage.data_dir().to_path_buf());
278            let mountpoint = mountpoints
279                .keys()
280                .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
281
282            match mountpoint {
283                None => warn!(
284                    buffer_id = usage.id().id(),
285                    data_dir = usage.data_dir().to_string_lossy().as_ref(),
286                    canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
287                    message = "Found no matching mountpoint for buffer data directory.",
288                ),
289                Some(mountpoint) => {
290                    mappings.entry(mountpoint.clone()).or_default().push(usage);
291                }
292            }
293
294            mappings
295        },
296    );
297
298    // Finally, we have a mapping of disk buffers, based on their underlying mountpoint. Go through
299    // and check to make sure the sum total of `max_size` for all buffers associated with each
300    // mountpoint does not exceed that mountpoint's total capacity.
301    //
302    // We specifically do not do any sort of warning on free space because that has to be the
303    // responsibility of the operator to ensure there's enough total space for all buffers present.
304    let mut errors = Vec::new();
305
306    for (mountpoint, buffers) in mountpoint_buffer_mapping {
307        let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
308        let mountpoint_total_capacity = mountpoints
309            .get(&mountpoint)
310            .copied()
311            .expect("mountpoint must exist");
312
313        if buffer_max_size_total > mountpoint_total_capacity {
314            let component_ids = buffers
315                .iter()
316                .map(|usage| usage.id().id())
317                .collect::<Vec<_>>();
318            errors.push(format!(
319                "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
320Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
321                mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
322            ));
323        }
324    }
325
326    if errors.is_empty() {
327        Ok(())
328    } else {
329        Err(errors)
330    }
331}
332
333async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
334    stream::iter(partitions)
335        .map(Ok)
336        .and_then(|partition| {
337            let mountpoint_path = partition.mount_point().to_path_buf();
338            heim::disk::usage(mountpoint_path.clone())
339                .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
340        })
341        .try_collect::<IndexMap<_, _>>()
342        .await
343}
344
345pub fn warnings(config: &Config) -> Vec<String> {
346    let mut warnings = vec![];
347
348    let table_sources = config
349        .enrichment_tables
350        .iter()
351        .filter_map(|(key, table)| table.as_source(key))
352        .collect::<Vec<_>>();
353    let source_ids = config
354        .sources
355        .iter()
356        .chain(table_sources.iter().map(|(k, s)| (k, s)))
357        .flat_map(|(key, source)| {
358            source
359                .inner
360                .outputs(config.schema.log_namespace())
361                .iter()
362                .map(|output| {
363                    if let Some(port) = &output.port {
364                        ("source", OutputId::from((key, port.clone())))
365                    } else {
366                        ("source", OutputId::from(key))
367                    }
368                })
369                .collect::<Vec<_>>()
370        });
371    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
372        get_transform_output_ids(
373            transform.inner.as_ref(),
374            key.clone(),
375            config.schema.log_namespace(),
376        )
377        .map(|output| ("transform", output))
378        .collect::<Vec<_>>()
379    });
380
381    let table_sinks = config
382        .enrichment_tables
383        .iter()
384        .filter_map(|(key, table)| table.as_sink(key))
385        .collect::<Vec<_>>();
386    for (input_type, id) in transform_ids.chain(source_ids) {
387        if !config
388            .transforms
389            .iter()
390            .any(|(_, transform)| transform.inputs.contains(&id))
391            && !config
392                .sinks
393                .iter()
394                .any(|(_, sink)| sink.inputs.contains(&id))
395            && !table_sinks
396                .iter()
397                .any(|(_, sink)| sink.inputs.contains(&id))
398        {
399            warnings.push(format!(
400                "{} \"{}\" has no consumers",
401                capitalize(input_type),
402                id
403            ));
404        }
405    }
406
407    warnings
408}
409
410fn capitalize(s: &str) -> String {
411    let mut s = s.to_owned();
412    if let Some(r) = s.get_mut(0..1) {
413        r.make_ascii_uppercase();
414    }
415    s
416}