vector/config/
validation.rs

1use std::{collections::HashMap, path::PathBuf};
2
3use futures_util::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, stream};
4use heim::{disk::Partition, units::information::byte};
5use indexmap::IndexMap;
6use vector_lib::{buffers::config::DiskUsage, internal_event::DEFAULT_OUTPUT};
7
8use super::{
9    ComponentKey, Config, OutputId, Resource, builder::ConfigBuilder,
10    transform::get_transform_output_ids,
11};
12use crate::config::schema;
13
14/// Check that provide + topology config aren't present in the same builder, which is an error.
15pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
16    if config.provider.is_some()
17        && (!config.sources.is_empty() || !config.transforms.is_empty() || !config.sinks.is_empty())
18    {
19        Err(vec![
20            "No sources/transforms/sinks are allowed if provider config is present.".to_owned(),
21        ])
22    } else {
23        Ok(())
24    }
25}
26
27pub fn check_names<'a, I: Iterator<Item = &'a ComponentKey>>(names: I) -> Result<(), Vec<String>> {
28    let errors: Vec<_> = names
29        .filter(|component_key| component_key.id().contains('.'))
30        .map(|component_key| {
31            format!(
32                "Component name \"{}\" should not contain a \".\"",
33                component_key.id()
34            )
35        })
36        .collect();
37
38    if errors.is_empty() {
39        Ok(())
40    } else {
41        Err(errors)
42    }
43}
44
45pub fn check_shape(config: &ConfigBuilder) -> Result<(), Vec<String>> {
46    let mut errors = vec![];
47
48    if !config.allow_empty {
49        if config.sources.is_empty() {
50            errors.push("No sources defined in the config.".to_owned());
51        }
52
53        if config.sinks.is_empty() {
54            errors.push("No sinks defined in the config.".to_owned());
55        }
56    }
57
58    // Helper for below
59    fn tagged<'a>(
60        tag: &'static str,
61        iter: impl Iterator<Item = &'a ComponentKey>,
62    ) -> impl Iterator<Item = (&'static str, &'a ComponentKey)> {
63        iter.map(move |x| (tag, x))
64    }
65
66    // Check for non-unique names across sources, sinks, and transforms
67    let mut used_keys = HashMap::<&ComponentKey, Vec<&'static str>>::new();
68    for (ctype, id) in tagged("source", config.sources.keys())
69        .chain(tagged("transform", config.transforms.keys()))
70        .chain(tagged("sink", config.sinks.keys()))
71    {
72        let uses = used_keys.entry(id).or_default();
73        uses.push(ctype);
74    }
75
76    for (id, uses) in used_keys.into_iter().filter(|(_id, uses)| uses.len() > 1) {
77        errors.push(format!(
78            "More than one component with name \"{}\" ({}).",
79            id,
80            uses.join(", ")
81        ));
82    }
83
84    // Warnings and errors
85    let sink_inputs = config
86        .sinks
87        .iter()
88        .map(|(key, sink)| ("sink", key.clone(), sink.inputs.clone()));
89    let transform_inputs = config
90        .transforms
91        .iter()
92        .map(|(key, transform)| ("transform", key.clone(), transform.inputs.clone()));
93    for (output_type, key, inputs) in sink_inputs.chain(transform_inputs) {
94        if inputs.is_empty() {
95            errors.push(format!(
96                "{} \"{}\" has no inputs",
97                capitalize(output_type),
98                key
99            ));
100        }
101
102        let mut frequencies = HashMap::new();
103        for input in inputs {
104            let entry = frequencies.entry(input).or_insert(0usize);
105            *entry += 1;
106        }
107
108        for (dup, count) in frequencies.into_iter().filter(|(_name, count)| *count > 1) {
109            errors.push(format!(
110                "{} \"{}\" has input \"{}\" duplicated {} times",
111                capitalize(output_type),
112                key,
113                dup,
114                count,
115            ));
116        }
117    }
118
119    if errors.is_empty() {
120        Ok(())
121    } else {
122        Err(errors)
123    }
124}
125
126pub fn check_resources(config: &ConfigBuilder) -> Result<(), Vec<String>> {
127    let source_resources = config
128        .sources
129        .iter()
130        .map(|(id, config)| (id, config.inner.resources()));
131    let sink_resources = config
132        .sinks
133        .iter()
134        .map(|(id, config)| (id, config.resources(id)));
135
136    let conflicting_components = Resource::conflicts(source_resources.chain(sink_resources));
137
138    if conflicting_components.is_empty() {
139        Ok(())
140    } else {
141        Err(conflicting_components
142            .into_iter()
143            .map(|(resource, components)| {
144                format!("Resource `{resource}` is claimed by multiple components: {components:?}")
145            })
146            .collect())
147    }
148}
149
150/// To avoid collisions between `output` metric tags, check that a component
151/// does not have a named output with the name [`DEFAULT_OUTPUT`]
152pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
153    let mut errors = Vec::new();
154    for (key, source) in config.sources.iter() {
155        let outputs = source.inner.outputs(config.schema.log_namespace());
156        if outputs
157            .iter()
158            .map(|output| output.port.as_deref().unwrap_or(""))
159            .any(|name| name == DEFAULT_OUTPUT)
160        {
161            errors.push(format!(
162                "Source {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
163            ));
164        }
165    }
166
167    for (key, transform) in config.transforms.iter() {
168        // use the most general definition possible, since the real value isn't known yet.
169        let definition = schema::Definition::any();
170
171        if let Err(errs) = transform.inner.validate(&definition) {
172            errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
173        }
174
175        if get_transform_output_ids(
176            transform.inner.as_ref(),
177            key.clone(),
178            config.schema.log_namespace(),
179        )
180        .any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
181        {
182            errors.push(format!(
183                "Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
184            ));
185        }
186    }
187
188    if errors.is_empty() {
189        Ok(())
190    } else {
191        Err(errors)
192    }
193}
194
195pub async fn check_buffer_preconditions(config: &Config) -> Result<(), Vec<String>> {
196    // We need to assert that Vector's data directory is located on a mountpoint that has enough
197    // capacity to allow all sinks with disk buffers configured to be able to use up to their
198    // maximum configured size without overrunning the total capacity.
199    //
200    // More subtly, we need to make sure we properly map a given buffer's data directory to the
201    // appropriate mountpoint, as it is technically possible that individual buffers could be on
202    // separate mountpoints.
203    //
204    // Notably, this does *not* cover other data usage by Vector on the same mountpoint because we
205    // don't always know the upper bound of that usage i.e. file checkpoint state.
206
207    // Grab all configured disk buffers, and if none are present, simply return early.
208    let global_data_dir = config.global.data_dir.clone();
209    let configured_disk_buffers = config
210        .sinks()
211        .flat_map(|(id, sink)| {
212            sink.buffer
213                .stages()
214                .iter()
215                .filter_map(|stage| stage.disk_usage(global_data_dir.clone(), id))
216        })
217        .collect::<Vec<_>>();
218
219    if configured_disk_buffers.is_empty() {
220        return Ok(());
221    }
222
223    // Now query all the mountpoints on the system, and get their total capacity. We also have to
224    // sort the mountpoints from longest to shortest so we can find the longest prefix match for
225    // each buffer data directory by simply iterating from beginning to end.
226    let mountpoints = heim::disk::partitions()
227        .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
228        .or_else(|_| {
229            heim::disk::partitions_physical()
230                .and_then(|stream| stream.try_collect::<Vec<_>>().and_then(process_partitions))
231        })
232        .await;
233
234    let mountpoints = match mountpoints {
235        Ok(mut mountpoints) => {
236            mountpoints.sort_by(|m1, _, m2, _| m2.cmp(m1));
237            mountpoints
238        }
239        Err(e) => {
240            warn!(
241                cause = %e,
242                message = "Failed to query disk partitions. Cannot ensure that buffer size limits are within physical storage capacity limits.",
243            );
244            return Ok(());
245        }
246    };
247
248    // Now build a mapping of buffer IDs/usage configuration to the mountpoint they reside on.
249    let mountpoint_buffer_mapping = configured_disk_buffers.into_iter().fold(
250        HashMap::new(),
251        |mut mappings: HashMap<PathBuf, Vec<DiskUsage>>, usage| {
252            let canonicalized_data_dir = usage
253                .data_dir()
254                .canonicalize()
255                .unwrap_or_else(|_| usage.data_dir().to_path_buf());
256            let mountpoint = mountpoints
257                .keys()
258                .find(|mountpoint| canonicalized_data_dir.starts_with(mountpoint));
259
260            match mountpoint {
261                None => warn!(
262                    buffer_id = usage.id().id(),
263                    data_dir = usage.data_dir().to_string_lossy().as_ref(),
264                    canonicalized_data_dir = canonicalized_data_dir.to_string_lossy().as_ref(),
265                    message = "Found no matching mountpoint for buffer data directory.",
266                ),
267                Some(mountpoint) => {
268                    mappings.entry(mountpoint.clone()).or_default().push(usage);
269                }
270            }
271
272            mappings
273        },
274    );
275
276    // Finally, we have a mapping of disk buffers, based on their underlying mountpoint. Go through
277    // and check to make sure the sum total of `max_size` for all buffers associated with each
278    // mountpoint does not exceed that mountpoint's total capacity.
279    //
280    // We specifically do not do any sort of warning on free space because that has to be the
281    // responsibility of the operator to ensure there's enough total space for all buffers present.
282    let mut errors = Vec::new();
283
284    for (mountpoint, buffers) in mountpoint_buffer_mapping {
285        let buffer_max_size_total: u64 = buffers.iter().map(|usage| usage.max_size()).sum();
286        let mountpoint_total_capacity = mountpoints
287            .get(&mountpoint)
288            .copied()
289            .expect("mountpoint must exist");
290
291        if buffer_max_size_total > mountpoint_total_capacity {
292            let component_ids = buffers
293                .iter()
294                .map(|usage| usage.id().id())
295                .collect::<Vec<_>>();
296            errors.push(format!(
297                "Mountpoint '{}' has total capacity of {} bytes, but configured buffers using mountpoint have total maximum size of {} bytes. \
298Reduce the `max_size` of the buffers to fit within the total capacity of the mountpoint. (components associated with mountpoint: {})",
299                mountpoint.to_string_lossy(), mountpoint_total_capacity, buffer_max_size_total, component_ids.join(", "),
300            ));
301        }
302    }
303
304    if errors.is_empty() {
305        Ok(())
306    } else {
307        Err(errors)
308    }
309}
310
311async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap<PathBuf, u64>> {
312    stream::iter(partitions)
313        .map(Ok)
314        .and_then(|partition| {
315            let mountpoint_path = partition.mount_point().to_path_buf();
316            heim::disk::usage(mountpoint_path.clone())
317                .map(|usage| usage.map(|usage| (mountpoint_path, usage.total().get::<byte>())))
318        })
319        .try_collect::<IndexMap<_, _>>()
320        .await
321}
322
323pub fn warnings(config: &Config) -> Vec<String> {
324    let mut warnings = vec![];
325
326    let table_sources = config
327        .enrichment_tables
328        .iter()
329        .filter_map(|(key, table)| table.as_source(key))
330        .collect::<Vec<_>>();
331    let source_ids = config
332        .sources
333        .iter()
334        .chain(table_sources.iter().map(|(k, s)| (k, s)))
335        .flat_map(|(key, source)| {
336            source
337                .inner
338                .outputs(config.schema.log_namespace())
339                .iter()
340                .map(|output| {
341                    if let Some(port) = &output.port {
342                        ("source", OutputId::from((key, port.clone())))
343                    } else {
344                        ("source", OutputId::from(key))
345                    }
346                })
347                .collect::<Vec<_>>()
348        });
349    let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
350        get_transform_output_ids(
351            transform.inner.as_ref(),
352            key.clone(),
353            config.schema.log_namespace(),
354        )
355        .map(|output| ("transform", output))
356        .collect::<Vec<_>>()
357    });
358
359    let table_sinks = config
360        .enrichment_tables
361        .iter()
362        .filter_map(|(key, table)| table.as_sink(key))
363        .collect::<Vec<_>>();
364    for (input_type, id) in transform_ids.chain(source_ids) {
365        if !config
366            .transforms
367            .iter()
368            .any(|(_, transform)| transform.inputs.contains(&id))
369            && !config
370                .sinks
371                .iter()
372                .any(|(_, sink)| sink.inputs.contains(&id))
373            && !table_sinks
374                .iter()
375                .any(|(_, sink)| sink.inputs.contains(&id))
376        {
377            warnings.push(format!(
378                "{} \"{}\" has no consumers",
379                capitalize(input_type),
380                id
381            ));
382        }
383    }
384
385    warnings
386}
387
388fn capitalize(s: &str) -> String {
389    let mut s = s.to_owned();
390    if let Some(r) = s.get_mut(0..1) {
391        r.make_ascii_uppercase();
392    }
393    s
394}