vector_core/schema/
requirement.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use lookup::OwnedTargetPath;
4use vrl::value::Kind;
5
6use super::Definition;
7use crate::config::LogNamespace;
8
9/// The input schema for a given component.
10///
11/// This schema defines the (semantic) fields a component expects to receive from its input
12/// components.
13#[derive(Debug, Clone, PartialEq)]
14pub struct Requirement {
15    /// Semantic meanings configured for this requirement.
16    meaning: BTreeMap<String, SemanticMeaning>,
17}
18
19/// The semantic meaning of an event.
20#[derive(Debug, Clone, PartialEq)]
21struct SemanticMeaning {
22    /// The type required by this semantic meaning.
23    kind: Kind,
24
25    /// Whether the meaning is optional.
26    ///
27    /// If a meaning is optional, the sink must not error when the meaning is not defined in the
28    /// provided `Definition`, but it *must* error if it is defined, but its type does not meet the
29    /// requirement.
30    optional: bool,
31}
32
33impl Requirement {
34    /// Create a new empty schema.
35    ///
36    /// An empty schema is the most "open" schema, in that there are no restrictions.
37    pub fn empty() -> Self {
38        Self {
39            meaning: BTreeMap::default(),
40        }
41    }
42
43    /// Check if the requirement is "empty", meaning:
44    ///
45    /// 1. There are no required fields defined.
46    /// 2. The unknown fields are set to "any".
47    /// 3. There are no required meanings defined.
48    pub fn is_empty(&self) -> bool {
49        self.meaning.is_empty()
50    }
51
52    /// Add a restriction to the schema.
53    #[must_use]
54    pub fn required_meaning(mut self, meaning: impl Into<String>, kind: Kind) -> Self {
55        self.insert_meaning(meaning, kind, false);
56        self
57    }
58
59    /// Add an optional restriction to the schema.
60    ///
61    /// This differs from `required_meaning` in that it is valid for the event to not have the
62    /// specified meaning defined, but invalid for that meaning to be defined, but its [`Kind`] not
63    /// matching the configured expectation.
64    #[must_use]
65    pub fn optional_meaning(mut self, meaning: impl Into<String>, kind: Kind) -> Self {
66        self.insert_meaning(meaning, kind, true);
67        self
68    }
69
70    fn insert_meaning(&mut self, identifier: impl Into<String>, kind: Kind, optional: bool) {
71        let meaning = SemanticMeaning { kind, optional };
72        self.meaning.insert(identifier.into(), meaning);
73    }
74
75    /// Validate the provided [`Definition`] against the current requirement.
76    /// If `validate_schema_type` is true, validation ensure the types match,
77    /// otherwise it only ensures the required fields exist.
78    ///
79    /// # Errors
80    ///
81    /// Returns a list of errors if validation fails.
82    pub fn validate(
83        &self,
84        definition: &Definition,
85        validate_schema_type: bool,
86    ) -> Result<(), ValidationErrors> {
87        let mut errors = vec![];
88
89        // We only validate definitions if there is at least one connected component
90        // that uses the Vector namespace.
91        if !definition.log_namespaces().contains(&LogNamespace::Vector) {
92            return Ok(());
93        }
94
95        for (identifier, req_meaning) in &self.meaning {
96            // Check if we're dealing with an invalid meaning, meaning the definition has a single
97            // meaning identifier pointing to multiple paths.
98            if let Some(paths) = definition.invalid_meaning(identifier).cloned() {
99                errors.push(ValidationError::MeaningDuplicate {
100                    identifier: identifier.clone(),
101                    paths,
102                });
103                continue;
104            }
105
106            let maybe_meaning_path = definition.meanings().find_map(|(def_id, path)| {
107                if def_id == identifier {
108                    Some(path)
109                } else {
110                    None
111                }
112            });
113
114            match maybe_meaning_path {
115                Some(target_path) if validate_schema_type => {
116                    // Get the kind at the path for the given semantic meaning.
117                    let definition_kind = definition.kind_at(target_path);
118
119                    if req_meaning.kind.is_superset(&definition_kind).is_err() {
120                        // The semantic meaning kind does not match the expected
121                        // kind, so we can't use it in the sink.
122                        errors.push(ValidationError::MeaningKind {
123                            identifier: identifier.clone(),
124                            want: req_meaning.kind.clone(),
125                            got: definition_kind,
126                        });
127                    }
128                }
129                None if !req_meaning.optional => {
130                    errors.push(ValidationError::MeaningMissing {
131                        identifier: identifier.clone(),
132                    });
133                }
134                _ => {}
135            }
136        }
137
138        if errors.is_empty() {
139            Ok(())
140        } else {
141            Err(ValidationErrors(errors))
142        }
143    }
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct ValidationErrors(Vec<ValidationError>);
148
149impl ValidationErrors {
150    pub fn is_meaning_missing(&self) -> bool {
151        self.0.iter().any(ValidationError::is_meaning_missing)
152    }
153
154    pub fn is_meaning_kind(&self) -> bool {
155        self.0.iter().any(ValidationError::is_meaning_kind)
156    }
157
158    pub fn errors(&self) -> &[ValidationError] {
159        &self.0
160    }
161}
162
163impl std::error::Error for ValidationErrors {
164    fn source(&self) -> Option<&(dyn snafu::Error + 'static)> {
165        Some(&self.0[0])
166    }
167}
168
169impl std::fmt::Display for ValidationErrors {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        for error in &self.0 {
172            error.fmt(f)?;
173        }
174
175        Ok(())
176    }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq)]
180#[allow(clippy::enum_variant_names)]
181pub enum ValidationError {
182    /// A required semantic meaning is missing.
183    MeaningMissing { identifier: String },
184
185    /// A semantic meaning has an invalid `[Kind]`.
186    MeaningKind {
187        identifier: String,
188        want: Kind,
189        got: Kind,
190    },
191
192    /// A semantic meaning is pointing to multiple paths.
193    MeaningDuplicate {
194        identifier: String,
195        paths: BTreeSet<OwnedTargetPath>,
196    },
197}
198
199impl ValidationError {
200    pub fn is_meaning_missing(&self) -> bool {
201        matches!(self, Self::MeaningMissing { .. })
202    }
203
204    pub fn is_meaning_kind(&self) -> bool {
205        matches!(self, Self::MeaningKind { .. })
206    }
207
208    pub fn is_meaning_duplicate(&self) -> bool {
209        matches!(self, Self::MeaningDuplicate { .. })
210    }
211}
212
213impl std::fmt::Display for ValidationError {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        match self {
216            Self::MeaningMissing { identifier } => {
217                write!(f, "missing semantic meaning: {identifier}")
218            }
219            Self::MeaningKind {
220                identifier,
221                want,
222                got,
223            } => write!(
224                f,
225                "invalid semantic meaning: {identifier} (expected {want}, got {got})"
226            ),
227            Self::MeaningDuplicate { identifier, paths } => write!(
228                f,
229                "semantic meaning {} pointing to multiple fields: {}",
230                identifier,
231                paths
232                    .iter()
233                    .map(ToString::to_string)
234                    .collect::<Vec<_>>()
235                    .join(", ")
236            ),
237        }
238    }
239}
240
241impl std::error::Error for ValidationError {}
242
243#[cfg(test)]
244mod tests {
245    use std::collections::HashMap;
246
247    use lookup::{lookup_v2::parse_target_path, owned_value_path};
248
249    use super::*;
250
251    #[test]
252    fn test_doesnt_validate_types() {
253        let requirement = Requirement::empty().required_meaning("foo", Kind::boolean());
254        let definition = Definition::default_for_namespace(&[LogNamespace::Vector].into())
255            .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));
256
257        assert_eq!(Ok(()), requirement.validate(&definition, false));
258    }
259
260    #[test]
261    fn test_doesnt_validate_legacy_namespace() {
262        let requirement = Requirement::empty().required_meaning("foo", Kind::boolean());
263
264        // We get an error if we have a connected component with the Vector namespace.
265        let definition =
266            Definition::default_for_namespace(&[LogNamespace::Vector, LogNamespace::Legacy].into())
267                .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));
268
269        assert_ne!(Ok(()), requirement.validate(&definition, true));
270
271        // We don't get an error if we have a connected component with just the Legacy namespace.
272        let definition = Definition::default_for_namespace(&[LogNamespace::Legacy].into())
273            .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"));
274
275        assert_eq!(Ok(()), requirement.validate(&definition, true));
276    }
277
278    #[test]
279    #[allow(clippy::too_many_lines)]
280    fn test_validate() {
281        struct TestCase {
282            requirement: Requirement,
283            definition: Definition,
284            errors: Vec<ValidationError>,
285        }
286
287        for (
288            title,
289            TestCase {
290                requirement,
291                definition,
292                errors,
293            },
294        ) in HashMap::from([
295            (
296                "empty",
297                TestCase {
298                    requirement: Requirement::empty(),
299                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
300                    errors: vec![],
301                },
302            ),
303            (
304                "missing required meaning",
305                TestCase {
306                    requirement: Requirement::empty().required_meaning("foo", Kind::any()),
307                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
308                    errors: vec![ValidationError::MeaningMissing {
309                        identifier: "foo".into(),
310                    }],
311                },
312            ),
313            (
314                "missing required meanings",
315                TestCase {
316                    requirement: Requirement::empty()
317                        .required_meaning("foo", Kind::any())
318                        .required_meaning("bar", Kind::any()),
319                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
320                    errors: vec![
321                        ValidationError::MeaningMissing {
322                            identifier: "bar".into(),
323                        },
324                        ValidationError::MeaningMissing {
325                            identifier: "foo".into(),
326                        },
327                    ],
328                },
329            ),
330            (
331                "missing optional meaning",
332                TestCase {
333                    requirement: Requirement::empty().optional_meaning("foo", Kind::any()),
334                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
335                    errors: vec![],
336                },
337            ),
338            (
339                "missing mixed meanings",
340                TestCase {
341                    requirement: Requirement::empty()
342                        .optional_meaning("foo", Kind::any())
343                        .required_meaning("bar", Kind::any()),
344                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into()),
345                    errors: vec![ValidationError::MeaningMissing {
346                        identifier: "bar".into(),
347                    }],
348                },
349            ),
350            (
351                "invalid required meaning kind",
352                TestCase {
353                    requirement: Requirement::empty().required_meaning("foo", Kind::boolean()),
354                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
355                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")),
356                    errors: vec![ValidationError::MeaningKind {
357                        identifier: "foo".into(),
358                        want: Kind::boolean(),
359                        got: Kind::integer(),
360                    }],
361                },
362            ),
363            (
364                "invalid optional meaning kind",
365                TestCase {
366                    requirement: Requirement::empty().optional_meaning("foo", Kind::boolean()),
367                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
368                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo")),
369                    errors: vec![ValidationError::MeaningKind {
370                        identifier: "foo".into(),
371                        want: Kind::boolean(),
372                        got: Kind::integer(),
373                    }],
374                },
375            ),
376            (
377                "duplicate meaning pointers",
378                TestCase {
379                    requirement: Requirement::empty().optional_meaning("foo", Kind::boolean()),
380                    definition: Definition::default_for_namespace(&[LogNamespace::Vector].into())
381                        .with_event_field(&owned_value_path!("foo"), Kind::integer(), Some("foo"))
382                        .merge(
383                            Definition::default_for_namespace(&[LogNamespace::Vector].into())
384                                .with_event_field(
385                                    &owned_value_path!("bar"),
386                                    Kind::boolean(),
387                                    Some("foo"),
388                                ),
389                        ),
390                    errors: vec![ValidationError::MeaningDuplicate {
391                        identifier: "foo".into(),
392                        paths: BTreeSet::from([
393                            parse_target_path("foo").unwrap(),
394                            parse_target_path("bar").unwrap(),
395                        ]),
396                    }],
397                },
398            ),
399        ]) {
400            let got = requirement.validate(&definition, true);
401            let want = if errors.is_empty() {
402                Ok(())
403            } else {
404                Err(ValidationErrors(errors))
405            };
406
407            assert_eq!(got, want, "{title}");
408        }
409    }
410}