vector/config/
validation.rs

1use crate::config::schema;
2use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
3use heim::{disk::Partition, units::information::byte};
4use indexmap::IndexMap;
5use std::{collections::HashMap, path::PathBuf};
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9    builder::ConfigBuilder, transform::get_transform_output_ids, ComponentKey, Config, OutputId,
10    Resource,
11};
12
13/// Check that provide + topology config aren't present in the same builder, which is an error.
14pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
15    if config.provider.is_some()
16        && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
17    {
18        Err(vec![
19            "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
20        ])
21    } else {
22        Ok(())
23    }
24}
25
26pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
27    let errors: Vec<_> = names
28        .filter(|component_key| component_key.id().contains('.'))
29        .map(|component_key| {
30            format!(
31                "Component name \"{}\" should not contain a \".\"",
32                component_key.id()
33            )
34        })
35        .collect();
36
37    if errors.is_empty() {
38        Ok(())
39    } else {
40        Err(errors)
41    }
42}
43
44pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
45    let mut errors = vec![];
46
47    if !config.allow_empty {
48        if config.sources.is_empty() {
49            errors.push("No sources defined in the config.".to_owned());
50        }
51
52        if config.sinks.is_empty() {
53            errors.push("No sinks defined in the config.".to_owned());
54        }
55    }
56
57    // Helper for below
58    fn tagged<'a>(
59        tag: &'static str,
60        iter: impl Iterator<Item = &'a ComponentKey>,
61    ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
62        iter.map(move |x| (tag, x))
63    }
64
65    // Check for non-unique names across sources, sinks, and transforms
66    let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
67    for (ctype, id) in tagged("source", config.sources.keys())
68        .chain(tagged("transform", config.transforms.keys()))
69        .chain(tagged("sink", config.sinks.keys()))
70    {
71        let uses = used_keys.entry(id).or_default();
72        uses.push(ctype);
73    }
74
75    for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
76        errors.push(format!(
77            "More than one component with name \"{}\" ({}).",
78            id,
79            uses.join(", ")
80        ));
81    }
82
83    // Warnings and errors
84    let sink_inputs = config
85        .sinks
86        .iter()
87        .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
88    let transform_inputs = config
89        .transforms
90        .iter()
91        .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
92    for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
93        if inputs.is_empty() {
94            errors.push(format!(
95                "{} \"{}\" has no inputs",
96                capitalize(output_type),
97                key
98            ));
99        }
100
101        let mut frequencies = HashMap::new();
102        for input in inputs {
103            let entry = frequencies.entry(input).or_insert(0usize);
104            *entry += 1;
105        }
106
107        for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
108            errors.push(format!(
109                "{} \"{}\" has input \"{}\" duplicated {} times",
110                capitalize(output_type),
111                key,
112                dup,
113                count,
114            ));
115        }
116    }
117
118    if errors.is_empty() {
119        Ok(())
120    } else {
121        Err(errors)
122    }
123}
124
125pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
126    let source_resources = config
127        .sources
128        .iter()
129        .map(|(id, config)| (id, config.inner.resources()));
130    let sink_resources = config
131        .sinks
132        .iter()
133        .map(|(id, config)| (id, config.resources(id)));
134
135    let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
136
137    if conflicting_components.is_empty() {
138        Ok(())
139    } else {
140        Err(conflicting_components
141            .into_iter()
142            .map(|(resource, components)| {
143                format!("Resource `{resource}` is claimed by multiple components: {components:?}")
144            })
145            .collect())
146    }
147}
148
149/// To avoid collisions between `output` metric tags, check that a component
150/// does not have a named output with the name [`DEFAULT_OUTPUT`]
151pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
152    let mut errors = Vec::new();
153    for (key, source) in config.sources.iter() {
154        let outputs = source.inner.outputs(config.schema.log_namespace());
155        if outputs
156            .iter()
157            .map(|output| output.port.as_deref().unwrap_or(""))
158            .any(|name| name == DEFAULT_OUTPUT)
159        {
160            errors.push(format!(
161                "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
162            ));
163        }
164    }
165
166    for (key, transform) in config.transforms.iter() {
167        // use the most general definition possible, since the real value isn't known yet.
168        let definition = schema::Definition::any();
169
170        if let Err(errs) = transform.inner.validate(&definition) {
171            errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
172        }
173
174        if get_transform_output_ids(
175            transform.inner.as_ref(),
176            key.clone(),
177            config.schema.log_namespace(),
178        )
179        .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
180        {
181            errors.push(format!(
182                "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
183            ));
184        }
185    }
186
187    if errors.is_empty() {
188        Ok(())
189    } else {
190        Err(errors)
191    }
192}
193
194pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
195    // We need to assert that Vector's data directory is located on a mountpoint that has enough
196    // capacity to allow all sinks with disk buffers configured to be able to use up to their
197    // maximum configured size without overrunning the total capacity.
198    //
199    // More subtly, we need to make sure we properly map a given buffer's data directory to the
200    // appropriate mountpoint, as it is technically possible that individual buffers could be on
201    // separate mountpoints.
202    //
203    // Notably, this does *not* cover other data usage by Vector on the same mountpoint because we
204    // don't always know the upper bound of that usage i.e. file checkpoint state.
205
206    // Grab all configured disk buffers, and if none are present, simply return early.
207    let global_data_dir = config.global.data_dir.clone();
208    let configured_disk_buffers = config
209        .sinks()
210        .flat_map(|(id, sink)| {
211            sink.buffer
212                .stages()
213                .iter()
214                .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
215        })
216        .collect::<Vec<_>>();
217
218    if configured_disk_buffers.is_empty() {
219        return Ok(());
220    }
221
222    // Now query all the mountpoints on the system, and get their total capacity. We also have to
223    // sort the mountpoints from longest to shortest so we can find the longest prefix match for
224    // each buffer data directory by simply iterating from beginning to end.
225    let mountpoints = heim::disk::partitions()
226        .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
227        .or_else(|_| {
228            heim::disk::partitions_physical()
229                .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
230        })
231        .await;
232
233    let mountpoints = match mountpoints {
234        Ok(mut mountpoints) => {
235            mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
236            mountpoints
237        }
238        Err(e) => {
239            warn!(
240                cause = %e,
241                message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
242            );
243            return Ok(());
244        }
245    };
246
247    // Now build a mapping of buffer IDs/usage configuration to the mountpoint they reside on.
248    let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
249        HashMap::new(),
250        |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
251            let canonicalized_data_dir = usage
252                .data_dir()
253                .canonicalize()
254                .unwrap_or_else(|_| usage.data_dir().to_path_buf());
255            let mountpoint = mountpoints
256                .keys()
257                .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
258
259            match mountpoint {
260                None => warn!(
261                    buffer_id = usage.id().id(),
262                    data_dir = usage.data_dir().to_string_lossy().as_ref(),
263                    canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
264                    message = "Found no matching mountpoint for buffer data directory.",
265                ),
266                Some(mountpoint) => {
267                    mappings.entry(mountpoint.clone()).or_default().push(usage);
268                }
269            }
270
271            mappings
272        },
273    );
274
275    // Finally, we have a mapping of disk buffers, based on their underlying mountpoint. Go through
276    // and check to make sure the sum total of `max_size` for all buffers associated with each
277    // mountpoint does not exceed that mountpoint's total capacity.
278    //
279    // We specifically do not do any sort of warning on free space because that has to be the
280    // responsibility of the operator to ensure there's enough total space for all buffers present.
281    let mut errors = Vec::new();
282
283    for (mountpoint, buffers) in mountpoint_buffer_mapping {
284        let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
285        let mountpoint_total_capacity = mountpoints
286            .get(&mountpoint)
287            .copied()
288            .expect("mountpoint must exist");
289
290        if buffer_max_size_total > mountpoint_total_capacity {
291            let component_ids = buffers
292                .iter()
293                .map(|usage| usage.id().id())
294                .collect::<Vec<_>>();
295            errors.push(format!(
296                "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
297Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
298                mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
299            ));
300        }
301    }
302
303    if errors.is_empty() {
304        Ok(())
305    } else {
306        Err(errors)
307    }
308}
309
310async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
311    stream::iter(partitions)
312        .map(Ok)
313        .and_then(|partition| {
314            let mountpoint_path = partition.mount_point().to_path_buf();
315            heim::disk::usage(mountpoint_path.clone())
316                .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
317        })
318        .try_collect::<IndexMap<_, _>>()
319        .await
320}
321
322pub fn warnings(config: &Config) -> Vec<String> {
323    let mut warnings = vec![];
324
325    let table_sources = config
326        .enrichment_tables
327        .iter()
328        .filter_map(|(key, table)| table.as_source(key))
329        .collect::<Vec<_>>();
330    let source_ids = config
331        .sources
332        .iter()
333        .chain(table_sources.iter().map(|(k, s)| (k, s)))
334        .flat_map(|(key, source)| {
335            source
336                .inner
337                .outputs(config.schema.log_namespace())
338                .iter()
339                .map(|output| {
340                    if let Some(port) = &output.port {
341                        ("source", OutputId::from((key, port.clone())))
342                    } else {
343                        ("source", OutputId::from(key))
344                    }
345                })
346                .collect::<Vec<_>>()
347        });
348    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
349        get_transform_output_ids(
350            transform.inner.as_ref(),
351            key.clone(),
352            config.schema.log_namespace(),
353        )
354        .map(|output| ("transform", output))
355        .collect::<Vec<_>>()
356    });
357
358    let table_sinks = config
359        .enrichment_tables
360        .iter()
361        .filter_map(|(key, table)| table.as_sink(key))
362        .collect::<Vec<_>>();
363    for (input_type, id) in transform_ids.chain(source_ids) {
364        if !config
365            .transforms
366            .iter()
367            .any(|(_, transform)| transform.inputs.contains(&id))
368            && !config
369                .sinks
370                .iter()
371                .any(|(_, sink)| sink.inputs.contains(&id))
372            && !table_sinks
373                .iter()
374                .any(|(_, sink)| sink.inputs.contains(&id))
375        {
376            warnings.push(format!(
377                "{} \"{}\" has no consumers",
378                capitalize(input_type),
379                id
380            ));
381        }
382    }
383
384    warnings
385}
386
387fn capitalize(s: &str) -> String {
388    let mut s = s.to_owned();
389    if let Some(r) = s.get_mut(0..1) {
390        r.make_ascii_uppercase();
391    }
392    s
393}