vector_core/schema/
requirement.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use lookup::OwnedTargetPath;
4use vrl::value::Kind;
5
6use crate::config::LogNamespace;
7
8use super::Definition;
9
10/// The input schema for a given component.
11///
12/// This schema defines the (semantic) fields a component expects to receive from its input
13/// components.
14#[derive(Debug, Clone, PartialEq)]
15pub struct Requirement {
16    /// Semantic meanings configured for this requirement.
17    meaning: BTreeMap<String, SemanticMeaning>,
18}
19
20/// The semantic meaning of an event.
21#[derive(Debug, Clone, PartialEq)]
22struct SemanticMeaning {
23    /// The type required by this semantic meaning.
24    kind: Kind,
25
26    /// Whether the meaning is optional.
27    ///
28    /// If a meaning is optional, the sink must not error when the meaning is not defined in the
29    /// provided `Definition`, but it *must* error if it is defined, but its type does not meet the
30    /// requirement.
31    optional: bool,
32}
33
34impl Requirement {
35    /// Create a new empty schema.
36    ///
37    /// An empty schema is the most "open" schema, in that there are no restrictions.
38    pub fn empty() -> Self {
39        Self {
40            meaning: BTreeMap::default(),
41        }
42    }
43
44    /// Check if the requirement is "empty", meaning:
45    ///
46    /// 1. There are no required fields defined.
47    /// 2. The unknown fields are set to "any".
48    /// 3. There are no required meanings defined.
49    pub fn is_empty(&self) -> bool {
50        self.meaning.is_empty()
51    }
52
53    /// Add a restriction to the schema.
54    #[must_use]
55    pub fn required_meaning(mut self, meaning: impl Into<String>, kind: Kind) -> Self {
56        self.insert_meaning(meaning, kind, false);
57        self
58    }
59
60    /// Add an optional restriction to the schema.
61    ///
62    /// This differs from `required_meaning` in that it is valid for the event to not have the
63    /// specified meaning defined, but invalid for that meaning to be defined, but its [`Kind`] not
64    /// matching the configured expectation.
65    #[must_use]
66    pub fn optional_meaning(mut self, meaning: impl Into<String>, kind: Kind) -> Self {
67        self.insert_meaning(meaning, kind, true);
68        self
69    }
70
71    fn insert_meaning(&mut self, identifier: impl Into<String>, kind: Kind, optional: bool) {
72        let meaning = SemanticMeaning { kind, optional };
73        self.meaning.insert(identifier.into(), meaning);
74    }
75
76    /// Validate the provided [`Definition`] against the current requirement.
77    /// If `validate_schema_type` is true, validation ensure the types match,
78    /// otherwise it only ensures the required fields exist.
79    ///
80    /// # Errors
81    ///
82    /// Returns a list of errors if validation fails.
83    pub fn validate(
84        &self,
85        definition: &Definition,
86        validate_schema_type: bool,
87    ) -> Result<(), ValidationErrors> {
88        let mut errors = vec![];
89
90        // We only validate definitions if there is at least one connected component
91        // that uses the Vector namespace.
92        if !definition.log_namespaces().contains(&LogNamespace::Vector) {
93            return Ok(());
94        }
95
96        for (identifier, req_meaning) in &self.meaning {
97            // Check if we're dealing with an invalid meaning, meaning the definition has a single
98            // meaning identifier pointing to multiple paths.
99            if let Some(paths) = definition.invalid_meaning(identifier).cloned() {
100                errors.push(ValidationError::MeaningDuplicate {
101                    identifier: identifier.clone(),
102                    paths,
103                });
104                continue;
105            }
106
107            let maybe_meaning_path = definition.meanings().find_map(|(def_id, path)| {
108                if def_id == identifier {
109                    Some(path)
110                } else {
111                    None
112                }
113            });
114
115            match maybe_meaning_path {
116                Some(target_path) if validate_schema_type => {
117                    // Get the kind at the path for the given semantic meaning.
118                    let definition_kind = definition.kind_at(target_path);
119
120                    if req_meaning.kind.is_superset(&definition_kind).is_err() {
121                        // The semantic meaning kind does not match the expected
122                        // kind, so we can't use it in the sink.
123                        errors.push(ValidationError::MeaningKind {
124                            identifier: identifier.clone(),
125                            want: req_meaning.kind.clone(),
126                            got: definition_kind,
127                        });
128                    }
129                }
130                None if !req_meaning.optional => {
131                    errors.push(ValidationError::MeaningMissing {
132                        identifier: identifier.clone(),
133                    });
134                }
135                _ => {}
136            }
137        }
138
139        if errors.is_empty() {
140            Ok(())
141        } else {
142            Err(ValidationErrors(errors))
143        }
144    }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct ValidationErrors(Vec<ValidationError>);
149
150impl ValidationErrors {
151    pub fn is_meaning_missing(&self) -> bool {
152        self.0.iter().any(ValidationError::is_meaning_missing)
153    }
154
155    pub fn is_meaning_kind(&self) -> bool {
156        self.0.iter().any(ValidationError::is_meaning_kind)
157    }
158
159    pub fn errors(&self) -> &[ValidationError] {
160        &self.0
161    }
162}
163
164impl std::error::Error for ValidationErrors {
165    fn source(&self) -> Option<&(dyn snafu::Error + 'static)> {
166        Some(&self.0[0])
167    }
168}
169
170impl std::fmt::Display for ValidationErrors {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        for error in &self.0 {
173            error.fmt(f)?;
174        }
175
176        Ok(())
177    }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
181#[allow(clippy::enum_variant_names)]
182pub enum ValidationError {
183    /// A required semantic meaning is missing.
184    MeaningMissing { identifier: String },
185
186    /// A semantic meaning has an invalid `[Kind]`.
187    MeaningKind {
188        identifier: String,
189        want: Kind,
190        got: Kind,
191    },
192
193    /// A semantic meaning is pointing to multiple paths.
194    MeaningDuplicate {
195        identifier: String,
196        paths: BTreeSet<OwnedTargetPath>,
197    },
198}
199
200impl ValidationError {
201    pub fn is_meaning_missing(&self) -> bool {
202        matches!(self, Self::MeaningMissing { .. })
203    }
204
205    pub fn is_meaning_kind(&self) -> bool {
206        matches!(self, Self::MeaningKind { .. })
207    }
208
209    pub fn is_meaning_duplicate(&self) -> bool {
210        matches!(self, Self::MeaningDuplicate { .. })
211    }
212}
213
214impl std::fmt::Display for ValidationError {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        match self {
217            Self::MeaningMissing { identifier } => {
218                write!(f, "missing semantic meaning: {identifier}")
219            }
220            Self::MeaningKind {
221                identifier,
222                want,
223                got,
224            } => write!(
225                f,
226                "invalid semantic meaning: {identifier} (expected {want}, got {got})"
227            ),
228            Self::MeaningDuplicate { identifier, paths } => write!(
229                f,
230                "semantic meaning {} pointing to multiple fields: {}",
231                identifier,
232                paths
233                    .iter()
234                    .map(ToString::to_string)
235                    .collect::<Vec<_>>()
236                    .join(", ")
237            ),
238        }
239    }
240}
241
242impl std::error::Error for ValidationError {}
243
244#[cfg(test)]
245mod tests {
246    use lookup::lookup_v2::parse_target_path;
247    use lookup::owned_value_path;
248    use std::collections::HashMap;
249
250    use super::*;
251
252    #[test]
253    fn test_doesnt_validate_types() {
254        let requirement = Requirement::empty().required_meaning("foo", Kind::boolean());
255        let definition = Definition::default_for_namespace(&[LogNamespace::Vector].into())
256            .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));
257
258        assert_eq!(Ok(()), requirement.validate(&definition, false));
259    }
260
261    #[test]
262    fn test_doesnt_validate_legacy_namespace() {
263        let requirement = Requirement::empty().required_meaning("foo", Kind::boolean());
264
265        // We get an error if we have a connected component with the Vector namespace.
266        let definition =
267            Definition::default_for_namespace(&[LogNamespace::Vector, LogNamespace::Legacy].into())
268                .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));
269
270        assert_ne!(Ok(()), requirement.validate(&definition, true));
271
272        // We don't get an error if we have a connected component with just the Legacy namespace.
273        let definition = Definition::default_for_namespace(&[LogNamespace::Legacy].into())
274            .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));
275
276        assert_eq!(Ok(()), requirement.validate(&definition, true));
277    }
278
279    #[test]
280    #[allow(clippy::too_many_lines)]
281    fn test_validate() {
282        struct TestCase {
283            requirement: Requirement,
284            definition: Definition,
285            errors: Vec<ValidationError>,
286        }
287
288        for (
289            title,
290            TestCase {
291                requirement,
292                definition,
293                errors,
294            },
295        ) in HashMap::from([
296            (
297                "empty",
298                TestCase {
299                    requirement: Requirement::empty(),
300                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
301                    errors: vec![],
302                },
303            ),
304            (
305                "missing required meaning",
306                TestCase {
307                    requirement: Requirement::empty().required_meaning("foo", Kind::any()),
308                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
309                    errors: vec![ValidationError::MeaningMissing {
310                        identifier: "foo".into(),
311                    }],
312                },
313            ),
314            (
315                "missing required meanings",
316                TestCase {
317                    requirement: Requirement::empty()
318                        .required_meaning("foo", Kind::any())
319                        .required_meaning("bar", Kind::any()),
320                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
321                    errors: vec![
322                        ValidationError::MeaningMissing {
323                            identifier: "bar".into(),
324                        },
325                        ValidationError::MeaningMissing {
326                            identifier: "foo".into(),
327                        },
328                    ],
329                },
330            ),
331            (
332                "missing optional meaning",
333                TestCase {
334                    requirement: Requirement::empty().optional_meaning("foo", Kind::any()),
335                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
336                    errors: vec![],
337                },
338            ),
339            (
340                "missing mixed meanings",
341                TestCase {
342                    requirement: Requirement::empty()
343                        .optional_meaning("foo", Kind::any())
344                        .required_meaning("bar", Kind::any()),
345                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
346                    errors: vec![ValidationError::MeaningMissing {
347                        identifier: "bar".into(),
348                    }],
349                },
350            ),
351            (
352                "invalid required meaning kind",
353                TestCase {
354                    requirement: Requirement::empty().required_meaning("foo", Kind::boolean()),
355                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
356                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")),
357                    errors: vec![ValidationError::MeaningKind {
358                        identifier: "foo".into(),
359                        want: Kind::boolean(),
360                        got: Kind::integer(),
361                    }],
362                },
363            ),
364            (
365                "invalid optional meaning kind",
366                TestCase {
367                    requirement: Requirement::empty().optional_meaning("foo", Kind::boolean()),
368                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
369                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")),
370                    errors: vec![ValidationError::MeaningKind {
371                        identifier: "foo".into(),
372                        want: Kind::boolean(),
373                        got: Kind::integer(),
374                    }],
375                },
376            ),
377            (
378                "duplicate meaning pointers",
379                TestCase {
380                    requirement: Requirement::empty().optional_meaning("foo", Kind::boolean()),
381                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
382                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"))
383                        .merge(
384                            Definition::default_for_namespace(&[LogNamespace::Vector].into())
385                                .with_event_field(
386                                    &owned_value_path!("bar"),
387                                    Kind::boolean(),
388                                    Some("foo"),
389                                ),
390                        ),
391                    errors: vec![ValidationError::MeaningDuplicate {
392                        identifier: "foo".into(),
393                        paths: BTreeSet::from([
394                            parse_target_path("foo").unwrap(),
395                            parse_target_path("bar").unwrap(),
396                        ]),
397                    }],
398                },
399            ),
400        ]) {
401            let got = requirement.validate(&definition, true);
402            let want = if errors.is_empty() {
403                Ok(())
404            } else {
405                Err(ValidationErrors(errors))
406            };
407
408            assert_eq!(got, want, "{title}");
409        }
410    }
411}